Fix Bug 1425 NG-RO unable to pin VNF with vim_account config
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99 from osm_lcm.prometheus import parse_job
100
101 from copy import copy, deepcopy
102 from time import time
103 from uuid import uuid4
104
105 from random import randint
106
107 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
108
109
110 class NsLcm(LcmBase):
111 timeout_vca_on_error = (
112 5 * 60
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete = 10 * 60
117 timeout_primitive = 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive = (
119 10 * 60
120 ) # timeout for some progress in a primitive execution
121
122 SUBOPERATION_STATUS_NOT_FOUND = -1
123 SUBOPERATION_STATUS_NEW = -2
124 SUBOPERATION_STATUS_SKIP = -3
125 task_name_deploy_vca = "Deploying VCA"
126
127 def __init__(self, msg, lcm_tasks, config, loop):
128 """
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
131 :return: None
132 """
133 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
134
135 self.db = Database().instance.db
136 self.fs = Filesystem().instance.fs
137 self.loop = loop
138 self.lcm_tasks = lcm_tasks
139 self.timeout = config["timeout"]
140 self.ro_config = config["ro_config"]
141 self.ng_ro = config["ro_config"].get("ng")
142 self.vca_config = config["VCA"].copy()
143
144 # create N2VC connector
145 self.n2vc = N2VCJujuConnector(
146 log=self.logger,
147 loop=self.loop,
148 on_update_db=self._on_update_n2vc_db,
149 fs=self.fs,
150 db=self.db,
151 )
152
153 self.conn_helm_ee = LCMHelmConn(
154 log=self.logger,
155 loop=self.loop,
156 vca_config=self.vca_config,
157 on_update_db=self._on_update_n2vc_db,
158 )
159
160 self.k8sclusterhelm2 = K8sHelmConnector(
161 kubectl_command=self.vca_config.get("kubectlpath"),
162 helm_command=self.vca_config.get("helmpath"),
163 log=self.logger,
164 on_update_db=None,
165 fs=self.fs,
166 db=self.db,
167 )
168
169 self.k8sclusterhelm3 = K8sHelm3Connector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helm3path"),
172 fs=self.fs,
173 log=self.logger,
174 db=self.db,
175 on_update_db=None,
176 )
177
178 self.k8sclusterjuju = K8sJujuConnector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 juju_command=self.vca_config.get("jujupath"),
181 log=self.logger,
182 loop=self.loop,
183 on_update_db=self._on_update_k8s_db,
184 fs=self.fs,
185 db=self.db,
186 )
187
188 self.k8scluster_map = {
189 "helm-chart": self.k8sclusterhelm2,
190 "helm-chart-v3": self.k8sclusterhelm3,
191 "chart": self.k8sclusterhelm3,
192 "juju-bundle": self.k8sclusterjuju,
193 "juju": self.k8sclusterjuju,
194 }
195
196 self.vca_map = {
197 "lxc_proxy_charm": self.n2vc,
198 "native_charm": self.n2vc,
199 "k8s_proxy_charm": self.n2vc,
200 "helm": self.conn_helm_ee,
201 "helm-v3": self.conn_helm_ee,
202 }
203
204 # create RO client
205 self.RO = NgRoClient(self.loop, **self.ro_config)
206
207 @staticmethod
208 def increment_ip_mac(ip_mac, vm_index=1):
209 if not isinstance(ip_mac, str):
210 return ip_mac
211 try:
212 # try with ipv4 look for last dot
213 i = ip_mac.rfind(".")
214 if i > 0:
215 i += 1
216 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i = ip_mac.rfind(":")
219 if i > 0:
220 i += 1
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
223 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
224 )
225 except Exception:
226 pass
227 return None
228
229 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
230
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
232
233 try:
234 # TODO filter RO descriptor fields...
235
236 # write to database
237 db_dict = dict()
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict["deploymentStatus"] = ro_descriptor
240 self.update_db_2("nsrs", nsrs_id, db_dict)
241
242 except Exception as e:
243 self.logger.warn(
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
245 )
246
247 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
248
249 # remove last dot from path (if exists)
250 if path.endswith("."):
251 path = path[:-1]
252
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
255 try:
256
257 nsr_id = filter.get("_id")
258
259 # read ns record from database
260 nsr = self.db.get_one(table="nsrs", q_filter=filter)
261 current_ns_status = nsr.get("nsState")
262
263 # get vca status for NS
264 status_dict = await self.n2vc.get_status(
265 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
266 )
267
268 # vcaStatus
269 db_dict = dict()
270 db_dict["vcaStatus"] = status_dict
271 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
272
273 # update configurationStatus for this VCA
274 try:
275 vca_index = int(path[path.rfind(".") + 1 :])
276
277 vca_list = deep_get(
278 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
279 )
280 vca_status = vca_list[vca_index].get("status")
281
282 configuration_status_list = nsr.get("configurationStatus")
283 config_status = configuration_status_list[vca_index].get("status")
284
285 if config_status == "BROKEN" and vca_status != "failed":
286 db_dict["configurationStatus"][vca_index] = "READY"
287 elif config_status != "BROKEN" and vca_status == "failed":
288 db_dict["configurationStatus"][vca_index] = "BROKEN"
289 except Exception as e:
290 # not update configurationStatus
291 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
292
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
295 is_degraded = False
296 if current_ns_status in ("READY", "DEGRADED"):
297 error_description = ""
298 # check machines
299 if status_dict.get("machines"):
300 for machine_id in status_dict.get("machines"):
301 machine = status_dict.get("machines").get(machine_id)
302 # check machine agent-status
303 if machine.get("agent-status"):
304 s = machine.get("agent-status").get("status")
305 if s != "started":
306 is_degraded = True
307 error_description += (
308 "machine {} agent-status={} ; ".format(
309 machine_id, s
310 )
311 )
312 # check machine instance status
313 if machine.get("instance-status"):
314 s = machine.get("instance-status").get("status")
315 if s != "running":
316 is_degraded = True
317 error_description += (
318 "machine {} instance-status={} ; ".format(
319 machine_id, s
320 )
321 )
322 # check applications
323 if status_dict.get("applications"):
324 for app_id in status_dict.get("applications"):
325 app = status_dict.get("applications").get(app_id)
326 # check application status
327 if app.get("status"):
328 s = app.get("status").get("status")
329 if s != "active":
330 is_degraded = True
331 error_description += (
332 "application {} status={} ; ".format(app_id, s)
333 )
334
335 if error_description:
336 db_dict["errorDescription"] = error_description
337 if current_ns_status == "READY" and is_degraded:
338 db_dict["nsState"] = "DEGRADED"
339 if current_ns_status == "DEGRADED" and not is_degraded:
340 db_dict["nsState"] = "READY"
341
342 # write to database
343 self.update_db_2("nsrs", nsr_id, db_dict)
344
345 except (asyncio.CancelledError, asyncio.TimeoutError):
346 raise
347 except Exception as e:
348 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
349
350 async def _on_update_k8s_db(
351 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
352 ):
353 """
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :return: none
359 """
360
361 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
362 # .format(cluster_uuid, kdu_instance, filter))
363
364 try:
365 nsr_id = filter.get("_id")
366
367 # get vca status for NS
368 vca_status = await self.k8sclusterjuju.status_kdu(
369 cluster_uuid,
370 kdu_instance,
371 complete_status=True,
372 yaml_format=False,
373 vca_id=vca_id,
374 )
375 # vcaStatus
376 db_dict = dict()
377 db_dict["vcaStatus"] = {nsr_id: vca_status}
378
379 await self.k8sclusterjuju.update_vca_status(
380 db_dict["vcaStatus"],
381 kdu_instance,
382 vca_id=vca_id,
383 )
384
385 # write to database
386 self.update_db_2("nsrs", nsr_id, db_dict)
387
388 except (asyncio.CancelledError, asyncio.TimeoutError):
389 raise
390 except Exception as e:
391 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
392
393 @staticmethod
394 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
395 try:
396 env = Environment(undefined=StrictUndefined)
397 template = env.from_string(cloud_init_text)
398 return template.render(additional_params or {})
399 except UndefinedError as e:
400 raise LcmException(
401 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
402 "file, must be provided in the instantiation parameters inside the "
403 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
404 )
405 except (TemplateError, TemplateNotFound) as e:
406 raise LcmException(
407 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
408 vnfd_id, vdu_id, e
409 )
410 )
411
412 def _get_vdu_cloud_init_content(self, vdu, vnfd):
413 cloud_init_content = cloud_init_file = None
414 try:
415 if vdu.get("cloud-init-file"):
416 base_folder = vnfd["_admin"]["storage"]
417 if base_folder["pkg-dir"]:
418 cloud_init_file = "{}/{}/cloud_init/{}".format(
419 base_folder["folder"],
420 base_folder["pkg-dir"],
421 vdu["cloud-init-file"],
422 )
423 else:
424 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
425 base_folder["folder"],
426 vdu["cloud-init-file"],
427 )
428 with self.fs.file_open(cloud_init_file, "r") as ci_file:
429 cloud_init_content = ci_file.read()
430 elif vdu.get("cloud-init"):
431 cloud_init_content = vdu["cloud-init"]
432
433 return cloud_init_content
434 except FsException as e:
435 raise LcmException(
436 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
437 vnfd["id"], vdu["id"], cloud_init_file, e
438 )
439 )
440
441 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
442 vdur = next(
443 vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]
444 )
445 additional_params = vdur.get("additionalParams")
446 return parse_yaml_strings(additional_params)
447
448 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
449 """
450 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
451 :param vnfd: input vnfd
452 :param new_id: overrides vnf id if provided
453 :param additionalParams: Instantiation params for VNFs provided
454 :param nsrId: Id of the NSR
455 :return: copy of vnfd
456 """
457 vnfd_RO = deepcopy(vnfd)
458 # remove unused by RO configuration, monitoring, scaling and internal keys
459 vnfd_RO.pop("_id", None)
460 vnfd_RO.pop("_admin", None)
461 vnfd_RO.pop("monitoring-param", None)
462 vnfd_RO.pop("scaling-group-descriptor", None)
463 vnfd_RO.pop("kdu", None)
464 vnfd_RO.pop("k8s-cluster", None)
465 if new_id:
466 vnfd_RO["id"] = new_id
467
468 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
469 for vdu in get_iterable(vnfd_RO, "vdu"):
470 vdu.pop("cloud-init-file", None)
471 vdu.pop("cloud-init", None)
472 return vnfd_RO
473
474 @staticmethod
475 def ip_profile_2_RO(ip_profile):
476 RO_ip_profile = deepcopy(ip_profile)
477 if "dns-server" in RO_ip_profile:
478 if isinstance(RO_ip_profile["dns-server"], list):
479 RO_ip_profile["dns-address"] = []
480 for ds in RO_ip_profile.pop("dns-server"):
481 RO_ip_profile["dns-address"].append(ds["address"])
482 else:
483 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
484 if RO_ip_profile.get("ip-version") == "ipv4":
485 RO_ip_profile["ip-version"] = "IPv4"
486 if RO_ip_profile.get("ip-version") == "ipv6":
487 RO_ip_profile["ip-version"] = "IPv6"
488 if "dhcp-params" in RO_ip_profile:
489 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
490 return RO_ip_profile
491
492 def _get_ro_vim_id_for_vim_account(self, vim_account):
493 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
494 if db_vim["_admin"]["operationalState"] != "ENABLED":
495 raise LcmException(
496 "VIM={} is not available. operationalState={}".format(
497 vim_account, db_vim["_admin"]["operationalState"]
498 )
499 )
500 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
501 return RO_vim_id
502
503 def get_ro_wim_id_for_wim_account(self, wim_account):
504 if isinstance(wim_account, str):
505 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
506 if db_wim["_admin"]["operationalState"] != "ENABLED":
507 raise LcmException(
508 "WIM={} is not available. operationalState={}".format(
509 wim_account, db_wim["_admin"]["operationalState"]
510 )
511 )
512 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
513 return RO_wim_id
514 else:
515 return wim_account
516
517 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
518
519 db_vdu_push_list = []
520 db_update = {"_admin.modified": time()}
521 if vdu_create:
522 for vdu_id, vdu_count in vdu_create.items():
523 vdur = next(
524 (
525 vdur
526 for vdur in reversed(db_vnfr["vdur"])
527 if vdur["vdu-id-ref"] == vdu_id
528 ),
529 None,
530 )
531 if not vdur:
532 raise LcmException(
533 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
534 vdu_id
535 )
536 )
537
538 for count in range(vdu_count):
539 vdur_copy = deepcopy(vdur)
540 vdur_copy["status"] = "BUILD"
541 vdur_copy["status-detailed"] = None
542 vdur_copy["ip-address"] = None
543 vdur_copy["_id"] = str(uuid4())
544 vdur_copy["count-index"] += count + 1
545 vdur_copy["id"] = "{}-{}".format(
546 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
547 )
548 vdur_copy.pop("vim_info", None)
549 for iface in vdur_copy["interfaces"]:
550 if iface.get("fixed-ip"):
551 iface["ip-address"] = self.increment_ip_mac(
552 iface["ip-address"], count + 1
553 )
554 else:
555 iface.pop("ip-address", None)
556 if iface.get("fixed-mac"):
557 iface["mac-address"] = self.increment_ip_mac(
558 iface["mac-address"], count + 1
559 )
560 else:
561 iface.pop("mac-address", None)
562 iface.pop(
563 "mgmt_vnf", None
564 ) # only first vdu can be managment of vnf
565 db_vdu_push_list.append(vdur_copy)
566 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
567 if vdu_delete:
568 for vdu_id, vdu_count in vdu_delete.items():
569 if mark_delete:
570 indexes_to_delete = [
571 iv[0]
572 for iv in enumerate(db_vnfr["vdur"])
573 if iv[1]["vdu-id-ref"] == vdu_id
574 ]
575 db_update.update(
576 {
577 "vdur.{}.status".format(i): "DELETING"
578 for i in indexes_to_delete[-vdu_count:]
579 }
580 )
581 else:
582 # it must be deleted one by one because common.db does not allow otherwise
583 vdus_to_delete = [
584 v
585 for v in reversed(db_vnfr["vdur"])
586 if v["vdu-id-ref"] == vdu_id
587 ]
588 for vdu in vdus_to_delete[:vdu_count]:
589 self.db.set_one(
590 "vnfrs",
591 {"_id": db_vnfr["_id"]},
592 None,
593 pull={"vdur": {"_id": vdu["_id"]}},
594 )
595 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
596 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
597 # modify passed dictionary db_vnfr
598 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
599 db_vnfr["vdur"] = db_vnfr_["vdur"]
600
601 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
602 """
603 Updates database nsr with the RO info for the created vld
604 :param ns_update_nsr: dictionary to be filled with the updated info
605 :param db_nsr: content of db_nsr. This is also modified
606 :param nsr_desc_RO: nsr descriptor from RO
607 :return: Nothing, LcmException is raised on errors
608 """
609
610 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
611 for net_RO in get_iterable(nsr_desc_RO, "nets"):
612 if vld["id"] != net_RO.get("ns_net_osm_id"):
613 continue
614 vld["vim-id"] = net_RO.get("vim_net_id")
615 vld["name"] = net_RO.get("vim_name")
616 vld["status"] = net_RO.get("status")
617 vld["status-detailed"] = net_RO.get("error_msg")
618 ns_update_nsr["vld.{}".format(vld_index)] = vld
619 break
620 else:
621 raise LcmException(
622 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
623 )
624
625 def set_vnfr_at_error(self, db_vnfrs, error_text):
626 try:
627 for db_vnfr in db_vnfrs.values():
628 vnfr_update = {"status": "ERROR"}
629 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
630 if "status" not in vdur:
631 vdur["status"] = "ERROR"
632 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
633 if error_text:
634 vdur["status-detailed"] = str(error_text)
635 vnfr_update[
636 "vdur.{}.status-detailed".format(vdu_index)
637 ] = "ERROR"
638 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
639 except DbException as e:
640 self.logger.error("Cannot update vnf. {}".format(e))
641
642 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
643 """
644 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
645 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
646 :param nsr_desc_RO: nsr descriptor from RO
647 :return: Nothing, LcmException is raised on errors
648 """
649 for vnf_index, db_vnfr in db_vnfrs.items():
650 for vnf_RO in nsr_desc_RO["vnfs"]:
651 if vnf_RO["member_vnf_index"] != vnf_index:
652 continue
653 vnfr_update = {}
654 if vnf_RO.get("ip_address"):
655 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
656 "ip_address"
657 ].split(";")[0]
658 elif not db_vnfr.get("ip-address"):
659 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
660 raise LcmExceptionNoMgmtIP(
661 "ns member_vnf_index '{}' has no IP address".format(
662 vnf_index
663 )
664 )
665
666 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
667 vdur_RO_count_index = 0
668 if vdur.get("pdu-type"):
669 continue
670 for vdur_RO in get_iterable(vnf_RO, "vms"):
671 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
672 continue
673 if vdur["count-index"] != vdur_RO_count_index:
674 vdur_RO_count_index += 1
675 continue
676 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
677 if vdur_RO.get("ip_address"):
678 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
679 else:
680 vdur["ip-address"] = None
681 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
682 vdur["name"] = vdur_RO.get("vim_name")
683 vdur["status"] = vdur_RO.get("status")
684 vdur["status-detailed"] = vdur_RO.get("error_msg")
685 for ifacer in get_iterable(vdur, "interfaces"):
686 for interface_RO in get_iterable(vdur_RO, "interfaces"):
687 if ifacer["name"] == interface_RO.get("internal_name"):
688 ifacer["ip-address"] = interface_RO.get(
689 "ip_address"
690 )
691 ifacer["mac-address"] = interface_RO.get(
692 "mac_address"
693 )
694 break
695 else:
696 raise LcmException(
697 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
698 "from VIM info".format(
699 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
700 )
701 )
702 vnfr_update["vdur.{}".format(vdu_index)] = vdur
703 break
704 else:
705 raise LcmException(
706 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
707 "VIM info".format(
708 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
709 )
710 )
711
712 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
713 for net_RO in get_iterable(nsr_desc_RO, "nets"):
714 if vld["id"] != net_RO.get("vnf_net_osm_id"):
715 continue
716 vld["vim-id"] = net_RO.get("vim_net_id")
717 vld["name"] = net_RO.get("vim_name")
718 vld["status"] = net_RO.get("status")
719 vld["status-detailed"] = net_RO.get("error_msg")
720 vnfr_update["vld.{}".format(vld_index)] = vld
721 break
722 else:
723 raise LcmException(
724 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
725 vnf_index, vld["id"]
726 )
727 )
728
729 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
730 break
731
732 else:
733 raise LcmException(
734 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
735 vnf_index
736 )
737 )
738
739 def _get_ns_config_info(self, nsr_id):
740 """
741 Generates a mapping between vnf,vdu elements and the N2VC id
742 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
743 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
744 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
745 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
746 """
747 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
748 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
749 mapping = {}
750 ns_config_info = {"osm-config-mapping": mapping}
751 for vca in vca_deployed_list:
752 if not vca["member-vnf-index"]:
753 continue
754 if not vca["vdu_id"]:
755 mapping[vca["member-vnf-index"]] = vca["application"]
756 else:
757 mapping[
758 "{}.{}.{}".format(
759 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
760 )
761 ] = vca["application"]
762 return ns_config_info
763
764 async def _instantiate_ng_ro(
765 self,
766 logging_text,
767 nsr_id,
768 nsd,
769 db_nsr,
770 db_nslcmop,
771 db_vnfrs,
772 db_vnfds,
773 n2vc_key_list,
774 stage,
775 start_deploy,
776 timeout_ns_deploy,
777 ):
778
779 db_vims = {}
780
781 def get_vim_account(vim_account_id):
782 nonlocal db_vims
783 if vim_account_id in db_vims:
784 return db_vims[vim_account_id]
785 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
786 db_vims[vim_account_id] = db_vim
787 return db_vim
788
789 # modify target_vld info with instantiation parameters
790 def parse_vld_instantiation_params(
791 target_vim, target_vld, vld_params, target_sdn
792 ):
793 if vld_params.get("ip-profile"):
794 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
795 "ip-profile"
796 ]
797 if vld_params.get("provider-network"):
798 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
799 "provider-network"
800 ]
801 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
802 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
803 "provider-network"
804 ]["sdn-ports"]
805 if vld_params.get("wimAccountId"):
806 target_wim = "wim:{}".format(vld_params["wimAccountId"])
807 target_vld["vim_info"][target_wim] = {}
808 for param in ("vim-network-name", "vim-network-id"):
809 if vld_params.get(param):
810 if isinstance(vld_params[param], dict):
811 for vim, vim_net in vld_params[param].items():
812 other_target_vim = "vim:" + vim
813 populate_dict(
814 target_vld["vim_info"],
815 (other_target_vim, param.replace("-", "_")),
816 vim_net,
817 )
818 else: # isinstance str
819 target_vld["vim_info"][target_vim][
820 param.replace("-", "_")
821 ] = vld_params[param]
822 if vld_params.get("common_id"):
823 target_vld["common_id"] = vld_params.get("common_id")
824
825 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
826 def update_ns_vld_target(target, ns_params):
827 for vnf_params in ns_params.get("vnf", ()):
828 if vnf_params.get("vimAccountId"):
829 target_vnf = next(
830 (
831 vnfr
832 for vnfr in db_vnfrs.values()
833 if vnf_params["member-vnf-index"]
834 == vnfr["member-vnf-index-ref"]
835 ),
836 None,
837 )
838 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
839 for a_index, a_vld in enumerate(target["ns"]["vld"]):
840 target_vld = find_in_list(
841 get_iterable(vdur, "interfaces"),
842 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
843 )
844 if target_vld:
845 if vnf_params.get("vimAccountId") not in a_vld.get(
846 "vim_info", {}
847 ):
848 target["ns"]["vld"][a_index].get("vim_info").update(
849 {
850 "vim:{}".format(vnf_params["vimAccountId"]): {
851 "vim_network_name": ""
852 }
853 }
854 )
855
856 nslcmop_id = db_nslcmop["_id"]
857 target = {
858 "name": db_nsr["name"],
859 "ns": {"vld": []},
860 "vnf": [],
861 "image": deepcopy(db_nsr["image"]),
862 "flavor": deepcopy(db_nsr["flavor"]),
863 "action_id": nslcmop_id,
864 "cloud_init_content": {},
865 }
866 for image in target["image"]:
867 image["vim_info"] = {}
868 for flavor in target["flavor"]:
869 flavor["vim_info"] = {}
870
871 if db_nslcmop.get("lcmOperationType") != "instantiate":
872 # get parameters of instantiation:
873 db_nslcmop_instantiate = self.db.get_list(
874 "nslcmops",
875 {
876 "nsInstanceId": db_nslcmop["nsInstanceId"],
877 "lcmOperationType": "instantiate",
878 },
879 )[-1]
880 ns_params = db_nslcmop_instantiate.get("operationParams")
881 else:
882 ns_params = db_nslcmop.get("operationParams")
883 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
884 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
885
886 cp2target = {}
887 for vld_index, vld in enumerate(db_nsr.get("vld")):
888 target_vim = "vim:{}".format(ns_params["vimAccountId"])
889 target_vld = {
890 "id": vld["id"],
891 "name": vld["name"],
892 "mgmt-network": vld.get("mgmt-network", False),
893 "type": vld.get("type"),
894 "vim_info": {
895 target_vim: {
896 "vim_network_name": vld.get("vim-network-name"),
897 "vim_account_id": ns_params["vimAccountId"],
898 }
899 },
900 }
901 # check if this network needs SDN assist
902 if vld.get("pci-interfaces"):
903 db_vim = get_vim_account(ns_params["vimAccountId"])
904 sdnc_id = db_vim["config"].get("sdn-controller")
905 if sdnc_id:
906 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
907 target_sdn = "sdn:{}".format(sdnc_id)
908 target_vld["vim_info"][target_sdn] = {
909 "sdn": True,
910 "target_vim": target_vim,
911 "vlds": [sdn_vld],
912 "type": vld.get("type"),
913 }
914
915 nsd_vnf_profiles = get_vnf_profiles(nsd)
916 for nsd_vnf_profile in nsd_vnf_profiles:
917 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
918 if cp["virtual-link-profile-id"] == vld["id"]:
919 cp2target[
920 "member_vnf:{}.{}".format(
921 cp["constituent-cpd-id"][0][
922 "constituent-base-element-id"
923 ],
924 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
925 )
926 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
927
928 # check at nsd descriptor, if there is an ip-profile
929 vld_params = {}
930 nsd_vlp = find_in_list(
931 get_virtual_link_profiles(nsd),
932 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
933 == vld["id"],
934 )
935 if (
936 nsd_vlp
937 and nsd_vlp.get("virtual-link-protocol-data")
938 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
939 ):
940 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
941 "l3-protocol-data"
942 ]
943 ip_profile_dest_data = {}
944 if "ip-version" in ip_profile_source_data:
945 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
946 "ip-version"
947 ]
948 if "cidr" in ip_profile_source_data:
949 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
950 "cidr"
951 ]
952 if "gateway-ip" in ip_profile_source_data:
953 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
954 "gateway-ip"
955 ]
956 if "dhcp-enabled" in ip_profile_source_data:
957 ip_profile_dest_data["dhcp-params"] = {
958 "enabled": ip_profile_source_data["dhcp-enabled"]
959 }
960 vld_params["ip-profile"] = ip_profile_dest_data
961
962 # update vld_params with instantiation params
963 vld_instantiation_params = find_in_list(
964 get_iterable(ns_params, "vld"),
965 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
966 )
967 if vld_instantiation_params:
968 vld_params.update(vld_instantiation_params)
969 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
970 target["ns"]["vld"].append(target_vld)
971 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
972 update_ns_vld_target(target, ns_params)
973
974 for vnfr in db_vnfrs.values():
975 vnfd = find_in_list(
976 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
977 )
978 vnf_params = find_in_list(
979 get_iterable(ns_params, "vnf"),
980 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
981 )
982 target_vnf = deepcopy(vnfr)
983 target_vim = "vim:{}".format(vnfr["vim-account-id"])
984 for vld in target_vnf.get("vld", ()):
985 # check if connected to a ns.vld, to fill target'
986 vnf_cp = find_in_list(
987 vnfd.get("int-virtual-link-desc", ()),
988 lambda cpd: cpd.get("id") == vld["id"],
989 )
990 if vnf_cp:
991 ns_cp = "member_vnf:{}.{}".format(
992 vnfr["member-vnf-index-ref"], vnf_cp["id"]
993 )
994 if cp2target.get(ns_cp):
995 vld["target"] = cp2target[ns_cp]
996
997 vld["vim_info"] = {
998 target_vim: {"vim_network_name": vld.get("vim-network-name")}
999 }
1000 # check if this network needs SDN assist
1001 target_sdn = None
1002 if vld.get("pci-interfaces"):
1003 db_vim = get_vim_account(vnfr["vim-account-id"])
1004 sdnc_id = db_vim["config"].get("sdn-controller")
1005 if sdnc_id:
1006 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1007 target_sdn = "sdn:{}".format(sdnc_id)
1008 vld["vim_info"][target_sdn] = {
1009 "sdn": True,
1010 "target_vim": target_vim,
1011 "vlds": [sdn_vld],
1012 "type": vld.get("type"),
1013 }
1014
1015 # check at vnfd descriptor, if there is an ip-profile
1016 vld_params = {}
1017 vnfd_vlp = find_in_list(
1018 get_virtual_link_profiles(vnfd),
1019 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1020 )
1021 if (
1022 vnfd_vlp
1023 and vnfd_vlp.get("virtual-link-protocol-data")
1024 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1025 ):
1026 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1027 "l3-protocol-data"
1028 ]
1029 ip_profile_dest_data = {}
1030 if "ip-version" in ip_profile_source_data:
1031 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1032 "ip-version"
1033 ]
1034 if "cidr" in ip_profile_source_data:
1035 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1036 "cidr"
1037 ]
1038 if "gateway-ip" in ip_profile_source_data:
1039 ip_profile_dest_data[
1040 "gateway-address"
1041 ] = ip_profile_source_data["gateway-ip"]
1042 if "dhcp-enabled" in ip_profile_source_data:
1043 ip_profile_dest_data["dhcp-params"] = {
1044 "enabled": ip_profile_source_data["dhcp-enabled"]
1045 }
1046
1047 vld_params["ip-profile"] = ip_profile_dest_data
1048 # update vld_params with instantiation params
1049 if vnf_params:
1050 vld_instantiation_params = find_in_list(
1051 get_iterable(vnf_params, "internal-vld"),
1052 lambda i_vld: i_vld["name"] == vld["id"],
1053 )
1054 if vld_instantiation_params:
1055 vld_params.update(vld_instantiation_params)
1056 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1057
1058 vdur_list = []
1059 for vdur in target_vnf.get("vdur", ()):
1060 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1061 continue # This vdu must not be created
1062 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1063
1064 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1065
1066 if ssh_keys_all:
1067 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1068 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1069 if (
1070 vdu_configuration
1071 and vdu_configuration.get("config-access")
1072 and vdu_configuration.get("config-access").get("ssh-access")
1073 ):
1074 vdur["ssh-keys"] = ssh_keys_all
1075 vdur["ssh-access-required"] = vdu_configuration[
1076 "config-access"
1077 ]["ssh-access"]["required"]
1078 elif (
1079 vnf_configuration
1080 and vnf_configuration.get("config-access")
1081 and vnf_configuration.get("config-access").get("ssh-access")
1082 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1083 ):
1084 vdur["ssh-keys"] = ssh_keys_all
1085 vdur["ssh-access-required"] = vnf_configuration[
1086 "config-access"
1087 ]["ssh-access"]["required"]
1088 elif ssh_keys_instantiation and find_in_list(
1089 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1090 ):
1091 vdur["ssh-keys"] = ssh_keys_instantiation
1092
1093 self.logger.debug("NS > vdur > {}".format(vdur))
1094
1095 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1096 # cloud-init
1097 if vdud.get("cloud-init-file"):
1098 vdur["cloud-init"] = "{}:file:{}".format(
1099 vnfd["_id"], vdud.get("cloud-init-file")
1100 )
1101 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1102 if vdur["cloud-init"] not in target["cloud_init_content"]:
1103 base_folder = vnfd["_admin"]["storage"]
1104 if base_folder["pkg-dir"]:
1105 cloud_init_file = "{}/{}/cloud_init/{}".format(
1106 base_folder["folder"],
1107 base_folder["pkg-dir"],
1108 vdud.get("cloud-init-file"),
1109 )
1110 else:
1111 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1112 base_folder["folder"],
1113 vdud.get("cloud-init-file"),
1114 )
1115 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1116 target["cloud_init_content"][
1117 vdur["cloud-init"]
1118 ] = ci_file.read()
1119 elif vdud.get("cloud-init"):
1120 vdur["cloud-init"] = "{}:vdu:{}".format(
1121 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1122 )
1123 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1124 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1125 "cloud-init"
1126 ]
1127 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1128 deploy_params_vdu = self._format_additional_params(
1129 vdur.get("additionalParams") or {}
1130 )
1131 deploy_params_vdu["OSM"] = get_osm_params(
1132 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1133 )
1134 vdur["additionalParams"] = deploy_params_vdu
1135
1136 # flavor
1137 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1138 if target_vim not in ns_flavor["vim_info"]:
1139 ns_flavor["vim_info"][target_vim] = {}
1140
1141 # deal with images
1142 # in case alternative images are provided we must check if they should be applied
1143 # for the vim_type, modify the vim_type taking into account
1144 ns_image_id = int(vdur["ns-image-id"])
1145 if vdur.get("alt-image-ids"):
1146 db_vim = get_vim_account(vnfr["vim-account-id"])
1147 vim_type = db_vim["vim_type"]
1148 for alt_image_id in vdur.get("alt-image-ids"):
1149 ns_alt_image = target["image"][int(alt_image_id)]
1150 if vim_type == ns_alt_image.get("vim-type"):
1151 # must use alternative image
1152 self.logger.debug(
1153 "use alternative image id: {}".format(alt_image_id)
1154 )
1155 ns_image_id = alt_image_id
1156 vdur["ns-image-id"] = ns_image_id
1157 break
1158 ns_image = target["image"][int(ns_image_id)]
1159 if target_vim not in ns_image["vim_info"]:
1160 ns_image["vim_info"][target_vim] = {}
1161
1162 vdur["vim_info"] = {target_vim: {}}
1163 # instantiation parameters
1164 # if vnf_params:
1165 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1166 # vdud["id"]), None)
1167 vdur_list.append(vdur)
1168 target_vnf["vdur"] = vdur_list
1169 target["vnf"].append(target_vnf)
1170
1171 desc = await self.RO.deploy(nsr_id, target)
1172 self.logger.debug("RO return > {}".format(desc))
1173 action_id = desc["action_id"]
1174 await self._wait_ng_ro(
1175 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1176 )
1177
1178 # Updating NSR
1179 db_nsr_update = {
1180 "_admin.deployed.RO.operational-status": "running",
1181 "detailed-status": " ".join(stage),
1182 }
1183 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1184 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1185 self._write_op_status(nslcmop_id, stage)
1186 self.logger.debug(
1187 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1188 )
1189 return
1190
1191 async def _wait_ng_ro(
1192 self,
1193 nsr_id,
1194 action_id,
1195 nslcmop_id=None,
1196 start_time=None,
1197 timeout=600,
1198 stage=None,
1199 ):
1200 detailed_status_old = None
1201 db_nsr_update = {}
1202 start_time = start_time or time()
1203 while time() <= start_time + timeout:
1204 desc_status = await self.RO.status(nsr_id, action_id)
1205 self.logger.debug("Wait NG RO > {}".format(desc_status))
1206 if desc_status["status"] == "FAILED":
1207 raise NgRoException(desc_status["details"])
1208 elif desc_status["status"] == "BUILD":
1209 if stage:
1210 stage[2] = "VIM: ({})".format(desc_status["details"])
1211 elif desc_status["status"] == "DONE":
1212 if stage:
1213 stage[2] = "Deployed at VIM"
1214 break
1215 else:
1216 assert False, "ROclient.check_ns_status returns unknown {}".format(
1217 desc_status["status"]
1218 )
1219 if stage and nslcmop_id and stage[2] != detailed_status_old:
1220 detailed_status_old = stage[2]
1221 db_nsr_update["detailed-status"] = " ".join(stage)
1222 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1223 self._write_op_status(nslcmop_id, stage)
1224 await asyncio.sleep(15, loop=self.loop)
1225 else: # timeout_ns_deploy
1226 raise NgRoException("Timeout waiting ns to deploy")
1227
1228 async def _terminate_ng_ro(
1229 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1230 ):
1231 db_nsr_update = {}
1232 failed_detail = []
1233 action_id = None
1234 start_deploy = time()
1235 try:
1236 target = {
1237 "ns": {"vld": []},
1238 "vnf": [],
1239 "image": [],
1240 "flavor": [],
1241 "action_id": nslcmop_id,
1242 }
1243 desc = await self.RO.deploy(nsr_id, target)
1244 action_id = desc["action_id"]
1245 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1246 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1247 self.logger.debug(
1248 logging_text
1249 + "ns terminate action at RO. action_id={}".format(action_id)
1250 )
1251
1252 # wait until done
1253 delete_timeout = 20 * 60 # 20 minutes
1254 await self._wait_ng_ro(
1255 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1256 )
1257
1258 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1259 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1260 # delete all nsr
1261 await self.RO.delete(nsr_id)
1262 except Exception as e:
1263 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1264 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1265 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1266 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1267 self.logger.debug(
1268 logging_text + "RO_action_id={} already deleted".format(action_id)
1269 )
1270 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1271 failed_detail.append("delete conflict: {}".format(e))
1272 self.logger.debug(
1273 logging_text
1274 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1275 )
1276 else:
1277 failed_detail.append("delete error: {}".format(e))
1278 self.logger.error(
1279 logging_text
1280 + "RO_action_id={} delete error: {}".format(action_id, e)
1281 )
1282
1283 if failed_detail:
1284 stage[2] = "Error deleting from VIM"
1285 else:
1286 stage[2] = "Deleted from VIM"
1287 db_nsr_update["detailed-status"] = " ".join(stage)
1288 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1289 self._write_op_status(nslcmop_id, stage)
1290
1291 if failed_detail:
1292 raise LcmException("; ".join(failed_detail))
1293 return
1294
1295 async def instantiate_RO(
1296 self,
1297 logging_text,
1298 nsr_id,
1299 nsd,
1300 db_nsr,
1301 db_nslcmop,
1302 db_vnfrs,
1303 db_vnfds,
1304 n2vc_key_list,
1305 stage,
1306 ):
1307 """
1308 Instantiate at RO
1309 :param logging_text: preffix text to use at logging
1310 :param nsr_id: nsr identity
1311 :param nsd: database content of ns descriptor
1312 :param db_nsr: database content of ns record
1313 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1314 :param db_vnfrs:
1315 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1316 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1317 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1318 :return: None or exception
1319 """
1320 try:
1321 start_deploy = time()
1322 ns_params = db_nslcmop.get("operationParams")
1323 if ns_params and ns_params.get("timeout_ns_deploy"):
1324 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1325 else:
1326 timeout_ns_deploy = self.timeout.get(
1327 "ns_deploy", self.timeout_ns_deploy
1328 )
1329
1330 # Check for and optionally request placement optimization. Database will be updated if placement activated
1331 stage[2] = "Waiting for Placement."
1332 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1333 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1334 for vnfr in db_vnfrs.values():
1335 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1336 break
1337 else:
1338 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1339
1340 return await self._instantiate_ng_ro(
1341 logging_text,
1342 nsr_id,
1343 nsd,
1344 db_nsr,
1345 db_nslcmop,
1346 db_vnfrs,
1347 db_vnfds,
1348 n2vc_key_list,
1349 stage,
1350 start_deploy,
1351 timeout_ns_deploy,
1352 )
1353 except Exception as e:
1354 stage[2] = "ERROR deploying at VIM"
1355 self.set_vnfr_at_error(db_vnfrs, str(e))
1356 self.logger.error(
1357 "Error deploying at VIM {}".format(e),
1358 exc_info=not isinstance(
1359 e,
1360 (
1361 ROclient.ROClientException,
1362 LcmException,
1363 DbException,
1364 NgRoException,
1365 ),
1366 ),
1367 )
1368 raise
1369
1370 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1371 """
1372 Wait for kdu to be up, get ip address
1373 :param logging_text: prefix use for logging
1374 :param nsr_id:
1375 :param vnfr_id:
1376 :param kdu_name:
1377 :return: IP address
1378 """
1379
1380 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1381 nb_tries = 0
1382
1383 while nb_tries < 360:
1384 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1385 kdur = next(
1386 (
1387 x
1388 for x in get_iterable(db_vnfr, "kdur")
1389 if x.get("kdu-name") == kdu_name
1390 ),
1391 None,
1392 )
1393 if not kdur:
1394 raise LcmException(
1395 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1396 )
1397 if kdur.get("status"):
1398 if kdur["status"] in ("READY", "ENABLED"):
1399 return kdur.get("ip-address")
1400 else:
1401 raise LcmException(
1402 "target KDU={} is in error state".format(kdu_name)
1403 )
1404
1405 await asyncio.sleep(10, loop=self.loop)
1406 nb_tries += 1
1407 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1408
1409 async def wait_vm_up_insert_key_ro(
1410 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1411 ):
1412 """
1413 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1414 :param logging_text: prefix use for logging
1415 :param nsr_id:
1416 :param vnfr_id:
1417 :param vdu_id:
1418 :param vdu_index:
1419 :param pub_key: public ssh key to inject, None to skip
1420 :param user: user to apply the public ssh key
1421 :return: IP address
1422 """
1423
1424 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1425 ro_nsr_id = None
1426 ip_address = None
1427 nb_tries = 0
1428 target_vdu_id = None
1429 ro_retries = 0
1430
1431 while True:
1432
1433 ro_retries += 1
1434 if ro_retries >= 360: # 1 hour
1435 raise LcmException(
1436 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1437 )
1438
1439 await asyncio.sleep(10, loop=self.loop)
1440
1441 # get ip address
1442 if not target_vdu_id:
1443 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1444
1445 if not vdu_id: # for the VNF case
1446 if db_vnfr.get("status") == "ERROR":
1447 raise LcmException(
1448 "Cannot inject ssh-key because target VNF is in error state"
1449 )
1450 ip_address = db_vnfr.get("ip-address")
1451 if not ip_address:
1452 continue
1453 vdur = next(
1454 (
1455 x
1456 for x in get_iterable(db_vnfr, "vdur")
1457 if x.get("ip-address") == ip_address
1458 ),
1459 None,
1460 )
1461 else: # VDU case
1462 vdur = next(
1463 (
1464 x
1465 for x in get_iterable(db_vnfr, "vdur")
1466 if x.get("vdu-id-ref") == vdu_id
1467 and x.get("count-index") == vdu_index
1468 ),
1469 None,
1470 )
1471
1472 if (
1473 not vdur and len(db_vnfr.get("vdur", ())) == 1
1474 ): # If only one, this should be the target vdu
1475 vdur = db_vnfr["vdur"][0]
1476 if not vdur:
1477 raise LcmException(
1478 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1479 vnfr_id, vdu_id, vdu_index
1480 )
1481 )
1482 # New generation RO stores information at "vim_info"
1483 ng_ro_status = None
1484 target_vim = None
1485 if vdur.get("vim_info"):
1486 target_vim = next(
1487 t for t in vdur["vim_info"]
1488 ) # there should be only one key
1489 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1490 if (
1491 vdur.get("pdu-type")
1492 or vdur.get("status") == "ACTIVE"
1493 or ng_ro_status == "ACTIVE"
1494 ):
1495 ip_address = vdur.get("ip-address")
1496 if not ip_address:
1497 continue
1498 target_vdu_id = vdur["vdu-id-ref"]
1499 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1500 raise LcmException(
1501 "Cannot inject ssh-key because target VM is in error state"
1502 )
1503
1504 if not target_vdu_id:
1505 continue
1506
1507 # inject public key into machine
1508 if pub_key and user:
1509 self.logger.debug(logging_text + "Inserting RO key")
1510 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1511 if vdur.get("pdu-type"):
1512 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1513 return ip_address
1514 try:
1515 ro_vm_id = "{}-{}".format(
1516 db_vnfr["member-vnf-index-ref"], target_vdu_id
1517 ) # TODO add vdu_index
1518 if self.ng_ro:
1519 target = {
1520 "action": {
1521 "action": "inject_ssh_key",
1522 "key": pub_key,
1523 "user": user,
1524 },
1525 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1526 }
1527 desc = await self.RO.deploy(nsr_id, target)
1528 action_id = desc["action_id"]
1529 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1530 break
1531 else:
1532 # wait until NS is deployed at RO
1533 if not ro_nsr_id:
1534 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1535 ro_nsr_id = deep_get(
1536 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1537 )
1538 if not ro_nsr_id:
1539 continue
1540 result_dict = await self.RO.create_action(
1541 item="ns",
1542 item_id_name=ro_nsr_id,
1543 descriptor={
1544 "add_public_key": pub_key,
1545 "vms": [ro_vm_id],
1546 "user": user,
1547 },
1548 )
1549 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1550 if not result_dict or not isinstance(result_dict, dict):
1551 raise LcmException(
1552 "Unknown response from RO when injecting key"
1553 )
1554 for result in result_dict.values():
1555 if result.get("vim_result") == 200:
1556 break
1557 else:
1558 raise ROclient.ROClientException(
1559 "error injecting key: {}".format(
1560 result.get("description")
1561 )
1562 )
1563 break
1564 except NgRoException as e:
1565 raise LcmException(
1566 "Reaching max tries injecting key. Error: {}".format(e)
1567 )
1568 except ROclient.ROClientException as e:
1569 if not nb_tries:
1570 self.logger.debug(
1571 logging_text
1572 + "error injecting key: {}. Retrying until {} seconds".format(
1573 e, 20 * 10
1574 )
1575 )
1576 nb_tries += 1
1577 if nb_tries >= 20:
1578 raise LcmException(
1579 "Reaching max tries injecting key. Error: {}".format(e)
1580 )
1581 else:
1582 break
1583
1584 return ip_address
1585
1586 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1587 """
1588 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1589 """
1590 my_vca = vca_deployed_list[vca_index]
1591 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1592 # vdu or kdu: no dependencies
1593 return
1594 timeout = 300
1595 while timeout >= 0:
1596 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1597 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1598 configuration_status_list = db_nsr["configurationStatus"]
1599 for index, vca_deployed in enumerate(configuration_status_list):
1600 if index == vca_index:
1601 # myself
1602 continue
1603 if not my_vca.get("member-vnf-index") or (
1604 vca_deployed.get("member-vnf-index")
1605 == my_vca.get("member-vnf-index")
1606 ):
1607 internal_status = configuration_status_list[index].get("status")
1608 if internal_status == "READY":
1609 continue
1610 elif internal_status == "BROKEN":
1611 raise LcmException(
1612 "Configuration aborted because dependent charm/s has failed"
1613 )
1614 else:
1615 break
1616 else:
1617 # no dependencies, return
1618 return
1619 await asyncio.sleep(10)
1620 timeout -= 1
1621
1622 raise LcmException("Configuration aborted because dependent charm/s timeout")
1623
1624 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1625 vca_id = None
1626 if db_vnfr:
1627 vca_id = deep_get(db_vnfr, ("vca-id",))
1628 elif db_nsr:
1629 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1630 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1631 return vca_id
1632
1633 async def instantiate_N2VC(
1634 self,
1635 logging_text,
1636 vca_index,
1637 nsi_id,
1638 db_nsr,
1639 db_vnfr,
1640 vdu_id,
1641 kdu_name,
1642 vdu_index,
1643 config_descriptor,
1644 deploy_params,
1645 base_folder,
1646 nslcmop_id,
1647 stage,
1648 vca_type,
1649 vca_name,
1650 ee_config_descriptor,
1651 ):
1652 nsr_id = db_nsr["_id"]
1653 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1654 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1655 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1656 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1657 db_dict = {
1658 "collection": "nsrs",
1659 "filter": {"_id": nsr_id},
1660 "path": db_update_entry,
1661 }
1662 step = ""
1663 try:
1664
1665 element_type = "NS"
1666 element_under_configuration = nsr_id
1667
1668 vnfr_id = None
1669 if db_vnfr:
1670 vnfr_id = db_vnfr["_id"]
1671 osm_config["osm"]["vnf_id"] = vnfr_id
1672
1673 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1674
1675 if vca_type == "native_charm":
1676 index_number = 0
1677 else:
1678 index_number = vdu_index or 0
1679
1680 if vnfr_id:
1681 element_type = "VNF"
1682 element_under_configuration = vnfr_id
1683 namespace += ".{}-{}".format(vnfr_id, index_number)
1684 if vdu_id:
1685 namespace += ".{}-{}".format(vdu_id, index_number)
1686 element_type = "VDU"
1687 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1688 osm_config["osm"]["vdu_id"] = vdu_id
1689 elif kdu_name:
1690 namespace += ".{}".format(kdu_name)
1691 element_type = "KDU"
1692 element_under_configuration = kdu_name
1693 osm_config["osm"]["kdu_name"] = kdu_name
1694
1695 # Get artifact path
1696 if base_folder["pkg-dir"]:
1697 artifact_path = "{}/{}/{}/{}".format(
1698 base_folder["folder"],
1699 base_folder["pkg-dir"],
1700 "charms"
1701 if vca_type
1702 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1703 else "helm-charts",
1704 vca_name,
1705 )
1706 else:
1707 artifact_path = "{}/Scripts/{}/{}/".format(
1708 base_folder["folder"],
1709 "charms"
1710 if vca_type
1711 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1712 else "helm-charts",
1713 vca_name,
1714 )
1715
1716 self.logger.debug("Artifact path > {}".format(artifact_path))
1717
1718 # get initial_config_primitive_list that applies to this element
1719 initial_config_primitive_list = config_descriptor.get(
1720 "initial-config-primitive"
1721 )
1722
1723 self.logger.debug(
1724 "Initial config primitive list > {}".format(
1725 initial_config_primitive_list
1726 )
1727 )
1728
1729 # add config if not present for NS charm
1730 ee_descriptor_id = ee_config_descriptor.get("id")
1731 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1732 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1733 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1734 )
1735
1736 self.logger.debug(
1737 "Initial config primitive list #2 > {}".format(
1738 initial_config_primitive_list
1739 )
1740 )
1741 # n2vc_redesign STEP 3.1
1742 # find old ee_id if exists
1743 ee_id = vca_deployed.get("ee_id")
1744
1745 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1746 # create or register execution environment in VCA
1747 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1748
1749 self._write_configuration_status(
1750 nsr_id=nsr_id,
1751 vca_index=vca_index,
1752 status="CREATING",
1753 element_under_configuration=element_under_configuration,
1754 element_type=element_type,
1755 )
1756
1757 step = "create execution environment"
1758 self.logger.debug(logging_text + step)
1759
1760 ee_id = None
1761 credentials = None
1762 if vca_type == "k8s_proxy_charm":
1763 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1764 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1765 namespace=namespace,
1766 artifact_path=artifact_path,
1767 db_dict=db_dict,
1768 vca_id=vca_id,
1769 )
1770 elif vca_type == "helm" or vca_type == "helm-v3":
1771 ee_id, credentials = await self.vca_map[
1772 vca_type
1773 ].create_execution_environment(
1774 namespace=namespace,
1775 reuse_ee_id=ee_id,
1776 db_dict=db_dict,
1777 config=osm_config,
1778 artifact_path=artifact_path,
1779 vca_type=vca_type,
1780 )
1781 else:
1782 ee_id, credentials = await self.vca_map[
1783 vca_type
1784 ].create_execution_environment(
1785 namespace=namespace,
1786 reuse_ee_id=ee_id,
1787 db_dict=db_dict,
1788 vca_id=vca_id,
1789 )
1790
1791 elif vca_type == "native_charm":
1792 step = "Waiting to VM being up and getting IP address"
1793 self.logger.debug(logging_text + step)
1794 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1795 logging_text,
1796 nsr_id,
1797 vnfr_id,
1798 vdu_id,
1799 vdu_index,
1800 user=None,
1801 pub_key=None,
1802 )
1803 credentials = {"hostname": rw_mgmt_ip}
1804 # get username
1805 username = deep_get(
1806 config_descriptor, ("config-access", "ssh-access", "default-user")
1807 )
1808 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1809 # merged. Meanwhile let's get username from initial-config-primitive
1810 if not username and initial_config_primitive_list:
1811 for config_primitive in initial_config_primitive_list:
1812 for param in config_primitive.get("parameter", ()):
1813 if param["name"] == "ssh-username":
1814 username = param["value"]
1815 break
1816 if not username:
1817 raise LcmException(
1818 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1819 "'config-access.ssh-access.default-user'"
1820 )
1821 credentials["username"] = username
1822 # n2vc_redesign STEP 3.2
1823
1824 self._write_configuration_status(
1825 nsr_id=nsr_id,
1826 vca_index=vca_index,
1827 status="REGISTERING",
1828 element_under_configuration=element_under_configuration,
1829 element_type=element_type,
1830 )
1831
1832 step = "register execution environment {}".format(credentials)
1833 self.logger.debug(logging_text + step)
1834 ee_id = await self.vca_map[vca_type].register_execution_environment(
1835 credentials=credentials,
1836 namespace=namespace,
1837 db_dict=db_dict,
1838 vca_id=vca_id,
1839 )
1840
1841 # for compatibility with MON/POL modules, the need model and application name at database
1842 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1843 ee_id_parts = ee_id.split(".")
1844 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1845 if len(ee_id_parts) >= 2:
1846 model_name = ee_id_parts[0]
1847 application_name = ee_id_parts[1]
1848 db_nsr_update[db_update_entry + "model"] = model_name
1849 db_nsr_update[db_update_entry + "application"] = application_name
1850
1851 # n2vc_redesign STEP 3.3
1852 step = "Install configuration Software"
1853
1854 self._write_configuration_status(
1855 nsr_id=nsr_id,
1856 vca_index=vca_index,
1857 status="INSTALLING SW",
1858 element_under_configuration=element_under_configuration,
1859 element_type=element_type,
1860 other_update=db_nsr_update,
1861 )
1862
1863 # TODO check if already done
1864 self.logger.debug(logging_text + step)
1865 config = None
1866 if vca_type == "native_charm":
1867 config_primitive = next(
1868 (p for p in initial_config_primitive_list if p["name"] == "config"),
1869 None,
1870 )
1871 if config_primitive:
1872 config = self._map_primitive_params(
1873 config_primitive, {}, deploy_params
1874 )
1875 num_units = 1
1876 if vca_type == "lxc_proxy_charm":
1877 if element_type == "NS":
1878 num_units = db_nsr.get("config-units") or 1
1879 elif element_type == "VNF":
1880 num_units = db_vnfr.get("config-units") or 1
1881 elif element_type == "VDU":
1882 for v in db_vnfr["vdur"]:
1883 if vdu_id == v["vdu-id-ref"]:
1884 num_units = v.get("config-units") or 1
1885 break
1886 if vca_type != "k8s_proxy_charm":
1887 await self.vca_map[vca_type].install_configuration_sw(
1888 ee_id=ee_id,
1889 artifact_path=artifact_path,
1890 db_dict=db_dict,
1891 config=config,
1892 num_units=num_units,
1893 vca_id=vca_id,
1894 vca_type=vca_type,
1895 )
1896
1897 # write in db flag of configuration_sw already installed
1898 self.update_db_2(
1899 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1900 )
1901
1902 # add relations for this VCA (wait for other peers related with this VCA)
1903 await self._add_vca_relations(
1904 logging_text=logging_text,
1905 nsr_id=nsr_id,
1906 vca_type=vca_type,
1907 vca_index=vca_index,
1908 )
1909
1910 # if SSH access is required, then get execution environment SSH public
1911 # if native charm we have waited already to VM be UP
1912 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1913 pub_key = None
1914 user = None
1915 # self.logger.debug("get ssh key block")
1916 if deep_get(
1917 config_descriptor, ("config-access", "ssh-access", "required")
1918 ):
1919 # self.logger.debug("ssh key needed")
1920 # Needed to inject a ssh key
1921 user = deep_get(
1922 config_descriptor,
1923 ("config-access", "ssh-access", "default-user"),
1924 )
1925 step = "Install configuration Software, getting public ssh key"
1926 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1927 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1928 )
1929
1930 step = "Insert public key into VM user={} ssh_key={}".format(
1931 user, pub_key
1932 )
1933 else:
1934 # self.logger.debug("no need to get ssh key")
1935 step = "Waiting to VM being up and getting IP address"
1936 self.logger.debug(logging_text + step)
1937
1938 # n2vc_redesign STEP 5.1
1939 # wait for RO (ip-address) Insert pub_key into VM
1940 if vnfr_id:
1941 if kdu_name:
1942 rw_mgmt_ip = await self.wait_kdu_up(
1943 logging_text, nsr_id, vnfr_id, kdu_name
1944 )
1945 else:
1946 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1947 logging_text,
1948 nsr_id,
1949 vnfr_id,
1950 vdu_id,
1951 vdu_index,
1952 user=user,
1953 pub_key=pub_key,
1954 )
1955 else:
1956 rw_mgmt_ip = None # This is for a NS configuration
1957
1958 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1959
1960 # store rw_mgmt_ip in deploy params for later replacement
1961 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1962
1963 # n2vc_redesign STEP 6 Execute initial config primitive
1964 step = "execute initial config primitive"
1965
1966 # wait for dependent primitives execution (NS -> VNF -> VDU)
1967 if initial_config_primitive_list:
1968 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1969
1970 # stage, in function of element type: vdu, kdu, vnf or ns
1971 my_vca = vca_deployed_list[vca_index]
1972 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1973 # VDU or KDU
1974 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1975 elif my_vca.get("member-vnf-index"):
1976 # VNF
1977 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1978 else:
1979 # NS
1980 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1981
1982 self._write_configuration_status(
1983 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1984 )
1985
1986 self._write_op_status(op_id=nslcmop_id, stage=stage)
1987
1988 check_if_terminated_needed = True
1989 for initial_config_primitive in initial_config_primitive_list:
1990 # adding information on the vca_deployed if it is a NS execution environment
1991 if not vca_deployed["member-vnf-index"]:
1992 deploy_params["ns_config_info"] = json.dumps(
1993 self._get_ns_config_info(nsr_id)
1994 )
1995 # TODO check if already done
1996 primitive_params_ = self._map_primitive_params(
1997 initial_config_primitive, {}, deploy_params
1998 )
1999
2000 step = "execute primitive '{}' params '{}'".format(
2001 initial_config_primitive["name"], primitive_params_
2002 )
2003 self.logger.debug(logging_text + step)
2004 await self.vca_map[vca_type].exec_primitive(
2005 ee_id=ee_id,
2006 primitive_name=initial_config_primitive["name"],
2007 params_dict=primitive_params_,
2008 db_dict=db_dict,
2009 vca_id=vca_id,
2010 vca_type=vca_type,
2011 )
2012 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2013 if check_if_terminated_needed:
2014 if config_descriptor.get("terminate-config-primitive"):
2015 self.update_db_2(
2016 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2017 )
2018 check_if_terminated_needed = False
2019
2020 # TODO register in database that primitive is done
2021
2022 # STEP 7 Configure metrics
2023 if vca_type == "helm" or vca_type == "helm-v3":
2024 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2025 ee_id=ee_id,
2026 artifact_path=artifact_path,
2027 ee_config_descriptor=ee_config_descriptor,
2028 vnfr_id=vnfr_id,
2029 nsr_id=nsr_id,
2030 target_ip=rw_mgmt_ip,
2031 )
2032 if prometheus_jobs:
2033 self.update_db_2(
2034 "nsrs",
2035 nsr_id,
2036 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2037 )
2038
2039 for job in prometheus_jobs:
2040 self.db.set_one(
2041 "prometheus_jobs",
2042 {"job_name": job["job_name"]},
2043 job,
2044 upsert=True,
2045 fail_on_empty=False,
2046 )
2047
2048 step = "instantiated at VCA"
2049 self.logger.debug(logging_text + step)
2050
2051 self._write_configuration_status(
2052 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2053 )
2054
2055 except Exception as e: # TODO not use Exception but N2VC exception
2056 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2057 if not isinstance(
2058 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2059 ):
2060 self.logger.error(
2061 "Exception while {} : {}".format(step, e), exc_info=True
2062 )
2063 self._write_configuration_status(
2064 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2065 )
2066 raise LcmException("{} {}".format(step, e)) from e
2067
2068 def _write_ns_status(
2069 self,
2070 nsr_id: str,
2071 ns_state: str,
2072 current_operation: str,
2073 current_operation_id: str,
2074 error_description: str = None,
2075 error_detail: str = None,
2076 other_update: dict = None,
2077 ):
2078 """
2079 Update db_nsr fields.
2080 :param nsr_id:
2081 :param ns_state:
2082 :param current_operation:
2083 :param current_operation_id:
2084 :param error_description:
2085 :param error_detail:
2086 :param other_update: Other required changes at database if provided, will be cleared
2087 :return:
2088 """
2089 try:
2090 db_dict = other_update or {}
2091 db_dict[
2092 "_admin.nslcmop"
2093 ] = current_operation_id # for backward compatibility
2094 db_dict["_admin.current-operation"] = current_operation_id
2095 db_dict["_admin.operation-type"] = (
2096 current_operation if current_operation != "IDLE" else None
2097 )
2098 db_dict["currentOperation"] = current_operation
2099 db_dict["currentOperationID"] = current_operation_id
2100 db_dict["errorDescription"] = error_description
2101 db_dict["errorDetail"] = error_detail
2102
2103 if ns_state:
2104 db_dict["nsState"] = ns_state
2105 self.update_db_2("nsrs", nsr_id, db_dict)
2106 except DbException as e:
2107 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2108
2109 def _write_op_status(
2110 self,
2111 op_id: str,
2112 stage: list = None,
2113 error_message: str = None,
2114 queuePosition: int = 0,
2115 operation_state: str = None,
2116 other_update: dict = None,
2117 ):
2118 try:
2119 db_dict = other_update or {}
2120 db_dict["queuePosition"] = queuePosition
2121 if isinstance(stage, list):
2122 db_dict["stage"] = stage[0]
2123 db_dict["detailed-status"] = " ".join(stage)
2124 elif stage is not None:
2125 db_dict["stage"] = str(stage)
2126
2127 if error_message is not None:
2128 db_dict["errorMessage"] = error_message
2129 if operation_state is not None:
2130 db_dict["operationState"] = operation_state
2131 db_dict["statusEnteredTime"] = time()
2132 self.update_db_2("nslcmops", op_id, db_dict)
2133 except DbException as e:
2134 self.logger.warn(
2135 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2136 )
2137
2138 def _write_all_config_status(self, db_nsr: dict, status: str):
2139 try:
2140 nsr_id = db_nsr["_id"]
2141 # configurationStatus
2142 config_status = db_nsr.get("configurationStatus")
2143 if config_status:
2144 db_nsr_update = {
2145 "configurationStatus.{}.status".format(index): status
2146 for index, v in enumerate(config_status)
2147 if v
2148 }
2149 # update status
2150 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2151
2152 except DbException as e:
2153 self.logger.warn(
2154 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2155 )
2156
2157 def _write_configuration_status(
2158 self,
2159 nsr_id: str,
2160 vca_index: int,
2161 status: str = None,
2162 element_under_configuration: str = None,
2163 element_type: str = None,
2164 other_update: dict = None,
2165 ):
2166
2167 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2168 # .format(vca_index, status))
2169
2170 try:
2171 db_path = "configurationStatus.{}.".format(vca_index)
2172 db_dict = other_update or {}
2173 if status:
2174 db_dict[db_path + "status"] = status
2175 if element_under_configuration:
2176 db_dict[
2177 db_path + "elementUnderConfiguration"
2178 ] = element_under_configuration
2179 if element_type:
2180 db_dict[db_path + "elementType"] = element_type
2181 self.update_db_2("nsrs", nsr_id, db_dict)
2182 except DbException as e:
2183 self.logger.warn(
2184 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2185 status, nsr_id, vca_index, e
2186 )
2187 )
2188
2189 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2190 """
2191 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2192 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2193 Database is used because the result can be obtained from a different LCM worker in case of HA.
2194 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2195 :param db_nslcmop: database content of nslcmop
2196 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2197 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2198 computed 'vim-account-id'
2199 """
2200 modified = False
2201 nslcmop_id = db_nslcmop["_id"]
2202 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2203 if placement_engine == "PLA":
2204 self.logger.debug(
2205 logging_text + "Invoke and wait for placement optimization"
2206 )
2207 await self.msg.aiowrite(
2208 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2209 )
2210 db_poll_interval = 5
2211 wait = db_poll_interval * 10
2212 pla_result = None
2213 while not pla_result and wait >= 0:
2214 await asyncio.sleep(db_poll_interval)
2215 wait -= db_poll_interval
2216 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2217 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2218
2219 if not pla_result:
2220 raise LcmException(
2221 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2222 )
2223
2224 for pla_vnf in pla_result["vnf"]:
2225 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2226 if not pla_vnf.get("vimAccountId") or not vnfr:
2227 continue
2228 modified = True
2229 self.db.set_one(
2230 "vnfrs",
2231 {"_id": vnfr["_id"]},
2232 {"vim-account-id": pla_vnf["vimAccountId"]},
2233 )
2234 # Modifies db_vnfrs
2235 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2236 return modified
2237
2238 def update_nsrs_with_pla_result(self, params):
2239 try:
2240 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2241 self.update_db_2(
2242 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2243 )
2244 except Exception as e:
2245 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2246
2247 async def instantiate(self, nsr_id, nslcmop_id):
2248 """
2249
2250 :param nsr_id: ns instance to deploy
2251 :param nslcmop_id: operation to run
2252 :return:
2253 """
2254
2255 # Try to lock HA task here
2256 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2257 if not task_is_locked_by_me:
2258 self.logger.debug(
2259 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2260 )
2261 return
2262
2263 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2264 self.logger.debug(logging_text + "Enter")
2265
2266 # get all needed from database
2267
2268 # database nsrs record
2269 db_nsr = None
2270
2271 # database nslcmops record
2272 db_nslcmop = None
2273
2274 # update operation on nsrs
2275 db_nsr_update = {}
2276 # update operation on nslcmops
2277 db_nslcmop_update = {}
2278
2279 nslcmop_operation_state = None
2280 db_vnfrs = {} # vnf's info indexed by member-index
2281 # n2vc_info = {}
2282 tasks_dict_info = {} # from task to info text
2283 exc = None
2284 error_list = []
2285 stage = [
2286 "Stage 1/5: preparation of the environment.",
2287 "Waiting for previous operations to terminate.",
2288 "",
2289 ]
2290 # ^ stage, step, VIM progress
2291 try:
2292 # wait for any previous tasks in process
2293 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2294
2295 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2296 stage[1] = "Reading from database."
2297 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2298 db_nsr_update["detailed-status"] = "creating"
2299 db_nsr_update["operational-status"] = "init"
2300 self._write_ns_status(
2301 nsr_id=nsr_id,
2302 ns_state="BUILDING",
2303 current_operation="INSTANTIATING",
2304 current_operation_id=nslcmop_id,
2305 other_update=db_nsr_update,
2306 )
2307 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2308
2309 # read from db: operation
2310 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2311 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2312 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2313 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2314 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2315 )
2316 ns_params = db_nslcmop.get("operationParams")
2317 if ns_params and ns_params.get("timeout_ns_deploy"):
2318 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2319 else:
2320 timeout_ns_deploy = self.timeout.get(
2321 "ns_deploy", self.timeout_ns_deploy
2322 )
2323
2324 # read from db: ns
2325 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2326 self.logger.debug(logging_text + stage[1])
2327 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2328 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2329 self.logger.debug(logging_text + stage[1])
2330 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2331 self.fs.sync(db_nsr["nsd-id"])
2332 db_nsr["nsd"] = nsd
2333 # nsr_name = db_nsr["name"] # TODO short-name??
2334
2335 # read from db: vnf's of this ns
2336 stage[1] = "Getting vnfrs from db."
2337 self.logger.debug(logging_text + stage[1])
2338 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2339
2340 # read from db: vnfd's for every vnf
2341 db_vnfds = [] # every vnfd data
2342
2343 # for each vnf in ns, read vnfd
2344 for vnfr in db_vnfrs_list:
2345 if vnfr.get("kdur"):
2346 kdur_list = []
2347 for kdur in vnfr["kdur"]:
2348 if kdur.get("additionalParams"):
2349 kdur["additionalParams"] = json.loads(kdur["additionalParams"])
2350 kdur_list.append(kdur)
2351 vnfr["kdur"] = kdur_list
2352
2353 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2354 vnfd_id = vnfr["vnfd-id"]
2355 vnfd_ref = vnfr["vnfd-ref"]
2356 self.fs.sync(vnfd_id)
2357
2358 # if we haven't this vnfd, read it from db
2359 if vnfd_id not in db_vnfds:
2360 # read from db
2361 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2362 vnfd_id, vnfd_ref
2363 )
2364 self.logger.debug(logging_text + stage[1])
2365 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2366
2367 # store vnfd
2368 db_vnfds.append(vnfd)
2369
2370 # Get or generates the _admin.deployed.VCA list
2371 vca_deployed_list = None
2372 if db_nsr["_admin"].get("deployed"):
2373 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2374 if vca_deployed_list is None:
2375 vca_deployed_list = []
2376 configuration_status_list = []
2377 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2378 db_nsr_update["configurationStatus"] = configuration_status_list
2379 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2380 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2381 elif isinstance(vca_deployed_list, dict):
2382 # maintain backward compatibility. Change a dict to list at database
2383 vca_deployed_list = list(vca_deployed_list.values())
2384 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2385 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2386
2387 if not isinstance(
2388 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2389 ):
2390 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2391 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2392
2393 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2394 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2395 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2396 self.db.set_list(
2397 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2398 )
2399
2400 # n2vc_redesign STEP 2 Deploy Network Scenario
2401 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2402 self._write_op_status(op_id=nslcmop_id, stage=stage)
2403
2404 stage[1] = "Deploying KDUs."
2405 # self.logger.debug(logging_text + "Before deploy_kdus")
2406 # Call to deploy_kdus in case exists the "vdu:kdu" param
2407 await self.deploy_kdus(
2408 logging_text=logging_text,
2409 nsr_id=nsr_id,
2410 nslcmop_id=nslcmop_id,
2411 db_vnfrs=db_vnfrs,
2412 db_vnfds=db_vnfds,
2413 task_instantiation_info=tasks_dict_info,
2414 )
2415
2416 stage[1] = "Getting VCA public key."
2417 # n2vc_redesign STEP 1 Get VCA public ssh-key
2418 # feature 1429. Add n2vc public key to needed VMs
2419 n2vc_key = self.n2vc.get_public_key()
2420 n2vc_key_list = [n2vc_key]
2421 if self.vca_config.get("public_key"):
2422 n2vc_key_list.append(self.vca_config["public_key"])
2423
2424 stage[1] = "Deploying NS at VIM."
2425 task_ro = asyncio.ensure_future(
2426 self.instantiate_RO(
2427 logging_text=logging_text,
2428 nsr_id=nsr_id,
2429 nsd=nsd,
2430 db_nsr=db_nsr,
2431 db_nslcmop=db_nslcmop,
2432 db_vnfrs=db_vnfrs,
2433 db_vnfds=db_vnfds,
2434 n2vc_key_list=n2vc_key_list,
2435 stage=stage,
2436 )
2437 )
2438 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2439 tasks_dict_info[task_ro] = "Deploying at VIM"
2440
2441 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2442 stage[1] = "Deploying Execution Environments."
2443 self.logger.debug(logging_text + stage[1])
2444
2445 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2446 for vnf_profile in get_vnf_profiles(nsd):
2447 vnfd_id = vnf_profile["vnfd-id"]
2448 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2449 member_vnf_index = str(vnf_profile["id"])
2450 db_vnfr = db_vnfrs[member_vnf_index]
2451 base_folder = vnfd["_admin"]["storage"]
2452 vdu_id = None
2453 vdu_index = 0
2454 vdu_name = None
2455 kdu_name = None
2456
2457 # Get additional parameters
2458 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2459 if db_vnfr.get("additionalParamsForVnf"):
2460 deploy_params.update(
2461 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2462 )
2463
2464 descriptor_config = get_configuration(vnfd, vnfd["id"])
2465 if descriptor_config:
2466 self._deploy_n2vc(
2467 logging_text=logging_text
2468 + "member_vnf_index={} ".format(member_vnf_index),
2469 db_nsr=db_nsr,
2470 db_vnfr=db_vnfr,
2471 nslcmop_id=nslcmop_id,
2472 nsr_id=nsr_id,
2473 nsi_id=nsi_id,
2474 vnfd_id=vnfd_id,
2475 vdu_id=vdu_id,
2476 kdu_name=kdu_name,
2477 member_vnf_index=member_vnf_index,
2478 vdu_index=vdu_index,
2479 vdu_name=vdu_name,
2480 deploy_params=deploy_params,
2481 descriptor_config=descriptor_config,
2482 base_folder=base_folder,
2483 task_instantiation_info=tasks_dict_info,
2484 stage=stage,
2485 )
2486
2487 # Deploy charms for each VDU that supports one.
2488 for vdud in get_vdu_list(vnfd):
2489 vdu_id = vdud["id"]
2490 descriptor_config = get_configuration(vnfd, vdu_id)
2491 vdur = find_in_list(
2492 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2493 )
2494
2495 if vdur.get("additionalParams"):
2496 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2497 else:
2498 deploy_params_vdu = deploy_params
2499 deploy_params_vdu["OSM"] = get_osm_params(
2500 db_vnfr, vdu_id, vdu_count_index=0
2501 )
2502 vdud_count = get_number_of_instances(vnfd, vdu_id)
2503
2504 self.logger.debug("VDUD > {}".format(vdud))
2505 self.logger.debug(
2506 "Descriptor config > {}".format(descriptor_config)
2507 )
2508 if descriptor_config:
2509 vdu_name = None
2510 kdu_name = None
2511 for vdu_index in range(vdud_count):
2512 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2513 self._deploy_n2vc(
2514 logging_text=logging_text
2515 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2516 member_vnf_index, vdu_id, vdu_index
2517 ),
2518 db_nsr=db_nsr,
2519 db_vnfr=db_vnfr,
2520 nslcmop_id=nslcmop_id,
2521 nsr_id=nsr_id,
2522 nsi_id=nsi_id,
2523 vnfd_id=vnfd_id,
2524 vdu_id=vdu_id,
2525 kdu_name=kdu_name,
2526 member_vnf_index=member_vnf_index,
2527 vdu_index=vdu_index,
2528 vdu_name=vdu_name,
2529 deploy_params=deploy_params_vdu,
2530 descriptor_config=descriptor_config,
2531 base_folder=base_folder,
2532 task_instantiation_info=tasks_dict_info,
2533 stage=stage,
2534 )
2535 for kdud in get_kdu_list(vnfd):
2536 kdu_name = kdud["name"]
2537 descriptor_config = get_configuration(vnfd, kdu_name)
2538 if descriptor_config:
2539 vdu_id = None
2540 vdu_index = 0
2541 vdu_name = None
2542 kdur = next(
2543 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2544 )
2545 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2546 if kdur.get("additionalParams"):
2547 deploy_params_kdu = parse_yaml_strings(
2548 kdur["additionalParams"]
2549 )
2550
2551 self._deploy_n2vc(
2552 logging_text=logging_text,
2553 db_nsr=db_nsr,
2554 db_vnfr=db_vnfr,
2555 nslcmop_id=nslcmop_id,
2556 nsr_id=nsr_id,
2557 nsi_id=nsi_id,
2558 vnfd_id=vnfd_id,
2559 vdu_id=vdu_id,
2560 kdu_name=kdu_name,
2561 member_vnf_index=member_vnf_index,
2562 vdu_index=vdu_index,
2563 vdu_name=vdu_name,
2564 deploy_params=deploy_params_kdu,
2565 descriptor_config=descriptor_config,
2566 base_folder=base_folder,
2567 task_instantiation_info=tasks_dict_info,
2568 stage=stage,
2569 )
2570
2571 # Check if this NS has a charm configuration
2572 descriptor_config = nsd.get("ns-configuration")
2573 if descriptor_config and descriptor_config.get("juju"):
2574 vnfd_id = None
2575 db_vnfr = None
2576 member_vnf_index = None
2577 vdu_id = None
2578 kdu_name = None
2579 vdu_index = 0
2580 vdu_name = None
2581
2582 # Get additional parameters
2583 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2584 if db_nsr.get("additionalParamsForNs"):
2585 deploy_params.update(
2586 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2587 )
2588 base_folder = nsd["_admin"]["storage"]
2589 self._deploy_n2vc(
2590 logging_text=logging_text,
2591 db_nsr=db_nsr,
2592 db_vnfr=db_vnfr,
2593 nslcmop_id=nslcmop_id,
2594 nsr_id=nsr_id,
2595 nsi_id=nsi_id,
2596 vnfd_id=vnfd_id,
2597 vdu_id=vdu_id,
2598 kdu_name=kdu_name,
2599 member_vnf_index=member_vnf_index,
2600 vdu_index=vdu_index,
2601 vdu_name=vdu_name,
2602 deploy_params=deploy_params,
2603 descriptor_config=descriptor_config,
2604 base_folder=base_folder,
2605 task_instantiation_info=tasks_dict_info,
2606 stage=stage,
2607 )
2608
2609 # rest of staff will be done at finally
2610
2611 except (
2612 ROclient.ROClientException,
2613 DbException,
2614 LcmException,
2615 N2VCException,
2616 ) as e:
2617 self.logger.error(
2618 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2619 )
2620 exc = e
2621 except asyncio.CancelledError:
2622 self.logger.error(
2623 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2624 )
2625 exc = "Operation was cancelled"
2626 except Exception as e:
2627 exc = traceback.format_exc()
2628 self.logger.critical(
2629 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2630 exc_info=True,
2631 )
2632 finally:
2633 if exc:
2634 error_list.append(str(exc))
2635 try:
2636 # wait for pending tasks
2637 if tasks_dict_info:
2638 stage[1] = "Waiting for instantiate pending tasks."
2639 self.logger.debug(logging_text + stage[1])
2640 error_list += await self._wait_for_tasks(
2641 logging_text,
2642 tasks_dict_info,
2643 timeout_ns_deploy,
2644 stage,
2645 nslcmop_id,
2646 nsr_id=nsr_id,
2647 )
2648 stage[1] = stage[2] = ""
2649 except asyncio.CancelledError:
2650 error_list.append("Cancelled")
2651 # TODO cancel all tasks
2652 except Exception as exc:
2653 error_list.append(str(exc))
2654
2655 # update operation-status
2656 db_nsr_update["operational-status"] = "running"
2657 # let's begin with VCA 'configured' status (later we can change it)
2658 db_nsr_update["config-status"] = "configured"
2659 for task, task_name in tasks_dict_info.items():
2660 if not task.done() or task.cancelled() or task.exception():
2661 if task_name.startswith(self.task_name_deploy_vca):
2662 # A N2VC task is pending
2663 db_nsr_update["config-status"] = "failed"
2664 else:
2665 # RO or KDU task is pending
2666 db_nsr_update["operational-status"] = "failed"
2667
2668 # update status at database
2669 if error_list:
2670 error_detail = ". ".join(error_list)
2671 self.logger.error(logging_text + error_detail)
2672 error_description_nslcmop = "{} Detail: {}".format(
2673 stage[0], error_detail
2674 )
2675 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2676 nslcmop_id, stage[0]
2677 )
2678
2679 db_nsr_update["detailed-status"] = (
2680 error_description_nsr + " Detail: " + error_detail
2681 )
2682 db_nslcmop_update["detailed-status"] = error_detail
2683 nslcmop_operation_state = "FAILED"
2684 ns_state = "BROKEN"
2685 else:
2686 error_detail = None
2687 error_description_nsr = error_description_nslcmop = None
2688 ns_state = "READY"
2689 db_nsr_update["detailed-status"] = "Done"
2690 db_nslcmop_update["detailed-status"] = "Done"
2691 nslcmop_operation_state = "COMPLETED"
2692
2693 if db_nsr:
2694 self._write_ns_status(
2695 nsr_id=nsr_id,
2696 ns_state=ns_state,
2697 current_operation="IDLE",
2698 current_operation_id=None,
2699 error_description=error_description_nsr,
2700 error_detail=error_detail,
2701 other_update=db_nsr_update,
2702 )
2703 self._write_op_status(
2704 op_id=nslcmop_id,
2705 stage="",
2706 error_message=error_description_nslcmop,
2707 operation_state=nslcmop_operation_state,
2708 other_update=db_nslcmop_update,
2709 )
2710
2711 if nslcmop_operation_state:
2712 try:
2713 await self.msg.aiowrite(
2714 "ns",
2715 "instantiated",
2716 {
2717 "nsr_id": nsr_id,
2718 "nslcmop_id": nslcmop_id,
2719 "operationState": nslcmop_operation_state,
2720 },
2721 loop=self.loop,
2722 )
2723 except Exception as e:
2724 self.logger.error(
2725 logging_text + "kafka_write notification Exception {}".format(e)
2726 )
2727
2728 self.logger.debug(logging_text + "Exit")
2729 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2730
2731 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2732 if vnfd_id not in cached_vnfds:
2733 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2734 return cached_vnfds[vnfd_id]
2735
2736 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2737 if vnf_profile_id not in cached_vnfrs:
2738 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2739 "vnfrs",
2740 {
2741 "member-vnf-index-ref": vnf_profile_id,
2742 "nsr-id-ref": nsr_id,
2743 },
2744 )
2745 return cached_vnfrs[vnf_profile_id]
2746
2747 def _is_deployed_vca_in_relation(
2748 self, vca: DeployedVCA, relation: Relation
2749 ) -> bool:
2750 found = False
2751 for endpoint in (relation.provider, relation.requirer):
2752 if endpoint["kdu-resource-profile-id"]:
2753 continue
2754 found = (
2755 vca.vnf_profile_id == endpoint.vnf_profile_id
2756 and vca.vdu_profile_id == endpoint.vdu_profile_id
2757 and vca.execution_environment_ref == endpoint.execution_environment_ref
2758 )
2759 if found:
2760 break
2761 return found
2762
2763 def _update_ee_relation_data_with_implicit_data(
2764 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2765 ):
2766 ee_relation_data = safe_get_ee_relation(
2767 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2768 )
2769 ee_relation_level = EELevel.get_level(ee_relation_data)
2770 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2771 "execution-environment-ref"
2772 ]:
2773 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2774 vnfd_id = vnf_profile["vnfd-id"]
2775 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2776 entity_id = (
2777 vnfd_id
2778 if ee_relation_level == EELevel.VNF
2779 else ee_relation_data["vdu-profile-id"]
2780 )
2781 ee = get_juju_ee_ref(db_vnfd, entity_id)
2782 if not ee:
2783 raise Exception(
2784 f"not execution environments found for ee_relation {ee_relation_data}"
2785 )
2786 ee_relation_data["execution-environment-ref"] = ee["id"]
2787 return ee_relation_data
2788
2789 def _get_ns_relations(
2790 self,
2791 nsr_id: str,
2792 nsd: Dict[str, Any],
2793 vca: DeployedVCA,
2794 cached_vnfds: Dict[str, Any],
2795 ) -> List[Relation]:
2796 relations = []
2797 db_ns_relations = get_ns_configuration_relation_list(nsd)
2798 for r in db_ns_relations:
2799 provider_dict = None
2800 requirer_dict = None
2801 if all(key in r for key in ("provider", "requirer")):
2802 provider_dict = r["provider"]
2803 requirer_dict = r["requirer"]
2804 elif "entities" in r:
2805 provider_id = r["entities"][0]["id"]
2806 provider_dict = {
2807 "nsr-id": nsr_id,
2808 "endpoint": r["entities"][0]["endpoint"],
2809 }
2810 if provider_id != nsd["id"]:
2811 provider_dict["vnf-profile-id"] = provider_id
2812 requirer_id = r["entities"][1]["id"]
2813 requirer_dict = {
2814 "nsr-id": nsr_id,
2815 "endpoint": r["entities"][1]["endpoint"],
2816 }
2817 if requirer_id != nsd["id"]:
2818 requirer_dict["vnf-profile-id"] = requirer_id
2819 else:
2820 raise Exception(
2821 "provider/requirer or entities must be included in the relation."
2822 )
2823 relation_provider = self._update_ee_relation_data_with_implicit_data(
2824 nsr_id, nsd, provider_dict, cached_vnfds
2825 )
2826 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2827 nsr_id, nsd, requirer_dict, cached_vnfds
2828 )
2829 provider = EERelation(relation_provider)
2830 requirer = EERelation(relation_requirer)
2831 relation = Relation(r["name"], provider, requirer)
2832 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2833 if vca_in_relation:
2834 relations.append(relation)
2835 return relations
2836
2837 def _get_vnf_relations(
2838 self,
2839 nsr_id: str,
2840 nsd: Dict[str, Any],
2841 vca: DeployedVCA,
2842 cached_vnfds: Dict[str, Any],
2843 ) -> List[Relation]:
2844 relations = []
2845 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2846 vnf_profile_id = vnf_profile["id"]
2847 vnfd_id = vnf_profile["vnfd-id"]
2848 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2849 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2850 for r in db_vnf_relations:
2851 provider_dict = None
2852 requirer_dict = None
2853 if all(key in r for key in ("provider", "requirer")):
2854 provider_dict = r["provider"]
2855 requirer_dict = r["requirer"]
2856 elif "entities" in r:
2857 provider_id = r["entities"][0]["id"]
2858 provider_dict = {
2859 "nsr-id": nsr_id,
2860 "vnf-profile-id": vnf_profile_id,
2861 "endpoint": r["entities"][0]["endpoint"],
2862 }
2863 if provider_id != vnfd_id:
2864 provider_dict["vdu-profile-id"] = provider_id
2865 requirer_id = r["entities"][1]["id"]
2866 requirer_dict = {
2867 "nsr-id": nsr_id,
2868 "vnf-profile-id": vnf_profile_id,
2869 "endpoint": r["entities"][1]["endpoint"],
2870 }
2871 if requirer_id != vnfd_id:
2872 requirer_dict["vdu-profile-id"] = requirer_id
2873 else:
2874 raise Exception(
2875 "provider/requirer or entities must be included in the relation."
2876 )
2877 relation_provider = self._update_ee_relation_data_with_implicit_data(
2878 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2879 )
2880 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2881 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2882 )
2883 provider = EERelation(relation_provider)
2884 requirer = EERelation(relation_requirer)
2885 relation = Relation(r["name"], provider, requirer)
2886 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2887 if vca_in_relation:
2888 relations.append(relation)
2889 return relations
2890
2891 def _get_kdu_resource_data(
2892 self,
2893 ee_relation: EERelation,
2894 db_nsr: Dict[str, Any],
2895 cached_vnfds: Dict[str, Any],
2896 ) -> DeployedK8sResource:
2897 nsd = get_nsd(db_nsr)
2898 vnf_profiles = get_vnf_profiles(nsd)
2899 vnfd_id = find_in_list(
2900 vnf_profiles,
2901 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2902 )["vnfd-id"]
2903 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2904 kdu_resource_profile = get_kdu_resource_profile(
2905 db_vnfd, ee_relation.kdu_resource_profile_id
2906 )
2907 kdu_name = kdu_resource_profile["kdu-name"]
2908 deployed_kdu, _ = get_deployed_kdu(
2909 db_nsr.get("_admin", ()).get("deployed", ()),
2910 kdu_name,
2911 ee_relation.vnf_profile_id,
2912 )
2913 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2914 return deployed_kdu
2915
2916 def _get_deployed_component(
2917 self,
2918 ee_relation: EERelation,
2919 db_nsr: Dict[str, Any],
2920 cached_vnfds: Dict[str, Any],
2921 ) -> DeployedComponent:
2922 nsr_id = db_nsr["_id"]
2923 deployed_component = None
2924 ee_level = EELevel.get_level(ee_relation)
2925 if ee_level == EELevel.NS:
2926 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2927 if vca:
2928 deployed_component = DeployedVCA(nsr_id, vca)
2929 elif ee_level == EELevel.VNF:
2930 vca = get_deployed_vca(
2931 db_nsr,
2932 {
2933 "vdu_id": None,
2934 "member-vnf-index": ee_relation.vnf_profile_id,
2935 "ee_descriptor_id": ee_relation.execution_environment_ref,
2936 },
2937 )
2938 if vca:
2939 deployed_component = DeployedVCA(nsr_id, vca)
2940 elif ee_level == EELevel.VDU:
2941 vca = get_deployed_vca(
2942 db_nsr,
2943 {
2944 "vdu_id": ee_relation.vdu_profile_id,
2945 "member-vnf-index": ee_relation.vnf_profile_id,
2946 "ee_descriptor_id": ee_relation.execution_environment_ref,
2947 },
2948 )
2949 if vca:
2950 deployed_component = DeployedVCA(nsr_id, vca)
2951 elif ee_level == EELevel.KDU:
2952 kdu_resource_data = self._get_kdu_resource_data(
2953 ee_relation, db_nsr, cached_vnfds
2954 )
2955 if kdu_resource_data:
2956 deployed_component = DeployedK8sResource(kdu_resource_data)
2957 return deployed_component
2958
2959 async def _add_relation(
2960 self,
2961 relation: Relation,
2962 vca_type: str,
2963 db_nsr: Dict[str, Any],
2964 cached_vnfds: Dict[str, Any],
2965 cached_vnfrs: Dict[str, Any],
2966 ) -> bool:
2967 deployed_provider = self._get_deployed_component(
2968 relation.provider, db_nsr, cached_vnfds
2969 )
2970 deployed_requirer = self._get_deployed_component(
2971 relation.requirer, db_nsr, cached_vnfds
2972 )
2973 if (
2974 deployed_provider
2975 and deployed_requirer
2976 and deployed_provider.config_sw_installed
2977 and deployed_requirer.config_sw_installed
2978 ):
2979 provider_db_vnfr = (
2980 self._get_vnfr(
2981 relation.provider.nsr_id,
2982 relation.provider.vnf_profile_id,
2983 cached_vnfrs,
2984 )
2985 if relation.provider.vnf_profile_id
2986 else None
2987 )
2988 requirer_db_vnfr = (
2989 self._get_vnfr(
2990 relation.requirer.nsr_id,
2991 relation.requirer.vnf_profile_id,
2992 cached_vnfrs,
2993 )
2994 if relation.requirer.vnf_profile_id
2995 else None
2996 )
2997 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
2998 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
2999 provider_relation_endpoint = RelationEndpoint(
3000 deployed_provider.ee_id,
3001 provider_vca_id,
3002 relation.provider.endpoint,
3003 )
3004 requirer_relation_endpoint = RelationEndpoint(
3005 deployed_requirer.ee_id,
3006 requirer_vca_id,
3007 relation.requirer.endpoint,
3008 )
3009 await self.vca_map[vca_type].add_relation(
3010 provider=provider_relation_endpoint,
3011 requirer=requirer_relation_endpoint,
3012 )
3013 # remove entry from relations list
3014 return True
3015 return False
3016
3017 async def _add_vca_relations(
3018 self,
3019 logging_text,
3020 nsr_id,
3021 vca_type: str,
3022 vca_index: int,
3023 timeout: int = 3600,
3024 ) -> bool:
3025
3026 # steps:
3027 # 1. find all relations for this VCA
3028 # 2. wait for other peers related
3029 # 3. add relations
3030
3031 try:
3032 # STEP 1: find all relations for this VCA
3033
3034 # read nsr record
3035 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3036 nsd = get_nsd(db_nsr)
3037
3038 # this VCA data
3039 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3040 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3041
3042 cached_vnfds = {}
3043 cached_vnfrs = {}
3044 relations = []
3045 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3046 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3047
3048 # if no relations, terminate
3049 if not relations:
3050 self.logger.debug(logging_text + " No relations")
3051 return True
3052
3053 self.logger.debug(logging_text + " adding relations {}".format(relations))
3054
3055 # add all relations
3056 start = time()
3057 while True:
3058 # check timeout
3059 now = time()
3060 if now - start >= timeout:
3061 self.logger.error(logging_text + " : timeout adding relations")
3062 return False
3063
3064 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3065 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3066
3067 # for each relation, find the VCA's related
3068 for relation in relations.copy():
3069 added = await self._add_relation(
3070 relation,
3071 vca_type,
3072 db_nsr,
3073 cached_vnfds,
3074 cached_vnfrs,
3075 )
3076 if added:
3077 relations.remove(relation)
3078
3079 if not relations:
3080 self.logger.debug("Relations added")
3081 break
3082 await asyncio.sleep(5.0)
3083
3084 return True
3085
3086 except Exception as e:
3087 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3088 return False
3089
3090 async def _install_kdu(
3091 self,
3092 nsr_id: str,
3093 nsr_db_path: str,
3094 vnfr_data: dict,
3095 kdu_index: int,
3096 kdud: dict,
3097 vnfd: dict,
3098 k8s_instance_info: dict,
3099 k8params: dict = None,
3100 timeout: int = 600,
3101 vca_id: str = None,
3102 ):
3103
3104 try:
3105 k8sclustertype = k8s_instance_info["k8scluster-type"]
3106 # Instantiate kdu
3107 db_dict_install = {
3108 "collection": "nsrs",
3109 "filter": {"_id": nsr_id},
3110 "path": nsr_db_path,
3111 }
3112
3113 if k8s_instance_info.get("kdu-deployment-name"):
3114 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3115 else:
3116 kdu_instance = self.k8scluster_map[
3117 k8sclustertype
3118 ].generate_kdu_instance_name(
3119 db_dict=db_dict_install,
3120 kdu_model=k8s_instance_info["kdu-model"],
3121 kdu_name=k8s_instance_info["kdu-name"],
3122 )
3123 self.update_db_2(
3124 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3125 )
3126 await self.k8scluster_map[k8sclustertype].install(
3127 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3128 kdu_model=k8s_instance_info["kdu-model"],
3129 atomic=True,
3130 params=k8params,
3131 db_dict=db_dict_install,
3132 timeout=timeout,
3133 kdu_name=k8s_instance_info["kdu-name"],
3134 namespace=k8s_instance_info["namespace"],
3135 kdu_instance=kdu_instance,
3136 vca_id=vca_id,
3137 )
3138 self.update_db_2(
3139 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3140 )
3141
3142 # Obtain services to obtain management service ip
3143 services = await self.k8scluster_map[k8sclustertype].get_services(
3144 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3145 kdu_instance=kdu_instance,
3146 namespace=k8s_instance_info["namespace"],
3147 )
3148
3149 # Obtain management service info (if exists)
3150 vnfr_update_dict = {}
3151 kdu_config = get_configuration(vnfd, kdud["name"])
3152 if kdu_config:
3153 target_ee_list = kdu_config.get("execution-environment-list", [])
3154 else:
3155 target_ee_list = []
3156
3157 if services:
3158 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3159 mgmt_services = [
3160 service
3161 for service in kdud.get("service", [])
3162 if service.get("mgmt-service")
3163 ]
3164 for mgmt_service in mgmt_services:
3165 for service in services:
3166 if service["name"].startswith(mgmt_service["name"]):
3167 # Mgmt service found, Obtain service ip
3168 ip = service.get("external_ip", service.get("cluster_ip"))
3169 if isinstance(ip, list) and len(ip) == 1:
3170 ip = ip[0]
3171
3172 vnfr_update_dict[
3173 "kdur.{}.ip-address".format(kdu_index)
3174 ] = ip
3175
3176 # Check if must update also mgmt ip at the vnf
3177 service_external_cp = mgmt_service.get(
3178 "external-connection-point-ref"
3179 )
3180 if service_external_cp:
3181 if (
3182 deep_get(vnfd, ("mgmt-interface", "cp"))
3183 == service_external_cp
3184 ):
3185 vnfr_update_dict["ip-address"] = ip
3186
3187 if find_in_list(
3188 target_ee_list,
3189 lambda ee: ee.get(
3190 "external-connection-point-ref", ""
3191 )
3192 == service_external_cp,
3193 ):
3194 vnfr_update_dict[
3195 "kdur.{}.ip-address".format(kdu_index)
3196 ] = ip
3197 break
3198 else:
3199 self.logger.warn(
3200 "Mgmt service name: {} not found".format(
3201 mgmt_service["name"]
3202 )
3203 )
3204
3205 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3206 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3207
3208 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3209 if (
3210 kdu_config
3211 and kdu_config.get("initial-config-primitive")
3212 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3213 ):
3214 initial_config_primitive_list = kdu_config.get(
3215 "initial-config-primitive"
3216 )
3217 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3218
3219 for initial_config_primitive in initial_config_primitive_list:
3220 primitive_params_ = self._map_primitive_params(
3221 initial_config_primitive, {}, {}
3222 )
3223
3224 await asyncio.wait_for(
3225 self.k8scluster_map[k8sclustertype].exec_primitive(
3226 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3227 kdu_instance=kdu_instance,
3228 primitive_name=initial_config_primitive["name"],
3229 params=primitive_params_,
3230 db_dict=db_dict_install,
3231 vca_id=vca_id,
3232 ),
3233 timeout=timeout,
3234 )
3235
3236 except Exception as e:
3237 # Prepare update db with error and raise exception
3238 try:
3239 self.update_db_2(
3240 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3241 )
3242 self.update_db_2(
3243 "vnfrs",
3244 vnfr_data.get("_id"),
3245 {"kdur.{}.status".format(kdu_index): "ERROR"},
3246 )
3247 except Exception:
3248 # ignore to keep original exception
3249 pass
3250 # reraise original error
3251 raise
3252
3253 return kdu_instance
3254
3255 async def deploy_kdus(
3256 self,
3257 logging_text,
3258 nsr_id,
3259 nslcmop_id,
3260 db_vnfrs,
3261 db_vnfds,
3262 task_instantiation_info,
3263 ):
3264 # Launch kdus if present in the descriptor
3265
3266 k8scluster_id_2_uuic = {
3267 "helm-chart-v3": {},
3268 "helm-chart": {},
3269 "juju-bundle": {},
3270 }
3271
3272 async def _get_cluster_id(cluster_id, cluster_type):
3273 nonlocal k8scluster_id_2_uuic
3274 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3275 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3276
3277 # check if K8scluster is creating and wait look if previous tasks in process
3278 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3279 "k8scluster", cluster_id
3280 )
3281 if task_dependency:
3282 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3283 task_name, cluster_id
3284 )
3285 self.logger.debug(logging_text + text)
3286 await asyncio.wait(task_dependency, timeout=3600)
3287
3288 db_k8scluster = self.db.get_one(
3289 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3290 )
3291 if not db_k8scluster:
3292 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3293
3294 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3295 if not k8s_id:
3296 if cluster_type == "helm-chart-v3":
3297 try:
3298 # backward compatibility for existing clusters that have not been initialized for helm v3
3299 k8s_credentials = yaml.safe_dump(
3300 db_k8scluster.get("credentials")
3301 )
3302 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3303 k8s_credentials, reuse_cluster_uuid=cluster_id
3304 )
3305 db_k8scluster_update = {}
3306 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3307 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3308 db_k8scluster_update[
3309 "_admin.helm-chart-v3.created"
3310 ] = uninstall_sw
3311 db_k8scluster_update[
3312 "_admin.helm-chart-v3.operationalState"
3313 ] = "ENABLED"
3314 self.update_db_2(
3315 "k8sclusters", cluster_id, db_k8scluster_update
3316 )
3317 except Exception as e:
3318 self.logger.error(
3319 logging_text
3320 + "error initializing helm-v3 cluster: {}".format(str(e))
3321 )
3322 raise LcmException(
3323 "K8s cluster '{}' has not been initialized for '{}'".format(
3324 cluster_id, cluster_type
3325 )
3326 )
3327 else:
3328 raise LcmException(
3329 "K8s cluster '{}' has not been initialized for '{}'".format(
3330 cluster_id, cluster_type
3331 )
3332 )
3333 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3334 return k8s_id
3335
3336 logging_text += "Deploy kdus: "
3337 step = ""
3338 try:
3339 db_nsr_update = {"_admin.deployed.K8s": []}
3340 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3341
3342 index = 0
3343 updated_cluster_list = []
3344 updated_v3_cluster_list = []
3345
3346 for vnfr_data in db_vnfrs.values():
3347 vca_id = self.get_vca_id(vnfr_data, {})
3348 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3349 # Step 0: Prepare and set parameters
3350 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3351 vnfd_id = vnfr_data.get("vnfd-id")
3352 vnfd_with_id = find_in_list(
3353 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3354 )
3355 kdud = next(
3356 kdud
3357 for kdud in vnfd_with_id["kdu"]
3358 if kdud["name"] == kdur["kdu-name"]
3359 )
3360 namespace = kdur.get("k8s-namespace")
3361 kdu_deployment_name = kdur.get("kdu-deployment-name")
3362 if kdur.get("helm-chart"):
3363 kdumodel = kdur["helm-chart"]
3364 # Default version: helm3, if helm-version is v2 assign v2
3365 k8sclustertype = "helm-chart-v3"
3366 self.logger.debug("kdur: {}".format(kdur))
3367 if (
3368 kdur.get("helm-version")
3369 and kdur.get("helm-version") == "v2"
3370 ):
3371 k8sclustertype = "helm-chart"
3372 elif kdur.get("juju-bundle"):
3373 kdumodel = kdur["juju-bundle"]
3374 k8sclustertype = "juju-bundle"
3375 else:
3376 raise LcmException(
3377 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3378 "juju-bundle. Maybe an old NBI version is running".format(
3379 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3380 )
3381 )
3382 # check if kdumodel is a file and exists
3383 try:
3384 vnfd_with_id = find_in_list(
3385 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3386 )
3387 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3388 if storage: # may be not present if vnfd has not artifacts
3389 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3390 if storage["pkg-dir"]:
3391 filename = "{}/{}/{}s/{}".format(
3392 storage["folder"],
3393 storage["pkg-dir"],
3394 k8sclustertype,
3395 kdumodel,
3396 )
3397 else:
3398 filename = "{}/Scripts/{}s/{}".format(
3399 storage["folder"],
3400 k8sclustertype,
3401 kdumodel,
3402 )
3403 if self.fs.file_exists(
3404 filename, mode="file"
3405 ) or self.fs.file_exists(filename, mode="dir"):
3406 kdumodel = self.fs.path + filename
3407 except (asyncio.TimeoutError, asyncio.CancelledError):
3408 raise
3409 except Exception: # it is not a file
3410 pass
3411
3412 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3413 step = "Synchronize repos for k8s cluster '{}'".format(
3414 k8s_cluster_id
3415 )
3416 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3417
3418 # Synchronize repos
3419 if (
3420 k8sclustertype == "helm-chart"
3421 and cluster_uuid not in updated_cluster_list
3422 ) or (
3423 k8sclustertype == "helm-chart-v3"
3424 and cluster_uuid not in updated_v3_cluster_list
3425 ):
3426 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3427 self.k8scluster_map[k8sclustertype].synchronize_repos(
3428 cluster_uuid=cluster_uuid
3429 )
3430 )
3431 if del_repo_list or added_repo_dict:
3432 if k8sclustertype == "helm-chart":
3433 unset = {
3434 "_admin.helm_charts_added." + item: None
3435 for item in del_repo_list
3436 }
3437 updated = {
3438 "_admin.helm_charts_added." + item: name
3439 for item, name in added_repo_dict.items()
3440 }
3441 updated_cluster_list.append(cluster_uuid)
3442 elif k8sclustertype == "helm-chart-v3":
3443 unset = {
3444 "_admin.helm_charts_v3_added." + item: None
3445 for item in del_repo_list
3446 }
3447 updated = {
3448 "_admin.helm_charts_v3_added." + item: name
3449 for item, name in added_repo_dict.items()
3450 }
3451 updated_v3_cluster_list.append(cluster_uuid)
3452 self.logger.debug(
3453 logging_text + "repos synchronized on k8s cluster "
3454 "'{}' to_delete: {}, to_add: {}".format(
3455 k8s_cluster_id, del_repo_list, added_repo_dict
3456 )
3457 )
3458 self.db.set_one(
3459 "k8sclusters",
3460 {"_id": k8s_cluster_id},
3461 updated,
3462 unset=unset,
3463 )
3464
3465 # Instantiate kdu
3466 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3467 vnfr_data["member-vnf-index-ref"],
3468 kdur["kdu-name"],
3469 k8s_cluster_id,
3470 )
3471 k8s_instance_info = {
3472 "kdu-instance": None,
3473 "k8scluster-uuid": cluster_uuid,
3474 "k8scluster-type": k8sclustertype,
3475 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3476 "kdu-name": kdur["kdu-name"],
3477 "kdu-model": kdumodel,
3478 "namespace": namespace,
3479 "kdu-deployment-name": kdu_deployment_name,
3480 }
3481 db_path = "_admin.deployed.K8s.{}".format(index)
3482 db_nsr_update[db_path] = k8s_instance_info
3483 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3484 vnfd_with_id = find_in_list(
3485 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3486 )
3487 task = asyncio.ensure_future(
3488 self._install_kdu(
3489 nsr_id,
3490 db_path,
3491 vnfr_data,
3492 kdu_index,
3493 kdud,
3494 vnfd_with_id,
3495 k8s_instance_info,
3496 k8params=desc_params,
3497 timeout=600,
3498 vca_id=vca_id,
3499 )
3500 )
3501 self.lcm_tasks.register(
3502 "ns",
3503 nsr_id,
3504 nslcmop_id,
3505 "instantiate_KDU-{}".format(index),
3506 task,
3507 )
3508 task_instantiation_info[task] = "Deploying KDU {}".format(
3509 kdur["kdu-name"]
3510 )
3511
3512 index += 1
3513
3514 except (LcmException, asyncio.CancelledError):
3515 raise
3516 except Exception as e:
3517 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3518 if isinstance(e, (N2VCException, DbException)):
3519 self.logger.error(logging_text + msg)
3520 else:
3521 self.logger.critical(logging_text + msg, exc_info=True)
3522 raise LcmException(msg)
3523 finally:
3524 if db_nsr_update:
3525 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3526
3527 def _deploy_n2vc(
3528 self,
3529 logging_text,
3530 db_nsr,
3531 db_vnfr,
3532 nslcmop_id,
3533 nsr_id,
3534 nsi_id,
3535 vnfd_id,
3536 vdu_id,
3537 kdu_name,
3538 member_vnf_index,
3539 vdu_index,
3540 vdu_name,
3541 deploy_params,
3542 descriptor_config,
3543 base_folder,
3544 task_instantiation_info,
3545 stage,
3546 ):
3547 # launch instantiate_N2VC in a asyncio task and register task object
3548 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3549 # if not found, create one entry and update database
3550 # fill db_nsr._admin.deployed.VCA.<index>
3551
3552 self.logger.debug(
3553 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3554 )
3555 if "execution-environment-list" in descriptor_config:
3556 ee_list = descriptor_config.get("execution-environment-list", [])
3557 elif "juju" in descriptor_config:
3558 ee_list = [descriptor_config] # ns charms
3559 else: # other types as script are not supported
3560 ee_list = []
3561
3562 for ee_item in ee_list:
3563 self.logger.debug(
3564 logging_text
3565 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3566 ee_item.get("juju"), ee_item.get("helm-chart")
3567 )
3568 )
3569 ee_descriptor_id = ee_item.get("id")
3570 if ee_item.get("juju"):
3571 vca_name = ee_item["juju"].get("charm")
3572 vca_type = (
3573 "lxc_proxy_charm"
3574 if ee_item["juju"].get("charm") is not None
3575 else "native_charm"
3576 )
3577 if ee_item["juju"].get("cloud") == "k8s":
3578 vca_type = "k8s_proxy_charm"
3579 elif ee_item["juju"].get("proxy") is False:
3580 vca_type = "native_charm"
3581 elif ee_item.get("helm-chart"):
3582 vca_name = ee_item["helm-chart"]
3583 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3584 vca_type = "helm"
3585 else:
3586 vca_type = "helm-v3"
3587 else:
3588 self.logger.debug(
3589 logging_text + "skipping non juju neither charm configuration"
3590 )
3591 continue
3592
3593 vca_index = -1
3594 for vca_index, vca_deployed in enumerate(
3595 db_nsr["_admin"]["deployed"]["VCA"]
3596 ):
3597 if not vca_deployed:
3598 continue
3599 if (
3600 vca_deployed.get("member-vnf-index") == member_vnf_index
3601 and vca_deployed.get("vdu_id") == vdu_id
3602 and vca_deployed.get("kdu_name") == kdu_name
3603 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3604 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3605 ):
3606 break
3607 else:
3608 # not found, create one.
3609 target = (
3610 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3611 )
3612 if vdu_id:
3613 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3614 elif kdu_name:
3615 target += "/kdu/{}".format(kdu_name)
3616 vca_deployed = {
3617 "target_element": target,
3618 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3619 "member-vnf-index": member_vnf_index,
3620 "vdu_id": vdu_id,
3621 "kdu_name": kdu_name,
3622 "vdu_count_index": vdu_index,
3623 "operational-status": "init", # TODO revise
3624 "detailed-status": "", # TODO revise
3625 "step": "initial-deploy", # TODO revise
3626 "vnfd_id": vnfd_id,
3627 "vdu_name": vdu_name,
3628 "type": vca_type,
3629 "ee_descriptor_id": ee_descriptor_id,
3630 }
3631 vca_index += 1
3632
3633 # create VCA and configurationStatus in db
3634 db_dict = {
3635 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3636 "configurationStatus.{}".format(vca_index): dict(),
3637 }
3638 self.update_db_2("nsrs", nsr_id, db_dict)
3639
3640 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3641
3642 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3643 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3644 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3645
3646 # Launch task
3647 task_n2vc = asyncio.ensure_future(
3648 self.instantiate_N2VC(
3649 logging_text=logging_text,
3650 vca_index=vca_index,
3651 nsi_id=nsi_id,
3652 db_nsr=db_nsr,
3653 db_vnfr=db_vnfr,
3654 vdu_id=vdu_id,
3655 kdu_name=kdu_name,
3656 vdu_index=vdu_index,
3657 deploy_params=deploy_params,
3658 config_descriptor=descriptor_config,
3659 base_folder=base_folder,
3660 nslcmop_id=nslcmop_id,
3661 stage=stage,
3662 vca_type=vca_type,
3663 vca_name=vca_name,
3664 ee_config_descriptor=ee_item,
3665 )
3666 )
3667 self.lcm_tasks.register(
3668 "ns",
3669 nsr_id,
3670 nslcmop_id,
3671 "instantiate_N2VC-{}".format(vca_index),
3672 task_n2vc,
3673 )
3674 task_instantiation_info[
3675 task_n2vc
3676 ] = self.task_name_deploy_vca + " {}.{}".format(
3677 member_vnf_index or "", vdu_id or ""
3678 )
3679
3680 @staticmethod
3681 def _create_nslcmop(nsr_id, operation, params):
3682 """
3683 Creates a ns-lcm-opp content to be stored at database.
3684 :param nsr_id: internal id of the instance
3685 :param operation: instantiate, terminate, scale, action, ...
3686 :param params: user parameters for the operation
3687 :return: dictionary following SOL005 format
3688 """
3689 # Raise exception if invalid arguments
3690 if not (nsr_id and operation and params):
3691 raise LcmException(
3692 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3693 )
3694 now = time()
3695 _id = str(uuid4())
3696 nslcmop = {
3697 "id": _id,
3698 "_id": _id,
3699 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3700 "operationState": "PROCESSING",
3701 "statusEnteredTime": now,
3702 "nsInstanceId": nsr_id,
3703 "lcmOperationType": operation,
3704 "startTime": now,
3705 "isAutomaticInvocation": False,
3706 "operationParams": params,
3707 "isCancelPending": False,
3708 "links": {
3709 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3710 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3711 },
3712 }
3713 return nslcmop
3714
3715 def _format_additional_params(self, params):
3716 params = params or {}
3717 for key, value in params.items():
3718 if str(value).startswith("!!yaml "):
3719 params[key] = yaml.safe_load(value[7:])
3720 return params
3721
3722 def _get_terminate_primitive_params(self, seq, vnf_index):
3723 primitive = seq.get("name")
3724 primitive_params = {}
3725 params = {
3726 "member_vnf_index": vnf_index,
3727 "primitive": primitive,
3728 "primitive_params": primitive_params,
3729 }
3730 desc_params = {}
3731 return self._map_primitive_params(seq, params, desc_params)
3732
3733 # sub-operations
3734
3735 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3736 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3737 if op.get("operationState") == "COMPLETED":
3738 # b. Skip sub-operation
3739 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3740 return self.SUBOPERATION_STATUS_SKIP
3741 else:
3742 # c. retry executing sub-operation
3743 # The sub-operation exists, and operationState != 'COMPLETED'
3744 # Update operationState = 'PROCESSING' to indicate a retry.
3745 operationState = "PROCESSING"
3746 detailed_status = "In progress"
3747 self._update_suboperation_status(
3748 db_nslcmop, op_index, operationState, detailed_status
3749 )
3750 # Return the sub-operation index
3751 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3752 # with arguments extracted from the sub-operation
3753 return op_index
3754
3755 # Find a sub-operation where all keys in a matching dictionary must match
3756 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3757 def _find_suboperation(self, db_nslcmop, match):
3758 if db_nslcmop and match:
3759 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3760 for i, op in enumerate(op_list):
3761 if all(op.get(k) == match[k] for k in match):
3762 return i
3763 return self.SUBOPERATION_STATUS_NOT_FOUND
3764
3765 # Update status for a sub-operation given its index
3766 def _update_suboperation_status(
3767 self, db_nslcmop, op_index, operationState, detailed_status
3768 ):
3769 # Update DB for HA tasks
3770 q_filter = {"_id": db_nslcmop["_id"]}
3771 update_dict = {
3772 "_admin.operations.{}.operationState".format(op_index): operationState,
3773 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3774 }
3775 self.db.set_one(
3776 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3777 )
3778
3779 # Add sub-operation, return the index of the added sub-operation
3780 # Optionally, set operationState, detailed-status, and operationType
3781 # Status and type are currently set for 'scale' sub-operations:
3782 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3783 # 'detailed-status' : status message
3784 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3785 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3786 def _add_suboperation(
3787 self,
3788 db_nslcmop,
3789 vnf_index,
3790 vdu_id,
3791 vdu_count_index,
3792 vdu_name,
3793 primitive,
3794 mapped_primitive_params,
3795 operationState=None,
3796 detailed_status=None,
3797 operationType=None,
3798 RO_nsr_id=None,
3799 RO_scaling_info=None,
3800 ):
3801 if not db_nslcmop:
3802 return self.SUBOPERATION_STATUS_NOT_FOUND
3803 # Get the "_admin.operations" list, if it exists
3804 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3805 op_list = db_nslcmop_admin.get("operations")
3806 # Create or append to the "_admin.operations" list
3807 new_op = {
3808 "member_vnf_index": vnf_index,
3809 "vdu_id": vdu_id,
3810 "vdu_count_index": vdu_count_index,
3811 "primitive": primitive,
3812 "primitive_params": mapped_primitive_params,
3813 }
3814 if operationState:
3815 new_op["operationState"] = operationState
3816 if detailed_status:
3817 new_op["detailed-status"] = detailed_status
3818 if operationType:
3819 new_op["lcmOperationType"] = operationType
3820 if RO_nsr_id:
3821 new_op["RO_nsr_id"] = RO_nsr_id
3822 if RO_scaling_info:
3823 new_op["RO_scaling_info"] = RO_scaling_info
3824 if not op_list:
3825 # No existing operations, create key 'operations' with current operation as first list element
3826 db_nslcmop_admin.update({"operations": [new_op]})
3827 op_list = db_nslcmop_admin.get("operations")
3828 else:
3829 # Existing operations, append operation to list
3830 op_list.append(new_op)
3831
3832 db_nslcmop_update = {"_admin.operations": op_list}
3833 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3834 op_index = len(op_list) - 1
3835 return op_index
3836
3837 # Helper methods for scale() sub-operations
3838
3839 # pre-scale/post-scale:
3840 # Check for 3 different cases:
3841 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3842 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3843 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3844 def _check_or_add_scale_suboperation(
3845 self,
3846 db_nslcmop,
3847 vnf_index,
3848 vnf_config_primitive,
3849 primitive_params,
3850 operationType,
3851 RO_nsr_id=None,
3852 RO_scaling_info=None,
3853 ):
3854 # Find this sub-operation
3855 if RO_nsr_id and RO_scaling_info:
3856 operationType = "SCALE-RO"
3857 match = {
3858 "member_vnf_index": vnf_index,
3859 "RO_nsr_id": RO_nsr_id,
3860 "RO_scaling_info": RO_scaling_info,
3861 }
3862 else:
3863 match = {
3864 "member_vnf_index": vnf_index,
3865 "primitive": vnf_config_primitive,
3866 "primitive_params": primitive_params,
3867 "lcmOperationType": operationType,
3868 }
3869 op_index = self._find_suboperation(db_nslcmop, match)
3870 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3871 # a. New sub-operation
3872 # The sub-operation does not exist, add it.
3873 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3874 # The following parameters are set to None for all kind of scaling:
3875 vdu_id = None
3876 vdu_count_index = None
3877 vdu_name = None
3878 if RO_nsr_id and RO_scaling_info:
3879 vnf_config_primitive = None
3880 primitive_params = None
3881 else:
3882 RO_nsr_id = None
3883 RO_scaling_info = None
3884 # Initial status for sub-operation
3885 operationState = "PROCESSING"
3886 detailed_status = "In progress"
3887 # Add sub-operation for pre/post-scaling (zero or more operations)
3888 self._add_suboperation(
3889 db_nslcmop,
3890 vnf_index,
3891 vdu_id,
3892 vdu_count_index,
3893 vdu_name,
3894 vnf_config_primitive,
3895 primitive_params,
3896 operationState,
3897 detailed_status,
3898 operationType,
3899 RO_nsr_id,
3900 RO_scaling_info,
3901 )
3902 return self.SUBOPERATION_STATUS_NEW
3903 else:
3904 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3905 # or op_index (operationState != 'COMPLETED')
3906 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3907
3908 # Function to return execution_environment id
3909
3910 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3911 # TODO vdu_index_count
3912 for vca in vca_deployed_list:
3913 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3914 return vca["ee_id"]
3915
3916 async def destroy_N2VC(
3917 self,
3918 logging_text,
3919 db_nslcmop,
3920 vca_deployed,
3921 config_descriptor,
3922 vca_index,
3923 destroy_ee=True,
3924 exec_primitives=True,
3925 scaling_in=False,
3926 vca_id: str = None,
3927 ):
3928 """
3929 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3930 :param logging_text:
3931 :param db_nslcmop:
3932 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3933 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3934 :param vca_index: index in the database _admin.deployed.VCA
3935 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3936 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3937 not executed properly
3938 :param scaling_in: True destroys the application, False destroys the model
3939 :return: None or exception
3940 """
3941
3942 self.logger.debug(
3943 logging_text
3944 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3945 vca_index, vca_deployed, config_descriptor, destroy_ee
3946 )
3947 )
3948
3949 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3950
3951 # execute terminate_primitives
3952 if exec_primitives:
3953 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3954 config_descriptor.get("terminate-config-primitive"),
3955 vca_deployed.get("ee_descriptor_id"),
3956 )
3957 vdu_id = vca_deployed.get("vdu_id")
3958 vdu_count_index = vca_deployed.get("vdu_count_index")
3959 vdu_name = vca_deployed.get("vdu_name")
3960 vnf_index = vca_deployed.get("member-vnf-index")
3961 if terminate_primitives and vca_deployed.get("needed_terminate"):
3962 for seq in terminate_primitives:
3963 # For each sequence in list, get primitive and call _ns_execute_primitive()
3964 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3965 vnf_index, seq.get("name")
3966 )
3967 self.logger.debug(logging_text + step)
3968 # Create the primitive for each sequence, i.e. "primitive": "touch"
3969 primitive = seq.get("name")
3970 mapped_primitive_params = self._get_terminate_primitive_params(
3971 seq, vnf_index
3972 )
3973
3974 # Add sub-operation
3975 self._add_suboperation(
3976 db_nslcmop,
3977 vnf_index,
3978 vdu_id,
3979 vdu_count_index,
3980 vdu_name,
3981 primitive,
3982 mapped_primitive_params,
3983 )
3984 # Sub-operations: Call _ns_execute_primitive() instead of action()
3985 try:
3986 result, result_detail = await self._ns_execute_primitive(
3987 vca_deployed["ee_id"],
3988 primitive,
3989 mapped_primitive_params,
3990 vca_type=vca_type,
3991 vca_id=vca_id,
3992 )
3993 except LcmException:
3994 # this happens when VCA is not deployed. In this case it is not needed to terminate
3995 continue
3996 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3997 if result not in result_ok:
3998 raise LcmException(
3999 "terminate_primitive {} for vnf_member_index={} fails with "
4000 "error {}".format(seq.get("name"), vnf_index, result_detail)
4001 )
4002 # set that this VCA do not need terminated
4003 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4004 vca_index
4005 )
4006 self.update_db_2(
4007 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4008 )
4009
4010 # Delete Prometheus Jobs if any
4011 # This uses NSR_ID, so it will destroy any jobs under this index
4012 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4013
4014 if destroy_ee:
4015 await self.vca_map[vca_type].delete_execution_environment(
4016 vca_deployed["ee_id"],
4017 scaling_in=scaling_in,
4018 vca_type=vca_type,
4019 vca_id=vca_id,
4020 )
4021
4022 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4023 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4024 namespace = "." + db_nsr["_id"]
4025 try:
4026 await self.n2vc.delete_namespace(
4027 namespace=namespace,
4028 total_timeout=self.timeout_charm_delete,
4029 vca_id=vca_id,
4030 )
4031 except N2VCNotFound: # already deleted. Skip
4032 pass
4033 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4034
4035 async def _terminate_RO(
4036 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4037 ):
4038 """
4039 Terminates a deployment from RO
4040 :param logging_text:
4041 :param nsr_deployed: db_nsr._admin.deployed
4042 :param nsr_id:
4043 :param nslcmop_id:
4044 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4045 this method will update only the index 2, but it will write on database the concatenated content of the list
4046 :return:
4047 """
4048 db_nsr_update = {}
4049 failed_detail = []
4050 ro_nsr_id = ro_delete_action = None
4051 if nsr_deployed and nsr_deployed.get("RO"):
4052 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4053 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4054 try:
4055 if ro_nsr_id:
4056 stage[2] = "Deleting ns from VIM."
4057 db_nsr_update["detailed-status"] = " ".join(stage)
4058 self._write_op_status(nslcmop_id, stage)
4059 self.logger.debug(logging_text + stage[2])
4060 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4061 self._write_op_status(nslcmop_id, stage)
4062 desc = await self.RO.delete("ns", ro_nsr_id)
4063 ro_delete_action = desc["action_id"]
4064 db_nsr_update[
4065 "_admin.deployed.RO.nsr_delete_action_id"
4066 ] = ro_delete_action
4067 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4068 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4069 if ro_delete_action:
4070 # wait until NS is deleted from VIM
4071 stage[2] = "Waiting ns deleted from VIM."
4072 detailed_status_old = None
4073 self.logger.debug(
4074 logging_text
4075 + stage[2]
4076 + " RO_id={} ro_delete_action={}".format(
4077 ro_nsr_id, ro_delete_action
4078 )
4079 )
4080 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4081 self._write_op_status(nslcmop_id, stage)
4082
4083 delete_timeout = 20 * 60 # 20 minutes
4084 while delete_timeout > 0:
4085 desc = await self.RO.show(
4086 "ns",
4087 item_id_name=ro_nsr_id,
4088 extra_item="action",
4089 extra_item_id=ro_delete_action,
4090 )
4091
4092 # deploymentStatus
4093 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4094
4095 ns_status, ns_status_info = self.RO.check_action_status(desc)
4096 if ns_status == "ERROR":
4097 raise ROclient.ROClientException(ns_status_info)
4098 elif ns_status == "BUILD":
4099 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4100 elif ns_status == "ACTIVE":
4101 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4102 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4103 break
4104 else:
4105 assert (
4106 False
4107 ), "ROclient.check_action_status returns unknown {}".format(
4108 ns_status
4109 )
4110 if stage[2] != detailed_status_old:
4111 detailed_status_old = stage[2]
4112 db_nsr_update["detailed-status"] = " ".join(stage)
4113 self._write_op_status(nslcmop_id, stage)
4114 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4115 await asyncio.sleep(5, loop=self.loop)
4116 delete_timeout -= 5
4117 else: # delete_timeout <= 0:
4118 raise ROclient.ROClientException(
4119 "Timeout waiting ns deleted from VIM"
4120 )
4121
4122 except Exception as e:
4123 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4124 if (
4125 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4126 ): # not found
4127 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4128 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4129 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4130 self.logger.debug(
4131 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4132 )
4133 elif (
4134 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4135 ): # conflict
4136 failed_detail.append("delete conflict: {}".format(e))
4137 self.logger.debug(
4138 logging_text
4139 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4140 )
4141 else:
4142 failed_detail.append("delete error: {}".format(e))
4143 self.logger.error(
4144 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4145 )
4146
4147 # Delete nsd
4148 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4149 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4150 try:
4151 stage[2] = "Deleting nsd from RO."
4152 db_nsr_update["detailed-status"] = " ".join(stage)
4153 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4154 self._write_op_status(nslcmop_id, stage)
4155 await self.RO.delete("nsd", ro_nsd_id)
4156 self.logger.debug(
4157 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4158 )
4159 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4160 except Exception as e:
4161 if (
4162 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4163 ): # not found
4164 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4165 self.logger.debug(
4166 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4167 )
4168 elif (
4169 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4170 ): # conflict
4171 failed_detail.append(
4172 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4173 )
4174 self.logger.debug(logging_text + failed_detail[-1])
4175 else:
4176 failed_detail.append(
4177 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4178 )
4179 self.logger.error(logging_text + failed_detail[-1])
4180
4181 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4182 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4183 if not vnf_deployed or not vnf_deployed["id"]:
4184 continue
4185 try:
4186 ro_vnfd_id = vnf_deployed["id"]
4187 stage[
4188 2
4189 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4190 vnf_deployed["member-vnf-index"], ro_vnfd_id
4191 )
4192 db_nsr_update["detailed-status"] = " ".join(stage)
4193 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4194 self._write_op_status(nslcmop_id, stage)
4195 await self.RO.delete("vnfd", ro_vnfd_id)
4196 self.logger.debug(
4197 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4198 )
4199 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4200 except Exception as e:
4201 if (
4202 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4203 ): # not found
4204 db_nsr_update[
4205 "_admin.deployed.RO.vnfd.{}.id".format(index)
4206 ] = None
4207 self.logger.debug(
4208 logging_text
4209 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4210 )
4211 elif (
4212 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4213 ): # conflict
4214 failed_detail.append(
4215 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4216 )
4217 self.logger.debug(logging_text + failed_detail[-1])
4218 else:
4219 failed_detail.append(
4220 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4221 )
4222 self.logger.error(logging_text + failed_detail[-1])
4223
4224 if failed_detail:
4225 stage[2] = "Error deleting from VIM"
4226 else:
4227 stage[2] = "Deleted from VIM"
4228 db_nsr_update["detailed-status"] = " ".join(stage)
4229 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4230 self._write_op_status(nslcmop_id, stage)
4231
4232 if failed_detail:
4233 raise LcmException("; ".join(failed_detail))
4234
4235 async def terminate(self, nsr_id, nslcmop_id):
4236 # Try to lock HA task here
4237 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4238 if not task_is_locked_by_me:
4239 return
4240
4241 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4242 self.logger.debug(logging_text + "Enter")
4243 timeout_ns_terminate = self.timeout_ns_terminate
4244 db_nsr = None
4245 db_nslcmop = None
4246 operation_params = None
4247 exc = None
4248 error_list = [] # annotates all failed error messages
4249 db_nslcmop_update = {}
4250 autoremove = False # autoremove after terminated
4251 tasks_dict_info = {}
4252 db_nsr_update = {}
4253 stage = [
4254 "Stage 1/3: Preparing task.",
4255 "Waiting for previous operations to terminate.",
4256 "",
4257 ]
4258 # ^ contains [stage, step, VIM-status]
4259 try:
4260 # wait for any previous tasks in process
4261 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4262
4263 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4264 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4265 operation_params = db_nslcmop.get("operationParams") or {}
4266 if operation_params.get("timeout_ns_terminate"):
4267 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4268 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4269 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4270
4271 db_nsr_update["operational-status"] = "terminating"
4272 db_nsr_update["config-status"] = "terminating"
4273 self._write_ns_status(
4274 nsr_id=nsr_id,
4275 ns_state="TERMINATING",
4276 current_operation="TERMINATING",
4277 current_operation_id=nslcmop_id,
4278 other_update=db_nsr_update,
4279 )
4280 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4281 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4282 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4283 return
4284
4285 stage[1] = "Getting vnf descriptors from db."
4286 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4287 db_vnfrs_dict = {
4288 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4289 }
4290 db_vnfds_from_id = {}
4291 db_vnfds_from_member_index = {}
4292 # Loop over VNFRs
4293 for vnfr in db_vnfrs_list:
4294 vnfd_id = vnfr["vnfd-id"]
4295 if vnfd_id not in db_vnfds_from_id:
4296 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4297 db_vnfds_from_id[vnfd_id] = vnfd
4298 db_vnfds_from_member_index[
4299 vnfr["member-vnf-index-ref"]
4300 ] = db_vnfds_from_id[vnfd_id]
4301
4302 # Destroy individual execution environments when there are terminating primitives.
4303 # Rest of EE will be deleted at once
4304 # TODO - check before calling _destroy_N2VC
4305 # if not operation_params.get("skip_terminate_primitives"):#
4306 # or not vca.get("needed_terminate"):
4307 stage[0] = "Stage 2/3 execute terminating primitives."
4308 self.logger.debug(logging_text + stage[0])
4309 stage[1] = "Looking execution environment that needs terminate."
4310 self.logger.debug(logging_text + stage[1])
4311
4312 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4313 config_descriptor = None
4314 vca_member_vnf_index = vca.get("member-vnf-index")
4315 vca_id = self.get_vca_id(
4316 db_vnfrs_dict.get(vca_member_vnf_index)
4317 if vca_member_vnf_index
4318 else None,
4319 db_nsr,
4320 )
4321 if not vca or not vca.get("ee_id"):
4322 continue
4323 if not vca.get("member-vnf-index"):
4324 # ns
4325 config_descriptor = db_nsr.get("ns-configuration")
4326 elif vca.get("vdu_id"):
4327 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4328 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4329 elif vca.get("kdu_name"):
4330 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4331 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4332 else:
4333 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4334 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4335 vca_type = vca.get("type")
4336 exec_terminate_primitives = not operation_params.get(
4337 "skip_terminate_primitives"
4338 ) and vca.get("needed_terminate")
4339 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4340 # pending native charms
4341 destroy_ee = (
4342 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4343 )
4344 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4345 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4346 task = asyncio.ensure_future(
4347 self.destroy_N2VC(
4348 logging_text,
4349 db_nslcmop,
4350 vca,
4351 config_descriptor,
4352 vca_index,
4353 destroy_ee,
4354 exec_terminate_primitives,
4355 vca_id=vca_id,
4356 )
4357 )
4358 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4359
4360 # wait for pending tasks of terminate primitives
4361 if tasks_dict_info:
4362 self.logger.debug(
4363 logging_text
4364 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4365 )
4366 error_list = await self._wait_for_tasks(
4367 logging_text,
4368 tasks_dict_info,
4369 min(self.timeout_charm_delete, timeout_ns_terminate),
4370 stage,
4371 nslcmop_id,
4372 )
4373 tasks_dict_info.clear()
4374 if error_list:
4375 return # raise LcmException("; ".join(error_list))
4376
4377 # remove All execution environments at once
4378 stage[0] = "Stage 3/3 delete all."
4379
4380 if nsr_deployed.get("VCA"):
4381 stage[1] = "Deleting all execution environments."
4382 self.logger.debug(logging_text + stage[1])
4383 vca_id = self.get_vca_id({}, db_nsr)
4384 task_delete_ee = asyncio.ensure_future(
4385 asyncio.wait_for(
4386 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4387 timeout=self.timeout_charm_delete,
4388 )
4389 )
4390 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4391 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4392
4393 # Delete from k8scluster
4394 stage[1] = "Deleting KDUs."
4395 self.logger.debug(logging_text + stage[1])
4396 # print(nsr_deployed)
4397 for kdu in get_iterable(nsr_deployed, "K8s"):
4398 if not kdu or not kdu.get("kdu-instance"):
4399 continue
4400 kdu_instance = kdu.get("kdu-instance")
4401 if kdu.get("k8scluster-type") in self.k8scluster_map:
4402 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4403 vca_id = self.get_vca_id({}, db_nsr)
4404 task_delete_kdu_instance = asyncio.ensure_future(
4405 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4406 cluster_uuid=kdu.get("k8scluster-uuid"),
4407 kdu_instance=kdu_instance,
4408 vca_id=vca_id,
4409 )
4410 )
4411 else:
4412 self.logger.error(
4413 logging_text
4414 + "Unknown k8s deployment type {}".format(
4415 kdu.get("k8scluster-type")
4416 )
4417 )
4418 continue
4419 tasks_dict_info[
4420 task_delete_kdu_instance
4421 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4422
4423 # remove from RO
4424 stage[1] = "Deleting ns from VIM."
4425 if self.ng_ro:
4426 task_delete_ro = asyncio.ensure_future(
4427 self._terminate_ng_ro(
4428 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4429 )
4430 )
4431 else:
4432 task_delete_ro = asyncio.ensure_future(
4433 self._terminate_RO(
4434 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4435 )
4436 )
4437 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4438
4439 # rest of staff will be done at finally
4440
4441 except (
4442 ROclient.ROClientException,
4443 DbException,
4444 LcmException,
4445 N2VCException,
4446 ) as e:
4447 self.logger.error(logging_text + "Exit Exception {}".format(e))
4448 exc = e
4449 except asyncio.CancelledError:
4450 self.logger.error(
4451 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4452 )
4453 exc = "Operation was cancelled"
4454 except Exception as e:
4455 exc = traceback.format_exc()
4456 self.logger.critical(
4457 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4458 exc_info=True,
4459 )
4460 finally:
4461 if exc:
4462 error_list.append(str(exc))
4463 try:
4464 # wait for pending tasks
4465 if tasks_dict_info:
4466 stage[1] = "Waiting for terminate pending tasks."
4467 self.logger.debug(logging_text + stage[1])
4468 error_list += await self._wait_for_tasks(
4469 logging_text,
4470 tasks_dict_info,
4471 timeout_ns_terminate,
4472 stage,
4473 nslcmop_id,
4474 )
4475 stage[1] = stage[2] = ""
4476 except asyncio.CancelledError:
4477 error_list.append("Cancelled")
4478 # TODO cancell all tasks
4479 except Exception as exc:
4480 error_list.append(str(exc))
4481 # update status at database
4482 if error_list:
4483 error_detail = "; ".join(error_list)
4484 # self.logger.error(logging_text + error_detail)
4485 error_description_nslcmop = "{} Detail: {}".format(
4486 stage[0], error_detail
4487 )
4488 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4489 nslcmop_id, stage[0]
4490 )
4491
4492 db_nsr_update["operational-status"] = "failed"
4493 db_nsr_update["detailed-status"] = (
4494 error_description_nsr + " Detail: " + error_detail
4495 )
4496 db_nslcmop_update["detailed-status"] = error_detail
4497 nslcmop_operation_state = "FAILED"
4498 ns_state = "BROKEN"
4499 else:
4500 error_detail = None
4501 error_description_nsr = error_description_nslcmop = None
4502 ns_state = "NOT_INSTANTIATED"
4503 db_nsr_update["operational-status"] = "terminated"
4504 db_nsr_update["detailed-status"] = "Done"
4505 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4506 db_nslcmop_update["detailed-status"] = "Done"
4507 nslcmop_operation_state = "COMPLETED"
4508
4509 if db_nsr:
4510 self._write_ns_status(
4511 nsr_id=nsr_id,
4512 ns_state=ns_state,
4513 current_operation="IDLE",
4514 current_operation_id=None,
4515 error_description=error_description_nsr,
4516 error_detail=error_detail,
4517 other_update=db_nsr_update,
4518 )
4519 self._write_op_status(
4520 op_id=nslcmop_id,
4521 stage="",
4522 error_message=error_description_nslcmop,
4523 operation_state=nslcmop_operation_state,
4524 other_update=db_nslcmop_update,
4525 )
4526 if ns_state == "NOT_INSTANTIATED":
4527 try:
4528 self.db.set_list(
4529 "vnfrs",
4530 {"nsr-id-ref": nsr_id},
4531 {"_admin.nsState": "NOT_INSTANTIATED"},
4532 )
4533 except DbException as e:
4534 self.logger.warn(
4535 logging_text
4536 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4537 nsr_id, e
4538 )
4539 )
4540 if operation_params:
4541 autoremove = operation_params.get("autoremove", False)
4542 if nslcmop_operation_state:
4543 try:
4544 await self.msg.aiowrite(
4545 "ns",
4546 "terminated",
4547 {
4548 "nsr_id": nsr_id,
4549 "nslcmop_id": nslcmop_id,
4550 "operationState": nslcmop_operation_state,
4551 "autoremove": autoremove,
4552 },
4553 loop=self.loop,
4554 )
4555 except Exception as e:
4556 self.logger.error(
4557 logging_text + "kafka_write notification Exception {}".format(e)
4558 )
4559
4560 self.logger.debug(logging_text + "Exit")
4561 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4562
4563 async def _wait_for_tasks(
4564 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4565 ):
4566 time_start = time()
4567 error_detail_list = []
4568 error_list = []
4569 pending_tasks = list(created_tasks_info.keys())
4570 num_tasks = len(pending_tasks)
4571 num_done = 0
4572 stage[1] = "{}/{}.".format(num_done, num_tasks)
4573 self._write_op_status(nslcmop_id, stage)
4574 while pending_tasks:
4575 new_error = None
4576 _timeout = timeout + time_start - time()
4577 done, pending_tasks = await asyncio.wait(
4578 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4579 )
4580 num_done += len(done)
4581 if not done: # Timeout
4582 for task in pending_tasks:
4583 new_error = created_tasks_info[task] + ": Timeout"
4584 error_detail_list.append(new_error)
4585 error_list.append(new_error)
4586 break
4587 for task in done:
4588 if task.cancelled():
4589 exc = "Cancelled"
4590 else:
4591 exc = task.exception()
4592 if exc:
4593 if isinstance(exc, asyncio.TimeoutError):
4594 exc = "Timeout"
4595 new_error = created_tasks_info[task] + ": {}".format(exc)
4596 error_list.append(created_tasks_info[task])
4597 error_detail_list.append(new_error)
4598 if isinstance(
4599 exc,
4600 (
4601 str,
4602 DbException,
4603 N2VCException,
4604 ROclient.ROClientException,
4605 LcmException,
4606 K8sException,
4607 NgRoException,
4608 ),
4609 ):
4610 self.logger.error(logging_text + new_error)
4611 else:
4612 exc_traceback = "".join(
4613 traceback.format_exception(None, exc, exc.__traceback__)
4614 )
4615 self.logger.error(
4616 logging_text
4617 + created_tasks_info[task]
4618 + " "
4619 + exc_traceback
4620 )
4621 else:
4622 self.logger.debug(
4623 logging_text + created_tasks_info[task] + ": Done"
4624 )
4625 stage[1] = "{}/{}.".format(num_done, num_tasks)
4626 if new_error:
4627 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4628 if nsr_id: # update also nsr
4629 self.update_db_2(
4630 "nsrs",
4631 nsr_id,
4632 {
4633 "errorDescription": "Error at: " + ", ".join(error_list),
4634 "errorDetail": ". ".join(error_detail_list),
4635 },
4636 )
4637 self._write_op_status(nslcmop_id, stage)
4638 return error_detail_list
4639
4640 @staticmethod
4641 def _map_primitive_params(primitive_desc, params, instantiation_params):
4642 """
4643 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4644 The default-value is used. If it is between < > it look for a value at instantiation_params
4645 :param primitive_desc: portion of VNFD/NSD that describes primitive
4646 :param params: Params provided by user
4647 :param instantiation_params: Instantiation params provided by user
4648 :return: a dictionary with the calculated params
4649 """
4650 calculated_params = {}
4651 for parameter in primitive_desc.get("parameter", ()):
4652 param_name = parameter["name"]
4653 if param_name in params:
4654 calculated_params[param_name] = params[param_name]
4655 elif "default-value" in parameter or "value" in parameter:
4656 if "value" in parameter:
4657 calculated_params[param_name] = parameter["value"]
4658 else:
4659 calculated_params[param_name] = parameter["default-value"]
4660 if (
4661 isinstance(calculated_params[param_name], str)
4662 and calculated_params[param_name].startswith("<")
4663 and calculated_params[param_name].endswith(">")
4664 ):
4665 if calculated_params[param_name][1:-1] in instantiation_params:
4666 calculated_params[param_name] = instantiation_params[
4667 calculated_params[param_name][1:-1]
4668 ]
4669 else:
4670 raise LcmException(
4671 "Parameter {} needed to execute primitive {} not provided".format(
4672 calculated_params[param_name], primitive_desc["name"]
4673 )
4674 )
4675 else:
4676 raise LcmException(
4677 "Parameter {} needed to execute primitive {} not provided".format(
4678 param_name, primitive_desc["name"]
4679 )
4680 )
4681
4682 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4683 calculated_params[param_name] = yaml.safe_dump(
4684 calculated_params[param_name], default_flow_style=True, width=256
4685 )
4686 elif isinstance(calculated_params[param_name], str) and calculated_params[
4687 param_name
4688 ].startswith("!!yaml "):
4689 calculated_params[param_name] = calculated_params[param_name][7:]
4690 if parameter.get("data-type") == "INTEGER":
4691 try:
4692 calculated_params[param_name] = int(calculated_params[param_name])
4693 except ValueError: # error converting string to int
4694 raise LcmException(
4695 "Parameter {} of primitive {} must be integer".format(
4696 param_name, primitive_desc["name"]
4697 )
4698 )
4699 elif parameter.get("data-type") == "BOOLEAN":
4700 calculated_params[param_name] = not (
4701 (str(calculated_params[param_name])).lower() == "false"
4702 )
4703
4704 # add always ns_config_info if primitive name is config
4705 if primitive_desc["name"] == "config":
4706 if "ns_config_info" in instantiation_params:
4707 calculated_params["ns_config_info"] = instantiation_params[
4708 "ns_config_info"
4709 ]
4710 return calculated_params
4711
4712 def _look_for_deployed_vca(
4713 self,
4714 deployed_vca,
4715 member_vnf_index,
4716 vdu_id,
4717 vdu_count_index,
4718 kdu_name=None,
4719 ee_descriptor_id=None,
4720 ):
4721 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4722 for vca in deployed_vca:
4723 if not vca:
4724 continue
4725 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4726 continue
4727 if (
4728 vdu_count_index is not None
4729 and vdu_count_index != vca["vdu_count_index"]
4730 ):
4731 continue
4732 if kdu_name and kdu_name != vca["kdu_name"]:
4733 continue
4734 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4735 continue
4736 break
4737 else:
4738 # vca_deployed not found
4739 raise LcmException(
4740 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4741 " is not deployed".format(
4742 member_vnf_index,
4743 vdu_id,
4744 vdu_count_index,
4745 kdu_name,
4746 ee_descriptor_id,
4747 )
4748 )
4749 # get ee_id
4750 ee_id = vca.get("ee_id")
4751 vca_type = vca.get(
4752 "type", "lxc_proxy_charm"
4753 ) # default value for backward compatibility - proxy charm
4754 if not ee_id:
4755 raise LcmException(
4756 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4757 "execution environment".format(
4758 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4759 )
4760 )
4761 return ee_id, vca_type
4762
4763 async def _ns_execute_primitive(
4764 self,
4765 ee_id,
4766 primitive,
4767 primitive_params,
4768 retries=0,
4769 retries_interval=30,
4770 timeout=None,
4771 vca_type=None,
4772 db_dict=None,
4773 vca_id: str = None,
4774 ) -> (str, str):
4775 try:
4776 if primitive == "config":
4777 primitive_params = {"params": primitive_params}
4778
4779 vca_type = vca_type or "lxc_proxy_charm"
4780
4781 while retries >= 0:
4782 try:
4783 output = await asyncio.wait_for(
4784 self.vca_map[vca_type].exec_primitive(
4785 ee_id=ee_id,
4786 primitive_name=primitive,
4787 params_dict=primitive_params,
4788 progress_timeout=self.timeout_progress_primitive,
4789 total_timeout=self.timeout_primitive,
4790 db_dict=db_dict,
4791 vca_id=vca_id,
4792 vca_type=vca_type,
4793 ),
4794 timeout=timeout or self.timeout_primitive,
4795 )
4796 # execution was OK
4797 break
4798 except asyncio.CancelledError:
4799 raise
4800 except Exception as e: # asyncio.TimeoutError
4801 if isinstance(e, asyncio.TimeoutError):
4802 e = "Timeout"
4803 retries -= 1
4804 if retries >= 0:
4805 self.logger.debug(
4806 "Error executing action {} on {} -> {}".format(
4807 primitive, ee_id, e
4808 )
4809 )
4810 # wait and retry
4811 await asyncio.sleep(retries_interval, loop=self.loop)
4812 else:
4813 return "FAILED", str(e)
4814
4815 return "COMPLETED", output
4816
4817 except (LcmException, asyncio.CancelledError):
4818 raise
4819 except Exception as e:
4820 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4821
4822 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4823 """
4824 Updating the vca_status with latest juju information in nsrs record
4825 :param: nsr_id: Id of the nsr
4826 :param: nslcmop_id: Id of the nslcmop
4827 :return: None
4828 """
4829
4830 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4831 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4832 vca_id = self.get_vca_id({}, db_nsr)
4833 if db_nsr["_admin"]["deployed"]["K8s"]:
4834 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4835 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4836 await self._on_update_k8s_db(
4837 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4838 )
4839 else:
4840 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4841 table, filter = "nsrs", {"_id": nsr_id}
4842 path = "_admin.deployed.VCA.{}.".format(vca_index)
4843 await self._on_update_n2vc_db(table, filter, path, {})
4844
4845 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4846 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4847
4848 async def action(self, nsr_id, nslcmop_id):
4849 # Try to lock HA task here
4850 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4851 if not task_is_locked_by_me:
4852 return
4853
4854 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4855 self.logger.debug(logging_text + "Enter")
4856 # get all needed from database
4857 db_nsr = None
4858 db_nslcmop = None
4859 db_nsr_update = {}
4860 db_nslcmop_update = {}
4861 nslcmop_operation_state = None
4862 error_description_nslcmop = None
4863 exc = None
4864 try:
4865 # wait for any previous tasks in process
4866 step = "Waiting for previous operations to terminate"
4867 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4868
4869 self._write_ns_status(
4870 nsr_id=nsr_id,
4871 ns_state=None,
4872 current_operation="RUNNING ACTION",
4873 current_operation_id=nslcmop_id,
4874 )
4875
4876 step = "Getting information from database"
4877 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4878 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4879 if db_nslcmop["operationParams"].get("primitive_params"):
4880 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4881 db_nslcmop["operationParams"]["primitive_params"]
4882 )
4883
4884 nsr_deployed = db_nsr["_admin"].get("deployed")
4885 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4886 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4887 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4888 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4889 primitive = db_nslcmop["operationParams"]["primitive"]
4890 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4891 timeout_ns_action = db_nslcmop["operationParams"].get(
4892 "timeout_ns_action", self.timeout_primitive
4893 )
4894
4895 if vnf_index:
4896 step = "Getting vnfr from database"
4897 db_vnfr = self.db.get_one(
4898 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4899 )
4900 if db_vnfr.get("kdur"):
4901 kdur_list = []
4902 for kdur in db_vnfr["kdur"]:
4903 if kdur.get("additionalParams"):
4904 kdur["additionalParams"] = json.loads(kdur["additionalParams"])
4905 kdur_list.append(kdur)
4906 db_vnfr["kdur"] = kdur_list
4907 step = "Getting vnfd from database"
4908 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4909
4910 # Sync filesystem before running a primitive
4911 self.fs.sync(db_vnfr["vnfd-id"])
4912 else:
4913 step = "Getting nsd from database"
4914 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4915
4916 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4917 # for backward compatibility
4918 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4919 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4920 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4921 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4922
4923 # look for primitive
4924 config_primitive_desc = descriptor_configuration = None
4925 if vdu_id:
4926 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4927 elif kdu_name:
4928 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4929 elif vnf_index:
4930 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4931 else:
4932 descriptor_configuration = db_nsd.get("ns-configuration")
4933
4934 if descriptor_configuration and descriptor_configuration.get(
4935 "config-primitive"
4936 ):
4937 for config_primitive in descriptor_configuration["config-primitive"]:
4938 if config_primitive["name"] == primitive:
4939 config_primitive_desc = config_primitive
4940 break
4941
4942 if not config_primitive_desc:
4943 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4944 raise LcmException(
4945 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4946 primitive
4947 )
4948 )
4949 primitive_name = primitive
4950 ee_descriptor_id = None
4951 else:
4952 primitive_name = config_primitive_desc.get(
4953 "execution-environment-primitive", primitive
4954 )
4955 ee_descriptor_id = config_primitive_desc.get(
4956 "execution-environment-ref"
4957 )
4958
4959 if vnf_index:
4960 if vdu_id:
4961 vdur = next(
4962 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4963 )
4964 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4965 elif kdu_name:
4966 kdur = next(
4967 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4968 )
4969 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4970 else:
4971 desc_params = parse_yaml_strings(
4972 db_vnfr.get("additionalParamsForVnf")
4973 )
4974 else:
4975 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4976 if kdu_name and get_configuration(db_vnfd, kdu_name):
4977 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4978 actions = set()
4979 for primitive in kdu_configuration.get("initial-config-primitive", []):
4980 actions.add(primitive["name"])
4981 for primitive in kdu_configuration.get("config-primitive", []):
4982 actions.add(primitive["name"])
4983 kdu_action = True if primitive_name in actions else False
4984
4985 # TODO check if ns is in a proper status
4986 if kdu_name and (
4987 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4988 ):
4989 # kdur and desc_params already set from before
4990 if primitive_params:
4991 desc_params.update(primitive_params)
4992 # TODO Check if we will need something at vnf level
4993 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4994 if (
4995 kdu_name == kdu["kdu-name"]
4996 and kdu["member-vnf-index"] == vnf_index
4997 ):
4998 break
4999 else:
5000 raise LcmException(
5001 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5002 )
5003
5004 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5005 msg = "unknown k8scluster-type '{}'".format(
5006 kdu.get("k8scluster-type")
5007 )
5008 raise LcmException(msg)
5009
5010 db_dict = {
5011 "collection": "nsrs",
5012 "filter": {"_id": nsr_id},
5013 "path": "_admin.deployed.K8s.{}".format(index),
5014 }
5015 self.logger.debug(
5016 logging_text
5017 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5018 )
5019 step = "Executing kdu {}".format(primitive_name)
5020 if primitive_name == "upgrade":
5021 if desc_params.get("kdu_model"):
5022 kdu_model = desc_params.get("kdu_model")
5023 del desc_params["kdu_model"]
5024 else:
5025 kdu_model = kdu.get("kdu-model")
5026 parts = kdu_model.split(sep=":")
5027 if len(parts) == 2:
5028 kdu_model = parts[0]
5029
5030 detailed_status = await asyncio.wait_for(
5031 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5032 cluster_uuid=kdu.get("k8scluster-uuid"),
5033 kdu_instance=kdu.get("kdu-instance"),
5034 atomic=True,
5035 kdu_model=kdu_model,
5036 params=desc_params,
5037 db_dict=db_dict,
5038 timeout=timeout_ns_action,
5039 ),
5040 timeout=timeout_ns_action + 10,
5041 )
5042 self.logger.debug(
5043 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5044 )
5045 elif primitive_name == "rollback":
5046 detailed_status = await asyncio.wait_for(
5047 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5048 cluster_uuid=kdu.get("k8scluster-uuid"),
5049 kdu_instance=kdu.get("kdu-instance"),
5050 db_dict=db_dict,
5051 ),
5052 timeout=timeout_ns_action,
5053 )
5054 elif primitive_name == "status":
5055 detailed_status = await asyncio.wait_for(
5056 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5057 cluster_uuid=kdu.get("k8scluster-uuid"),
5058 kdu_instance=kdu.get("kdu-instance"),
5059 vca_id=vca_id,
5060 ),
5061 timeout=timeout_ns_action,
5062 )
5063 else:
5064 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5065 kdu["kdu-name"], nsr_id
5066 )
5067 params = self._map_primitive_params(
5068 config_primitive_desc, primitive_params, desc_params
5069 )
5070
5071 detailed_status = await asyncio.wait_for(
5072 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5073 cluster_uuid=kdu.get("k8scluster-uuid"),
5074 kdu_instance=kdu_instance,
5075 primitive_name=primitive_name,
5076 params=params,
5077 db_dict=db_dict,
5078 timeout=timeout_ns_action,
5079 vca_id=vca_id,
5080 ),
5081 timeout=timeout_ns_action,
5082 )
5083
5084 if detailed_status:
5085 nslcmop_operation_state = "COMPLETED"
5086 else:
5087 detailed_status = ""
5088 nslcmop_operation_state = "FAILED"
5089 else:
5090 ee_id, vca_type = self._look_for_deployed_vca(
5091 nsr_deployed["VCA"],
5092 member_vnf_index=vnf_index,
5093 vdu_id=vdu_id,
5094 vdu_count_index=vdu_count_index,
5095 ee_descriptor_id=ee_descriptor_id,
5096 )
5097 for vca_index, vca_deployed in enumerate(
5098 db_nsr["_admin"]["deployed"]["VCA"]
5099 ):
5100 if vca_deployed.get("member-vnf-index") == vnf_index:
5101 db_dict = {
5102 "collection": "nsrs",
5103 "filter": {"_id": nsr_id},
5104 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5105 }
5106 break
5107 (
5108 nslcmop_operation_state,
5109 detailed_status,
5110 ) = await self._ns_execute_primitive(
5111 ee_id,
5112 primitive=primitive_name,
5113 primitive_params=self._map_primitive_params(
5114 config_primitive_desc, primitive_params, desc_params
5115 ),
5116 timeout=timeout_ns_action,
5117 vca_type=vca_type,
5118 db_dict=db_dict,
5119 vca_id=vca_id,
5120 )
5121
5122 db_nslcmop_update["detailed-status"] = detailed_status
5123 error_description_nslcmop = (
5124 detailed_status if nslcmop_operation_state == "FAILED" else ""
5125 )
5126 self.logger.debug(
5127 logging_text
5128 + " task Done with result {} {}".format(
5129 nslcmop_operation_state, detailed_status
5130 )
5131 )
5132 return # database update is called inside finally
5133
5134 except (DbException, LcmException, N2VCException, K8sException) as e:
5135 self.logger.error(logging_text + "Exit Exception {}".format(e))
5136 exc = e
5137 except asyncio.CancelledError:
5138 self.logger.error(
5139 logging_text + "Cancelled Exception while '{}'".format(step)
5140 )
5141 exc = "Operation was cancelled"
5142 except asyncio.TimeoutError:
5143 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5144 exc = "Timeout"
5145 except Exception as e:
5146 exc = traceback.format_exc()
5147 self.logger.critical(
5148 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5149 exc_info=True,
5150 )
5151 finally:
5152 if exc:
5153 db_nslcmop_update[
5154 "detailed-status"
5155 ] = (
5156 detailed_status
5157 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5158 nslcmop_operation_state = "FAILED"
5159 if db_nsr:
5160 self._write_ns_status(
5161 nsr_id=nsr_id,
5162 ns_state=db_nsr[
5163 "nsState"
5164 ], # TODO check if degraded. For the moment use previous status
5165 current_operation="IDLE",
5166 current_operation_id=None,
5167 # error_description=error_description_nsr,
5168 # error_detail=error_detail,
5169 other_update=db_nsr_update,
5170 )
5171
5172 self._write_op_status(
5173 op_id=nslcmop_id,
5174 stage="",
5175 error_message=error_description_nslcmop,
5176 operation_state=nslcmop_operation_state,
5177 other_update=db_nslcmop_update,
5178 )
5179
5180 if nslcmop_operation_state:
5181 try:
5182 await self.msg.aiowrite(
5183 "ns",
5184 "actioned",
5185 {
5186 "nsr_id": nsr_id,
5187 "nslcmop_id": nslcmop_id,
5188 "operationState": nslcmop_operation_state,
5189 },
5190 loop=self.loop,
5191 )
5192 except Exception as e:
5193 self.logger.error(
5194 logging_text + "kafka_write notification Exception {}".format(e)
5195 )
5196 self.logger.debug(logging_text + "Exit")
5197 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5198 return nslcmop_operation_state, detailed_status
5199
5200 async def scale(self, nsr_id, nslcmop_id):
5201 # Try to lock HA task here
5202 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5203 if not task_is_locked_by_me:
5204 return
5205
5206 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5207 stage = ["", "", ""]
5208 tasks_dict_info = {}
5209 # ^ stage, step, VIM progress
5210 self.logger.debug(logging_text + "Enter")
5211 # get all needed from database
5212 db_nsr = None
5213 db_nslcmop_update = {}
5214 db_nsr_update = {}
5215 exc = None
5216 # in case of error, indicates what part of scale was failed to put nsr at error status
5217 scale_process = None
5218 old_operational_status = ""
5219 old_config_status = ""
5220 nsi_id = None
5221 try:
5222 # wait for any previous tasks in process
5223 step = "Waiting for previous operations to terminate"
5224 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5225 self._write_ns_status(
5226 nsr_id=nsr_id,
5227 ns_state=None,
5228 current_operation="SCALING",
5229 current_operation_id=nslcmop_id,
5230 )
5231
5232 step = "Getting nslcmop from database"
5233 self.logger.debug(
5234 step + " after having waited for previous tasks to be completed"
5235 )
5236 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5237
5238 step = "Getting nsr from database"
5239 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5240 old_operational_status = db_nsr["operational-status"]
5241 old_config_status = db_nsr["config-status"]
5242
5243 step = "Parsing scaling parameters"
5244 db_nsr_update["operational-status"] = "scaling"
5245 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5246 nsr_deployed = db_nsr["_admin"].get("deployed")
5247
5248 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5249 "scaleByStepData"
5250 ]["member-vnf-index"]
5251 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5252 "scaleByStepData"
5253 ]["scaling-group-descriptor"]
5254 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5255 # for backward compatibility
5256 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5257 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5258 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5259 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5260
5261 step = "Getting vnfr from database"
5262 db_vnfr = self.db.get_one(
5263 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5264 )
5265
5266 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5267
5268 step = "Getting vnfd from database"
5269 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5270
5271 base_folder = db_vnfd["_admin"]["storage"]
5272
5273 step = "Getting scaling-group-descriptor"
5274 scaling_descriptor = find_in_list(
5275 get_scaling_aspect(db_vnfd),
5276 lambda scale_desc: scale_desc["name"] == scaling_group,
5277 )
5278 if not scaling_descriptor:
5279 raise LcmException(
5280 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5281 "at vnfd:scaling-group-descriptor".format(scaling_group)
5282 )
5283
5284 step = "Sending scale order to VIM"
5285 # TODO check if ns is in a proper status
5286 nb_scale_op = 0
5287 if not db_nsr["_admin"].get("scaling-group"):
5288 self.update_db_2(
5289 "nsrs",
5290 nsr_id,
5291 {
5292 "_admin.scaling-group": [
5293 {"name": scaling_group, "nb-scale-op": 0}
5294 ]
5295 },
5296 )
5297 admin_scale_index = 0
5298 else:
5299 for admin_scale_index, admin_scale_info in enumerate(
5300 db_nsr["_admin"]["scaling-group"]
5301 ):
5302 if admin_scale_info["name"] == scaling_group:
5303 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5304 break
5305 else: # not found, set index one plus last element and add new entry with the name
5306 admin_scale_index += 1
5307 db_nsr_update[
5308 "_admin.scaling-group.{}.name".format(admin_scale_index)
5309 ] = scaling_group
5310
5311 vca_scaling_info = []
5312 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5313 if scaling_type == "SCALE_OUT":
5314 if "aspect-delta-details" not in scaling_descriptor:
5315 raise LcmException(
5316 "Aspect delta details not fount in scaling descriptor {}".format(
5317 scaling_descriptor["name"]
5318 )
5319 )
5320 # count if max-instance-count is reached
5321 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5322
5323 scaling_info["scaling_direction"] = "OUT"
5324 scaling_info["vdu-create"] = {}
5325 scaling_info["kdu-create"] = {}
5326 for delta in deltas:
5327 for vdu_delta in delta.get("vdu-delta", {}):
5328 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5329 # vdu_index also provides the number of instance of the targeted vdu
5330 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5331 cloud_init_text = self._get_vdu_cloud_init_content(
5332 vdud, db_vnfd
5333 )
5334 if cloud_init_text:
5335 additional_params = (
5336 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5337 or {}
5338 )
5339 cloud_init_list = []
5340
5341 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5342 max_instance_count = 10
5343 if vdu_profile and "max-number-of-instances" in vdu_profile:
5344 max_instance_count = vdu_profile.get(
5345 "max-number-of-instances", 10
5346 )
5347
5348 default_instance_num = get_number_of_instances(
5349 db_vnfd, vdud["id"]
5350 )
5351 instances_number = vdu_delta.get("number-of-instances", 1)
5352 nb_scale_op += instances_number
5353
5354 new_instance_count = nb_scale_op + default_instance_num
5355 # Control if new count is over max and vdu count is less than max.
5356 # Then assign new instance count
5357 if new_instance_count > max_instance_count > vdu_count:
5358 instances_number = new_instance_count - max_instance_count
5359 else:
5360 instances_number = instances_number
5361
5362 if new_instance_count > max_instance_count:
5363 raise LcmException(
5364 "reached the limit of {} (max-instance-count) "
5365 "scaling-out operations for the "
5366 "scaling-group-descriptor '{}'".format(
5367 nb_scale_op, scaling_group
5368 )
5369 )
5370 for x in range(vdu_delta.get("number-of-instances", 1)):
5371 if cloud_init_text:
5372 # TODO Information of its own ip is not available because db_vnfr is not updated.
5373 additional_params["OSM"] = get_osm_params(
5374 db_vnfr, vdu_delta["id"], vdu_index + x
5375 )
5376 cloud_init_list.append(
5377 self._parse_cloud_init(
5378 cloud_init_text,
5379 additional_params,
5380 db_vnfd["id"],
5381 vdud["id"],
5382 )
5383 )
5384 vca_scaling_info.append(
5385 {
5386 "osm_vdu_id": vdu_delta["id"],
5387 "member-vnf-index": vnf_index,
5388 "type": "create",
5389 "vdu_index": vdu_index + x,
5390 }
5391 )
5392 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5393 for kdu_delta in delta.get("kdu-resource-delta", {}):
5394 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5395 kdu_name = kdu_profile["kdu-name"]
5396 resource_name = kdu_profile.get("resource-name", "")
5397
5398 # Might have different kdus in the same delta
5399 # Should have list for each kdu
5400 if not scaling_info["kdu-create"].get(kdu_name, None):
5401 scaling_info["kdu-create"][kdu_name] = []
5402
5403 kdur = get_kdur(db_vnfr, kdu_name)
5404 if kdur.get("helm-chart"):
5405 k8s_cluster_type = "helm-chart-v3"
5406 self.logger.debug("kdur: {}".format(kdur))
5407 if (
5408 kdur.get("helm-version")
5409 and kdur.get("helm-version") == "v2"
5410 ):
5411 k8s_cluster_type = "helm-chart"
5412 elif kdur.get("juju-bundle"):
5413 k8s_cluster_type = "juju-bundle"
5414 else:
5415 raise LcmException(
5416 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5417 "juju-bundle. Maybe an old NBI version is running".format(
5418 db_vnfr["member-vnf-index-ref"], kdu_name
5419 )
5420 )
5421
5422 max_instance_count = 10
5423 if kdu_profile and "max-number-of-instances" in kdu_profile:
5424 max_instance_count = kdu_profile.get(
5425 "max-number-of-instances", 10
5426 )
5427
5428 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5429 deployed_kdu, _ = get_deployed_kdu(
5430 nsr_deployed, kdu_name, vnf_index
5431 )
5432 if deployed_kdu is None:
5433 raise LcmException(
5434 "KDU '{}' for vnf '{}' not deployed".format(
5435 kdu_name, vnf_index
5436 )
5437 )
5438 kdu_instance = deployed_kdu.get("kdu-instance")
5439 instance_num = await self.k8scluster_map[
5440 k8s_cluster_type
5441 ].get_scale_count(
5442 resource_name,
5443 kdu_instance,
5444 vca_id=vca_id,
5445 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
5446 kdu_model=deployed_kdu.get("kdu-model"),
5447 )
5448 kdu_replica_count = instance_num + kdu_delta.get(
5449 "number-of-instances", 1
5450 )
5451
5452 # Control if new count is over max and instance_num is less than max.
5453 # Then assign max instance number to kdu replica count
5454 if kdu_replica_count > max_instance_count > instance_num:
5455 kdu_replica_count = max_instance_count
5456 if kdu_replica_count > max_instance_count:
5457 raise LcmException(
5458 "reached the limit of {} (max-instance-count) "
5459 "scaling-out operations for the "
5460 "scaling-group-descriptor '{}'".format(
5461 instance_num, scaling_group
5462 )
5463 )
5464
5465 for x in range(kdu_delta.get("number-of-instances", 1)):
5466 vca_scaling_info.append(
5467 {
5468 "osm_kdu_id": kdu_name,
5469 "member-vnf-index": vnf_index,
5470 "type": "create",
5471 "kdu_index": instance_num + x - 1,
5472 }
5473 )
5474 scaling_info["kdu-create"][kdu_name].append(
5475 {
5476 "member-vnf-index": vnf_index,
5477 "type": "create",
5478 "k8s-cluster-type": k8s_cluster_type,
5479 "resource-name": resource_name,
5480 "scale": kdu_replica_count,
5481 }
5482 )
5483 elif scaling_type == "SCALE_IN":
5484 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5485
5486 scaling_info["scaling_direction"] = "IN"
5487 scaling_info["vdu-delete"] = {}
5488 scaling_info["kdu-delete"] = {}
5489
5490 for delta in deltas:
5491 for vdu_delta in delta.get("vdu-delta", {}):
5492 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5493 min_instance_count = 0
5494 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5495 if vdu_profile and "min-number-of-instances" in vdu_profile:
5496 min_instance_count = vdu_profile["min-number-of-instances"]
5497
5498 default_instance_num = get_number_of_instances(
5499 db_vnfd, vdu_delta["id"]
5500 )
5501 instance_num = vdu_delta.get("number-of-instances", 1)
5502 nb_scale_op -= instance_num
5503
5504 new_instance_count = nb_scale_op + default_instance_num
5505
5506 if new_instance_count < min_instance_count < vdu_count:
5507 instances_number = min_instance_count - new_instance_count
5508 else:
5509 instances_number = instance_num
5510
5511 if new_instance_count < min_instance_count:
5512 raise LcmException(
5513 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5514 "scaling-group-descriptor '{}'".format(
5515 nb_scale_op, scaling_group
5516 )
5517 )
5518 for x in range(vdu_delta.get("number-of-instances", 1)):
5519 vca_scaling_info.append(
5520 {
5521 "osm_vdu_id": vdu_delta["id"],
5522 "member-vnf-index": vnf_index,
5523 "type": "delete",
5524 "vdu_index": vdu_index - 1 - x,
5525 }
5526 )
5527 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5528 for kdu_delta in delta.get("kdu-resource-delta", {}):
5529 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5530 kdu_name = kdu_profile["kdu-name"]
5531 resource_name = kdu_profile.get("resource-name", "")
5532
5533 if not scaling_info["kdu-delete"].get(kdu_name, None):
5534 scaling_info["kdu-delete"][kdu_name] = []
5535
5536 kdur = get_kdur(db_vnfr, kdu_name)
5537 if kdur.get("helm-chart"):
5538 k8s_cluster_type = "helm-chart-v3"
5539 self.logger.debug("kdur: {}".format(kdur))
5540 if (
5541 kdur.get("helm-version")
5542 and kdur.get("helm-version") == "v2"
5543 ):
5544 k8s_cluster_type = "helm-chart"
5545 elif kdur.get("juju-bundle"):
5546 k8s_cluster_type = "juju-bundle"
5547 else:
5548 raise LcmException(
5549 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5550 "juju-bundle. Maybe an old NBI version is running".format(
5551 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5552 )
5553 )
5554
5555 min_instance_count = 0
5556 if kdu_profile and "min-number-of-instances" in kdu_profile:
5557 min_instance_count = kdu_profile["min-number-of-instances"]
5558
5559 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5560 deployed_kdu, _ = get_deployed_kdu(
5561 nsr_deployed, kdu_name, vnf_index
5562 )
5563 if deployed_kdu is None:
5564 raise LcmException(
5565 "KDU '{}' for vnf '{}' not deployed".format(
5566 kdu_name, vnf_index
5567 )
5568 )
5569 kdu_instance = deployed_kdu.get("kdu-instance")
5570 instance_num = await self.k8scluster_map[
5571 k8s_cluster_type
5572 ].get_scale_count(
5573 resource_name,
5574 kdu_instance,
5575 vca_id=vca_id,
5576 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
5577 kdu_model=deployed_kdu.get("kdu-model"),
5578 )
5579 kdu_replica_count = instance_num - kdu_delta.get(
5580 "number-of-instances", 1
5581 )
5582
5583 if kdu_replica_count < min_instance_count < instance_num:
5584 kdu_replica_count = min_instance_count
5585 if kdu_replica_count < min_instance_count:
5586 raise LcmException(
5587 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5588 "scaling-group-descriptor '{}'".format(
5589 instance_num, scaling_group
5590 )
5591 )
5592
5593 for x in range(kdu_delta.get("number-of-instances", 1)):
5594 vca_scaling_info.append(
5595 {
5596 "osm_kdu_id": kdu_name,
5597 "member-vnf-index": vnf_index,
5598 "type": "delete",
5599 "kdu_index": instance_num - x - 1,
5600 }
5601 )
5602 scaling_info["kdu-delete"][kdu_name].append(
5603 {
5604 "member-vnf-index": vnf_index,
5605 "type": "delete",
5606 "k8s-cluster-type": k8s_cluster_type,
5607 "resource-name": resource_name,
5608 "scale": kdu_replica_count,
5609 }
5610 )
5611
5612 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5613 vdu_delete = copy(scaling_info.get("vdu-delete"))
5614 if scaling_info["scaling_direction"] == "IN":
5615 for vdur in reversed(db_vnfr["vdur"]):
5616 if vdu_delete.get(vdur["vdu-id-ref"]):
5617 vdu_delete[vdur["vdu-id-ref"]] -= 1
5618 scaling_info["vdu"].append(
5619 {
5620 "name": vdur.get("name") or vdur.get("vdu-name"),
5621 "vdu_id": vdur["vdu-id-ref"],
5622 "interface": [],
5623 }
5624 )
5625 for interface in vdur["interfaces"]:
5626 scaling_info["vdu"][-1]["interface"].append(
5627 {
5628 "name": interface["name"],
5629 "ip_address": interface["ip-address"],
5630 "mac_address": interface.get("mac-address"),
5631 }
5632 )
5633 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5634
5635 # PRE-SCALE BEGIN
5636 step = "Executing pre-scale vnf-config-primitive"
5637 if scaling_descriptor.get("scaling-config-action"):
5638 for scaling_config_action in scaling_descriptor[
5639 "scaling-config-action"
5640 ]:
5641 if (
5642 scaling_config_action.get("trigger") == "pre-scale-in"
5643 and scaling_type == "SCALE_IN"
5644 ) or (
5645 scaling_config_action.get("trigger") == "pre-scale-out"
5646 and scaling_type == "SCALE_OUT"
5647 ):
5648 vnf_config_primitive = scaling_config_action[
5649 "vnf-config-primitive-name-ref"
5650 ]
5651 step = db_nslcmop_update[
5652 "detailed-status"
5653 ] = "executing pre-scale scaling-config-action '{}'".format(
5654 vnf_config_primitive
5655 )
5656
5657 # look for primitive
5658 for config_primitive in (
5659 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5660 ).get("config-primitive", ()):
5661 if config_primitive["name"] == vnf_config_primitive:
5662 break
5663 else:
5664 raise LcmException(
5665 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5666 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5667 "primitive".format(scaling_group, vnf_config_primitive)
5668 )
5669
5670 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5671 if db_vnfr.get("additionalParamsForVnf"):
5672 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5673
5674 scale_process = "VCA"
5675 db_nsr_update["config-status"] = "configuring pre-scaling"
5676 primitive_params = self._map_primitive_params(
5677 config_primitive, {}, vnfr_params
5678 )
5679
5680 # Pre-scale retry check: Check if this sub-operation has been executed before
5681 op_index = self._check_or_add_scale_suboperation(
5682 db_nslcmop,
5683 vnf_index,
5684 vnf_config_primitive,
5685 primitive_params,
5686 "PRE-SCALE",
5687 )
5688 if op_index == self.SUBOPERATION_STATUS_SKIP:
5689 # Skip sub-operation
5690 result = "COMPLETED"
5691 result_detail = "Done"
5692 self.logger.debug(
5693 logging_text
5694 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5695 vnf_config_primitive, result, result_detail
5696 )
5697 )
5698 else:
5699 if op_index == self.SUBOPERATION_STATUS_NEW:
5700 # New sub-operation: Get index of this sub-operation
5701 op_index = (
5702 len(db_nslcmop.get("_admin", {}).get("operations"))
5703 - 1
5704 )
5705 self.logger.debug(
5706 logging_text
5707 + "vnf_config_primitive={} New sub-operation".format(
5708 vnf_config_primitive
5709 )
5710 )
5711 else:
5712 # retry: Get registered params for this existing sub-operation
5713 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5714 op_index
5715 ]
5716 vnf_index = op.get("member_vnf_index")
5717 vnf_config_primitive = op.get("primitive")
5718 primitive_params = op.get("primitive_params")
5719 self.logger.debug(
5720 logging_text
5721 + "vnf_config_primitive={} Sub-operation retry".format(
5722 vnf_config_primitive
5723 )
5724 )
5725 # Execute the primitive, either with new (first-time) or registered (reintent) args
5726 ee_descriptor_id = config_primitive.get(
5727 "execution-environment-ref"
5728 )
5729 primitive_name = config_primitive.get(
5730 "execution-environment-primitive", vnf_config_primitive
5731 )
5732 ee_id, vca_type = self._look_for_deployed_vca(
5733 nsr_deployed["VCA"],
5734 member_vnf_index=vnf_index,
5735 vdu_id=None,
5736 vdu_count_index=None,
5737 ee_descriptor_id=ee_descriptor_id,
5738 )
5739 result, result_detail = await self._ns_execute_primitive(
5740 ee_id,
5741 primitive_name,
5742 primitive_params,
5743 vca_type=vca_type,
5744 vca_id=vca_id,
5745 )
5746 self.logger.debug(
5747 logging_text
5748 + "vnf_config_primitive={} Done with result {} {}".format(
5749 vnf_config_primitive, result, result_detail
5750 )
5751 )
5752 # Update operationState = COMPLETED | FAILED
5753 self._update_suboperation_status(
5754 db_nslcmop, op_index, result, result_detail
5755 )
5756
5757 if result == "FAILED":
5758 raise LcmException(result_detail)
5759 db_nsr_update["config-status"] = old_config_status
5760 scale_process = None
5761 # PRE-SCALE END
5762
5763 db_nsr_update[
5764 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5765 ] = nb_scale_op
5766 db_nsr_update[
5767 "_admin.scaling-group.{}.time".format(admin_scale_index)
5768 ] = time()
5769
5770 # SCALE-IN VCA - BEGIN
5771 if vca_scaling_info:
5772 step = db_nslcmop_update[
5773 "detailed-status"
5774 ] = "Deleting the execution environments"
5775 scale_process = "VCA"
5776 for vca_info in vca_scaling_info:
5777 if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
5778 member_vnf_index = str(vca_info["member-vnf-index"])
5779 self.logger.debug(
5780 logging_text + "vdu info: {}".format(vca_info)
5781 )
5782 if vca_info.get("osm_vdu_id"):
5783 vdu_id = vca_info["osm_vdu_id"]
5784 vdu_index = int(vca_info["vdu_index"])
5785 stage[
5786 1
5787 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5788 member_vnf_index, vdu_id, vdu_index
5789 )
5790 stage[2] = step = "Scaling in VCA"
5791 self._write_op_status(op_id=nslcmop_id, stage=stage)
5792 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5793 config_update = db_nsr["configurationStatus"]
5794 for vca_index, vca in enumerate(vca_update):
5795 if (
5796 (vca or vca.get("ee_id"))
5797 and vca["member-vnf-index"] == member_vnf_index
5798 and vca["vdu_count_index"] == vdu_index
5799 ):
5800 if vca.get("vdu_id"):
5801 config_descriptor = get_configuration(
5802 db_vnfd, vca.get("vdu_id")
5803 )
5804 elif vca.get("kdu_name"):
5805 config_descriptor = get_configuration(
5806 db_vnfd, vca.get("kdu_name")
5807 )
5808 else:
5809 config_descriptor = get_configuration(
5810 db_vnfd, db_vnfd["id"]
5811 )
5812 operation_params = (
5813 db_nslcmop.get("operationParams") or {}
5814 )
5815 exec_terminate_primitives = not operation_params.get(
5816 "skip_terminate_primitives"
5817 ) and vca.get("needed_terminate")
5818 task = asyncio.ensure_future(
5819 asyncio.wait_for(
5820 self.destroy_N2VC(
5821 logging_text,
5822 db_nslcmop,
5823 vca,
5824 config_descriptor,
5825 vca_index,
5826 destroy_ee=True,
5827 exec_primitives=exec_terminate_primitives,
5828 scaling_in=True,
5829 vca_id=vca_id,
5830 ),
5831 timeout=self.timeout_charm_delete,
5832 )
5833 )
5834 tasks_dict_info[task] = "Terminating VCA {}".format(
5835 vca.get("ee_id")
5836 )
5837 del vca_update[vca_index]
5838 del config_update[vca_index]
5839 # wait for pending tasks of terminate primitives
5840 if tasks_dict_info:
5841 self.logger.debug(
5842 logging_text
5843 + "Waiting for tasks {}".format(
5844 list(tasks_dict_info.keys())
5845 )
5846 )
5847 error_list = await self._wait_for_tasks(
5848 logging_text,
5849 tasks_dict_info,
5850 min(
5851 self.timeout_charm_delete, self.timeout_ns_terminate
5852 ),
5853 stage,
5854 nslcmop_id,
5855 )
5856 tasks_dict_info.clear()
5857 if error_list:
5858 raise LcmException("; ".join(error_list))
5859
5860 db_vca_and_config_update = {
5861 "_admin.deployed.VCA": vca_update,
5862 "configurationStatus": config_update,
5863 }
5864 self.update_db_2(
5865 "nsrs", db_nsr["_id"], db_vca_and_config_update
5866 )
5867 scale_process = None
5868 # SCALE-IN VCA - END
5869
5870 # SCALE RO - BEGIN
5871 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5872 scale_process = "RO"
5873 if self.ro_config.get("ng"):
5874 await self._scale_ng_ro(
5875 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5876 )
5877 scaling_info.pop("vdu-create", None)
5878 scaling_info.pop("vdu-delete", None)
5879
5880 scale_process = None
5881 # SCALE RO - END
5882
5883 # SCALE KDU - BEGIN
5884 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5885 scale_process = "KDU"
5886 await self._scale_kdu(
5887 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5888 )
5889 scaling_info.pop("kdu-create", None)
5890 scaling_info.pop("kdu-delete", None)
5891
5892 scale_process = None
5893 # SCALE KDU - END
5894
5895 if db_nsr_update:
5896 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5897
5898 # SCALE-UP VCA - BEGIN
5899 if vca_scaling_info:
5900 step = db_nslcmop_update[
5901 "detailed-status"
5902 ] = "Creating new execution environments"
5903 scale_process = "VCA"
5904 for vca_info in vca_scaling_info:
5905 if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
5906 member_vnf_index = str(vca_info["member-vnf-index"])
5907 self.logger.debug(
5908 logging_text + "vdu info: {}".format(vca_info)
5909 )
5910 vnfd_id = db_vnfr["vnfd-ref"]
5911 if vca_info.get("osm_vdu_id"):
5912 vdu_index = int(vca_info["vdu_index"])
5913 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5914 if db_vnfr.get("additionalParamsForVnf"):
5915 deploy_params.update(
5916 parse_yaml_strings(
5917 db_vnfr["additionalParamsForVnf"].copy()
5918 )
5919 )
5920 descriptor_config = get_configuration(
5921 db_vnfd, db_vnfd["id"]
5922 )
5923 if descriptor_config:
5924 vdu_id = None
5925 vdu_name = None
5926 kdu_name = None
5927 self._deploy_n2vc(
5928 logging_text=logging_text
5929 + "member_vnf_index={} ".format(member_vnf_index),
5930 db_nsr=db_nsr,
5931 db_vnfr=db_vnfr,
5932 nslcmop_id=nslcmop_id,
5933 nsr_id=nsr_id,
5934 nsi_id=nsi_id,
5935 vnfd_id=vnfd_id,
5936 vdu_id=vdu_id,
5937 kdu_name=kdu_name,
5938 member_vnf_index=member_vnf_index,
5939 vdu_index=vdu_index,
5940 vdu_name=vdu_name,
5941 deploy_params=deploy_params,
5942 descriptor_config=descriptor_config,
5943 base_folder=base_folder,
5944 task_instantiation_info=tasks_dict_info,
5945 stage=stage,
5946 )
5947 vdu_id = vca_info["osm_vdu_id"]
5948 vdur = find_in_list(
5949 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5950 )
5951 descriptor_config = get_configuration(db_vnfd, vdu_id)
5952 if vdur.get("additionalParams"):
5953 deploy_params_vdu = parse_yaml_strings(
5954 vdur["additionalParams"]
5955 )
5956 else:
5957 deploy_params_vdu = deploy_params
5958 deploy_params_vdu["OSM"] = get_osm_params(
5959 db_vnfr, vdu_id, vdu_count_index=vdu_index
5960 )
5961 if descriptor_config:
5962 vdu_name = None
5963 kdu_name = None
5964 stage[
5965 1
5966 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5967 member_vnf_index, vdu_id, vdu_index
5968 )
5969 stage[2] = step = "Scaling out VCA"
5970 self._write_op_status(op_id=nslcmop_id, stage=stage)
5971 self._deploy_n2vc(
5972 logging_text=logging_text
5973 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5974 member_vnf_index, vdu_id, vdu_index
5975 ),
5976 db_nsr=db_nsr,
5977 db_vnfr=db_vnfr,
5978 nslcmop_id=nslcmop_id,
5979 nsr_id=nsr_id,
5980 nsi_id=nsi_id,
5981 vnfd_id=vnfd_id,
5982 vdu_id=vdu_id,
5983 kdu_name=kdu_name,
5984 member_vnf_index=member_vnf_index,
5985 vdu_index=vdu_index,
5986 vdu_name=vdu_name,
5987 deploy_params=deploy_params_vdu,
5988 descriptor_config=descriptor_config,
5989 base_folder=base_folder,
5990 task_instantiation_info=tasks_dict_info,
5991 stage=stage,
5992 )
5993 # SCALE-UP VCA - END
5994 scale_process = None
5995
5996 # POST-SCALE BEGIN
5997 # execute primitive service POST-SCALING
5998 step = "Executing post-scale vnf-config-primitive"
5999 if scaling_descriptor.get("scaling-config-action"):
6000 for scaling_config_action in scaling_descriptor[
6001 "scaling-config-action"
6002 ]:
6003 if (
6004 scaling_config_action.get("trigger") == "post-scale-in"
6005 and scaling_type == "SCALE_IN"
6006 ) or (
6007 scaling_config_action.get("trigger") == "post-scale-out"
6008 and scaling_type == "SCALE_OUT"
6009 ):
6010 vnf_config_primitive = scaling_config_action[
6011 "vnf-config-primitive-name-ref"
6012 ]
6013 step = db_nslcmop_update[
6014 "detailed-status"
6015 ] = "executing post-scale scaling-config-action '{}'".format(
6016 vnf_config_primitive
6017 )
6018
6019 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6020 if db_vnfr.get("additionalParamsForVnf"):
6021 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6022
6023 # look for primitive
6024 for config_primitive in (
6025 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6026 ).get("config-primitive", ()):
6027 if config_primitive["name"] == vnf_config_primitive:
6028 break
6029 else:
6030 raise LcmException(
6031 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6032 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6033 "config-primitive".format(
6034 scaling_group, vnf_config_primitive
6035 )
6036 )
6037 scale_process = "VCA"
6038 db_nsr_update["config-status"] = "configuring post-scaling"
6039 primitive_params = self._map_primitive_params(
6040 config_primitive, {}, vnfr_params
6041 )
6042
6043 # Post-scale retry check: Check if this sub-operation has been executed before
6044 op_index = self._check_or_add_scale_suboperation(
6045 db_nslcmop,
6046 vnf_index,
6047 vnf_config_primitive,
6048 primitive_params,
6049 "POST-SCALE",
6050 )
6051 if op_index == self.SUBOPERATION_STATUS_SKIP:
6052 # Skip sub-operation
6053 result = "COMPLETED"
6054 result_detail = "Done"
6055 self.logger.debug(
6056 logging_text
6057 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6058 vnf_config_primitive, result, result_detail
6059 )
6060 )
6061 else:
6062 if op_index == self.SUBOPERATION_STATUS_NEW:
6063 # New sub-operation: Get index of this sub-operation
6064 op_index = (
6065 len(db_nslcmop.get("_admin", {}).get("operations"))
6066 - 1
6067 )
6068 self.logger.debug(
6069 logging_text
6070 + "vnf_config_primitive={} New sub-operation".format(
6071 vnf_config_primitive
6072 )
6073 )
6074 else:
6075 # retry: Get registered params for this existing sub-operation
6076 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6077 op_index
6078 ]
6079 vnf_index = op.get("member_vnf_index")
6080 vnf_config_primitive = op.get("primitive")
6081 primitive_params = op.get("primitive_params")
6082 self.logger.debug(
6083 logging_text
6084 + "vnf_config_primitive={} Sub-operation retry".format(
6085 vnf_config_primitive
6086 )
6087 )
6088 # Execute the primitive, either with new (first-time) or registered (reintent) args
6089 ee_descriptor_id = config_primitive.get(
6090 "execution-environment-ref"
6091 )
6092 primitive_name = config_primitive.get(
6093 "execution-environment-primitive", vnf_config_primitive
6094 )
6095 ee_id, vca_type = self._look_for_deployed_vca(
6096 nsr_deployed["VCA"],
6097 member_vnf_index=vnf_index,
6098 vdu_id=None,
6099 vdu_count_index=None,
6100 ee_descriptor_id=ee_descriptor_id,
6101 )
6102 result, result_detail = await self._ns_execute_primitive(
6103 ee_id,
6104 primitive_name,
6105 primitive_params,
6106 vca_type=vca_type,
6107 vca_id=vca_id,
6108 )
6109 self.logger.debug(
6110 logging_text
6111 + "vnf_config_primitive={} Done with result {} {}".format(
6112 vnf_config_primitive, result, result_detail
6113 )
6114 )
6115 # Update operationState = COMPLETED | FAILED
6116 self._update_suboperation_status(
6117 db_nslcmop, op_index, result, result_detail
6118 )
6119
6120 if result == "FAILED":
6121 raise LcmException(result_detail)
6122 db_nsr_update["config-status"] = old_config_status
6123 scale_process = None
6124 # POST-SCALE END
6125
6126 db_nsr_update[
6127 "detailed-status"
6128 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6129 db_nsr_update["operational-status"] = (
6130 "running"
6131 if old_operational_status == "failed"
6132 else old_operational_status
6133 )
6134 db_nsr_update["config-status"] = old_config_status
6135 return
6136 except (
6137 ROclient.ROClientException,
6138 DbException,
6139 LcmException,
6140 NgRoException,
6141 ) as e:
6142 self.logger.error(logging_text + "Exit Exception {}".format(e))
6143 exc = e
6144 except asyncio.CancelledError:
6145 self.logger.error(
6146 logging_text + "Cancelled Exception while '{}'".format(step)
6147 )
6148 exc = "Operation was cancelled"
6149 except Exception as e:
6150 exc = traceback.format_exc()
6151 self.logger.critical(
6152 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6153 exc_info=True,
6154 )
6155 finally:
6156 self._write_ns_status(
6157 nsr_id=nsr_id,
6158 ns_state=None,
6159 current_operation="IDLE",
6160 current_operation_id=None,
6161 )
6162 if tasks_dict_info:
6163 stage[1] = "Waiting for instantiate pending tasks."
6164 self.logger.debug(logging_text + stage[1])
6165 exc = await self._wait_for_tasks(
6166 logging_text,
6167 tasks_dict_info,
6168 self.timeout_ns_deploy,
6169 stage,
6170 nslcmop_id,
6171 nsr_id=nsr_id,
6172 )
6173 if exc:
6174 db_nslcmop_update[
6175 "detailed-status"
6176 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6177 nslcmop_operation_state = "FAILED"
6178 if db_nsr:
6179 db_nsr_update["operational-status"] = old_operational_status
6180 db_nsr_update["config-status"] = old_config_status
6181 db_nsr_update["detailed-status"] = ""
6182 if scale_process:
6183 if "VCA" in scale_process:
6184 db_nsr_update["config-status"] = "failed"
6185 if "RO" in scale_process:
6186 db_nsr_update["operational-status"] = "failed"
6187 db_nsr_update[
6188 "detailed-status"
6189 ] = "FAILED scaling nslcmop={} {}: {}".format(
6190 nslcmop_id, step, exc
6191 )
6192 else:
6193 error_description_nslcmop = None
6194 nslcmop_operation_state = "COMPLETED"
6195 db_nslcmop_update["detailed-status"] = "Done"
6196
6197 self._write_op_status(
6198 op_id=nslcmop_id,
6199 stage="",
6200 error_message=error_description_nslcmop,
6201 operation_state=nslcmop_operation_state,
6202 other_update=db_nslcmop_update,
6203 )
6204 if db_nsr:
6205 self._write_ns_status(
6206 nsr_id=nsr_id,
6207 ns_state=None,
6208 current_operation="IDLE",
6209 current_operation_id=None,
6210 other_update=db_nsr_update,
6211 )
6212
6213 if nslcmop_operation_state:
6214 try:
6215 msg = {
6216 "nsr_id": nsr_id,
6217 "nslcmop_id": nslcmop_id,
6218 "operationState": nslcmop_operation_state,
6219 }
6220 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6221 except Exception as e:
6222 self.logger.error(
6223 logging_text + "kafka_write notification Exception {}".format(e)
6224 )
6225 self.logger.debug(logging_text + "Exit")
6226 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6227
6228 async def _scale_kdu(
6229 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6230 ):
6231 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6232 for kdu_name in _scaling_info:
6233 for kdu_scaling_info in _scaling_info[kdu_name]:
6234 deployed_kdu, index = get_deployed_kdu(
6235 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6236 )
6237 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6238 kdu_instance = deployed_kdu["kdu-instance"]
6239 kdu_model = deployed_kdu.get("kdu-model")
6240 scale = int(kdu_scaling_info["scale"])
6241 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6242
6243 db_dict = {
6244 "collection": "nsrs",
6245 "filter": {"_id": nsr_id},
6246 "path": "_admin.deployed.K8s.{}".format(index),
6247 }
6248
6249 step = "scaling application {}".format(
6250 kdu_scaling_info["resource-name"]
6251 )
6252 self.logger.debug(logging_text + step)
6253
6254 if kdu_scaling_info["type"] == "delete":
6255 kdu_config = get_configuration(db_vnfd, kdu_name)
6256 if (
6257 kdu_config
6258 and kdu_config.get("terminate-config-primitive")
6259 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6260 ):
6261 terminate_config_primitive_list = kdu_config.get(
6262 "terminate-config-primitive"
6263 )
6264 terminate_config_primitive_list.sort(
6265 key=lambda val: int(val["seq"])
6266 )
6267
6268 for (
6269 terminate_config_primitive
6270 ) in terminate_config_primitive_list:
6271 primitive_params_ = self._map_primitive_params(
6272 terminate_config_primitive, {}, {}
6273 )
6274 step = "execute terminate config primitive"
6275 self.logger.debug(logging_text + step)
6276 await asyncio.wait_for(
6277 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6278 cluster_uuid=cluster_uuid,
6279 kdu_instance=kdu_instance,
6280 primitive_name=terminate_config_primitive["name"],
6281 params=primitive_params_,
6282 db_dict=db_dict,
6283 vca_id=vca_id,
6284 ),
6285 timeout=600,
6286 )
6287
6288 await asyncio.wait_for(
6289 self.k8scluster_map[k8s_cluster_type].scale(
6290 kdu_instance,
6291 scale,
6292 kdu_scaling_info["resource-name"],
6293 vca_id=vca_id,
6294 cluster_uuid=cluster_uuid,
6295 kdu_model=kdu_model,
6296 atomic=True,
6297 db_dict=db_dict,
6298 ),
6299 timeout=self.timeout_vca_on_error,
6300 )
6301
6302 if kdu_scaling_info["type"] == "create":
6303 kdu_config = get_configuration(db_vnfd, kdu_name)
6304 if (
6305 kdu_config
6306 and kdu_config.get("initial-config-primitive")
6307 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6308 ):
6309 initial_config_primitive_list = kdu_config.get(
6310 "initial-config-primitive"
6311 )
6312 initial_config_primitive_list.sort(
6313 key=lambda val: int(val["seq"])
6314 )
6315
6316 for initial_config_primitive in initial_config_primitive_list:
6317 primitive_params_ = self._map_primitive_params(
6318 initial_config_primitive, {}, {}
6319 )
6320 step = "execute initial config primitive"
6321 self.logger.debug(logging_text + step)
6322 await asyncio.wait_for(
6323 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6324 cluster_uuid=cluster_uuid,
6325 kdu_instance=kdu_instance,
6326 primitive_name=initial_config_primitive["name"],
6327 params=primitive_params_,
6328 db_dict=db_dict,
6329 vca_id=vca_id,
6330 ),
6331 timeout=600,
6332 )
6333
6334 async def _scale_ng_ro(
6335 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6336 ):
6337 nsr_id = db_nslcmop["nsInstanceId"]
6338 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6339 db_vnfrs = {}
6340
6341 # read from db: vnfd's for every vnf
6342 db_vnfds = []
6343
6344 # for each vnf in ns, read vnfd
6345 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6346 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6347 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6348 # if we haven't this vnfd, read it from db
6349 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6350 # read from db
6351 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6352 db_vnfds.append(vnfd)
6353 n2vc_key = self.n2vc.get_public_key()
6354 n2vc_key_list = [n2vc_key]
6355 self.scale_vnfr(
6356 db_vnfr,
6357 vdu_scaling_info.get("vdu-create"),
6358 vdu_scaling_info.get("vdu-delete"),
6359 mark_delete=True,
6360 )
6361 # db_vnfr has been updated, update db_vnfrs to use it
6362 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6363 await self._instantiate_ng_ro(
6364 logging_text,
6365 nsr_id,
6366 db_nsd,
6367 db_nsr,
6368 db_nslcmop,
6369 db_vnfrs,
6370 db_vnfds,
6371 n2vc_key_list,
6372 stage=stage,
6373 start_deploy=time(),
6374 timeout_ns_deploy=self.timeout_ns_deploy,
6375 )
6376 if vdu_scaling_info.get("vdu-delete"):
6377 self.scale_vnfr(
6378 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6379 )
6380
6381 async def extract_prometheus_scrape_jobs(
6382 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6383 ):
6384 # look if exist a file called 'prometheus*.j2' and
6385 artifact_content = self.fs.dir_ls(artifact_path)
6386 job_file = next(
6387 (
6388 f
6389 for f in artifact_content
6390 if f.startswith("prometheus") and f.endswith(".j2")
6391 ),
6392 None,
6393 )
6394 if not job_file:
6395 return
6396 with self.fs.file_open((artifact_path, job_file), "r") as f:
6397 job_data = f.read()
6398
6399 # TODO get_service
6400 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6401 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6402 host_port = "80"
6403 vnfr_id = vnfr_id.replace("-", "")
6404 variables = {
6405 "JOB_NAME": vnfr_id,
6406 "TARGET_IP": target_ip,
6407 "EXPORTER_POD_IP": host_name,
6408 "EXPORTER_POD_PORT": host_port,
6409 }
6410 job_list = parse_job(job_data, variables)
6411 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6412 for job in job_list:
6413 if (
6414 not isinstance(job.get("job_name"), str)
6415 or vnfr_id not in job["job_name"]
6416 ):
6417 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6418 job["nsr_id"] = nsr_id
6419 job["vnfr_id"] = vnfr_id
6420 return job_list
6421
6422 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6423 """
6424 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6425
6426 :param: vim_account_id: VIM Account ID
6427
6428 :return: (cloud_name, cloud_credential)
6429 """
6430 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6431 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6432
6433 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6434 """
6435 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6436
6437 :param: vim_account_id: VIM Account ID
6438
6439 :return: (cloud_name, cloud_credential)
6440 """
6441 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6442 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")