Fix 1425 NG-RO unable to pin VNF with vim_account config
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99 from osm_lcm.prometheus import parse_job
100
101 from copy import copy, deepcopy
102 from time import time
103 from uuid import uuid4
104
105 from random import randint
106
107 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
108
109
110 class NsLcm(LcmBase):
111 timeout_vca_on_error = (
112 5 * 60
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete = 10 * 60
117 timeout_primitive = 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive = (
119 10 * 60
120 ) # timeout for some progress in a primitive execution
121
122 SUBOPERATION_STATUS_NOT_FOUND = -1
123 SUBOPERATION_STATUS_NEW = -2
124 SUBOPERATION_STATUS_SKIP = -3
125 task_name_deploy_vca = "Deploying VCA"
126
127 def __init__(self, msg, lcm_tasks, config, loop):
128 """
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
131 :return: None
132 """
133 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
134
135 self.db = Database().instance.db
136 self.fs = Filesystem().instance.fs
137 self.loop = loop
138 self.lcm_tasks = lcm_tasks
139 self.timeout = config["timeout"]
140 self.ro_config = config["ro_config"]
141 self.ng_ro = config["ro_config"].get("ng")
142 self.vca_config = config["VCA"].copy()
143
144 # create N2VC connector
145 self.n2vc = N2VCJujuConnector(
146 log=self.logger,
147 loop=self.loop,
148 on_update_db=self._on_update_n2vc_db,
149 fs=self.fs,
150 db=self.db,
151 )
152
153 self.conn_helm_ee = LCMHelmConn(
154 log=self.logger,
155 loop=self.loop,
156 vca_config=self.vca_config,
157 on_update_db=self._on_update_n2vc_db,
158 )
159
160 self.k8sclusterhelm2 = K8sHelmConnector(
161 kubectl_command=self.vca_config.get("kubectlpath"),
162 helm_command=self.vca_config.get("helmpath"),
163 log=self.logger,
164 on_update_db=None,
165 fs=self.fs,
166 db=self.db,
167 )
168
169 self.k8sclusterhelm3 = K8sHelm3Connector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helm3path"),
172 fs=self.fs,
173 log=self.logger,
174 db=self.db,
175 on_update_db=None,
176 )
177
178 self.k8sclusterjuju = K8sJujuConnector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 juju_command=self.vca_config.get("jujupath"),
181 log=self.logger,
182 loop=self.loop,
183 on_update_db=self._on_update_k8s_db,
184 fs=self.fs,
185 db=self.db,
186 )
187
188 self.k8scluster_map = {
189 "helm-chart": self.k8sclusterhelm2,
190 "helm-chart-v3": self.k8sclusterhelm3,
191 "chart": self.k8sclusterhelm3,
192 "juju-bundle": self.k8sclusterjuju,
193 "juju": self.k8sclusterjuju,
194 }
195
196 self.vca_map = {
197 "lxc_proxy_charm": self.n2vc,
198 "native_charm": self.n2vc,
199 "k8s_proxy_charm": self.n2vc,
200 "helm": self.conn_helm_ee,
201 "helm-v3": self.conn_helm_ee,
202 }
203
204 # create RO client
205 self.RO = NgRoClient(self.loop, **self.ro_config)
206
207 @staticmethod
208 def increment_ip_mac(ip_mac, vm_index=1):
209 if not isinstance(ip_mac, str):
210 return ip_mac
211 try:
212 # try with ipv4 look for last dot
213 i = ip_mac.rfind(".")
214 if i > 0:
215 i += 1
216 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i = ip_mac.rfind(":")
219 if i > 0:
220 i += 1
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
223 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
224 )
225 except Exception:
226 pass
227 return None
228
229 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
230
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
232
233 try:
234 # TODO filter RO descriptor fields...
235
236 # write to database
237 db_dict = dict()
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict["deploymentStatus"] = ro_descriptor
240 self.update_db_2("nsrs", nsrs_id, db_dict)
241
242 except Exception as e:
243 self.logger.warn(
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
245 )
246
247 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
248
249 # remove last dot from path (if exists)
250 if path.endswith("."):
251 path = path[:-1]
252
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
255 try:
256
257 nsr_id = filter.get("_id")
258
259 # read ns record from database
260 nsr = self.db.get_one(table="nsrs", q_filter=filter)
261 current_ns_status = nsr.get("nsState")
262
263 # get vca status for NS
264 status_dict = await self.n2vc.get_status(
265 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
266 )
267
268 # vcaStatus
269 db_dict = dict()
270 db_dict["vcaStatus"] = status_dict
271 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
272
273 # update configurationStatus for this VCA
274 try:
275 vca_index = int(path[path.rfind(".") + 1 :])
276
277 vca_list = deep_get(
278 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
279 )
280 vca_status = vca_list[vca_index].get("status")
281
282 configuration_status_list = nsr.get("configurationStatus")
283 config_status = configuration_status_list[vca_index].get("status")
284
285 if config_status == "BROKEN" and vca_status != "failed":
286 db_dict["configurationStatus"][vca_index] = "READY"
287 elif config_status != "BROKEN" and vca_status == "failed":
288 db_dict["configurationStatus"][vca_index] = "BROKEN"
289 except Exception as e:
290 # not update configurationStatus
291 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
292
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
295 is_degraded = False
296 if current_ns_status in ("READY", "DEGRADED"):
297 error_description = ""
298 # check machines
299 if status_dict.get("machines"):
300 for machine_id in status_dict.get("machines"):
301 machine = status_dict.get("machines").get(machine_id)
302 # check machine agent-status
303 if machine.get("agent-status"):
304 s = machine.get("agent-status").get("status")
305 if s != "started":
306 is_degraded = True
307 error_description += (
308 "machine {} agent-status={} ; ".format(
309 machine_id, s
310 )
311 )
312 # check machine instance status
313 if machine.get("instance-status"):
314 s = machine.get("instance-status").get("status")
315 if s != "running":
316 is_degraded = True
317 error_description += (
318 "machine {} instance-status={} ; ".format(
319 machine_id, s
320 )
321 )
322 # check applications
323 if status_dict.get("applications"):
324 for app_id in status_dict.get("applications"):
325 app = status_dict.get("applications").get(app_id)
326 # check application status
327 if app.get("status"):
328 s = app.get("status").get("status")
329 if s != "active":
330 is_degraded = True
331 error_description += (
332 "application {} status={} ; ".format(app_id, s)
333 )
334
335 if error_description:
336 db_dict["errorDescription"] = error_description
337 if current_ns_status == "READY" and is_degraded:
338 db_dict["nsState"] = "DEGRADED"
339 if current_ns_status == "DEGRADED" and not is_degraded:
340 db_dict["nsState"] = "READY"
341
342 # write to database
343 self.update_db_2("nsrs", nsr_id, db_dict)
344
345 except (asyncio.CancelledError, asyncio.TimeoutError):
346 raise
347 except Exception as e:
348 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
349
350 async def _on_update_k8s_db(
351 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
352 ):
353 """
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :return: none
359 """
360
361 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
362 # .format(cluster_uuid, kdu_instance, filter))
363
364 try:
365 nsr_id = filter.get("_id")
366
367 # get vca status for NS
368 vca_status = await self.k8sclusterjuju.status_kdu(
369 cluster_uuid,
370 kdu_instance,
371 complete_status=True,
372 yaml_format=False,
373 vca_id=vca_id,
374 )
375 # vcaStatus
376 db_dict = dict()
377 db_dict["vcaStatus"] = {nsr_id: vca_status}
378
379 await self.k8sclusterjuju.update_vca_status(
380 db_dict["vcaStatus"],
381 kdu_instance,
382 vca_id=vca_id,
383 )
384
385 # write to database
386 self.update_db_2("nsrs", nsr_id, db_dict)
387
388 except (asyncio.CancelledError, asyncio.TimeoutError):
389 raise
390 except Exception as e:
391 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
392
393 @staticmethod
394 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
395 try:
396 env = Environment(undefined=StrictUndefined)
397 template = env.from_string(cloud_init_text)
398 return template.render(additional_params or {})
399 except UndefinedError as e:
400 raise LcmException(
401 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
402 "file, must be provided in the instantiation parameters inside the "
403 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
404 )
405 except (TemplateError, TemplateNotFound) as e:
406 raise LcmException(
407 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
408 vnfd_id, vdu_id, e
409 )
410 )
411
412 def _get_vdu_cloud_init_content(self, vdu, vnfd):
413 cloud_init_content = cloud_init_file = None
414 try:
415 if vdu.get("cloud-init-file"):
416 base_folder = vnfd["_admin"]["storage"]
417 if base_folder["pkg-dir"]:
418 cloud_init_file = "{}/{}/cloud_init/{}".format(
419 base_folder["folder"],
420 base_folder["pkg-dir"],
421 vdu["cloud-init-file"],
422 )
423 else:
424 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
425 base_folder["folder"],
426 vdu["cloud-init-file"],
427 )
428 with self.fs.file_open(cloud_init_file, "r") as ci_file:
429 cloud_init_content = ci_file.read()
430 elif vdu.get("cloud-init"):
431 cloud_init_content = vdu["cloud-init"]
432
433 return cloud_init_content
434 except FsException as e:
435 raise LcmException(
436 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
437 vnfd["id"], vdu["id"], cloud_init_file, e
438 )
439 )
440
441 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
442 vdur = next(
443 vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]
444 )
445 additional_params = vdur.get("additionalParams")
446 return parse_yaml_strings(additional_params)
447
448 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
449 """
450 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
451 :param vnfd: input vnfd
452 :param new_id: overrides vnf id if provided
453 :param additionalParams: Instantiation params for VNFs provided
454 :param nsrId: Id of the NSR
455 :return: copy of vnfd
456 """
457 vnfd_RO = deepcopy(vnfd)
458 # remove unused by RO configuration, monitoring, scaling and internal keys
459 vnfd_RO.pop("_id", None)
460 vnfd_RO.pop("_admin", None)
461 vnfd_RO.pop("monitoring-param", None)
462 vnfd_RO.pop("scaling-group-descriptor", None)
463 vnfd_RO.pop("kdu", None)
464 vnfd_RO.pop("k8s-cluster", None)
465 if new_id:
466 vnfd_RO["id"] = new_id
467
468 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
469 for vdu in get_iterable(vnfd_RO, "vdu"):
470 vdu.pop("cloud-init-file", None)
471 vdu.pop("cloud-init", None)
472 return vnfd_RO
473
474 @staticmethod
475 def ip_profile_2_RO(ip_profile):
476 RO_ip_profile = deepcopy(ip_profile)
477 if "dns-server" in RO_ip_profile:
478 if isinstance(RO_ip_profile["dns-server"], list):
479 RO_ip_profile["dns-address"] = []
480 for ds in RO_ip_profile.pop("dns-server"):
481 RO_ip_profile["dns-address"].append(ds["address"])
482 else:
483 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
484 if RO_ip_profile.get("ip-version") == "ipv4":
485 RO_ip_profile["ip-version"] = "IPv4"
486 if RO_ip_profile.get("ip-version") == "ipv6":
487 RO_ip_profile["ip-version"] = "IPv6"
488 if "dhcp-params" in RO_ip_profile:
489 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
490 return RO_ip_profile
491
492 def _get_ro_vim_id_for_vim_account(self, vim_account):
493 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
494 if db_vim["_admin"]["operationalState"] != "ENABLED":
495 raise LcmException(
496 "VIM={} is not available. operationalState={}".format(
497 vim_account, db_vim["_admin"]["operationalState"]
498 )
499 )
500 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
501 return RO_vim_id
502
503 def get_ro_wim_id_for_wim_account(self, wim_account):
504 if isinstance(wim_account, str):
505 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
506 if db_wim["_admin"]["operationalState"] != "ENABLED":
507 raise LcmException(
508 "WIM={} is not available. operationalState={}".format(
509 wim_account, db_wim["_admin"]["operationalState"]
510 )
511 )
512 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
513 return RO_wim_id
514 else:
515 return wim_account
516
517 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
518
519 db_vdu_push_list = []
520 db_update = {"_admin.modified": time()}
521 if vdu_create:
522 for vdu_id, vdu_count in vdu_create.items():
523 vdur = next(
524 (
525 vdur
526 for vdur in reversed(db_vnfr["vdur"])
527 if vdur["vdu-id-ref"] == vdu_id
528 ),
529 None,
530 )
531 if not vdur:
532 raise LcmException(
533 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
534 vdu_id
535 )
536 )
537
538 for count in range(vdu_count):
539 vdur_copy = deepcopy(vdur)
540 vdur_copy["status"] = "BUILD"
541 vdur_copy["status-detailed"] = None
542 vdur_copy["ip-address"] = None
543 vdur_copy["_id"] = str(uuid4())
544 vdur_copy["count-index"] += count + 1
545 vdur_copy["id"] = "{}-{}".format(
546 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
547 )
548 vdur_copy.pop("vim_info", None)
549 for iface in vdur_copy["interfaces"]:
550 if iface.get("fixed-ip"):
551 iface["ip-address"] = self.increment_ip_mac(
552 iface["ip-address"], count + 1
553 )
554 else:
555 iface.pop("ip-address", None)
556 if iface.get("fixed-mac"):
557 iface["mac-address"] = self.increment_ip_mac(
558 iface["mac-address"], count + 1
559 )
560 else:
561 iface.pop("mac-address", None)
562 iface.pop(
563 "mgmt_vnf", None
564 ) # only first vdu can be managment of vnf
565 db_vdu_push_list.append(vdur_copy)
566 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
567 if vdu_delete:
568 for vdu_id, vdu_count in vdu_delete.items():
569 if mark_delete:
570 indexes_to_delete = [
571 iv[0]
572 for iv in enumerate(db_vnfr["vdur"])
573 if iv[1]["vdu-id-ref"] == vdu_id
574 ]
575 db_update.update(
576 {
577 "vdur.{}.status".format(i): "DELETING"
578 for i in indexes_to_delete[-vdu_count:]
579 }
580 )
581 else:
582 # it must be deleted one by one because common.db does not allow otherwise
583 vdus_to_delete = [
584 v
585 for v in reversed(db_vnfr["vdur"])
586 if v["vdu-id-ref"] == vdu_id
587 ]
588 for vdu in vdus_to_delete[:vdu_count]:
589 self.db.set_one(
590 "vnfrs",
591 {"_id": db_vnfr["_id"]},
592 None,
593 pull={"vdur": {"_id": vdu["_id"]}},
594 )
595 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
596 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
597 # modify passed dictionary db_vnfr
598 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
599 db_vnfr["vdur"] = db_vnfr_["vdur"]
600
601 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
602 """
603 Updates database nsr with the RO info for the created vld
604 :param ns_update_nsr: dictionary to be filled with the updated info
605 :param db_nsr: content of db_nsr. This is also modified
606 :param nsr_desc_RO: nsr descriptor from RO
607 :return: Nothing, LcmException is raised on errors
608 """
609
610 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
611 for net_RO in get_iterable(nsr_desc_RO, "nets"):
612 if vld["id"] != net_RO.get("ns_net_osm_id"):
613 continue
614 vld["vim-id"] = net_RO.get("vim_net_id")
615 vld["name"] = net_RO.get("vim_name")
616 vld["status"] = net_RO.get("status")
617 vld["status-detailed"] = net_RO.get("error_msg")
618 ns_update_nsr["vld.{}".format(vld_index)] = vld
619 break
620 else:
621 raise LcmException(
622 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
623 )
624
625 def set_vnfr_at_error(self, db_vnfrs, error_text):
626 try:
627 for db_vnfr in db_vnfrs.values():
628 vnfr_update = {"status": "ERROR"}
629 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
630 if "status" not in vdur:
631 vdur["status"] = "ERROR"
632 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
633 if error_text:
634 vdur["status-detailed"] = str(error_text)
635 vnfr_update[
636 "vdur.{}.status-detailed".format(vdu_index)
637 ] = "ERROR"
638 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
639 except DbException as e:
640 self.logger.error("Cannot update vnf. {}".format(e))
641
642 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
643 """
644 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
645 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
646 :param nsr_desc_RO: nsr descriptor from RO
647 :return: Nothing, LcmException is raised on errors
648 """
649 for vnf_index, db_vnfr in db_vnfrs.items():
650 for vnf_RO in nsr_desc_RO["vnfs"]:
651 if vnf_RO["member_vnf_index"] != vnf_index:
652 continue
653 vnfr_update = {}
654 if vnf_RO.get("ip_address"):
655 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
656 "ip_address"
657 ].split(";")[0]
658 elif not db_vnfr.get("ip-address"):
659 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
660 raise LcmExceptionNoMgmtIP(
661 "ns member_vnf_index '{}' has no IP address".format(
662 vnf_index
663 )
664 )
665
666 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
667 vdur_RO_count_index = 0
668 if vdur.get("pdu-type"):
669 continue
670 for vdur_RO in get_iterable(vnf_RO, "vms"):
671 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
672 continue
673 if vdur["count-index"] != vdur_RO_count_index:
674 vdur_RO_count_index += 1
675 continue
676 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
677 if vdur_RO.get("ip_address"):
678 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
679 else:
680 vdur["ip-address"] = None
681 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
682 vdur["name"] = vdur_RO.get("vim_name")
683 vdur["status"] = vdur_RO.get("status")
684 vdur["status-detailed"] = vdur_RO.get("error_msg")
685 for ifacer in get_iterable(vdur, "interfaces"):
686 for interface_RO in get_iterable(vdur_RO, "interfaces"):
687 if ifacer["name"] == interface_RO.get("internal_name"):
688 ifacer["ip-address"] = interface_RO.get(
689 "ip_address"
690 )
691 ifacer["mac-address"] = interface_RO.get(
692 "mac_address"
693 )
694 break
695 else:
696 raise LcmException(
697 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
698 "from VIM info".format(
699 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
700 )
701 )
702 vnfr_update["vdur.{}".format(vdu_index)] = vdur
703 break
704 else:
705 raise LcmException(
706 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
707 "VIM info".format(
708 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
709 )
710 )
711
712 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
713 for net_RO in get_iterable(nsr_desc_RO, "nets"):
714 if vld["id"] != net_RO.get("vnf_net_osm_id"):
715 continue
716 vld["vim-id"] = net_RO.get("vim_net_id")
717 vld["name"] = net_RO.get("vim_name")
718 vld["status"] = net_RO.get("status")
719 vld["status-detailed"] = net_RO.get("error_msg")
720 vnfr_update["vld.{}".format(vld_index)] = vld
721 break
722 else:
723 raise LcmException(
724 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
725 vnf_index, vld["id"]
726 )
727 )
728
729 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
730 break
731
732 else:
733 raise LcmException(
734 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
735 vnf_index
736 )
737 )
738
739 def _get_ns_config_info(self, nsr_id):
740 """
741 Generates a mapping between vnf,vdu elements and the N2VC id
742 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
743 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
744 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
745 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
746 """
747 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
748 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
749 mapping = {}
750 ns_config_info = {"osm-config-mapping": mapping}
751 for vca in vca_deployed_list:
752 if not vca["member-vnf-index"]:
753 continue
754 if not vca["vdu_id"]:
755 mapping[vca["member-vnf-index"]] = vca["application"]
756 else:
757 mapping[
758 "{}.{}.{}".format(
759 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
760 )
761 ] = vca["application"]
762 return ns_config_info
763
764 async def _instantiate_ng_ro(
765 self,
766 logging_text,
767 nsr_id,
768 nsd,
769 db_nsr,
770 db_nslcmop,
771 db_vnfrs,
772 db_vnfds,
773 n2vc_key_list,
774 stage,
775 start_deploy,
776 timeout_ns_deploy,
777 ):
778
779 db_vims = {}
780
781 def get_vim_account(vim_account_id):
782 nonlocal db_vims
783 if vim_account_id in db_vims:
784 return db_vims[vim_account_id]
785 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
786 db_vims[vim_account_id] = db_vim
787 return db_vim
788
789 # modify target_vld info with instantiation parameters
790 def parse_vld_instantiation_params(
791 target_vim, target_vld, vld_params, target_sdn
792 ):
793 if vld_params.get("ip-profile"):
794 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
795 "ip-profile"
796 ]
797 if vld_params.get("provider-network"):
798 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
799 "provider-network"
800 ]
801 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
802 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
803 "provider-network"
804 ]["sdn-ports"]
805 if vld_params.get("wimAccountId"):
806 target_wim = "wim:{}".format(vld_params["wimAccountId"])
807 target_vld["vim_info"][target_wim] = {}
808 for param in ("vim-network-name", "vim-network-id"):
809 if vld_params.get(param):
810 if isinstance(vld_params[param], dict):
811 for vim, vim_net in vld_params[param].items():
812 other_target_vim = "vim:" + vim
813 populate_dict(
814 target_vld["vim_info"],
815 (other_target_vim, param.replace("-", "_")),
816 vim_net,
817 )
818 else: # isinstance str
819 target_vld["vim_info"][target_vim][
820 param.replace("-", "_")
821 ] = vld_params[param]
822 if vld_params.get("common_id"):
823 target_vld["common_id"] = vld_params.get("common_id")
824
825 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
826 def update_ns_vld_target(target, ns_params):
827 for vnf_params in ns_params.get("vnf", ()):
828 if vnf_params.get("vimAccountId"):
829 target_vnf = next(
830 (
831 vnfr
832 for vnfr in db_vnfrs.values()
833 if vnf_params["member-vnf-index"]
834 == vnfr["member-vnf-index-ref"]
835 ),
836 None,
837 )
838 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
839 for a_index, a_vld in enumerate(target["ns"]["vld"]):
840 target_vld = find_in_list(
841 get_iterable(vdur, "interfaces"),
842 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
843 )
844 if target_vld:
845 if vnf_params.get("vimAccountId") not in a_vld.get(
846 "vim_info", {}
847 ):
848 target["ns"]["vld"][a_index].get("vim_info").update(
849 {
850 "vim:{}".format(vnf_params["vimAccountId"]): {
851 "vim_network_name": ""
852 }
853 }
854 )
855
856 nslcmop_id = db_nslcmop["_id"]
857 target = {
858 "name": db_nsr["name"],
859 "ns": {"vld": []},
860 "vnf": [],
861 "image": deepcopy(db_nsr["image"]),
862 "flavor": deepcopy(db_nsr["flavor"]),
863 "action_id": nslcmop_id,
864 "cloud_init_content": {},
865 }
866 for image in target["image"]:
867 image["vim_info"] = {}
868 for flavor in target["flavor"]:
869 flavor["vim_info"] = {}
870
871 if db_nslcmop.get("lcmOperationType") != "instantiate":
872 # get parameters of instantiation:
873 db_nslcmop_instantiate = self.db.get_list(
874 "nslcmops",
875 {
876 "nsInstanceId": db_nslcmop["nsInstanceId"],
877 "lcmOperationType": "instantiate",
878 },
879 )[-1]
880 ns_params = db_nslcmop_instantiate.get("operationParams")
881 else:
882 ns_params = db_nslcmop.get("operationParams")
883 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
884 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
885
886 cp2target = {}
887 for vld_index, vld in enumerate(db_nsr.get("vld")):
888 target_vim = "vim:{}".format(ns_params["vimAccountId"])
889 target_vld = {
890 "id": vld["id"],
891 "name": vld["name"],
892 "mgmt-network": vld.get("mgmt-network", False),
893 "type": vld.get("type"),
894 "vim_info": {
895 target_vim: {
896 "vim_network_name": vld.get("vim-network-name"),
897 "vim_account_id": ns_params["vimAccountId"],
898 }
899 },
900 }
901 # check if this network needs SDN assist
902 if vld.get("pci-interfaces"):
903 db_vim = get_vim_account(ns_params["vimAccountId"])
904 sdnc_id = db_vim["config"].get("sdn-controller")
905 if sdnc_id:
906 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
907 target_sdn = "sdn:{}".format(sdnc_id)
908 target_vld["vim_info"][target_sdn] = {
909 "sdn": True,
910 "target_vim": target_vim,
911 "vlds": [sdn_vld],
912 "type": vld.get("type"),
913 }
914
915 nsd_vnf_profiles = get_vnf_profiles(nsd)
916 for nsd_vnf_profile in nsd_vnf_profiles:
917 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
918 if cp["virtual-link-profile-id"] == vld["id"]:
919 cp2target[
920 "member_vnf:{}.{}".format(
921 cp["constituent-cpd-id"][0][
922 "constituent-base-element-id"
923 ],
924 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
925 )
926 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
927
928 # check at nsd descriptor, if there is an ip-profile
929 vld_params = {}
930 nsd_vlp = find_in_list(
931 get_virtual_link_profiles(nsd),
932 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
933 == vld["id"],
934 )
935 if (
936 nsd_vlp
937 and nsd_vlp.get("virtual-link-protocol-data")
938 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
939 ):
940 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
941 "l3-protocol-data"
942 ]
943 ip_profile_dest_data = {}
944 if "ip-version" in ip_profile_source_data:
945 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
946 "ip-version"
947 ]
948 if "cidr" in ip_profile_source_data:
949 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
950 "cidr"
951 ]
952 if "gateway-ip" in ip_profile_source_data:
953 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
954 "gateway-ip"
955 ]
956 if "dhcp-enabled" in ip_profile_source_data:
957 ip_profile_dest_data["dhcp-params"] = {
958 "enabled": ip_profile_source_data["dhcp-enabled"]
959 }
960 vld_params["ip-profile"] = ip_profile_dest_data
961
962 # update vld_params with instantiation params
963 vld_instantiation_params = find_in_list(
964 get_iterable(ns_params, "vld"),
965 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
966 )
967 if vld_instantiation_params:
968 vld_params.update(vld_instantiation_params)
969 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
970 target["ns"]["vld"].append(target_vld)
971 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
972 update_ns_vld_target(target, ns_params)
973
974 for vnfr in db_vnfrs.values():
975 vnfd = find_in_list(
976 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
977 )
978 vnf_params = find_in_list(
979 get_iterable(ns_params, "vnf"),
980 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
981 )
982 target_vnf = deepcopy(vnfr)
983 target_vim = "vim:{}".format(vnfr["vim-account-id"])
984 for vld in target_vnf.get("vld", ()):
985 # check if connected to a ns.vld, to fill target'
986 vnf_cp = find_in_list(
987 vnfd.get("int-virtual-link-desc", ()),
988 lambda cpd: cpd.get("id") == vld["id"],
989 )
990 if vnf_cp:
991 ns_cp = "member_vnf:{}.{}".format(
992 vnfr["member-vnf-index-ref"], vnf_cp["id"]
993 )
994 if cp2target.get(ns_cp):
995 vld["target"] = cp2target[ns_cp]
996
997 vld["vim_info"] = {
998 target_vim: {"vim_network_name": vld.get("vim-network-name")}
999 }
1000 # check if this network needs SDN assist
1001 target_sdn = None
1002 if vld.get("pci-interfaces"):
1003 db_vim = get_vim_account(vnfr["vim-account-id"])
1004 sdnc_id = db_vim["config"].get("sdn-controller")
1005 if sdnc_id:
1006 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1007 target_sdn = "sdn:{}".format(sdnc_id)
1008 vld["vim_info"][target_sdn] = {
1009 "sdn": True,
1010 "target_vim": target_vim,
1011 "vlds": [sdn_vld],
1012 "type": vld.get("type"),
1013 }
1014
1015 # check at vnfd descriptor, if there is an ip-profile
1016 vld_params = {}
1017 vnfd_vlp = find_in_list(
1018 get_virtual_link_profiles(vnfd),
1019 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1020 )
1021 if (
1022 vnfd_vlp
1023 and vnfd_vlp.get("virtual-link-protocol-data")
1024 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1025 ):
1026 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1027 "l3-protocol-data"
1028 ]
1029 ip_profile_dest_data = {}
1030 if "ip-version" in ip_profile_source_data:
1031 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1032 "ip-version"
1033 ]
1034 if "cidr" in ip_profile_source_data:
1035 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1036 "cidr"
1037 ]
1038 if "gateway-ip" in ip_profile_source_data:
1039 ip_profile_dest_data[
1040 "gateway-address"
1041 ] = ip_profile_source_data["gateway-ip"]
1042 if "dhcp-enabled" in ip_profile_source_data:
1043 ip_profile_dest_data["dhcp-params"] = {
1044 "enabled": ip_profile_source_data["dhcp-enabled"]
1045 }
1046
1047 vld_params["ip-profile"] = ip_profile_dest_data
1048 # update vld_params with instantiation params
1049 if vnf_params:
1050 vld_instantiation_params = find_in_list(
1051 get_iterable(vnf_params, "internal-vld"),
1052 lambda i_vld: i_vld["name"] == vld["id"],
1053 )
1054 if vld_instantiation_params:
1055 vld_params.update(vld_instantiation_params)
1056 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1057
1058 vdur_list = []
1059 for vdur in target_vnf.get("vdur", ()):
1060 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1061 continue # This vdu must not be created
1062 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1063
1064 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1065
1066 if ssh_keys_all:
1067 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1068 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1069 if (
1070 vdu_configuration
1071 and vdu_configuration.get("config-access")
1072 and vdu_configuration.get("config-access").get("ssh-access")
1073 ):
1074 vdur["ssh-keys"] = ssh_keys_all
1075 vdur["ssh-access-required"] = vdu_configuration[
1076 "config-access"
1077 ]["ssh-access"]["required"]
1078 elif (
1079 vnf_configuration
1080 and vnf_configuration.get("config-access")
1081 and vnf_configuration.get("config-access").get("ssh-access")
1082 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1083 ):
1084 vdur["ssh-keys"] = ssh_keys_all
1085 vdur["ssh-access-required"] = vnf_configuration[
1086 "config-access"
1087 ]["ssh-access"]["required"]
1088 elif ssh_keys_instantiation and find_in_list(
1089 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1090 ):
1091 vdur["ssh-keys"] = ssh_keys_instantiation
1092
1093 self.logger.debug("NS > vdur > {}".format(vdur))
1094
1095 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1096 # cloud-init
1097 if vdud.get("cloud-init-file"):
1098 vdur["cloud-init"] = "{}:file:{}".format(
1099 vnfd["_id"], vdud.get("cloud-init-file")
1100 )
1101 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1102 if vdur["cloud-init"] not in target["cloud_init_content"]:
1103 base_folder = vnfd["_admin"]["storage"]
1104 if base_folder["pkg-dir"]:
1105 cloud_init_file = "{}/{}/cloud_init/{}".format(
1106 base_folder["folder"],
1107 base_folder["pkg-dir"],
1108 vdud.get("cloud-init-file"),
1109 )
1110 else:
1111 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1112 base_folder["folder"],
1113 vdud.get("cloud-init-file"),
1114 )
1115 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1116 target["cloud_init_content"][
1117 vdur["cloud-init"]
1118 ] = ci_file.read()
1119 elif vdud.get("cloud-init"):
1120 vdur["cloud-init"] = "{}:vdu:{}".format(
1121 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1122 )
1123 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1124 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1125 "cloud-init"
1126 ]
1127 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1128 deploy_params_vdu = self._format_additional_params(
1129 vdur.get("additionalParams") or {}
1130 )
1131 deploy_params_vdu["OSM"] = get_osm_params(
1132 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1133 )
1134 vdur["additionalParams"] = deploy_params_vdu
1135
1136 # flavor
1137 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1138 if target_vim not in ns_flavor["vim_info"]:
1139 ns_flavor["vim_info"][target_vim] = {}
1140
1141 # deal with images
1142 # in case alternative images are provided we must check if they should be applied
1143 # for the vim_type, modify the vim_type taking into account
1144 ns_image_id = int(vdur["ns-image-id"])
1145 if vdur.get("alt-image-ids"):
1146 db_vim = get_vim_account(vnfr["vim-account-id"])
1147 vim_type = db_vim["vim_type"]
1148 for alt_image_id in vdur.get("alt-image-ids"):
1149 ns_alt_image = target["image"][int(alt_image_id)]
1150 if vim_type == ns_alt_image.get("vim-type"):
1151 # must use alternative image
1152 self.logger.debug(
1153 "use alternative image id: {}".format(alt_image_id)
1154 )
1155 ns_image_id = alt_image_id
1156 vdur["ns-image-id"] = ns_image_id
1157 break
1158 ns_image = target["image"][int(ns_image_id)]
1159 if target_vim not in ns_image["vim_info"]:
1160 ns_image["vim_info"][target_vim] = {}
1161
1162 vdur["vim_info"] = {target_vim: {}}
1163 # instantiation parameters
1164 # if vnf_params:
1165 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1166 # vdud["id"]), None)
1167 vdur_list.append(vdur)
1168 target_vnf["vdur"] = vdur_list
1169 target["vnf"].append(target_vnf)
1170
1171 desc = await self.RO.deploy(nsr_id, target)
1172 self.logger.debug("RO return > {}".format(desc))
1173 action_id = desc["action_id"]
1174 await self._wait_ng_ro(
1175 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1176 )
1177
1178 # Updating NSR
1179 db_nsr_update = {
1180 "_admin.deployed.RO.operational-status": "running",
1181 "detailed-status": " ".join(stage),
1182 }
1183 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1184 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1185 self._write_op_status(nslcmop_id, stage)
1186 self.logger.debug(
1187 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1188 )
1189 return
1190
1191 async def _wait_ng_ro(
1192 self,
1193 nsr_id,
1194 action_id,
1195 nslcmop_id=None,
1196 start_time=None,
1197 timeout=600,
1198 stage=None,
1199 ):
1200 detailed_status_old = None
1201 db_nsr_update = {}
1202 start_time = start_time or time()
1203 while time() <= start_time + timeout:
1204 desc_status = await self.RO.status(nsr_id, action_id)
1205 self.logger.debug("Wait NG RO > {}".format(desc_status))
1206 if desc_status["status"] == "FAILED":
1207 raise NgRoException(desc_status["details"])
1208 elif desc_status["status"] == "BUILD":
1209 if stage:
1210 stage[2] = "VIM: ({})".format(desc_status["details"])
1211 elif desc_status["status"] == "DONE":
1212 if stage:
1213 stage[2] = "Deployed at VIM"
1214 break
1215 else:
1216 assert False, "ROclient.check_ns_status returns unknown {}".format(
1217 desc_status["status"]
1218 )
1219 if stage and nslcmop_id and stage[2] != detailed_status_old:
1220 detailed_status_old = stage[2]
1221 db_nsr_update["detailed-status"] = " ".join(stage)
1222 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1223 self._write_op_status(nslcmop_id, stage)
1224 await asyncio.sleep(15, loop=self.loop)
1225 else: # timeout_ns_deploy
1226 raise NgRoException("Timeout waiting ns to deploy")
1227
1228 async def _terminate_ng_ro(
1229 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1230 ):
1231 db_nsr_update = {}
1232 failed_detail = []
1233 action_id = None
1234 start_deploy = time()
1235 try:
1236 target = {
1237 "ns": {"vld": []},
1238 "vnf": [],
1239 "image": [],
1240 "flavor": [],
1241 "action_id": nslcmop_id,
1242 }
1243 desc = await self.RO.deploy(nsr_id, target)
1244 action_id = desc["action_id"]
1245 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1246 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1247 self.logger.debug(
1248 logging_text
1249 + "ns terminate action at RO. action_id={}".format(action_id)
1250 )
1251
1252 # wait until done
1253 delete_timeout = 20 * 60 # 20 minutes
1254 await self._wait_ng_ro(
1255 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1256 )
1257
1258 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1259 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1260 # delete all nsr
1261 await self.RO.delete(nsr_id)
1262 except Exception as e:
1263 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1264 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1265 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1266 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1267 self.logger.debug(
1268 logging_text + "RO_action_id={} already deleted".format(action_id)
1269 )
1270 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1271 failed_detail.append("delete conflict: {}".format(e))
1272 self.logger.debug(
1273 logging_text
1274 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1275 )
1276 else:
1277 failed_detail.append("delete error: {}".format(e))
1278 self.logger.error(
1279 logging_text
1280 + "RO_action_id={} delete error: {}".format(action_id, e)
1281 )
1282
1283 if failed_detail:
1284 stage[2] = "Error deleting from VIM"
1285 else:
1286 stage[2] = "Deleted from VIM"
1287 db_nsr_update["detailed-status"] = " ".join(stage)
1288 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1289 self._write_op_status(nslcmop_id, stage)
1290
1291 if failed_detail:
1292 raise LcmException("; ".join(failed_detail))
1293 return
1294
1295 async def instantiate_RO(
1296 self,
1297 logging_text,
1298 nsr_id,
1299 nsd,
1300 db_nsr,
1301 db_nslcmop,
1302 db_vnfrs,
1303 db_vnfds,
1304 n2vc_key_list,
1305 stage,
1306 ):
1307 """
1308 Instantiate at RO
1309 :param logging_text: preffix text to use at logging
1310 :param nsr_id: nsr identity
1311 :param nsd: database content of ns descriptor
1312 :param db_nsr: database content of ns record
1313 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1314 :param db_vnfrs:
1315 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1316 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1317 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1318 :return: None or exception
1319 """
1320 try:
1321 start_deploy = time()
1322 ns_params = db_nslcmop.get("operationParams")
1323 if ns_params and ns_params.get("timeout_ns_deploy"):
1324 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1325 else:
1326 timeout_ns_deploy = self.timeout.get(
1327 "ns_deploy", self.timeout_ns_deploy
1328 )
1329
1330 # Check for and optionally request placement optimization. Database will be updated if placement activated
1331 stage[2] = "Waiting for Placement."
1332 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1333 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1334 for vnfr in db_vnfrs.values():
1335 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1336 break
1337 else:
1338 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1339
1340 return await self._instantiate_ng_ro(
1341 logging_text,
1342 nsr_id,
1343 nsd,
1344 db_nsr,
1345 db_nslcmop,
1346 db_vnfrs,
1347 db_vnfds,
1348 n2vc_key_list,
1349 stage,
1350 start_deploy,
1351 timeout_ns_deploy,
1352 )
1353 except Exception as e:
1354 stage[2] = "ERROR deploying at VIM"
1355 self.set_vnfr_at_error(db_vnfrs, str(e))
1356 self.logger.error(
1357 "Error deploying at VIM {}".format(e),
1358 exc_info=not isinstance(
1359 e,
1360 (
1361 ROclient.ROClientException,
1362 LcmException,
1363 DbException,
1364 NgRoException,
1365 ),
1366 ),
1367 )
1368 raise
1369
1370 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1371 """
1372 Wait for kdu to be up, get ip address
1373 :param logging_text: prefix use for logging
1374 :param nsr_id:
1375 :param vnfr_id:
1376 :param kdu_name:
1377 :return: IP address
1378 """
1379
1380 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1381 nb_tries = 0
1382
1383 while nb_tries < 360:
1384 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1385 kdur = next(
1386 (
1387 x
1388 for x in get_iterable(db_vnfr, "kdur")
1389 if x.get("kdu-name") == kdu_name
1390 ),
1391 None,
1392 )
1393 if not kdur:
1394 raise LcmException(
1395 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1396 )
1397 if kdur.get("status"):
1398 if kdur["status"] in ("READY", "ENABLED"):
1399 return kdur.get("ip-address")
1400 else:
1401 raise LcmException(
1402 "target KDU={} is in error state".format(kdu_name)
1403 )
1404
1405 await asyncio.sleep(10, loop=self.loop)
1406 nb_tries += 1
1407 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1408
1409 async def wait_vm_up_insert_key_ro(
1410 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1411 ):
1412 """
1413 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1414 :param logging_text: prefix use for logging
1415 :param nsr_id:
1416 :param vnfr_id:
1417 :param vdu_id:
1418 :param vdu_index:
1419 :param pub_key: public ssh key to inject, None to skip
1420 :param user: user to apply the public ssh key
1421 :return: IP address
1422 """
1423
1424 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1425 ro_nsr_id = None
1426 ip_address = None
1427 nb_tries = 0
1428 target_vdu_id = None
1429 ro_retries = 0
1430
1431 while True:
1432
1433 ro_retries += 1
1434 if ro_retries >= 360: # 1 hour
1435 raise LcmException(
1436 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1437 )
1438
1439 await asyncio.sleep(10, loop=self.loop)
1440
1441 # get ip address
1442 if not target_vdu_id:
1443 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1444
1445 if not vdu_id: # for the VNF case
1446 if db_vnfr.get("status") == "ERROR":
1447 raise LcmException(
1448 "Cannot inject ssh-key because target VNF is in error state"
1449 )
1450 ip_address = db_vnfr.get("ip-address")
1451 if not ip_address:
1452 continue
1453 vdur = next(
1454 (
1455 x
1456 for x in get_iterable(db_vnfr, "vdur")
1457 if x.get("ip-address") == ip_address
1458 ),
1459 None,
1460 )
1461 else: # VDU case
1462 vdur = next(
1463 (
1464 x
1465 for x in get_iterable(db_vnfr, "vdur")
1466 if x.get("vdu-id-ref") == vdu_id
1467 and x.get("count-index") == vdu_index
1468 ),
1469 None,
1470 )
1471
1472 if (
1473 not vdur and len(db_vnfr.get("vdur", ())) == 1
1474 ): # If only one, this should be the target vdu
1475 vdur = db_vnfr["vdur"][0]
1476 if not vdur:
1477 raise LcmException(
1478 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1479 vnfr_id, vdu_id, vdu_index
1480 )
1481 )
1482 # New generation RO stores information at "vim_info"
1483 ng_ro_status = None
1484 target_vim = None
1485 if vdur.get("vim_info"):
1486 target_vim = next(
1487 t for t in vdur["vim_info"]
1488 ) # there should be only one key
1489 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1490 if (
1491 vdur.get("pdu-type")
1492 or vdur.get("status") == "ACTIVE"
1493 or ng_ro_status == "ACTIVE"
1494 ):
1495 ip_address = vdur.get("ip-address")
1496 if not ip_address:
1497 continue
1498 target_vdu_id = vdur["vdu-id-ref"]
1499 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1500 raise LcmException(
1501 "Cannot inject ssh-key because target VM is in error state"
1502 )
1503
1504 if not target_vdu_id:
1505 continue
1506
1507 # inject public key into machine
1508 if pub_key and user:
1509 self.logger.debug(logging_text + "Inserting RO key")
1510 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1511 if vdur.get("pdu-type"):
1512 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1513 return ip_address
1514 try:
1515 ro_vm_id = "{}-{}".format(
1516 db_vnfr["member-vnf-index-ref"], target_vdu_id
1517 ) # TODO add vdu_index
1518 if self.ng_ro:
1519 target = {
1520 "action": {
1521 "action": "inject_ssh_key",
1522 "key": pub_key,
1523 "user": user,
1524 },
1525 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1526 }
1527 desc = await self.RO.deploy(nsr_id, target)
1528 action_id = desc["action_id"]
1529 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1530 break
1531 else:
1532 # wait until NS is deployed at RO
1533 if not ro_nsr_id:
1534 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1535 ro_nsr_id = deep_get(
1536 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1537 )
1538 if not ro_nsr_id:
1539 continue
1540 result_dict = await self.RO.create_action(
1541 item="ns",
1542 item_id_name=ro_nsr_id,
1543 descriptor={
1544 "add_public_key": pub_key,
1545 "vms": [ro_vm_id],
1546 "user": user,
1547 },
1548 )
1549 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1550 if not result_dict or not isinstance(result_dict, dict):
1551 raise LcmException(
1552 "Unknown response from RO when injecting key"
1553 )
1554 for result in result_dict.values():
1555 if result.get("vim_result") == 200:
1556 break
1557 else:
1558 raise ROclient.ROClientException(
1559 "error injecting key: {}".format(
1560 result.get("description")
1561 )
1562 )
1563 break
1564 except NgRoException as e:
1565 raise LcmException(
1566 "Reaching max tries injecting key. Error: {}".format(e)
1567 )
1568 except ROclient.ROClientException as e:
1569 if not nb_tries:
1570 self.logger.debug(
1571 logging_text
1572 + "error injecting key: {}. Retrying until {} seconds".format(
1573 e, 20 * 10
1574 )
1575 )
1576 nb_tries += 1
1577 if nb_tries >= 20:
1578 raise LcmException(
1579 "Reaching max tries injecting key. Error: {}".format(e)
1580 )
1581 else:
1582 break
1583
1584 return ip_address
1585
1586 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1587 """
1588 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1589 """
1590 my_vca = vca_deployed_list[vca_index]
1591 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1592 # vdu or kdu: no dependencies
1593 return
1594 timeout = 300
1595 while timeout >= 0:
1596 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1597 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1598 configuration_status_list = db_nsr["configurationStatus"]
1599 for index, vca_deployed in enumerate(configuration_status_list):
1600 if index == vca_index:
1601 # myself
1602 continue
1603 if not my_vca.get("member-vnf-index") or (
1604 vca_deployed.get("member-vnf-index")
1605 == my_vca.get("member-vnf-index")
1606 ):
1607 internal_status = configuration_status_list[index].get("status")
1608 if internal_status == "READY":
1609 continue
1610 elif internal_status == "BROKEN":
1611 raise LcmException(
1612 "Configuration aborted because dependent charm/s has failed"
1613 )
1614 else:
1615 break
1616 else:
1617 # no dependencies, return
1618 return
1619 await asyncio.sleep(10)
1620 timeout -= 1
1621
1622 raise LcmException("Configuration aborted because dependent charm/s timeout")
1623
1624 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1625 vca_id = None
1626 if db_vnfr:
1627 vca_id = deep_get(db_vnfr, ("vca-id",))
1628 elif db_nsr:
1629 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1630 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1631 return vca_id
1632
1633 async def instantiate_N2VC(
1634 self,
1635 logging_text,
1636 vca_index,
1637 nsi_id,
1638 db_nsr,
1639 db_vnfr,
1640 vdu_id,
1641 kdu_name,
1642 vdu_index,
1643 config_descriptor,
1644 deploy_params,
1645 base_folder,
1646 nslcmop_id,
1647 stage,
1648 vca_type,
1649 vca_name,
1650 ee_config_descriptor,
1651 ):
1652 nsr_id = db_nsr["_id"]
1653 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1654 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1655 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1656 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1657 db_dict = {
1658 "collection": "nsrs",
1659 "filter": {"_id": nsr_id},
1660 "path": db_update_entry,
1661 }
1662 step = ""
1663 try:
1664
1665 element_type = "NS"
1666 element_under_configuration = nsr_id
1667
1668 vnfr_id = None
1669 if db_vnfr:
1670 vnfr_id = db_vnfr["_id"]
1671 osm_config["osm"]["vnf_id"] = vnfr_id
1672
1673 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1674
1675 if vca_type == "native_charm":
1676 index_number = 0
1677 else:
1678 index_number = vdu_index or 0
1679
1680 if vnfr_id:
1681 element_type = "VNF"
1682 element_under_configuration = vnfr_id
1683 namespace += ".{}-{}".format(vnfr_id, index_number)
1684 if vdu_id:
1685 namespace += ".{}-{}".format(vdu_id, index_number)
1686 element_type = "VDU"
1687 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1688 osm_config["osm"]["vdu_id"] = vdu_id
1689 elif kdu_name:
1690 namespace += ".{}".format(kdu_name)
1691 element_type = "KDU"
1692 element_under_configuration = kdu_name
1693 osm_config["osm"]["kdu_name"] = kdu_name
1694
1695 # Get artifact path
1696 if base_folder["pkg-dir"]:
1697 artifact_path = "{}/{}/{}/{}".format(
1698 base_folder["folder"],
1699 base_folder["pkg-dir"],
1700 "charms"
1701 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1702 else "helm-charts",
1703 vca_name,
1704 )
1705 else:
1706 artifact_path = "{}/Scripts/{}/{}/".format(
1707 base_folder["folder"],
1708 "charms"
1709 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1710 else "helm-charts",
1711 vca_name,
1712 )
1713
1714 self.logger.debug("Artifact path > {}".format(artifact_path))
1715
1716 # get initial_config_primitive_list that applies to this element
1717 initial_config_primitive_list = config_descriptor.get(
1718 "initial-config-primitive"
1719 )
1720
1721 self.logger.debug(
1722 "Initial config primitive list > {}".format(
1723 initial_config_primitive_list
1724 )
1725 )
1726
1727 # add config if not present for NS charm
1728 ee_descriptor_id = ee_config_descriptor.get("id")
1729 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1730 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1731 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1732 )
1733
1734 self.logger.debug(
1735 "Initial config primitive list #2 > {}".format(
1736 initial_config_primitive_list
1737 )
1738 )
1739 # n2vc_redesign STEP 3.1
1740 # find old ee_id if exists
1741 ee_id = vca_deployed.get("ee_id")
1742
1743 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1744 # create or register execution environment in VCA
1745 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1746
1747 self._write_configuration_status(
1748 nsr_id=nsr_id,
1749 vca_index=vca_index,
1750 status="CREATING",
1751 element_under_configuration=element_under_configuration,
1752 element_type=element_type,
1753 )
1754
1755 step = "create execution environment"
1756 self.logger.debug(logging_text + step)
1757
1758 ee_id = None
1759 credentials = None
1760 if vca_type == "k8s_proxy_charm":
1761 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1762 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1763 namespace=namespace,
1764 artifact_path=artifact_path,
1765 db_dict=db_dict,
1766 vca_id=vca_id,
1767 )
1768 elif vca_type == "helm" or vca_type == "helm-v3":
1769 ee_id, credentials = await self.vca_map[
1770 vca_type
1771 ].create_execution_environment(
1772 namespace=namespace,
1773 reuse_ee_id=ee_id,
1774 db_dict=db_dict,
1775 config=osm_config,
1776 artifact_path=artifact_path,
1777 vca_type=vca_type,
1778 )
1779 else:
1780 ee_id, credentials = await self.vca_map[
1781 vca_type
1782 ].create_execution_environment(
1783 namespace=namespace,
1784 reuse_ee_id=ee_id,
1785 db_dict=db_dict,
1786 vca_id=vca_id,
1787 )
1788
1789 elif vca_type == "native_charm":
1790 step = "Waiting to VM being up and getting IP address"
1791 self.logger.debug(logging_text + step)
1792 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1793 logging_text,
1794 nsr_id,
1795 vnfr_id,
1796 vdu_id,
1797 vdu_index,
1798 user=None,
1799 pub_key=None,
1800 )
1801 credentials = {"hostname": rw_mgmt_ip}
1802 # get username
1803 username = deep_get(
1804 config_descriptor, ("config-access", "ssh-access", "default-user")
1805 )
1806 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1807 # merged. Meanwhile let's get username from initial-config-primitive
1808 if not username and initial_config_primitive_list:
1809 for config_primitive in initial_config_primitive_list:
1810 for param in config_primitive.get("parameter", ()):
1811 if param["name"] == "ssh-username":
1812 username = param["value"]
1813 break
1814 if not username:
1815 raise LcmException(
1816 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1817 "'config-access.ssh-access.default-user'"
1818 )
1819 credentials["username"] = username
1820 # n2vc_redesign STEP 3.2
1821
1822 self._write_configuration_status(
1823 nsr_id=nsr_id,
1824 vca_index=vca_index,
1825 status="REGISTERING",
1826 element_under_configuration=element_under_configuration,
1827 element_type=element_type,
1828 )
1829
1830 step = "register execution environment {}".format(credentials)
1831 self.logger.debug(logging_text + step)
1832 ee_id = await self.vca_map[vca_type].register_execution_environment(
1833 credentials=credentials,
1834 namespace=namespace,
1835 db_dict=db_dict,
1836 vca_id=vca_id,
1837 )
1838
1839 # for compatibility with MON/POL modules, the need model and application name at database
1840 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1841 ee_id_parts = ee_id.split(".")
1842 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1843 if len(ee_id_parts) >= 2:
1844 model_name = ee_id_parts[0]
1845 application_name = ee_id_parts[1]
1846 db_nsr_update[db_update_entry + "model"] = model_name
1847 db_nsr_update[db_update_entry + "application"] = application_name
1848
1849 # n2vc_redesign STEP 3.3
1850 step = "Install configuration Software"
1851
1852 self._write_configuration_status(
1853 nsr_id=nsr_id,
1854 vca_index=vca_index,
1855 status="INSTALLING SW",
1856 element_under_configuration=element_under_configuration,
1857 element_type=element_type,
1858 other_update=db_nsr_update,
1859 )
1860
1861 # TODO check if already done
1862 self.logger.debug(logging_text + step)
1863 config = None
1864 if vca_type == "native_charm":
1865 config_primitive = next(
1866 (p for p in initial_config_primitive_list if p["name"] == "config"),
1867 None,
1868 )
1869 if config_primitive:
1870 config = self._map_primitive_params(
1871 config_primitive, {}, deploy_params
1872 )
1873 num_units = 1
1874 if vca_type == "lxc_proxy_charm":
1875 if element_type == "NS":
1876 num_units = db_nsr.get("config-units") or 1
1877 elif element_type == "VNF":
1878 num_units = db_vnfr.get("config-units") or 1
1879 elif element_type == "VDU":
1880 for v in db_vnfr["vdur"]:
1881 if vdu_id == v["vdu-id-ref"]:
1882 num_units = v.get("config-units") or 1
1883 break
1884 if vca_type != "k8s_proxy_charm":
1885 await self.vca_map[vca_type].install_configuration_sw(
1886 ee_id=ee_id,
1887 artifact_path=artifact_path,
1888 db_dict=db_dict,
1889 config=config,
1890 num_units=num_units,
1891 vca_id=vca_id,
1892 vca_type=vca_type,
1893 )
1894
1895 # write in db flag of configuration_sw already installed
1896 self.update_db_2(
1897 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1898 )
1899
1900 # add relations for this VCA (wait for other peers related with this VCA)
1901 await self._add_vca_relations(
1902 logging_text=logging_text,
1903 nsr_id=nsr_id,
1904 vca_type=vca_type,
1905 vca_index=vca_index,
1906 )
1907
1908 # if SSH access is required, then get execution environment SSH public
1909 # if native charm we have waited already to VM be UP
1910 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1911 pub_key = None
1912 user = None
1913 # self.logger.debug("get ssh key block")
1914 if deep_get(
1915 config_descriptor, ("config-access", "ssh-access", "required")
1916 ):
1917 # self.logger.debug("ssh key needed")
1918 # Needed to inject a ssh key
1919 user = deep_get(
1920 config_descriptor,
1921 ("config-access", "ssh-access", "default-user"),
1922 )
1923 step = "Install configuration Software, getting public ssh key"
1924 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1925 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1926 )
1927
1928 step = "Insert public key into VM user={} ssh_key={}".format(
1929 user, pub_key
1930 )
1931 else:
1932 # self.logger.debug("no need to get ssh key")
1933 step = "Waiting to VM being up and getting IP address"
1934 self.logger.debug(logging_text + step)
1935
1936 # n2vc_redesign STEP 5.1
1937 # wait for RO (ip-address) Insert pub_key into VM
1938 if vnfr_id:
1939 if kdu_name:
1940 rw_mgmt_ip = await self.wait_kdu_up(
1941 logging_text, nsr_id, vnfr_id, kdu_name
1942 )
1943 else:
1944 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1945 logging_text,
1946 nsr_id,
1947 vnfr_id,
1948 vdu_id,
1949 vdu_index,
1950 user=user,
1951 pub_key=pub_key,
1952 )
1953 else:
1954 rw_mgmt_ip = None # This is for a NS configuration
1955
1956 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1957
1958 # store rw_mgmt_ip in deploy params for later replacement
1959 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1960
1961 # n2vc_redesign STEP 6 Execute initial config primitive
1962 step = "execute initial config primitive"
1963
1964 # wait for dependent primitives execution (NS -> VNF -> VDU)
1965 if initial_config_primitive_list:
1966 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1967
1968 # stage, in function of element type: vdu, kdu, vnf or ns
1969 my_vca = vca_deployed_list[vca_index]
1970 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1971 # VDU or KDU
1972 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1973 elif my_vca.get("member-vnf-index"):
1974 # VNF
1975 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1976 else:
1977 # NS
1978 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1979
1980 self._write_configuration_status(
1981 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1982 )
1983
1984 self._write_op_status(op_id=nslcmop_id, stage=stage)
1985
1986 check_if_terminated_needed = True
1987 for initial_config_primitive in initial_config_primitive_list:
1988 # adding information on the vca_deployed if it is a NS execution environment
1989 if not vca_deployed["member-vnf-index"]:
1990 deploy_params["ns_config_info"] = json.dumps(
1991 self._get_ns_config_info(nsr_id)
1992 )
1993 # TODO check if already done
1994 primitive_params_ = self._map_primitive_params(
1995 initial_config_primitive, {}, deploy_params
1996 )
1997
1998 step = "execute primitive '{}' params '{}'".format(
1999 initial_config_primitive["name"], primitive_params_
2000 )
2001 self.logger.debug(logging_text + step)
2002 await self.vca_map[vca_type].exec_primitive(
2003 ee_id=ee_id,
2004 primitive_name=initial_config_primitive["name"],
2005 params_dict=primitive_params_,
2006 db_dict=db_dict,
2007 vca_id=vca_id,
2008 vca_type=vca_type,
2009 )
2010 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2011 if check_if_terminated_needed:
2012 if config_descriptor.get("terminate-config-primitive"):
2013 self.update_db_2(
2014 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2015 )
2016 check_if_terminated_needed = False
2017
2018 # TODO register in database that primitive is done
2019
2020 # STEP 7 Configure metrics
2021 if vca_type == "helm" or vca_type == "helm-v3":
2022 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2023 ee_id=ee_id,
2024 artifact_path=artifact_path,
2025 ee_config_descriptor=ee_config_descriptor,
2026 vnfr_id=vnfr_id,
2027 nsr_id=nsr_id,
2028 target_ip=rw_mgmt_ip,
2029 )
2030 if prometheus_jobs:
2031 self.update_db_2(
2032 "nsrs",
2033 nsr_id,
2034 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2035 )
2036
2037 for job in prometheus_jobs:
2038 self.db.set_one(
2039 "prometheus_jobs",
2040 {
2041 "job_name": job["job_name"]
2042 },
2043 job,
2044 upsert=True,
2045 fail_on_empty=False
2046 )
2047
2048 step = "instantiated at VCA"
2049 self.logger.debug(logging_text + step)
2050
2051 self._write_configuration_status(
2052 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2053 )
2054
2055 except Exception as e: # TODO not use Exception but N2VC exception
2056 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2057 if not isinstance(
2058 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2059 ):
2060 self.logger.error(
2061 "Exception while {} : {}".format(step, e), exc_info=True
2062 )
2063 self._write_configuration_status(
2064 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2065 )
2066 raise LcmException("{} {}".format(step, e)) from e
2067
2068 def _write_ns_status(
2069 self,
2070 nsr_id: str,
2071 ns_state: str,
2072 current_operation: str,
2073 current_operation_id: str,
2074 error_description: str = None,
2075 error_detail: str = None,
2076 other_update: dict = None,
2077 ):
2078 """
2079 Update db_nsr fields.
2080 :param nsr_id:
2081 :param ns_state:
2082 :param current_operation:
2083 :param current_operation_id:
2084 :param error_description:
2085 :param error_detail:
2086 :param other_update: Other required changes at database if provided, will be cleared
2087 :return:
2088 """
2089 try:
2090 db_dict = other_update or {}
2091 db_dict[
2092 "_admin.nslcmop"
2093 ] = current_operation_id # for backward compatibility
2094 db_dict["_admin.current-operation"] = current_operation_id
2095 db_dict["_admin.operation-type"] = (
2096 current_operation if current_operation != "IDLE" else None
2097 )
2098 db_dict["currentOperation"] = current_operation
2099 db_dict["currentOperationID"] = current_operation_id
2100 db_dict["errorDescription"] = error_description
2101 db_dict["errorDetail"] = error_detail
2102
2103 if ns_state:
2104 db_dict["nsState"] = ns_state
2105 self.update_db_2("nsrs", nsr_id, db_dict)
2106 except DbException as e:
2107 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2108
2109 def _write_op_status(
2110 self,
2111 op_id: str,
2112 stage: list = None,
2113 error_message: str = None,
2114 queuePosition: int = 0,
2115 operation_state: str = None,
2116 other_update: dict = None,
2117 ):
2118 try:
2119 db_dict = other_update or {}
2120 db_dict["queuePosition"] = queuePosition
2121 if isinstance(stage, list):
2122 db_dict["stage"] = stage[0]
2123 db_dict["detailed-status"] = " ".join(stage)
2124 elif stage is not None:
2125 db_dict["stage"] = str(stage)
2126
2127 if error_message is not None:
2128 db_dict["errorMessage"] = error_message
2129 if operation_state is not None:
2130 db_dict["operationState"] = operation_state
2131 db_dict["statusEnteredTime"] = time()
2132 self.update_db_2("nslcmops", op_id, db_dict)
2133 except DbException as e:
2134 self.logger.warn(
2135 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2136 )
2137
2138 def _write_all_config_status(self, db_nsr: dict, status: str):
2139 try:
2140 nsr_id = db_nsr["_id"]
2141 # configurationStatus
2142 config_status = db_nsr.get("configurationStatus")
2143 if config_status:
2144 db_nsr_update = {
2145 "configurationStatus.{}.status".format(index): status
2146 for index, v in enumerate(config_status)
2147 if v
2148 }
2149 # update status
2150 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2151
2152 except DbException as e:
2153 self.logger.warn(
2154 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2155 )
2156
2157 def _write_configuration_status(
2158 self,
2159 nsr_id: str,
2160 vca_index: int,
2161 status: str = None,
2162 element_under_configuration: str = None,
2163 element_type: str = None,
2164 other_update: dict = None,
2165 ):
2166
2167 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2168 # .format(vca_index, status))
2169
2170 try:
2171 db_path = "configurationStatus.{}.".format(vca_index)
2172 db_dict = other_update or {}
2173 if status:
2174 db_dict[db_path + "status"] = status
2175 if element_under_configuration:
2176 db_dict[
2177 db_path + "elementUnderConfiguration"
2178 ] = element_under_configuration
2179 if element_type:
2180 db_dict[db_path + "elementType"] = element_type
2181 self.update_db_2("nsrs", nsr_id, db_dict)
2182 except DbException as e:
2183 self.logger.warn(
2184 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2185 status, nsr_id, vca_index, e
2186 )
2187 )
2188
2189 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2190 """
2191 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2192 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2193 Database is used because the result can be obtained from a different LCM worker in case of HA.
2194 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2195 :param db_nslcmop: database content of nslcmop
2196 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2197 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2198 computed 'vim-account-id'
2199 """
2200 modified = False
2201 nslcmop_id = db_nslcmop["_id"]
2202 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2203 if placement_engine == "PLA":
2204 self.logger.debug(
2205 logging_text + "Invoke and wait for placement optimization"
2206 )
2207 await self.msg.aiowrite(
2208 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2209 )
2210 db_poll_interval = 5
2211 wait = db_poll_interval * 10
2212 pla_result = None
2213 while not pla_result and wait >= 0:
2214 await asyncio.sleep(db_poll_interval)
2215 wait -= db_poll_interval
2216 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2217 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2218
2219 if not pla_result:
2220 raise LcmException(
2221 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2222 )
2223
2224 for pla_vnf in pla_result["vnf"]:
2225 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2226 if not pla_vnf.get("vimAccountId") or not vnfr:
2227 continue
2228 modified = True
2229 self.db.set_one(
2230 "vnfrs",
2231 {"_id": vnfr["_id"]},
2232 {"vim-account-id": pla_vnf["vimAccountId"]},
2233 )
2234 # Modifies db_vnfrs
2235 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2236 return modified
2237
2238 def update_nsrs_with_pla_result(self, params):
2239 try:
2240 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2241 self.update_db_2(
2242 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2243 )
2244 except Exception as e:
2245 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2246
2247 async def instantiate(self, nsr_id, nslcmop_id):
2248 """
2249
2250 :param nsr_id: ns instance to deploy
2251 :param nslcmop_id: operation to run
2252 :return:
2253 """
2254
2255 # Try to lock HA task here
2256 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2257 if not task_is_locked_by_me:
2258 self.logger.debug(
2259 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2260 )
2261 return
2262
2263 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2264 self.logger.debug(logging_text + "Enter")
2265
2266 # get all needed from database
2267
2268 # database nsrs record
2269 db_nsr = None
2270
2271 # database nslcmops record
2272 db_nslcmop = None
2273
2274 # update operation on nsrs
2275 db_nsr_update = {}
2276 # update operation on nslcmops
2277 db_nslcmop_update = {}
2278
2279 nslcmop_operation_state = None
2280 db_vnfrs = {} # vnf's info indexed by member-index
2281 # n2vc_info = {}
2282 tasks_dict_info = {} # from task to info text
2283 exc = None
2284 error_list = []
2285 stage = [
2286 "Stage 1/5: preparation of the environment.",
2287 "Waiting for previous operations to terminate.",
2288 "",
2289 ]
2290 # ^ stage, step, VIM progress
2291 try:
2292 # wait for any previous tasks in process
2293 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2294
2295 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2296 stage[1] = "Reading from database."
2297 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2298 db_nsr_update["detailed-status"] = "creating"
2299 db_nsr_update["operational-status"] = "init"
2300 self._write_ns_status(
2301 nsr_id=nsr_id,
2302 ns_state="BUILDING",
2303 current_operation="INSTANTIATING",
2304 current_operation_id=nslcmop_id,
2305 other_update=db_nsr_update,
2306 )
2307 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2308
2309 # read from db: operation
2310 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2311 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2312 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2313 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2314 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2315 )
2316 ns_params = db_nslcmop.get("operationParams")
2317 if ns_params and ns_params.get("timeout_ns_deploy"):
2318 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2319 else:
2320 timeout_ns_deploy = self.timeout.get(
2321 "ns_deploy", self.timeout_ns_deploy
2322 )
2323
2324 # read from db: ns
2325 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2326 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2327 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2328 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2329 self.fs.sync(db_nsr["nsd-id"])
2330 db_nsr["nsd"] = nsd
2331 # nsr_name = db_nsr["name"] # TODO short-name??
2332
2333 # read from db: vnf's of this ns
2334 stage[1] = "Getting vnfrs from db."
2335 self.logger.debug(logging_text + stage[1])
2336 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2337
2338 # read from db: vnfd's for every vnf
2339 db_vnfds = [] # every vnfd data
2340
2341 # for each vnf in ns, read vnfd
2342 for vnfr in db_vnfrs_list:
2343 if vnfr.get("kdur"):
2344 kdur_list = []
2345 for kdur in vnfr["kdur"]:
2346 if kdur.get("additionalParams"):
2347 kdur["additionalParams"] = json.loads(kdur["additionalParams"])
2348 kdur_list.append(kdur)
2349 vnfr["kdur"] = kdur_list
2350
2351 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2352 vnfd_id = vnfr["vnfd-id"]
2353 vnfd_ref = vnfr["vnfd-ref"]
2354 self.fs.sync(vnfd_id)
2355
2356 # if we haven't this vnfd, read it from db
2357 if vnfd_id not in db_vnfds:
2358 # read from db
2359 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2360 vnfd_id, vnfd_ref
2361 )
2362 self.logger.debug(logging_text + stage[1])
2363 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2364
2365 # store vnfd
2366 db_vnfds.append(vnfd)
2367
2368 # Get or generates the _admin.deployed.VCA list
2369 vca_deployed_list = None
2370 if db_nsr["_admin"].get("deployed"):
2371 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2372 if vca_deployed_list is None:
2373 vca_deployed_list = []
2374 configuration_status_list = []
2375 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2376 db_nsr_update["configurationStatus"] = configuration_status_list
2377 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2378 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2379 elif isinstance(vca_deployed_list, dict):
2380 # maintain backward compatibility. Change a dict to list at database
2381 vca_deployed_list = list(vca_deployed_list.values())
2382 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2383 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2384
2385 if not isinstance(
2386 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2387 ):
2388 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2389 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2390
2391 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2392 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2393 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2394 self.db.set_list(
2395 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2396 )
2397
2398 # n2vc_redesign STEP 2 Deploy Network Scenario
2399 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2400 self._write_op_status(op_id=nslcmop_id, stage=stage)
2401
2402 stage[1] = "Deploying KDUs."
2403 # self.logger.debug(logging_text + "Before deploy_kdus")
2404 # Call to deploy_kdus in case exists the "vdu:kdu" param
2405 await self.deploy_kdus(
2406 logging_text=logging_text,
2407 nsr_id=nsr_id,
2408 nslcmop_id=nslcmop_id,
2409 db_vnfrs=db_vnfrs,
2410 db_vnfds=db_vnfds,
2411 task_instantiation_info=tasks_dict_info,
2412 )
2413
2414 stage[1] = "Getting VCA public key."
2415 # n2vc_redesign STEP 1 Get VCA public ssh-key
2416 # feature 1429. Add n2vc public key to needed VMs
2417 n2vc_key = self.n2vc.get_public_key()
2418 n2vc_key_list = [n2vc_key]
2419 if self.vca_config.get("public_key"):
2420 n2vc_key_list.append(self.vca_config["public_key"])
2421
2422 stage[1] = "Deploying NS at VIM."
2423 task_ro = asyncio.ensure_future(
2424 self.instantiate_RO(
2425 logging_text=logging_text,
2426 nsr_id=nsr_id,
2427 nsd=nsd,
2428 db_nsr=db_nsr,
2429 db_nslcmop=db_nslcmop,
2430 db_vnfrs=db_vnfrs,
2431 db_vnfds=db_vnfds,
2432 n2vc_key_list=n2vc_key_list,
2433 stage=stage,
2434 )
2435 )
2436 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2437 tasks_dict_info[task_ro] = "Deploying at VIM"
2438
2439 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2440 stage[1] = "Deploying Execution Environments."
2441 self.logger.debug(logging_text + stage[1])
2442
2443 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2444 for vnf_profile in get_vnf_profiles(nsd):
2445 vnfd_id = vnf_profile["vnfd-id"]
2446 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2447 member_vnf_index = str(vnf_profile["id"])
2448 db_vnfr = db_vnfrs[member_vnf_index]
2449 base_folder = vnfd["_admin"]["storage"]
2450 vdu_id = None
2451 vdu_index = 0
2452 vdu_name = None
2453 kdu_name = None
2454
2455 # Get additional parameters
2456 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2457 if db_vnfr.get("additionalParamsForVnf"):
2458 deploy_params.update(
2459 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2460 )
2461
2462 descriptor_config = get_configuration(vnfd, vnfd["id"])
2463 if descriptor_config:
2464 self._deploy_n2vc(
2465 logging_text=logging_text
2466 + "member_vnf_index={} ".format(member_vnf_index),
2467 db_nsr=db_nsr,
2468 db_vnfr=db_vnfr,
2469 nslcmop_id=nslcmop_id,
2470 nsr_id=nsr_id,
2471 nsi_id=nsi_id,
2472 vnfd_id=vnfd_id,
2473 vdu_id=vdu_id,
2474 kdu_name=kdu_name,
2475 member_vnf_index=member_vnf_index,
2476 vdu_index=vdu_index,
2477 vdu_name=vdu_name,
2478 deploy_params=deploy_params,
2479 descriptor_config=descriptor_config,
2480 base_folder=base_folder,
2481 task_instantiation_info=tasks_dict_info,
2482 stage=stage,
2483 )
2484
2485 # Deploy charms for each VDU that supports one.
2486 for vdud in get_vdu_list(vnfd):
2487 vdu_id = vdud["id"]
2488 descriptor_config = get_configuration(vnfd, vdu_id)
2489 vdur = find_in_list(
2490 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2491 )
2492
2493 if vdur.get("additionalParams"):
2494 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2495 else:
2496 deploy_params_vdu = deploy_params
2497 deploy_params_vdu["OSM"] = get_osm_params(
2498 db_vnfr, vdu_id, vdu_count_index=0
2499 )
2500 vdud_count = get_number_of_instances(vnfd, vdu_id)
2501
2502 self.logger.debug("VDUD > {}".format(vdud))
2503 self.logger.debug(
2504 "Descriptor config > {}".format(descriptor_config)
2505 )
2506 if descriptor_config:
2507 vdu_name = None
2508 kdu_name = None
2509 for vdu_index in range(vdud_count):
2510 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2511 self._deploy_n2vc(
2512 logging_text=logging_text
2513 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2514 member_vnf_index, vdu_id, vdu_index
2515 ),
2516 db_nsr=db_nsr,
2517 db_vnfr=db_vnfr,
2518 nslcmop_id=nslcmop_id,
2519 nsr_id=nsr_id,
2520 nsi_id=nsi_id,
2521 vnfd_id=vnfd_id,
2522 vdu_id=vdu_id,
2523 kdu_name=kdu_name,
2524 member_vnf_index=member_vnf_index,
2525 vdu_index=vdu_index,
2526 vdu_name=vdu_name,
2527 deploy_params=deploy_params_vdu,
2528 descriptor_config=descriptor_config,
2529 base_folder=base_folder,
2530 task_instantiation_info=tasks_dict_info,
2531 stage=stage,
2532 )
2533 for kdud in get_kdu_list(vnfd):
2534 kdu_name = kdud["name"]
2535 descriptor_config = get_configuration(vnfd, kdu_name)
2536 if descriptor_config:
2537 vdu_id = None
2538 vdu_index = 0
2539 vdu_name = None
2540 kdur = next(
2541 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2542 )
2543 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2544 if kdur.get("additionalParams"):
2545 deploy_params_kdu = parse_yaml_strings(
2546 kdur["additionalParams"]
2547 )
2548
2549 self._deploy_n2vc(
2550 logging_text=logging_text,
2551 db_nsr=db_nsr,
2552 db_vnfr=db_vnfr,
2553 nslcmop_id=nslcmop_id,
2554 nsr_id=nsr_id,
2555 nsi_id=nsi_id,
2556 vnfd_id=vnfd_id,
2557 vdu_id=vdu_id,
2558 kdu_name=kdu_name,
2559 member_vnf_index=member_vnf_index,
2560 vdu_index=vdu_index,
2561 vdu_name=vdu_name,
2562 deploy_params=deploy_params_kdu,
2563 descriptor_config=descriptor_config,
2564 base_folder=base_folder,
2565 task_instantiation_info=tasks_dict_info,
2566 stage=stage,
2567 )
2568
2569 # Check if this NS has a charm configuration
2570 descriptor_config = nsd.get("ns-configuration")
2571 if descriptor_config and descriptor_config.get("juju"):
2572 vnfd_id = None
2573 db_vnfr = None
2574 member_vnf_index = None
2575 vdu_id = None
2576 kdu_name = None
2577 vdu_index = 0
2578 vdu_name = None
2579
2580 # Get additional parameters
2581 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2582 if db_nsr.get("additionalParamsForNs"):
2583 deploy_params.update(
2584 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2585 )
2586 base_folder = nsd["_admin"]["storage"]
2587 self._deploy_n2vc(
2588 logging_text=logging_text,
2589 db_nsr=db_nsr,
2590 db_vnfr=db_vnfr,
2591 nslcmop_id=nslcmop_id,
2592 nsr_id=nsr_id,
2593 nsi_id=nsi_id,
2594 vnfd_id=vnfd_id,
2595 vdu_id=vdu_id,
2596 kdu_name=kdu_name,
2597 member_vnf_index=member_vnf_index,
2598 vdu_index=vdu_index,
2599 vdu_name=vdu_name,
2600 deploy_params=deploy_params,
2601 descriptor_config=descriptor_config,
2602 base_folder=base_folder,
2603 task_instantiation_info=tasks_dict_info,
2604 stage=stage,
2605 )
2606
2607 # rest of staff will be done at finally
2608
2609 except (
2610 ROclient.ROClientException,
2611 DbException,
2612 LcmException,
2613 N2VCException,
2614 ) as e:
2615 self.logger.error(
2616 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2617 )
2618 exc = e
2619 except asyncio.CancelledError:
2620 self.logger.error(
2621 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2622 )
2623 exc = "Operation was cancelled"
2624 except Exception as e:
2625 exc = traceback.format_exc()
2626 self.logger.critical(
2627 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2628 exc_info=True,
2629 )
2630 finally:
2631 if exc:
2632 error_list.append(str(exc))
2633 try:
2634 # wait for pending tasks
2635 if tasks_dict_info:
2636 stage[1] = "Waiting for instantiate pending tasks."
2637 self.logger.debug(logging_text + stage[1])
2638 error_list += await self._wait_for_tasks(
2639 logging_text,
2640 tasks_dict_info,
2641 timeout_ns_deploy,
2642 stage,
2643 nslcmop_id,
2644 nsr_id=nsr_id,
2645 )
2646 stage[1] = stage[2] = ""
2647 except asyncio.CancelledError:
2648 error_list.append("Cancelled")
2649 # TODO cancel all tasks
2650 except Exception as exc:
2651 error_list.append(str(exc))
2652
2653 # update operation-status
2654 db_nsr_update["operational-status"] = "running"
2655 # let's begin with VCA 'configured' status (later we can change it)
2656 db_nsr_update["config-status"] = "configured"
2657 for task, task_name in tasks_dict_info.items():
2658 if not task.done() or task.cancelled() or task.exception():
2659 if task_name.startswith(self.task_name_deploy_vca):
2660 # A N2VC task is pending
2661 db_nsr_update["config-status"] = "failed"
2662 else:
2663 # RO or KDU task is pending
2664 db_nsr_update["operational-status"] = "failed"
2665
2666 # update status at database
2667 if error_list:
2668 error_detail = ". ".join(error_list)
2669 self.logger.error(logging_text + error_detail)
2670 error_description_nslcmop = "{} Detail: {}".format(
2671 stage[0], error_detail
2672 )
2673 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2674 nslcmop_id, stage[0]
2675 )
2676
2677 db_nsr_update["detailed-status"] = (
2678 error_description_nsr + " Detail: " + error_detail
2679 )
2680 db_nslcmop_update["detailed-status"] = error_detail
2681 nslcmop_operation_state = "FAILED"
2682 ns_state = "BROKEN"
2683 else:
2684 error_detail = None
2685 error_description_nsr = error_description_nslcmop = None
2686 ns_state = "READY"
2687 db_nsr_update["detailed-status"] = "Done"
2688 db_nslcmop_update["detailed-status"] = "Done"
2689 nslcmop_operation_state = "COMPLETED"
2690
2691 if db_nsr:
2692 self._write_ns_status(
2693 nsr_id=nsr_id,
2694 ns_state=ns_state,
2695 current_operation="IDLE",
2696 current_operation_id=None,
2697 error_description=error_description_nsr,
2698 error_detail=error_detail,
2699 other_update=db_nsr_update,
2700 )
2701 self._write_op_status(
2702 op_id=nslcmop_id,
2703 stage="",
2704 error_message=error_description_nslcmop,
2705 operation_state=nslcmop_operation_state,
2706 other_update=db_nslcmop_update,
2707 )
2708
2709 if nslcmop_operation_state:
2710 try:
2711 await self.msg.aiowrite(
2712 "ns",
2713 "instantiated",
2714 {
2715 "nsr_id": nsr_id,
2716 "nslcmop_id": nslcmop_id,
2717 "operationState": nslcmop_operation_state,
2718 },
2719 loop=self.loop,
2720 )
2721 except Exception as e:
2722 self.logger.error(
2723 logging_text + "kafka_write notification Exception {}".format(e)
2724 )
2725
2726 self.logger.debug(logging_text + "Exit")
2727 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2728
2729 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2730 if vnfd_id not in cached_vnfds:
2731 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2732 return cached_vnfds[vnfd_id]
2733
2734 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2735 if vnf_profile_id not in cached_vnfrs:
2736 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2737 "vnfrs",
2738 {
2739 "member-vnf-index-ref": vnf_profile_id,
2740 "nsr-id-ref": nsr_id,
2741 },
2742 )
2743 return cached_vnfrs[vnf_profile_id]
2744
2745 def _is_deployed_vca_in_relation(
2746 self, vca: DeployedVCA, relation: Relation
2747 ) -> bool:
2748 found = False
2749 for endpoint in (relation.provider, relation.requirer):
2750 if endpoint["kdu-resource-profile-id"]:
2751 continue
2752 found = (
2753 vca.vnf_profile_id == endpoint.vnf_profile_id
2754 and vca.vdu_profile_id == endpoint.vdu_profile_id
2755 and vca.execution_environment_ref == endpoint.execution_environment_ref
2756 )
2757 if found:
2758 break
2759 return found
2760
2761 def _update_ee_relation_data_with_implicit_data(
2762 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2763 ):
2764 ee_relation_data = safe_get_ee_relation(
2765 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2766 )
2767 ee_relation_level = EELevel.get_level(ee_relation_data)
2768 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2769 "execution-environment-ref"
2770 ]:
2771 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2772 vnfd_id = vnf_profile["vnfd-id"]
2773 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2774 entity_id = (
2775 vnfd_id
2776 if ee_relation_level == EELevel.VNF
2777 else ee_relation_data["vdu-profile-id"]
2778 )
2779 ee = get_juju_ee_ref(db_vnfd, entity_id)
2780 if not ee:
2781 raise Exception(
2782 f"not execution environments found for ee_relation {ee_relation_data}"
2783 )
2784 ee_relation_data["execution-environment-ref"] = ee["id"]
2785 return ee_relation_data
2786
2787 def _get_ns_relations(
2788 self,
2789 nsr_id: str,
2790 nsd: Dict[str, Any],
2791 vca: DeployedVCA,
2792 cached_vnfds: Dict[str, Any],
2793 ) -> List[Relation]:
2794 relations = []
2795 db_ns_relations = get_ns_configuration_relation_list(nsd)
2796 for r in db_ns_relations:
2797 provider_dict = None
2798 requirer_dict = None
2799 if all(key in r for key in ("provider", "requirer")):
2800 provider_dict = r["provider"]
2801 requirer_dict = r["requirer"]
2802 elif "entities" in r:
2803 provider_id = r["entities"][0]["id"]
2804 provider_dict = {
2805 "nsr-id": nsr_id,
2806 "endpoint": r["entities"][0]["endpoint"],
2807 }
2808 if provider_id != nsd["id"]:
2809 provider_dict["vnf-profile-id"] = provider_id
2810 requirer_id = r["entities"][1]["id"]
2811 requirer_dict = {
2812 "nsr-id": nsr_id,
2813 "endpoint": r["entities"][1]["endpoint"],
2814 }
2815 if requirer_id != nsd["id"]:
2816 requirer_dict["vnf-profile-id"] = requirer_id
2817 else:
2818 raise Exception("provider/requirer or entities must be included in the relation.")
2819 relation_provider = self._update_ee_relation_data_with_implicit_data(
2820 nsr_id, nsd, provider_dict, cached_vnfds
2821 )
2822 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2823 nsr_id, nsd, requirer_dict, cached_vnfds
2824 )
2825 provider = EERelation(relation_provider)
2826 requirer = EERelation(relation_requirer)
2827 relation = Relation(r["name"], provider, requirer)
2828 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2829 if vca_in_relation:
2830 relations.append(relation)
2831 return relations
2832
2833 def _get_vnf_relations(
2834 self,
2835 nsr_id: str,
2836 nsd: Dict[str, Any],
2837 vca: DeployedVCA,
2838 cached_vnfds: Dict[str, Any],
2839 ) -> List[Relation]:
2840 relations = []
2841 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2842 vnf_profile_id = vnf_profile["id"]
2843 vnfd_id = vnf_profile["vnfd-id"]
2844 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2845 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2846 for r in db_vnf_relations:
2847 provider_dict = None
2848 requirer_dict = None
2849 if all(key in r for key in ("provider", "requirer")):
2850 provider_dict = r["provider"]
2851 requirer_dict = r["requirer"]
2852 elif "entities" in r:
2853 provider_id = r["entities"][0]["id"]
2854 provider_dict = {
2855 "nsr-id": nsr_id,
2856 "vnf-profile-id": vnf_profile_id,
2857 "endpoint": r["entities"][0]["endpoint"],
2858 }
2859 if provider_id != vnfd_id:
2860 provider_dict["vdu-profile-id"] = provider_id
2861 requirer_id = r["entities"][1]["id"]
2862 requirer_dict = {
2863 "nsr-id": nsr_id,
2864 "vnf-profile-id": vnf_profile_id,
2865 "endpoint": r["entities"][1]["endpoint"],
2866 }
2867 if requirer_id != vnfd_id:
2868 requirer_dict["vdu-profile-id"] = requirer_id
2869 else:
2870 raise Exception("provider/requirer or entities must be included in the relation.")
2871 relation_provider = self._update_ee_relation_data_with_implicit_data(
2872 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2873 )
2874 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2875 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2876 )
2877 provider = EERelation(relation_provider)
2878 requirer = EERelation(relation_requirer)
2879 relation = Relation(r["name"], provider, requirer)
2880 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2881 if vca_in_relation:
2882 relations.append(relation)
2883 return relations
2884
2885 def _get_kdu_resource_data(
2886 self,
2887 ee_relation: EERelation,
2888 db_nsr: Dict[str, Any],
2889 cached_vnfds: Dict[str, Any],
2890 ) -> DeployedK8sResource:
2891 nsd = get_nsd(db_nsr)
2892 vnf_profiles = get_vnf_profiles(nsd)
2893 vnfd_id = find_in_list(
2894 vnf_profiles,
2895 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2896 )["vnfd-id"]
2897 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2898 kdu_resource_profile = get_kdu_resource_profile(
2899 db_vnfd, ee_relation.kdu_resource_profile_id
2900 )
2901 kdu_name = kdu_resource_profile["kdu-name"]
2902 deployed_kdu, _ = get_deployed_kdu(
2903 db_nsr.get("_admin", ()).get("deployed", ()),
2904 kdu_name,
2905 ee_relation.vnf_profile_id,
2906 )
2907 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2908 return deployed_kdu
2909
2910 def _get_deployed_component(
2911 self,
2912 ee_relation: EERelation,
2913 db_nsr: Dict[str, Any],
2914 cached_vnfds: Dict[str, Any],
2915 ) -> DeployedComponent:
2916 nsr_id = db_nsr["_id"]
2917 deployed_component = None
2918 ee_level = EELevel.get_level(ee_relation)
2919 if ee_level == EELevel.NS:
2920 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2921 if vca:
2922 deployed_component = DeployedVCA(nsr_id, vca)
2923 elif ee_level == EELevel.VNF:
2924 vca = get_deployed_vca(
2925 db_nsr,
2926 {
2927 "vdu_id": None,
2928 "member-vnf-index": ee_relation.vnf_profile_id,
2929 "ee_descriptor_id": ee_relation.execution_environment_ref,
2930 },
2931 )
2932 if vca:
2933 deployed_component = DeployedVCA(nsr_id, vca)
2934 elif ee_level == EELevel.VDU:
2935 vca = get_deployed_vca(
2936 db_nsr,
2937 {
2938 "vdu_id": ee_relation.vdu_profile_id,
2939 "member-vnf-index": ee_relation.vnf_profile_id,
2940 "ee_descriptor_id": ee_relation.execution_environment_ref,
2941 },
2942 )
2943 if vca:
2944 deployed_component = DeployedVCA(nsr_id, vca)
2945 elif ee_level == EELevel.KDU:
2946 kdu_resource_data = self._get_kdu_resource_data(
2947 ee_relation, db_nsr, cached_vnfds
2948 )
2949 if kdu_resource_data:
2950 deployed_component = DeployedK8sResource(kdu_resource_data)
2951 return deployed_component
2952
2953 async def _add_relation(
2954 self,
2955 relation: Relation,
2956 vca_type: str,
2957 db_nsr: Dict[str, Any],
2958 cached_vnfds: Dict[str, Any],
2959 cached_vnfrs: Dict[str, Any],
2960 ) -> bool:
2961 deployed_provider = self._get_deployed_component(
2962 relation.provider, db_nsr, cached_vnfds
2963 )
2964 deployed_requirer = self._get_deployed_component(
2965 relation.requirer, db_nsr, cached_vnfds
2966 )
2967 if (
2968 deployed_provider
2969 and deployed_requirer
2970 and deployed_provider.config_sw_installed
2971 and deployed_requirer.config_sw_installed
2972 ):
2973 provider_db_vnfr = (
2974 self._get_vnfr(
2975 relation.provider.nsr_id,
2976 relation.provider.vnf_profile_id,
2977 cached_vnfrs,
2978 )
2979 if relation.provider.vnf_profile_id
2980 else None
2981 )
2982 requirer_db_vnfr = (
2983 self._get_vnfr(
2984 relation.requirer.nsr_id,
2985 relation.requirer.vnf_profile_id,
2986 cached_vnfrs,
2987 )
2988 if relation.requirer.vnf_profile_id
2989 else None
2990 )
2991 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
2992 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
2993 provider_relation_endpoint = RelationEndpoint(
2994 deployed_provider.ee_id,
2995 provider_vca_id,
2996 relation.provider.endpoint,
2997 )
2998 requirer_relation_endpoint = RelationEndpoint(
2999 deployed_requirer.ee_id,
3000 requirer_vca_id,
3001 relation.requirer.endpoint,
3002 )
3003 await self.vca_map[vca_type].add_relation(
3004 provider=provider_relation_endpoint,
3005 requirer=requirer_relation_endpoint,
3006 )
3007 # remove entry from relations list
3008 return True
3009 return False
3010
3011 async def _add_vca_relations(
3012 self,
3013 logging_text,
3014 nsr_id,
3015 vca_type: str,
3016 vca_index: int,
3017 timeout: int = 3600,
3018 ) -> bool:
3019
3020 # steps:
3021 # 1. find all relations for this VCA
3022 # 2. wait for other peers related
3023 # 3. add relations
3024
3025 try:
3026 # STEP 1: find all relations for this VCA
3027
3028 # read nsr record
3029 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3030 nsd = get_nsd(db_nsr)
3031
3032 # this VCA data
3033 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3034 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3035
3036 cached_vnfds = {}
3037 cached_vnfrs = {}
3038 relations = []
3039 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3040 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3041
3042 # if no relations, terminate
3043 if not relations:
3044 self.logger.debug(logging_text + " No relations")
3045 return True
3046
3047 self.logger.debug(logging_text + " adding relations {}".format(relations))
3048
3049 # add all relations
3050 start = time()
3051 while True:
3052 # check timeout
3053 now = time()
3054 if now - start >= timeout:
3055 self.logger.error(logging_text + " : timeout adding relations")
3056 return False
3057
3058 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3059 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3060
3061 # for each relation, find the VCA's related
3062 for relation in relations.copy():
3063 added = await self._add_relation(
3064 relation,
3065 vca_type,
3066 db_nsr,
3067 cached_vnfds,
3068 cached_vnfrs,
3069 )
3070 if added:
3071 relations.remove(relation)
3072
3073 if not relations:
3074 self.logger.debug("Relations added")
3075 break
3076 await asyncio.sleep(5.0)
3077
3078 return True
3079
3080 except Exception as e:
3081 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3082 return False
3083
3084 async def _install_kdu(
3085 self,
3086 nsr_id: str,
3087 nsr_db_path: str,
3088 vnfr_data: dict,
3089 kdu_index: int,
3090 kdud: dict,
3091 vnfd: dict,
3092 k8s_instance_info: dict,
3093 k8params: dict = None,
3094 timeout: int = 600,
3095 vca_id: str = None,
3096 ):
3097
3098 try:
3099 k8sclustertype = k8s_instance_info["k8scluster-type"]
3100 # Instantiate kdu
3101 db_dict_install = {
3102 "collection": "nsrs",
3103 "filter": {"_id": nsr_id},
3104 "path": nsr_db_path,
3105 }
3106
3107 if k8s_instance_info.get("kdu-deployment-name"):
3108 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3109 else:
3110 kdu_instance = self.k8scluster_map[
3111 k8sclustertype
3112 ].generate_kdu_instance_name(
3113 db_dict=db_dict_install,
3114 kdu_model=k8s_instance_info["kdu-model"],
3115 kdu_name=k8s_instance_info["kdu-name"],
3116 )
3117 self.update_db_2(
3118 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3119 )
3120 await self.k8scluster_map[k8sclustertype].install(
3121 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3122 kdu_model=k8s_instance_info["kdu-model"],
3123 atomic=True,
3124 params=k8params,
3125 db_dict=db_dict_install,
3126 timeout=timeout,
3127 kdu_name=k8s_instance_info["kdu-name"],
3128 namespace=k8s_instance_info["namespace"],
3129 kdu_instance=kdu_instance,
3130 vca_id=vca_id,
3131 )
3132 self.update_db_2(
3133 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3134 )
3135
3136 # Obtain services to obtain management service ip
3137 services = await self.k8scluster_map[k8sclustertype].get_services(
3138 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3139 kdu_instance=kdu_instance,
3140 namespace=k8s_instance_info["namespace"],
3141 )
3142
3143 # Obtain management service info (if exists)
3144 vnfr_update_dict = {}
3145 kdu_config = get_configuration(vnfd, kdud["name"])
3146 if kdu_config:
3147 target_ee_list = kdu_config.get("execution-environment-list", [])
3148 else:
3149 target_ee_list = []
3150
3151 if services:
3152 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3153 mgmt_services = [
3154 service
3155 for service in kdud.get("service", [])
3156 if service.get("mgmt-service")
3157 ]
3158 for mgmt_service in mgmt_services:
3159 for service in services:
3160 if service["name"].startswith(mgmt_service["name"]):
3161 # Mgmt service found, Obtain service ip
3162 ip = service.get("external_ip", service.get("cluster_ip"))
3163 if isinstance(ip, list) and len(ip) == 1:
3164 ip = ip[0]
3165
3166 vnfr_update_dict[
3167 "kdur.{}.ip-address".format(kdu_index)
3168 ] = ip
3169
3170 # Check if must update also mgmt ip at the vnf
3171 service_external_cp = mgmt_service.get(
3172 "external-connection-point-ref"
3173 )
3174 if service_external_cp:
3175 if (
3176 deep_get(vnfd, ("mgmt-interface", "cp"))
3177 == service_external_cp
3178 ):
3179 vnfr_update_dict["ip-address"] = ip
3180
3181 if find_in_list(
3182 target_ee_list,
3183 lambda ee: ee.get(
3184 "external-connection-point-ref", ""
3185 )
3186 == service_external_cp,
3187 ):
3188 vnfr_update_dict[
3189 "kdur.{}.ip-address".format(kdu_index)
3190 ] = ip
3191 break
3192 else:
3193 self.logger.warn(
3194 "Mgmt service name: {} not found".format(
3195 mgmt_service["name"]
3196 )
3197 )
3198
3199 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3200 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3201
3202 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3203 if (
3204 kdu_config
3205 and kdu_config.get("initial-config-primitive")
3206 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3207 ):
3208 initial_config_primitive_list = kdu_config.get(
3209 "initial-config-primitive"
3210 )
3211 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3212
3213 for initial_config_primitive in initial_config_primitive_list:
3214 primitive_params_ = self._map_primitive_params(
3215 initial_config_primitive, {}, {}
3216 )
3217
3218 await asyncio.wait_for(
3219 self.k8scluster_map[k8sclustertype].exec_primitive(
3220 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3221 kdu_instance=kdu_instance,
3222 primitive_name=initial_config_primitive["name"],
3223 params=primitive_params_,
3224 db_dict=db_dict_install,
3225 vca_id=vca_id,
3226 ),
3227 timeout=timeout,
3228 )
3229
3230 except Exception as e:
3231 # Prepare update db with error and raise exception
3232 try:
3233 self.update_db_2(
3234 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3235 )
3236 self.update_db_2(
3237 "vnfrs",
3238 vnfr_data.get("_id"),
3239 {"kdur.{}.status".format(kdu_index): "ERROR"},
3240 )
3241 except Exception:
3242 # ignore to keep original exception
3243 pass
3244 # reraise original error
3245 raise
3246
3247 return kdu_instance
3248
3249 async def deploy_kdus(
3250 self,
3251 logging_text,
3252 nsr_id,
3253 nslcmop_id,
3254 db_vnfrs,
3255 db_vnfds,
3256 task_instantiation_info,
3257 ):
3258 # Launch kdus if present in the descriptor
3259
3260 k8scluster_id_2_uuic = {
3261 "helm-chart-v3": {},
3262 "helm-chart": {},
3263 "juju-bundle": {},
3264 }
3265
3266 async def _get_cluster_id(cluster_id, cluster_type):
3267 nonlocal k8scluster_id_2_uuic
3268 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3269 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3270
3271 # check if K8scluster is creating and wait look if previous tasks in process
3272 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3273 "k8scluster", cluster_id
3274 )
3275 if task_dependency:
3276 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3277 task_name, cluster_id
3278 )
3279 self.logger.debug(logging_text + text)
3280 await asyncio.wait(task_dependency, timeout=3600)
3281
3282 db_k8scluster = self.db.get_one(
3283 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3284 )
3285 if not db_k8scluster:
3286 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3287
3288 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3289 if not k8s_id:
3290 if cluster_type == "helm-chart-v3":
3291 try:
3292 # backward compatibility for existing clusters that have not been initialized for helm v3
3293 k8s_credentials = yaml.safe_dump(
3294 db_k8scluster.get("credentials")
3295 )
3296 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3297 k8s_credentials, reuse_cluster_uuid=cluster_id
3298 )
3299 db_k8scluster_update = {}
3300 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3301 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3302 db_k8scluster_update[
3303 "_admin.helm-chart-v3.created"
3304 ] = uninstall_sw
3305 db_k8scluster_update[
3306 "_admin.helm-chart-v3.operationalState"
3307 ] = "ENABLED"
3308 self.update_db_2(
3309 "k8sclusters", cluster_id, db_k8scluster_update
3310 )
3311 except Exception as e:
3312 self.logger.error(
3313 logging_text
3314 + "error initializing helm-v3 cluster: {}".format(str(e))
3315 )
3316 raise LcmException(
3317 "K8s cluster '{}' has not been initialized for '{}'".format(
3318 cluster_id, cluster_type
3319 )
3320 )
3321 else:
3322 raise LcmException(
3323 "K8s cluster '{}' has not been initialized for '{}'".format(
3324 cluster_id, cluster_type
3325 )
3326 )
3327 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3328 return k8s_id
3329
3330 logging_text += "Deploy kdus: "
3331 step = ""
3332 try:
3333 db_nsr_update = {"_admin.deployed.K8s": []}
3334 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3335
3336 index = 0
3337 updated_cluster_list = []
3338 updated_v3_cluster_list = []
3339
3340 for vnfr_data in db_vnfrs.values():
3341 vca_id = self.get_vca_id(vnfr_data, {})
3342 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3343 # Step 0: Prepare and set parameters
3344 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3345 vnfd_id = vnfr_data.get("vnfd-id")
3346 vnfd_with_id = find_in_list(
3347 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3348 )
3349 kdud = next(
3350 kdud
3351 for kdud in vnfd_with_id["kdu"]
3352 if kdud["name"] == kdur["kdu-name"]
3353 )
3354 namespace = kdur.get("k8s-namespace")
3355 kdu_deployment_name = kdur.get("kdu-deployment-name")
3356 if kdur.get("helm-chart"):
3357 kdumodel = kdur["helm-chart"]
3358 # Default version: helm3, if helm-version is v2 assign v2
3359 k8sclustertype = "helm-chart-v3"
3360 self.logger.debug("kdur: {}".format(kdur))
3361 if (
3362 kdur.get("helm-version")
3363 and kdur.get("helm-version") == "v2"
3364 ):
3365 k8sclustertype = "helm-chart"
3366 elif kdur.get("juju-bundle"):
3367 kdumodel = kdur["juju-bundle"]
3368 k8sclustertype = "juju-bundle"
3369 else:
3370 raise LcmException(
3371 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3372 "juju-bundle. Maybe an old NBI version is running".format(
3373 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3374 )
3375 )
3376 # check if kdumodel is a file and exists
3377 try:
3378 vnfd_with_id = find_in_list(
3379 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3380 )
3381 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3382 if storage: # may be not present if vnfd has not artifacts
3383 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3384 if storage["pkg-dir"]:
3385 filename = "{}/{}/{}s/{}".format(
3386 storage["folder"],
3387 storage["pkg-dir"],
3388 k8sclustertype,
3389 kdumodel,
3390 )
3391 else:
3392 filename = "{}/Scripts/{}s/{}".format(
3393 storage["folder"],
3394 k8sclustertype,
3395 kdumodel,
3396 )
3397 if self.fs.file_exists(
3398 filename, mode="file"
3399 ) or self.fs.file_exists(filename, mode="dir"):
3400 kdumodel = self.fs.path + filename
3401 except (asyncio.TimeoutError, asyncio.CancelledError):
3402 raise
3403 except Exception: # it is not a file
3404 pass
3405
3406 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3407 step = "Synchronize repos for k8s cluster '{}'".format(
3408 k8s_cluster_id
3409 )
3410 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3411
3412 # Synchronize repos
3413 if (
3414 k8sclustertype == "helm-chart"
3415 and cluster_uuid not in updated_cluster_list
3416 ) or (
3417 k8sclustertype == "helm-chart-v3"
3418 and cluster_uuid not in updated_v3_cluster_list
3419 ):
3420 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3421 self.k8scluster_map[k8sclustertype].synchronize_repos(
3422 cluster_uuid=cluster_uuid
3423 )
3424 )
3425 if del_repo_list or added_repo_dict:
3426 if k8sclustertype == "helm-chart":
3427 unset = {
3428 "_admin.helm_charts_added." + item: None
3429 for item in del_repo_list
3430 }
3431 updated = {
3432 "_admin.helm_charts_added." + item: name
3433 for item, name in added_repo_dict.items()
3434 }
3435 updated_cluster_list.append(cluster_uuid)
3436 elif k8sclustertype == "helm-chart-v3":
3437 unset = {
3438 "_admin.helm_charts_v3_added." + item: None
3439 for item in del_repo_list
3440 }
3441 updated = {
3442 "_admin.helm_charts_v3_added." + item: name
3443 for item, name in added_repo_dict.items()
3444 }
3445 updated_v3_cluster_list.append(cluster_uuid)
3446 self.logger.debug(
3447 logging_text + "repos synchronized on k8s cluster "
3448 "'{}' to_delete: {}, to_add: {}".format(
3449 k8s_cluster_id, del_repo_list, added_repo_dict
3450 )
3451 )
3452 self.db.set_one(
3453 "k8sclusters",
3454 {"_id": k8s_cluster_id},
3455 updated,
3456 unset=unset,
3457 )
3458
3459 # Instantiate kdu
3460 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3461 vnfr_data["member-vnf-index-ref"],
3462 kdur["kdu-name"],
3463 k8s_cluster_id,
3464 )
3465 k8s_instance_info = {
3466 "kdu-instance": None,
3467 "k8scluster-uuid": cluster_uuid,
3468 "k8scluster-type": k8sclustertype,
3469 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3470 "kdu-name": kdur["kdu-name"],
3471 "kdu-model": kdumodel,
3472 "namespace": namespace,
3473 "kdu-deployment-name": kdu_deployment_name,
3474 }
3475 db_path = "_admin.deployed.K8s.{}".format(index)
3476 db_nsr_update[db_path] = k8s_instance_info
3477 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3478 vnfd_with_id = find_in_list(
3479 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3480 )
3481 task = asyncio.ensure_future(
3482 self._install_kdu(
3483 nsr_id,
3484 db_path,
3485 vnfr_data,
3486 kdu_index,
3487 kdud,
3488 vnfd_with_id,
3489 k8s_instance_info,
3490 k8params=desc_params,
3491 timeout=600,
3492 vca_id=vca_id,
3493 )
3494 )
3495 self.lcm_tasks.register(
3496 "ns",
3497 nsr_id,
3498 nslcmop_id,
3499 "instantiate_KDU-{}".format(index),
3500 task,
3501 )
3502 task_instantiation_info[task] = "Deploying KDU {}".format(
3503 kdur["kdu-name"]
3504 )
3505
3506 index += 1
3507
3508 except (LcmException, asyncio.CancelledError):
3509 raise
3510 except Exception as e:
3511 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3512 if isinstance(e, (N2VCException, DbException)):
3513 self.logger.error(logging_text + msg)
3514 else:
3515 self.logger.critical(logging_text + msg, exc_info=True)
3516 raise LcmException(msg)
3517 finally:
3518 if db_nsr_update:
3519 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3520
3521 def _deploy_n2vc(
3522 self,
3523 logging_text,
3524 db_nsr,
3525 db_vnfr,
3526 nslcmop_id,
3527 nsr_id,
3528 nsi_id,
3529 vnfd_id,
3530 vdu_id,
3531 kdu_name,
3532 member_vnf_index,
3533 vdu_index,
3534 vdu_name,
3535 deploy_params,
3536 descriptor_config,
3537 base_folder,
3538 task_instantiation_info,
3539 stage,
3540 ):
3541 # launch instantiate_N2VC in a asyncio task and register task object
3542 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3543 # if not found, create one entry and update database
3544 # fill db_nsr._admin.deployed.VCA.<index>
3545
3546 self.logger.debug(
3547 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3548 )
3549 if "execution-environment-list" in descriptor_config:
3550 ee_list = descriptor_config.get("execution-environment-list", [])
3551 elif "juju" in descriptor_config:
3552 ee_list = [descriptor_config] # ns charms
3553 else: # other types as script are not supported
3554 ee_list = []
3555
3556 for ee_item in ee_list:
3557 self.logger.debug(
3558 logging_text
3559 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3560 ee_item.get("juju"), ee_item.get("helm-chart")
3561 )
3562 )
3563 ee_descriptor_id = ee_item.get("id")
3564 if ee_item.get("juju"):
3565 vca_name = ee_item["juju"].get("charm")
3566 vca_type = (
3567 "lxc_proxy_charm"
3568 if ee_item["juju"].get("charm") is not None
3569 else "native_charm"
3570 )
3571 if ee_item["juju"].get("cloud") == "k8s":
3572 vca_type = "k8s_proxy_charm"
3573 elif ee_item["juju"].get("proxy") is False:
3574 vca_type = "native_charm"
3575 elif ee_item.get("helm-chart"):
3576 vca_name = ee_item["helm-chart"]
3577 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3578 vca_type = "helm"
3579 else:
3580 vca_type = "helm-v3"
3581 else:
3582 self.logger.debug(
3583 logging_text + "skipping non juju neither charm configuration"
3584 )
3585 continue
3586
3587 vca_index = -1
3588 for vca_index, vca_deployed in enumerate(
3589 db_nsr["_admin"]["deployed"]["VCA"]
3590 ):
3591 if not vca_deployed:
3592 continue
3593 if (
3594 vca_deployed.get("member-vnf-index") == member_vnf_index
3595 and vca_deployed.get("vdu_id") == vdu_id
3596 and vca_deployed.get("kdu_name") == kdu_name
3597 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3598 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3599 ):
3600 break
3601 else:
3602 # not found, create one.
3603 target = (
3604 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3605 )
3606 if vdu_id:
3607 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3608 elif kdu_name:
3609 target += "/kdu/{}".format(kdu_name)
3610 vca_deployed = {
3611 "target_element": target,
3612 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3613 "member-vnf-index": member_vnf_index,
3614 "vdu_id": vdu_id,
3615 "kdu_name": kdu_name,
3616 "vdu_count_index": vdu_index,
3617 "operational-status": "init", # TODO revise
3618 "detailed-status": "", # TODO revise
3619 "step": "initial-deploy", # TODO revise
3620 "vnfd_id": vnfd_id,
3621 "vdu_name": vdu_name,
3622 "type": vca_type,
3623 "ee_descriptor_id": ee_descriptor_id,
3624 }
3625 vca_index += 1
3626
3627 # create VCA and configurationStatus in db
3628 db_dict = {
3629 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3630 "configurationStatus.{}".format(vca_index): dict(),
3631 }
3632 self.update_db_2("nsrs", nsr_id, db_dict)
3633
3634 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3635
3636 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3637 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3638 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3639
3640 # Launch task
3641 task_n2vc = asyncio.ensure_future(
3642 self.instantiate_N2VC(
3643 logging_text=logging_text,
3644 vca_index=vca_index,
3645 nsi_id=nsi_id,
3646 db_nsr=db_nsr,
3647 db_vnfr=db_vnfr,
3648 vdu_id=vdu_id,
3649 kdu_name=kdu_name,
3650 vdu_index=vdu_index,
3651 deploy_params=deploy_params,
3652 config_descriptor=descriptor_config,
3653 base_folder=base_folder,
3654 nslcmop_id=nslcmop_id,
3655 stage=stage,
3656 vca_type=vca_type,
3657 vca_name=vca_name,
3658 ee_config_descriptor=ee_item,
3659 )
3660 )
3661 self.lcm_tasks.register(
3662 "ns",
3663 nsr_id,
3664 nslcmop_id,
3665 "instantiate_N2VC-{}".format(vca_index),
3666 task_n2vc,
3667 )
3668 task_instantiation_info[
3669 task_n2vc
3670 ] = self.task_name_deploy_vca + " {}.{}".format(
3671 member_vnf_index or "", vdu_id or ""
3672 )
3673
3674 @staticmethod
3675 def _create_nslcmop(nsr_id, operation, params):
3676 """
3677 Creates a ns-lcm-opp content to be stored at database.
3678 :param nsr_id: internal id of the instance
3679 :param operation: instantiate, terminate, scale, action, ...
3680 :param params: user parameters for the operation
3681 :return: dictionary following SOL005 format
3682 """
3683 # Raise exception if invalid arguments
3684 if not (nsr_id and operation and params):
3685 raise LcmException(
3686 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3687 )
3688 now = time()
3689 _id = str(uuid4())
3690 nslcmop = {
3691 "id": _id,
3692 "_id": _id,
3693 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3694 "operationState": "PROCESSING",
3695 "statusEnteredTime": now,
3696 "nsInstanceId": nsr_id,
3697 "lcmOperationType": operation,
3698 "startTime": now,
3699 "isAutomaticInvocation": False,
3700 "operationParams": params,
3701 "isCancelPending": False,
3702 "links": {
3703 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3704 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3705 },
3706 }
3707 return nslcmop
3708
3709 def _format_additional_params(self, params):
3710 params = params or {}
3711 for key, value in params.items():
3712 if str(value).startswith("!!yaml "):
3713 params[key] = yaml.safe_load(value[7:])
3714 return params
3715
3716 def _get_terminate_primitive_params(self, seq, vnf_index):
3717 primitive = seq.get("name")
3718 primitive_params = {}
3719 params = {
3720 "member_vnf_index": vnf_index,
3721 "primitive": primitive,
3722 "primitive_params": primitive_params,
3723 }
3724 desc_params = {}
3725 return self._map_primitive_params(seq, params, desc_params)
3726
3727 # sub-operations
3728
3729 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3730 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3731 if op.get("operationState") == "COMPLETED":
3732 # b. Skip sub-operation
3733 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3734 return self.SUBOPERATION_STATUS_SKIP
3735 else:
3736 # c. retry executing sub-operation
3737 # The sub-operation exists, and operationState != 'COMPLETED'
3738 # Update operationState = 'PROCESSING' to indicate a retry.
3739 operationState = "PROCESSING"
3740 detailed_status = "In progress"
3741 self._update_suboperation_status(
3742 db_nslcmop, op_index, operationState, detailed_status
3743 )
3744 # Return the sub-operation index
3745 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3746 # with arguments extracted from the sub-operation
3747 return op_index
3748
3749 # Find a sub-operation where all keys in a matching dictionary must match
3750 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3751 def _find_suboperation(self, db_nslcmop, match):
3752 if db_nslcmop and match:
3753 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3754 for i, op in enumerate(op_list):
3755 if all(op.get(k) == match[k] for k in match):
3756 return i
3757 return self.SUBOPERATION_STATUS_NOT_FOUND
3758
3759 # Update status for a sub-operation given its index
3760 def _update_suboperation_status(
3761 self, db_nslcmop, op_index, operationState, detailed_status
3762 ):
3763 # Update DB for HA tasks
3764 q_filter = {"_id": db_nslcmop["_id"]}
3765 update_dict = {
3766 "_admin.operations.{}.operationState".format(op_index): operationState,
3767 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3768 }
3769 self.db.set_one(
3770 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3771 )
3772
3773 # Add sub-operation, return the index of the added sub-operation
3774 # Optionally, set operationState, detailed-status, and operationType
3775 # Status and type are currently set for 'scale' sub-operations:
3776 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3777 # 'detailed-status' : status message
3778 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3779 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3780 def _add_suboperation(
3781 self,
3782 db_nslcmop,
3783 vnf_index,
3784 vdu_id,
3785 vdu_count_index,
3786 vdu_name,
3787 primitive,
3788 mapped_primitive_params,
3789 operationState=None,
3790 detailed_status=None,
3791 operationType=None,
3792 RO_nsr_id=None,
3793 RO_scaling_info=None,
3794 ):
3795 if not db_nslcmop:
3796 return self.SUBOPERATION_STATUS_NOT_FOUND
3797 # Get the "_admin.operations" list, if it exists
3798 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3799 op_list = db_nslcmop_admin.get("operations")
3800 # Create or append to the "_admin.operations" list
3801 new_op = {
3802 "member_vnf_index": vnf_index,
3803 "vdu_id": vdu_id,
3804 "vdu_count_index": vdu_count_index,
3805 "primitive": primitive,
3806 "primitive_params": mapped_primitive_params,
3807 }
3808 if operationState:
3809 new_op["operationState"] = operationState
3810 if detailed_status:
3811 new_op["detailed-status"] = detailed_status
3812 if operationType:
3813 new_op["lcmOperationType"] = operationType
3814 if RO_nsr_id:
3815 new_op["RO_nsr_id"] = RO_nsr_id
3816 if RO_scaling_info:
3817 new_op["RO_scaling_info"] = RO_scaling_info
3818 if not op_list:
3819 # No existing operations, create key 'operations' with current operation as first list element
3820 db_nslcmop_admin.update({"operations": [new_op]})
3821 op_list = db_nslcmop_admin.get("operations")
3822 else:
3823 # Existing operations, append operation to list
3824 op_list.append(new_op)
3825
3826 db_nslcmop_update = {"_admin.operations": op_list}
3827 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3828 op_index = len(op_list) - 1
3829 return op_index
3830
3831 # Helper methods for scale() sub-operations
3832
3833 # pre-scale/post-scale:
3834 # Check for 3 different cases:
3835 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3836 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3837 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3838 def _check_or_add_scale_suboperation(
3839 self,
3840 db_nslcmop,
3841 vnf_index,
3842 vnf_config_primitive,
3843 primitive_params,
3844 operationType,
3845 RO_nsr_id=None,
3846 RO_scaling_info=None,
3847 ):
3848 # Find this sub-operation
3849 if RO_nsr_id and RO_scaling_info:
3850 operationType = "SCALE-RO"
3851 match = {
3852 "member_vnf_index": vnf_index,
3853 "RO_nsr_id": RO_nsr_id,
3854 "RO_scaling_info": RO_scaling_info,
3855 }
3856 else:
3857 match = {
3858 "member_vnf_index": vnf_index,
3859 "primitive": vnf_config_primitive,
3860 "primitive_params": primitive_params,
3861 "lcmOperationType": operationType,
3862 }
3863 op_index = self._find_suboperation(db_nslcmop, match)
3864 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3865 # a. New sub-operation
3866 # The sub-operation does not exist, add it.
3867 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3868 # The following parameters are set to None for all kind of scaling:
3869 vdu_id = None
3870 vdu_count_index = None
3871 vdu_name = None
3872 if RO_nsr_id and RO_scaling_info:
3873 vnf_config_primitive = None
3874 primitive_params = None
3875 else:
3876 RO_nsr_id = None
3877 RO_scaling_info = None
3878 # Initial status for sub-operation
3879 operationState = "PROCESSING"
3880 detailed_status = "In progress"
3881 # Add sub-operation for pre/post-scaling (zero or more operations)
3882 self._add_suboperation(
3883 db_nslcmop,
3884 vnf_index,
3885 vdu_id,
3886 vdu_count_index,
3887 vdu_name,
3888 vnf_config_primitive,
3889 primitive_params,
3890 operationState,
3891 detailed_status,
3892 operationType,
3893 RO_nsr_id,
3894 RO_scaling_info,
3895 )
3896 return self.SUBOPERATION_STATUS_NEW
3897 else:
3898 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3899 # or op_index (operationState != 'COMPLETED')
3900 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3901
3902 # Function to return execution_environment id
3903
3904 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3905 # TODO vdu_index_count
3906 for vca in vca_deployed_list:
3907 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3908 return vca["ee_id"]
3909
3910 async def destroy_N2VC(
3911 self,
3912 logging_text,
3913 db_nslcmop,
3914 vca_deployed,
3915 config_descriptor,
3916 vca_index,
3917 destroy_ee=True,
3918 exec_primitives=True,
3919 scaling_in=False,
3920 vca_id: str = None,
3921 ):
3922 """
3923 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3924 :param logging_text:
3925 :param db_nslcmop:
3926 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3927 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3928 :param vca_index: index in the database _admin.deployed.VCA
3929 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3930 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3931 not executed properly
3932 :param scaling_in: True destroys the application, False destroys the model
3933 :return: None or exception
3934 """
3935
3936 self.logger.debug(
3937 logging_text
3938 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3939 vca_index, vca_deployed, config_descriptor, destroy_ee
3940 )
3941 )
3942
3943 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3944
3945 # execute terminate_primitives
3946 if exec_primitives:
3947 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3948 config_descriptor.get("terminate-config-primitive"),
3949 vca_deployed.get("ee_descriptor_id"),
3950 )
3951 vdu_id = vca_deployed.get("vdu_id")
3952 vdu_count_index = vca_deployed.get("vdu_count_index")
3953 vdu_name = vca_deployed.get("vdu_name")
3954 vnf_index = vca_deployed.get("member-vnf-index")
3955 if terminate_primitives and vca_deployed.get("needed_terminate"):
3956 for seq in terminate_primitives:
3957 # For each sequence in list, get primitive and call _ns_execute_primitive()
3958 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3959 vnf_index, seq.get("name")
3960 )
3961 self.logger.debug(logging_text + step)
3962 # Create the primitive for each sequence, i.e. "primitive": "touch"
3963 primitive = seq.get("name")
3964 mapped_primitive_params = self._get_terminate_primitive_params(
3965 seq, vnf_index
3966 )
3967
3968 # Add sub-operation
3969 self._add_suboperation(
3970 db_nslcmop,
3971 vnf_index,
3972 vdu_id,
3973 vdu_count_index,
3974 vdu_name,
3975 primitive,
3976 mapped_primitive_params,
3977 )
3978 # Sub-operations: Call _ns_execute_primitive() instead of action()
3979 try:
3980 result, result_detail = await self._ns_execute_primitive(
3981 vca_deployed["ee_id"],
3982 primitive,
3983 mapped_primitive_params,
3984 vca_type=vca_type,
3985 vca_id=vca_id,
3986 )
3987 except LcmException:
3988 # this happens when VCA is not deployed. In this case it is not needed to terminate
3989 continue
3990 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3991 if result not in result_ok:
3992 raise LcmException(
3993 "terminate_primitive {} for vnf_member_index={} fails with "
3994 "error {}".format(seq.get("name"), vnf_index, result_detail)
3995 )
3996 # set that this VCA do not need terminated
3997 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
3998 vca_index
3999 )
4000 self.update_db_2(
4001 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4002 )
4003
4004 # Delete Prometheus Jobs if any
4005 # This uses NSR_ID, so it will destroy any jobs under this index
4006 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4007
4008 if destroy_ee:
4009 await self.vca_map[vca_type].delete_execution_environment(
4010 vca_deployed["ee_id"],
4011 scaling_in=scaling_in,
4012 vca_type=vca_type,
4013 vca_id=vca_id,
4014 )
4015
4016 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4017 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4018 namespace = "." + db_nsr["_id"]
4019 try:
4020 await self.n2vc.delete_namespace(
4021 namespace=namespace,
4022 total_timeout=self.timeout_charm_delete,
4023 vca_id=vca_id,
4024 )
4025 except N2VCNotFound: # already deleted. Skip
4026 pass
4027 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4028
4029 async def _terminate_RO(
4030 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4031 ):
4032 """
4033 Terminates a deployment from RO
4034 :param logging_text:
4035 :param nsr_deployed: db_nsr._admin.deployed
4036 :param nsr_id:
4037 :param nslcmop_id:
4038 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4039 this method will update only the index 2, but it will write on database the concatenated content of the list
4040 :return:
4041 """
4042 db_nsr_update = {}
4043 failed_detail = []
4044 ro_nsr_id = ro_delete_action = None
4045 if nsr_deployed and nsr_deployed.get("RO"):
4046 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4047 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4048 try:
4049 if ro_nsr_id:
4050 stage[2] = "Deleting ns from VIM."
4051 db_nsr_update["detailed-status"] = " ".join(stage)
4052 self._write_op_status(nslcmop_id, stage)
4053 self.logger.debug(logging_text + stage[2])
4054 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4055 self._write_op_status(nslcmop_id, stage)
4056 desc = await self.RO.delete("ns", ro_nsr_id)
4057 ro_delete_action = desc["action_id"]
4058 db_nsr_update[
4059 "_admin.deployed.RO.nsr_delete_action_id"
4060 ] = ro_delete_action
4061 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4062 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4063 if ro_delete_action:
4064 # wait until NS is deleted from VIM
4065 stage[2] = "Waiting ns deleted from VIM."
4066 detailed_status_old = None
4067 self.logger.debug(
4068 logging_text
4069 + stage[2]
4070 + " RO_id={} ro_delete_action={}".format(
4071 ro_nsr_id, ro_delete_action
4072 )
4073 )
4074 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4075 self._write_op_status(nslcmop_id, stage)
4076
4077 delete_timeout = 20 * 60 # 20 minutes
4078 while delete_timeout > 0:
4079 desc = await self.RO.show(
4080 "ns",
4081 item_id_name=ro_nsr_id,
4082 extra_item="action",
4083 extra_item_id=ro_delete_action,
4084 )
4085
4086 # deploymentStatus
4087 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4088
4089 ns_status, ns_status_info = self.RO.check_action_status(desc)
4090 if ns_status == "ERROR":
4091 raise ROclient.ROClientException(ns_status_info)
4092 elif ns_status == "BUILD":
4093 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4094 elif ns_status == "ACTIVE":
4095 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4096 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4097 break
4098 else:
4099 assert (
4100 False
4101 ), "ROclient.check_action_status returns unknown {}".format(
4102 ns_status
4103 )
4104 if stage[2] != detailed_status_old:
4105 detailed_status_old = stage[2]
4106 db_nsr_update["detailed-status"] = " ".join(stage)
4107 self._write_op_status(nslcmop_id, stage)
4108 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4109 await asyncio.sleep(5, loop=self.loop)
4110 delete_timeout -= 5
4111 else: # delete_timeout <= 0:
4112 raise ROclient.ROClientException(
4113 "Timeout waiting ns deleted from VIM"
4114 )
4115
4116 except Exception as e:
4117 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4118 if (
4119 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4120 ): # not found
4121 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4122 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4123 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4124 self.logger.debug(
4125 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4126 )
4127 elif (
4128 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4129 ): # conflict
4130 failed_detail.append("delete conflict: {}".format(e))
4131 self.logger.debug(
4132 logging_text
4133 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4134 )
4135 else:
4136 failed_detail.append("delete error: {}".format(e))
4137 self.logger.error(
4138 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4139 )
4140
4141 # Delete nsd
4142 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4143 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4144 try:
4145 stage[2] = "Deleting nsd from RO."
4146 db_nsr_update["detailed-status"] = " ".join(stage)
4147 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4148 self._write_op_status(nslcmop_id, stage)
4149 await self.RO.delete("nsd", ro_nsd_id)
4150 self.logger.debug(
4151 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4152 )
4153 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4154 except Exception as e:
4155 if (
4156 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4157 ): # not found
4158 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4159 self.logger.debug(
4160 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4161 )
4162 elif (
4163 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4164 ): # conflict
4165 failed_detail.append(
4166 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4167 )
4168 self.logger.debug(logging_text + failed_detail[-1])
4169 else:
4170 failed_detail.append(
4171 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4172 )
4173 self.logger.error(logging_text + failed_detail[-1])
4174
4175 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4176 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4177 if not vnf_deployed or not vnf_deployed["id"]:
4178 continue
4179 try:
4180 ro_vnfd_id = vnf_deployed["id"]
4181 stage[
4182 2
4183 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4184 vnf_deployed["member-vnf-index"], ro_vnfd_id
4185 )
4186 db_nsr_update["detailed-status"] = " ".join(stage)
4187 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4188 self._write_op_status(nslcmop_id, stage)
4189 await self.RO.delete("vnfd", ro_vnfd_id)
4190 self.logger.debug(
4191 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4192 )
4193 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4194 except Exception as e:
4195 if (
4196 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4197 ): # not found
4198 db_nsr_update[
4199 "_admin.deployed.RO.vnfd.{}.id".format(index)
4200 ] = None
4201 self.logger.debug(
4202 logging_text
4203 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4204 )
4205 elif (
4206 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4207 ): # conflict
4208 failed_detail.append(
4209 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4210 )
4211 self.logger.debug(logging_text + failed_detail[-1])
4212 else:
4213 failed_detail.append(
4214 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4215 )
4216 self.logger.error(logging_text + failed_detail[-1])
4217
4218 if failed_detail:
4219 stage[2] = "Error deleting from VIM"
4220 else:
4221 stage[2] = "Deleted from VIM"
4222 db_nsr_update["detailed-status"] = " ".join(stage)
4223 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4224 self._write_op_status(nslcmop_id, stage)
4225
4226 if failed_detail:
4227 raise LcmException("; ".join(failed_detail))
4228
4229 async def terminate(self, nsr_id, nslcmop_id):
4230 # Try to lock HA task here
4231 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4232 if not task_is_locked_by_me:
4233 return
4234
4235 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4236 self.logger.debug(logging_text + "Enter")
4237 timeout_ns_terminate = self.timeout_ns_terminate
4238 db_nsr = None
4239 db_nslcmop = None
4240 operation_params = None
4241 exc = None
4242 error_list = [] # annotates all failed error messages
4243 db_nslcmop_update = {}
4244 autoremove = False # autoremove after terminated
4245 tasks_dict_info = {}
4246 db_nsr_update = {}
4247 stage = [
4248 "Stage 1/3: Preparing task.",
4249 "Waiting for previous operations to terminate.",
4250 "",
4251 ]
4252 # ^ contains [stage, step, VIM-status]
4253 try:
4254 # wait for any previous tasks in process
4255 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4256
4257 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4258 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4259 operation_params = db_nslcmop.get("operationParams") or {}
4260 if operation_params.get("timeout_ns_terminate"):
4261 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4262 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4263 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4264
4265 db_nsr_update["operational-status"] = "terminating"
4266 db_nsr_update["config-status"] = "terminating"
4267 self._write_ns_status(
4268 nsr_id=nsr_id,
4269 ns_state="TERMINATING",
4270 current_operation="TERMINATING",
4271 current_operation_id=nslcmop_id,
4272 other_update=db_nsr_update,
4273 )
4274 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4275 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4276 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4277 return
4278
4279 stage[1] = "Getting vnf descriptors from db."
4280 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4281 db_vnfrs_dict = {
4282 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4283 }
4284 db_vnfds_from_id = {}
4285 db_vnfds_from_member_index = {}
4286 # Loop over VNFRs
4287 for vnfr in db_vnfrs_list:
4288 vnfd_id = vnfr["vnfd-id"]
4289 if vnfd_id not in db_vnfds_from_id:
4290 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4291 db_vnfds_from_id[vnfd_id] = vnfd
4292 db_vnfds_from_member_index[
4293 vnfr["member-vnf-index-ref"]
4294 ] = db_vnfds_from_id[vnfd_id]
4295
4296 # Destroy individual execution environments when there are terminating primitives.
4297 # Rest of EE will be deleted at once
4298 # TODO - check before calling _destroy_N2VC
4299 # if not operation_params.get("skip_terminate_primitives"):#
4300 # or not vca.get("needed_terminate"):
4301 stage[0] = "Stage 2/3 execute terminating primitives."
4302 self.logger.debug(logging_text + stage[0])
4303 stage[1] = "Looking execution environment that needs terminate."
4304 self.logger.debug(logging_text + stage[1])
4305
4306 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4307 config_descriptor = None
4308 vca_member_vnf_index = vca.get("member-vnf-index")
4309 vca_id = self.get_vca_id(
4310 db_vnfrs_dict.get(vca_member_vnf_index)
4311 if vca_member_vnf_index
4312 else None,
4313 db_nsr,
4314 )
4315 if not vca or not vca.get("ee_id"):
4316 continue
4317 if not vca.get("member-vnf-index"):
4318 # ns
4319 config_descriptor = db_nsr.get("ns-configuration")
4320 elif vca.get("vdu_id"):
4321 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4322 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4323 elif vca.get("kdu_name"):
4324 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4325 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4326 else:
4327 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4328 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4329 vca_type = vca.get("type")
4330 exec_terminate_primitives = not operation_params.get(
4331 "skip_terminate_primitives"
4332 ) and vca.get("needed_terminate")
4333 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4334 # pending native charms
4335 destroy_ee = (
4336 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4337 )
4338 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4339 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4340 task = asyncio.ensure_future(
4341 self.destroy_N2VC(
4342 logging_text,
4343 db_nslcmop,
4344 vca,
4345 config_descriptor,
4346 vca_index,
4347 destroy_ee,
4348 exec_terminate_primitives,
4349 vca_id=vca_id,
4350 )
4351 )
4352 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4353
4354 # wait for pending tasks of terminate primitives
4355 if tasks_dict_info:
4356 self.logger.debug(
4357 logging_text
4358 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4359 )
4360 error_list = await self._wait_for_tasks(
4361 logging_text,
4362 tasks_dict_info,
4363 min(self.timeout_charm_delete, timeout_ns_terminate),
4364 stage,
4365 nslcmop_id,
4366 )
4367 tasks_dict_info.clear()
4368 if error_list:
4369 return # raise LcmException("; ".join(error_list))
4370
4371 # remove All execution environments at once
4372 stage[0] = "Stage 3/3 delete all."
4373
4374 if nsr_deployed.get("VCA"):
4375 stage[1] = "Deleting all execution environments."
4376 self.logger.debug(logging_text + stage[1])
4377 vca_id = self.get_vca_id({}, db_nsr)
4378 task_delete_ee = asyncio.ensure_future(
4379 asyncio.wait_for(
4380 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4381 timeout=self.timeout_charm_delete,
4382 )
4383 )
4384 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4385 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4386
4387 # Delete from k8scluster
4388 stage[1] = "Deleting KDUs."
4389 self.logger.debug(logging_text + stage[1])
4390 # print(nsr_deployed)
4391 for kdu in get_iterable(nsr_deployed, "K8s"):
4392 if not kdu or not kdu.get("kdu-instance"):
4393 continue
4394 kdu_instance = kdu.get("kdu-instance")
4395 if kdu.get("k8scluster-type") in self.k8scluster_map:
4396 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4397 vca_id = self.get_vca_id({}, db_nsr)
4398 task_delete_kdu_instance = asyncio.ensure_future(
4399 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4400 cluster_uuid=kdu.get("k8scluster-uuid"),
4401 kdu_instance=kdu_instance,
4402 vca_id=vca_id,
4403 )
4404 )
4405 else:
4406 self.logger.error(
4407 logging_text
4408 + "Unknown k8s deployment type {}".format(
4409 kdu.get("k8scluster-type")
4410 )
4411 )
4412 continue
4413 tasks_dict_info[
4414 task_delete_kdu_instance
4415 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4416
4417 # remove from RO
4418 stage[1] = "Deleting ns from VIM."
4419 if self.ng_ro:
4420 task_delete_ro = asyncio.ensure_future(
4421 self._terminate_ng_ro(
4422 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4423 )
4424 )
4425 else:
4426 task_delete_ro = asyncio.ensure_future(
4427 self._terminate_RO(
4428 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4429 )
4430 )
4431 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4432
4433 # rest of staff will be done at finally
4434
4435 except (
4436 ROclient.ROClientException,
4437 DbException,
4438 LcmException,
4439 N2VCException,
4440 ) as e:
4441 self.logger.error(logging_text + "Exit Exception {}".format(e))
4442 exc = e
4443 except asyncio.CancelledError:
4444 self.logger.error(
4445 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4446 )
4447 exc = "Operation was cancelled"
4448 except Exception as e:
4449 exc = traceback.format_exc()
4450 self.logger.critical(
4451 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4452 exc_info=True,
4453 )
4454 finally:
4455 if exc:
4456 error_list.append(str(exc))
4457 try:
4458 # wait for pending tasks
4459 if tasks_dict_info:
4460 stage[1] = "Waiting for terminate pending tasks."
4461 self.logger.debug(logging_text + stage[1])
4462 error_list += await self._wait_for_tasks(
4463 logging_text,
4464 tasks_dict_info,
4465 timeout_ns_terminate,
4466 stage,
4467 nslcmop_id,
4468 )
4469 stage[1] = stage[2] = ""
4470 except asyncio.CancelledError:
4471 error_list.append("Cancelled")
4472 # TODO cancell all tasks
4473 except Exception as exc:
4474 error_list.append(str(exc))
4475 # update status at database
4476 if error_list:
4477 error_detail = "; ".join(error_list)
4478 # self.logger.error(logging_text + error_detail)
4479 error_description_nslcmop = "{} Detail: {}".format(
4480 stage[0], error_detail
4481 )
4482 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4483 nslcmop_id, stage[0]
4484 )
4485
4486 db_nsr_update["operational-status"] = "failed"
4487 db_nsr_update["detailed-status"] = (
4488 error_description_nsr + " Detail: " + error_detail
4489 )
4490 db_nslcmop_update["detailed-status"] = error_detail
4491 nslcmop_operation_state = "FAILED"
4492 ns_state = "BROKEN"
4493 else:
4494 error_detail = None
4495 error_description_nsr = error_description_nslcmop = None
4496 ns_state = "NOT_INSTANTIATED"
4497 db_nsr_update["operational-status"] = "terminated"
4498 db_nsr_update["detailed-status"] = "Done"
4499 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4500 db_nslcmop_update["detailed-status"] = "Done"
4501 nslcmop_operation_state = "COMPLETED"
4502
4503 if db_nsr:
4504 self._write_ns_status(
4505 nsr_id=nsr_id,
4506 ns_state=ns_state,
4507 current_operation="IDLE",
4508 current_operation_id=None,
4509 error_description=error_description_nsr,
4510 error_detail=error_detail,
4511 other_update=db_nsr_update,
4512 )
4513 self._write_op_status(
4514 op_id=nslcmop_id,
4515 stage="",
4516 error_message=error_description_nslcmop,
4517 operation_state=nslcmop_operation_state,
4518 other_update=db_nslcmop_update,
4519 )
4520 if ns_state == "NOT_INSTANTIATED":
4521 try:
4522 self.db.set_list(
4523 "vnfrs",
4524 {"nsr-id-ref": nsr_id},
4525 {"_admin.nsState": "NOT_INSTANTIATED"},
4526 )
4527 except DbException as e:
4528 self.logger.warn(
4529 logging_text
4530 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4531 nsr_id, e
4532 )
4533 )
4534 if operation_params:
4535 autoremove = operation_params.get("autoremove", False)
4536 if nslcmop_operation_state:
4537 try:
4538 await self.msg.aiowrite(
4539 "ns",
4540 "terminated",
4541 {
4542 "nsr_id": nsr_id,
4543 "nslcmop_id": nslcmop_id,
4544 "operationState": nslcmop_operation_state,
4545 "autoremove": autoremove,
4546 },
4547 loop=self.loop,
4548 )
4549 except Exception as e:
4550 self.logger.error(
4551 logging_text + "kafka_write notification Exception {}".format(e)
4552 )
4553
4554 self.logger.debug(logging_text + "Exit")
4555 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4556
4557 async def _wait_for_tasks(
4558 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4559 ):
4560 time_start = time()
4561 error_detail_list = []
4562 error_list = []
4563 pending_tasks = list(created_tasks_info.keys())
4564 num_tasks = len(pending_tasks)
4565 num_done = 0
4566 stage[1] = "{}/{}.".format(num_done, num_tasks)
4567 self._write_op_status(nslcmop_id, stage)
4568 while pending_tasks:
4569 new_error = None
4570 _timeout = timeout + time_start - time()
4571 done, pending_tasks = await asyncio.wait(
4572 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4573 )
4574 num_done += len(done)
4575 if not done: # Timeout
4576 for task in pending_tasks:
4577 new_error = created_tasks_info[task] + ": Timeout"
4578 error_detail_list.append(new_error)
4579 error_list.append(new_error)
4580 break
4581 for task in done:
4582 if task.cancelled():
4583 exc = "Cancelled"
4584 else:
4585 exc = task.exception()
4586 if exc:
4587 if isinstance(exc, asyncio.TimeoutError):
4588 exc = "Timeout"
4589 new_error = created_tasks_info[task] + ": {}".format(exc)
4590 error_list.append(created_tasks_info[task])
4591 error_detail_list.append(new_error)
4592 if isinstance(
4593 exc,
4594 (
4595 str,
4596 DbException,
4597 N2VCException,
4598 ROclient.ROClientException,
4599 LcmException,
4600 K8sException,
4601 NgRoException,
4602 ),
4603 ):
4604 self.logger.error(logging_text + new_error)
4605 else:
4606 exc_traceback = "".join(
4607 traceback.format_exception(None, exc, exc.__traceback__)
4608 )
4609 self.logger.error(
4610 logging_text
4611 + created_tasks_info[task]
4612 + " "
4613 + exc_traceback
4614 )
4615 else:
4616 self.logger.debug(
4617 logging_text + created_tasks_info[task] + ": Done"
4618 )
4619 stage[1] = "{}/{}.".format(num_done, num_tasks)
4620 if new_error:
4621 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4622 if nsr_id: # update also nsr
4623 self.update_db_2(
4624 "nsrs",
4625 nsr_id,
4626 {
4627 "errorDescription": "Error at: " + ", ".join(error_list),
4628 "errorDetail": ". ".join(error_detail_list),
4629 },
4630 )
4631 self._write_op_status(nslcmop_id, stage)
4632 return error_detail_list
4633
4634 @staticmethod
4635 def _map_primitive_params(primitive_desc, params, instantiation_params):
4636 """
4637 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4638 The default-value is used. If it is between < > it look for a value at instantiation_params
4639 :param primitive_desc: portion of VNFD/NSD that describes primitive
4640 :param params: Params provided by user
4641 :param instantiation_params: Instantiation params provided by user
4642 :return: a dictionary with the calculated params
4643 """
4644 calculated_params = {}
4645 for parameter in primitive_desc.get("parameter", ()):
4646 param_name = parameter["name"]
4647 if param_name in params:
4648 calculated_params[param_name] = params[param_name]
4649 elif "default-value" in parameter or "value" in parameter:
4650 if "value" in parameter:
4651 calculated_params[param_name] = parameter["value"]
4652 else:
4653 calculated_params[param_name] = parameter["default-value"]
4654 if (
4655 isinstance(calculated_params[param_name], str)
4656 and calculated_params[param_name].startswith("<")
4657 and calculated_params[param_name].endswith(">")
4658 ):
4659 if calculated_params[param_name][1:-1] in instantiation_params:
4660 calculated_params[param_name] = instantiation_params[
4661 calculated_params[param_name][1:-1]
4662 ]
4663 else:
4664 raise LcmException(
4665 "Parameter {} needed to execute primitive {} not provided".format(
4666 calculated_params[param_name], primitive_desc["name"]
4667 )
4668 )
4669 else:
4670 raise LcmException(
4671 "Parameter {} needed to execute primitive {} not provided".format(
4672 param_name, primitive_desc["name"]
4673 )
4674 )
4675
4676 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4677 calculated_params[param_name] = yaml.safe_dump(
4678 calculated_params[param_name], default_flow_style=True, width=256
4679 )
4680 elif isinstance(calculated_params[param_name], str) and calculated_params[
4681 param_name
4682 ].startswith("!!yaml "):
4683 calculated_params[param_name] = calculated_params[param_name][7:]
4684 if parameter.get("data-type") == "INTEGER":
4685 try:
4686 calculated_params[param_name] = int(calculated_params[param_name])
4687 except ValueError: # error converting string to int
4688 raise LcmException(
4689 "Parameter {} of primitive {} must be integer".format(
4690 param_name, primitive_desc["name"]
4691 )
4692 )
4693 elif parameter.get("data-type") == "BOOLEAN":
4694 calculated_params[param_name] = not (
4695 (str(calculated_params[param_name])).lower() == "false"
4696 )
4697
4698 # add always ns_config_info if primitive name is config
4699 if primitive_desc["name"] == "config":
4700 if "ns_config_info" in instantiation_params:
4701 calculated_params["ns_config_info"] = instantiation_params[
4702 "ns_config_info"
4703 ]
4704 return calculated_params
4705
4706 def _look_for_deployed_vca(
4707 self,
4708 deployed_vca,
4709 member_vnf_index,
4710 vdu_id,
4711 vdu_count_index,
4712 kdu_name=None,
4713 ee_descriptor_id=None,
4714 ):
4715 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4716 for vca in deployed_vca:
4717 if not vca:
4718 continue
4719 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4720 continue
4721 if (
4722 vdu_count_index is not None
4723 and vdu_count_index != vca["vdu_count_index"]
4724 ):
4725 continue
4726 if kdu_name and kdu_name != vca["kdu_name"]:
4727 continue
4728 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4729 continue
4730 break
4731 else:
4732 # vca_deployed not found
4733 raise LcmException(
4734 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4735 " is not deployed".format(
4736 member_vnf_index,
4737 vdu_id,
4738 vdu_count_index,
4739 kdu_name,
4740 ee_descriptor_id,
4741 )
4742 )
4743 # get ee_id
4744 ee_id = vca.get("ee_id")
4745 vca_type = vca.get(
4746 "type", "lxc_proxy_charm"
4747 ) # default value for backward compatibility - proxy charm
4748 if not ee_id:
4749 raise LcmException(
4750 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4751 "execution environment".format(
4752 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4753 )
4754 )
4755 return ee_id, vca_type
4756
4757 async def _ns_execute_primitive(
4758 self,
4759 ee_id,
4760 primitive,
4761 primitive_params,
4762 retries=0,
4763 retries_interval=30,
4764 timeout=None,
4765 vca_type=None,
4766 db_dict=None,
4767 vca_id: str = None,
4768 ) -> (str, str):
4769 try:
4770 if primitive == "config":
4771 primitive_params = {"params": primitive_params}
4772
4773 vca_type = vca_type or "lxc_proxy_charm"
4774
4775 while retries >= 0:
4776 try:
4777 output = await asyncio.wait_for(
4778 self.vca_map[vca_type].exec_primitive(
4779 ee_id=ee_id,
4780 primitive_name=primitive,
4781 params_dict=primitive_params,
4782 progress_timeout=self.timeout_progress_primitive,
4783 total_timeout=self.timeout_primitive,
4784 db_dict=db_dict,
4785 vca_id=vca_id,
4786 vca_type=vca_type,
4787 ),
4788 timeout=timeout or self.timeout_primitive,
4789 )
4790 # execution was OK
4791 break
4792 except asyncio.CancelledError:
4793 raise
4794 except Exception as e: # asyncio.TimeoutError
4795 if isinstance(e, asyncio.TimeoutError):
4796 e = "Timeout"
4797 retries -= 1
4798 if retries >= 0:
4799 self.logger.debug(
4800 "Error executing action {} on {} -> {}".format(
4801 primitive, ee_id, e
4802 )
4803 )
4804 # wait and retry
4805 await asyncio.sleep(retries_interval, loop=self.loop)
4806 else:
4807 return "FAILED", str(e)
4808
4809 return "COMPLETED", output
4810
4811 except (LcmException, asyncio.CancelledError):
4812 raise
4813 except Exception as e:
4814 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4815
4816 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4817 """
4818 Updating the vca_status with latest juju information in nsrs record
4819 :param: nsr_id: Id of the nsr
4820 :param: nslcmop_id: Id of the nslcmop
4821 :return: None
4822 """
4823
4824 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4825 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4826 vca_id = self.get_vca_id({}, db_nsr)
4827 if db_nsr["_admin"]["deployed"]["K8s"]:
4828 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4829 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4830 await self._on_update_k8s_db(
4831 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4832 )
4833 else:
4834 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4835 table, filter = "nsrs", {"_id": nsr_id}
4836 path = "_admin.deployed.VCA.{}.".format(vca_index)
4837 await self._on_update_n2vc_db(table, filter, path, {})
4838
4839 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4840 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4841
4842 async def action(self, nsr_id, nslcmop_id):
4843 # Try to lock HA task here
4844 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4845 if not task_is_locked_by_me:
4846 return
4847
4848 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4849 self.logger.debug(logging_text + "Enter")
4850 # get all needed from database
4851 db_nsr = None
4852 db_nslcmop = None
4853 db_nsr_update = {}
4854 db_nslcmop_update = {}
4855 nslcmop_operation_state = None
4856 error_description_nslcmop = None
4857 exc = None
4858 try:
4859 # wait for any previous tasks in process
4860 step = "Waiting for previous operations to terminate"
4861 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4862
4863 self._write_ns_status(
4864 nsr_id=nsr_id,
4865 ns_state=None,
4866 current_operation="RUNNING ACTION",
4867 current_operation_id=nslcmop_id,
4868 )
4869
4870 step = "Getting information from database"
4871 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4872 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4873 if db_nslcmop["operationParams"].get("primitive_params"):
4874 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4875 db_nslcmop["operationParams"]["primitive_params"]
4876 )
4877
4878 nsr_deployed = db_nsr["_admin"].get("deployed")
4879 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4880 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4881 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4882 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4883 primitive = db_nslcmop["operationParams"]["primitive"]
4884 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4885 timeout_ns_action = db_nslcmop["operationParams"].get(
4886 "timeout_ns_action", self.timeout_primitive
4887 )
4888
4889 if vnf_index:
4890 step = "Getting vnfr from database"
4891 db_vnfr = self.db.get_one(
4892 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4893 )
4894 if db_vnfr.get("kdur"):
4895 kdur_list = []
4896 for kdur in db_vnfr["kdur"]:
4897 if kdur.get("additionalParams"):
4898 kdur["additionalParams"] = json.loads(kdur["additionalParams"])
4899 kdur_list.append(kdur)
4900 db_vnfr["kdur"] = kdur_list
4901 step = "Getting vnfd from database"
4902 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4903 else:
4904 step = "Getting nsd from database"
4905 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4906
4907 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4908 # for backward compatibility
4909 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4910 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4911 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4912 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4913
4914 # look for primitive
4915 config_primitive_desc = descriptor_configuration = None
4916 if vdu_id:
4917 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4918 elif kdu_name:
4919 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4920 elif vnf_index:
4921 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4922 else:
4923 descriptor_configuration = db_nsd.get("ns-configuration")
4924
4925 if descriptor_configuration and descriptor_configuration.get(
4926 "config-primitive"
4927 ):
4928 for config_primitive in descriptor_configuration["config-primitive"]:
4929 if config_primitive["name"] == primitive:
4930 config_primitive_desc = config_primitive
4931 break
4932
4933 if not config_primitive_desc:
4934 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4935 raise LcmException(
4936 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4937 primitive
4938 )
4939 )
4940 primitive_name = primitive
4941 ee_descriptor_id = None
4942 else:
4943 primitive_name = config_primitive_desc.get(
4944 "execution-environment-primitive", primitive
4945 )
4946 ee_descriptor_id = config_primitive_desc.get(
4947 "execution-environment-ref"
4948 )
4949
4950 if vnf_index:
4951 if vdu_id:
4952 vdur = next(
4953 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4954 )
4955 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4956 elif kdu_name:
4957 kdur = next(
4958 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4959 )
4960 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4961 else:
4962 desc_params = parse_yaml_strings(
4963 db_vnfr.get("additionalParamsForVnf")
4964 )
4965 else:
4966 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4967 if kdu_name and get_configuration(db_vnfd, kdu_name):
4968 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4969 actions = set()
4970 for primitive in kdu_configuration.get("initial-config-primitive", []):
4971 actions.add(primitive["name"])
4972 for primitive in kdu_configuration.get("config-primitive", []):
4973 actions.add(primitive["name"])
4974 kdu_action = True if primitive_name in actions else False
4975
4976 # TODO check if ns is in a proper status
4977 if kdu_name and (
4978 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4979 ):
4980 # kdur and desc_params already set from before
4981 if primitive_params:
4982 desc_params.update(primitive_params)
4983 # TODO Check if we will need something at vnf level
4984 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4985 if (
4986 kdu_name == kdu["kdu-name"]
4987 and kdu["member-vnf-index"] == vnf_index
4988 ):
4989 break
4990 else:
4991 raise LcmException(
4992 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4993 )
4994
4995 if kdu.get("k8scluster-type") not in self.k8scluster_map:
4996 msg = "unknown k8scluster-type '{}'".format(
4997 kdu.get("k8scluster-type")
4998 )
4999 raise LcmException(msg)
5000
5001 db_dict = {
5002 "collection": "nsrs",
5003 "filter": {"_id": nsr_id},
5004 "path": "_admin.deployed.K8s.{}".format(index),
5005 }
5006 self.logger.debug(
5007 logging_text
5008 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5009 )
5010 step = "Executing kdu {}".format(primitive_name)
5011 if primitive_name == "upgrade":
5012 if desc_params.get("kdu_model"):
5013 kdu_model = desc_params.get("kdu_model")
5014 del desc_params["kdu_model"]
5015 else:
5016 kdu_model = kdu.get("kdu-model")
5017 parts = kdu_model.split(sep=":")
5018 if len(parts) == 2:
5019 kdu_model = parts[0]
5020
5021 detailed_status = await asyncio.wait_for(
5022 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5023 cluster_uuid=kdu.get("k8scluster-uuid"),
5024 kdu_instance=kdu.get("kdu-instance"),
5025 atomic=True,
5026 kdu_model=kdu_model,
5027 params=desc_params,
5028 db_dict=db_dict,
5029 timeout=timeout_ns_action,
5030 ),
5031 timeout=timeout_ns_action + 10,
5032 )
5033 self.logger.debug(
5034 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5035 )
5036 elif primitive_name == "rollback":
5037 detailed_status = await asyncio.wait_for(
5038 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5039 cluster_uuid=kdu.get("k8scluster-uuid"),
5040 kdu_instance=kdu.get("kdu-instance"),
5041 db_dict=db_dict,
5042 ),
5043 timeout=timeout_ns_action,
5044 )
5045 elif primitive_name == "status":
5046 detailed_status = await asyncio.wait_for(
5047 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5048 cluster_uuid=kdu.get("k8scluster-uuid"),
5049 kdu_instance=kdu.get("kdu-instance"),
5050 vca_id=vca_id,
5051 ),
5052 timeout=timeout_ns_action,
5053 )
5054 else:
5055 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5056 kdu["kdu-name"], nsr_id
5057 )
5058 params = self._map_primitive_params(
5059 config_primitive_desc, primitive_params, desc_params
5060 )
5061
5062 detailed_status = await asyncio.wait_for(
5063 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5064 cluster_uuid=kdu.get("k8scluster-uuid"),
5065 kdu_instance=kdu_instance,
5066 primitive_name=primitive_name,
5067 params=params,
5068 db_dict=db_dict,
5069 timeout=timeout_ns_action,
5070 vca_id=vca_id,
5071 ),
5072 timeout=timeout_ns_action,
5073 )
5074
5075 if detailed_status:
5076 nslcmop_operation_state = "COMPLETED"
5077 else:
5078 detailed_status = ""
5079 nslcmop_operation_state = "FAILED"
5080 else:
5081 ee_id, vca_type = self._look_for_deployed_vca(
5082 nsr_deployed["VCA"],
5083 member_vnf_index=vnf_index,
5084 vdu_id=vdu_id,
5085 vdu_count_index=vdu_count_index,
5086 ee_descriptor_id=ee_descriptor_id,
5087 )
5088 for vca_index, vca_deployed in enumerate(
5089 db_nsr["_admin"]["deployed"]["VCA"]
5090 ):
5091 if vca_deployed.get("member-vnf-index") == vnf_index:
5092 db_dict = {
5093 "collection": "nsrs",
5094 "filter": {"_id": nsr_id},
5095 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5096 }
5097 break
5098 (
5099 nslcmop_operation_state,
5100 detailed_status,
5101 ) = await self._ns_execute_primitive(
5102 ee_id,
5103 primitive=primitive_name,
5104 primitive_params=self._map_primitive_params(
5105 config_primitive_desc, primitive_params, desc_params
5106 ),
5107 timeout=timeout_ns_action,
5108 vca_type=vca_type,
5109 db_dict=db_dict,
5110 vca_id=vca_id,
5111 )
5112
5113 db_nslcmop_update["detailed-status"] = detailed_status
5114 error_description_nslcmop = (
5115 detailed_status if nslcmop_operation_state == "FAILED" else ""
5116 )
5117 self.logger.debug(
5118 logging_text
5119 + " task Done with result {} {}".format(
5120 nslcmop_operation_state, detailed_status
5121 )
5122 )
5123 return # database update is called inside finally
5124
5125 except (DbException, LcmException, N2VCException, K8sException) as e:
5126 self.logger.error(logging_text + "Exit Exception {}".format(e))
5127 exc = e
5128 except asyncio.CancelledError:
5129 self.logger.error(
5130 logging_text + "Cancelled Exception while '{}'".format(step)
5131 )
5132 exc = "Operation was cancelled"
5133 except asyncio.TimeoutError:
5134 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5135 exc = "Timeout"
5136 except Exception as e:
5137 exc = traceback.format_exc()
5138 self.logger.critical(
5139 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5140 exc_info=True,
5141 )
5142 finally:
5143 if exc:
5144 db_nslcmop_update[
5145 "detailed-status"
5146 ] = (
5147 detailed_status
5148 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5149 nslcmop_operation_state = "FAILED"
5150 if db_nsr:
5151 self._write_ns_status(
5152 nsr_id=nsr_id,
5153 ns_state=db_nsr[
5154 "nsState"
5155 ], # TODO check if degraded. For the moment use previous status
5156 current_operation="IDLE",
5157 current_operation_id=None,
5158 # error_description=error_description_nsr,
5159 # error_detail=error_detail,
5160 other_update=db_nsr_update,
5161 )
5162
5163 self._write_op_status(
5164 op_id=nslcmop_id,
5165 stage="",
5166 error_message=error_description_nslcmop,
5167 operation_state=nslcmop_operation_state,
5168 other_update=db_nslcmop_update,
5169 )
5170
5171 if nslcmop_operation_state:
5172 try:
5173 await self.msg.aiowrite(
5174 "ns",
5175 "actioned",
5176 {
5177 "nsr_id": nsr_id,
5178 "nslcmop_id": nslcmop_id,
5179 "operationState": nslcmop_operation_state,
5180 },
5181 loop=self.loop,
5182 )
5183 except Exception as e:
5184 self.logger.error(
5185 logging_text + "kafka_write notification Exception {}".format(e)
5186 )
5187 self.logger.debug(logging_text + "Exit")
5188 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5189 return nslcmop_operation_state, detailed_status
5190
5191 async def scale(self, nsr_id, nslcmop_id):
5192 # Try to lock HA task here
5193 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5194 if not task_is_locked_by_me:
5195 return
5196
5197 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5198 stage = ["", "", ""]
5199 tasks_dict_info = {}
5200 # ^ stage, step, VIM progress
5201 self.logger.debug(logging_text + "Enter")
5202 # get all needed from database
5203 db_nsr = None
5204 db_nslcmop_update = {}
5205 db_nsr_update = {}
5206 exc = None
5207 # in case of error, indicates what part of scale was failed to put nsr at error status
5208 scale_process = None
5209 old_operational_status = ""
5210 old_config_status = ""
5211 nsi_id = None
5212 try:
5213 # wait for any previous tasks in process
5214 step = "Waiting for previous operations to terminate"
5215 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5216 self._write_ns_status(
5217 nsr_id=nsr_id,
5218 ns_state=None,
5219 current_operation="SCALING",
5220 current_operation_id=nslcmop_id,
5221 )
5222
5223 step = "Getting nslcmop from database"
5224 self.logger.debug(
5225 step + " after having waited for previous tasks to be completed"
5226 )
5227 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5228
5229 step = "Getting nsr from database"
5230 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5231 old_operational_status = db_nsr["operational-status"]
5232 old_config_status = db_nsr["config-status"]
5233
5234 step = "Parsing scaling parameters"
5235 db_nsr_update["operational-status"] = "scaling"
5236 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5237 nsr_deployed = db_nsr["_admin"].get("deployed")
5238
5239 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5240 "scaleByStepData"
5241 ]["member-vnf-index"]
5242 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5243 "scaleByStepData"
5244 ]["scaling-group-descriptor"]
5245 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5246 # for backward compatibility
5247 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5248 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5249 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5250 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5251
5252 step = "Getting vnfr from database"
5253 db_vnfr = self.db.get_one(
5254 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5255 )
5256
5257 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5258
5259 step = "Getting vnfd from database"
5260 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5261
5262 base_folder = db_vnfd["_admin"]["storage"]
5263
5264 step = "Getting scaling-group-descriptor"
5265 scaling_descriptor = find_in_list(
5266 get_scaling_aspect(db_vnfd),
5267 lambda scale_desc: scale_desc["name"] == scaling_group,
5268 )
5269 if not scaling_descriptor:
5270 raise LcmException(
5271 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5272 "at vnfd:scaling-group-descriptor".format(scaling_group)
5273 )
5274
5275 step = "Sending scale order to VIM"
5276 # TODO check if ns is in a proper status
5277 nb_scale_op = 0
5278 if not db_nsr["_admin"].get("scaling-group"):
5279 self.update_db_2(
5280 "nsrs",
5281 nsr_id,
5282 {
5283 "_admin.scaling-group": [
5284 {"name": scaling_group, "nb-scale-op": 0}
5285 ]
5286 },
5287 )
5288 admin_scale_index = 0
5289 else:
5290 for admin_scale_index, admin_scale_info in enumerate(
5291 db_nsr["_admin"]["scaling-group"]
5292 ):
5293 if admin_scale_info["name"] == scaling_group:
5294 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5295 break
5296 else: # not found, set index one plus last element and add new entry with the name
5297 admin_scale_index += 1
5298 db_nsr_update[
5299 "_admin.scaling-group.{}.name".format(admin_scale_index)
5300 ] = scaling_group
5301
5302 vca_scaling_info = []
5303 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5304 if scaling_type == "SCALE_OUT":
5305 if "aspect-delta-details" not in scaling_descriptor:
5306 raise LcmException(
5307 "Aspect delta details not fount in scaling descriptor {}".format(
5308 scaling_descriptor["name"]
5309 )
5310 )
5311 # count if max-instance-count is reached
5312 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5313
5314 scaling_info["scaling_direction"] = "OUT"
5315 scaling_info["vdu-create"] = {}
5316 scaling_info["kdu-create"] = {}
5317 for delta in deltas:
5318 for vdu_delta in delta.get("vdu-delta", {}):
5319 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5320 # vdu_index also provides the number of instance of the targeted vdu
5321 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5322 cloud_init_text = self._get_vdu_cloud_init_content(
5323 vdud, db_vnfd
5324 )
5325 if cloud_init_text:
5326 additional_params = (
5327 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5328 or {}
5329 )
5330 cloud_init_list = []
5331
5332 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5333 max_instance_count = 10
5334 if vdu_profile and "max-number-of-instances" in vdu_profile:
5335 max_instance_count = vdu_profile.get(
5336 "max-number-of-instances", 10
5337 )
5338
5339 default_instance_num = get_number_of_instances(
5340 db_vnfd, vdud["id"]
5341 )
5342 instances_number = vdu_delta.get("number-of-instances", 1)
5343 nb_scale_op += instances_number
5344
5345 new_instance_count = nb_scale_op + default_instance_num
5346 # Control if new count is over max and vdu count is less than max.
5347 # Then assign new instance count
5348 if new_instance_count > max_instance_count > vdu_count:
5349 instances_number = new_instance_count - max_instance_count
5350 else:
5351 instances_number = instances_number
5352
5353 if new_instance_count > max_instance_count:
5354 raise LcmException(
5355 "reached the limit of {} (max-instance-count) "
5356 "scaling-out operations for the "
5357 "scaling-group-descriptor '{}'".format(
5358 nb_scale_op, scaling_group
5359 )
5360 )
5361 for x in range(vdu_delta.get("number-of-instances", 1)):
5362 if cloud_init_text:
5363 # TODO Information of its own ip is not available because db_vnfr is not updated.
5364 additional_params["OSM"] = get_osm_params(
5365 db_vnfr, vdu_delta["id"], vdu_index + x
5366 )
5367 cloud_init_list.append(
5368 self._parse_cloud_init(
5369 cloud_init_text,
5370 additional_params,
5371 db_vnfd["id"],
5372 vdud["id"],
5373 )
5374 )
5375 vca_scaling_info.append(
5376 {
5377 "osm_vdu_id": vdu_delta["id"],
5378 "member-vnf-index": vnf_index,
5379 "type": "create",
5380 "vdu_index": vdu_index + x,
5381 }
5382 )
5383 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5384 for kdu_delta in delta.get("kdu-resource-delta", {}):
5385 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5386 kdu_name = kdu_profile["kdu-name"]
5387 resource_name = kdu_profile["resource-name"]
5388
5389 # Might have different kdus in the same delta
5390 # Should have list for each kdu
5391 if not scaling_info["kdu-create"].get(kdu_name, None):
5392 scaling_info["kdu-create"][kdu_name] = []
5393
5394 kdur = get_kdur(db_vnfr, kdu_name)
5395 if kdur.get("helm-chart"):
5396 k8s_cluster_type = "helm-chart-v3"
5397 self.logger.debug("kdur: {}".format(kdur))
5398 if (
5399 kdur.get("helm-version")
5400 and kdur.get("helm-version") == "v2"
5401 ):
5402 k8s_cluster_type = "helm-chart"
5403 raise NotImplementedError
5404 elif kdur.get("juju-bundle"):
5405 k8s_cluster_type = "juju-bundle"
5406 else:
5407 raise LcmException(
5408 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5409 "juju-bundle. Maybe an old NBI version is running".format(
5410 db_vnfr["member-vnf-index-ref"], kdu_name
5411 )
5412 )
5413
5414 max_instance_count = 10
5415 if kdu_profile and "max-number-of-instances" in kdu_profile:
5416 max_instance_count = kdu_profile.get(
5417 "max-number-of-instances", 10
5418 )
5419
5420 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5421 deployed_kdu, _ = get_deployed_kdu(
5422 nsr_deployed, kdu_name, vnf_index
5423 )
5424 if deployed_kdu is None:
5425 raise LcmException(
5426 "KDU '{}' for vnf '{}' not deployed".format(
5427 kdu_name, vnf_index
5428 )
5429 )
5430 kdu_instance = deployed_kdu.get("kdu-instance")
5431 instance_num = await self.k8scluster_map[
5432 k8s_cluster_type
5433 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5434 kdu_replica_count = instance_num + kdu_delta.get(
5435 "number-of-instances", 1
5436 )
5437
5438 # Control if new count is over max and instance_num is less than max.
5439 # Then assign max instance number to kdu replica count
5440 if kdu_replica_count > max_instance_count > instance_num:
5441 kdu_replica_count = max_instance_count
5442 if kdu_replica_count > max_instance_count:
5443 raise LcmException(
5444 "reached the limit of {} (max-instance-count) "
5445 "scaling-out operations for the "
5446 "scaling-group-descriptor '{}'".format(
5447 instance_num, scaling_group
5448 )
5449 )
5450
5451 for x in range(kdu_delta.get("number-of-instances", 1)):
5452 vca_scaling_info.append(
5453 {
5454 "osm_kdu_id": kdu_name,
5455 "member-vnf-index": vnf_index,
5456 "type": "create",
5457 "kdu_index": instance_num + x - 1,
5458 }
5459 )
5460 scaling_info["kdu-create"][kdu_name].append(
5461 {
5462 "member-vnf-index": vnf_index,
5463 "type": "create",
5464 "k8s-cluster-type": k8s_cluster_type,
5465 "resource-name": resource_name,
5466 "scale": kdu_replica_count,
5467 }
5468 )
5469 elif scaling_type == "SCALE_IN":
5470 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5471
5472 scaling_info["scaling_direction"] = "IN"
5473 scaling_info["vdu-delete"] = {}
5474 scaling_info["kdu-delete"] = {}
5475
5476 for delta in deltas:
5477 for vdu_delta in delta.get("vdu-delta", {}):
5478 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5479 min_instance_count = 0
5480 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5481 if vdu_profile and "min-number-of-instances" in vdu_profile:
5482 min_instance_count = vdu_profile["min-number-of-instances"]
5483
5484 default_instance_num = get_number_of_instances(
5485 db_vnfd, vdu_delta["id"]
5486 )
5487 instance_num = vdu_delta.get("number-of-instances", 1)
5488 nb_scale_op -= instance_num
5489
5490 new_instance_count = nb_scale_op + default_instance_num
5491
5492 if new_instance_count < min_instance_count < vdu_count:
5493 instances_number = min_instance_count - new_instance_count
5494 else:
5495 instances_number = instance_num
5496
5497 if new_instance_count < min_instance_count:
5498 raise LcmException(
5499 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5500 "scaling-group-descriptor '{}'".format(
5501 nb_scale_op, scaling_group
5502 )
5503 )
5504 for x in range(vdu_delta.get("number-of-instances", 1)):
5505 vca_scaling_info.append(
5506 {
5507 "osm_vdu_id": vdu_delta["id"],
5508 "member-vnf-index": vnf_index,
5509 "type": "delete",
5510 "vdu_index": vdu_index - 1 - x,
5511 }
5512 )
5513 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5514 for kdu_delta in delta.get("kdu-resource-delta", {}):
5515 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5516 kdu_name = kdu_profile["kdu-name"]
5517 resource_name = kdu_profile["resource-name"]
5518
5519 if not scaling_info["kdu-delete"].get(kdu_name, None):
5520 scaling_info["kdu-delete"][kdu_name] = []
5521
5522 kdur = get_kdur(db_vnfr, kdu_name)
5523 if kdur.get("helm-chart"):
5524 k8s_cluster_type = "helm-chart-v3"
5525 self.logger.debug("kdur: {}".format(kdur))
5526 if (
5527 kdur.get("helm-version")
5528 and kdur.get("helm-version") == "v2"
5529 ):
5530 k8s_cluster_type = "helm-chart"
5531 raise NotImplementedError
5532 elif kdur.get("juju-bundle"):
5533 k8s_cluster_type = "juju-bundle"
5534 else:
5535 raise LcmException(
5536 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5537 "juju-bundle. Maybe an old NBI version is running".format(
5538 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5539 )
5540 )
5541
5542 min_instance_count = 0
5543 if kdu_profile and "min-number-of-instances" in kdu_profile:
5544 min_instance_count = kdu_profile["min-number-of-instances"]
5545
5546 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5547 deployed_kdu, _ = get_deployed_kdu(
5548 nsr_deployed, kdu_name, vnf_index
5549 )
5550 if deployed_kdu is None:
5551 raise LcmException(
5552 "KDU '{}' for vnf '{}' not deployed".format(
5553 kdu_name, vnf_index
5554 )
5555 )
5556 kdu_instance = deployed_kdu.get("kdu-instance")
5557 instance_num = await self.k8scluster_map[
5558 k8s_cluster_type
5559 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5560 kdu_replica_count = instance_num - kdu_delta.get(
5561 "number-of-instances", 1
5562 )
5563
5564 if kdu_replica_count < min_instance_count < instance_num:
5565 kdu_replica_count = min_instance_count
5566 if kdu_replica_count < min_instance_count:
5567 raise LcmException(
5568 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5569 "scaling-group-descriptor '{}'".format(
5570 instance_num, scaling_group
5571 )
5572 )
5573
5574 for x in range(kdu_delta.get("number-of-instances", 1)):
5575 vca_scaling_info.append(
5576 {
5577 "osm_kdu_id": kdu_name,
5578 "member-vnf-index": vnf_index,
5579 "type": "delete",
5580 "kdu_index": instance_num - x - 1,
5581 }
5582 )
5583 scaling_info["kdu-delete"][kdu_name].append(
5584 {
5585 "member-vnf-index": vnf_index,
5586 "type": "delete",
5587 "k8s-cluster-type": k8s_cluster_type,
5588 "resource-name": resource_name,
5589 "scale": kdu_replica_count,
5590 }
5591 )
5592
5593 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5594 vdu_delete = copy(scaling_info.get("vdu-delete"))
5595 if scaling_info["scaling_direction"] == "IN":
5596 for vdur in reversed(db_vnfr["vdur"]):
5597 if vdu_delete.get(vdur["vdu-id-ref"]):
5598 vdu_delete[vdur["vdu-id-ref"]] -= 1
5599 scaling_info["vdu"].append(
5600 {
5601 "name": vdur.get("name") or vdur.get("vdu-name"),
5602 "vdu_id": vdur["vdu-id-ref"],
5603 "interface": [],
5604 }
5605 )
5606 for interface in vdur["interfaces"]:
5607 scaling_info["vdu"][-1]["interface"].append(
5608 {
5609 "name": interface["name"],
5610 "ip_address": interface["ip-address"],
5611 "mac_address": interface.get("mac-address"),
5612 }
5613 )
5614 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5615
5616 # PRE-SCALE BEGIN
5617 step = "Executing pre-scale vnf-config-primitive"
5618 if scaling_descriptor.get("scaling-config-action"):
5619 for scaling_config_action in scaling_descriptor[
5620 "scaling-config-action"
5621 ]:
5622 if (
5623 scaling_config_action.get("trigger") == "pre-scale-in"
5624 and scaling_type == "SCALE_IN"
5625 ) or (
5626 scaling_config_action.get("trigger") == "pre-scale-out"
5627 and scaling_type == "SCALE_OUT"
5628 ):
5629 vnf_config_primitive = scaling_config_action[
5630 "vnf-config-primitive-name-ref"
5631 ]
5632 step = db_nslcmop_update[
5633 "detailed-status"
5634 ] = "executing pre-scale scaling-config-action '{}'".format(
5635 vnf_config_primitive
5636 )
5637
5638 # look for primitive
5639 for config_primitive in (
5640 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5641 ).get("config-primitive", ()):
5642 if config_primitive["name"] == vnf_config_primitive:
5643 break
5644 else:
5645 raise LcmException(
5646 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5647 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5648 "primitive".format(scaling_group, vnf_config_primitive)
5649 )
5650
5651 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5652 if db_vnfr.get("additionalParamsForVnf"):
5653 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5654
5655 scale_process = "VCA"
5656 db_nsr_update["config-status"] = "configuring pre-scaling"
5657 primitive_params = self._map_primitive_params(
5658 config_primitive, {}, vnfr_params
5659 )
5660
5661 # Pre-scale retry check: Check if this sub-operation has been executed before
5662 op_index = self._check_or_add_scale_suboperation(
5663 db_nslcmop,
5664 vnf_index,
5665 vnf_config_primitive,
5666 primitive_params,
5667 "PRE-SCALE",
5668 )
5669 if op_index == self.SUBOPERATION_STATUS_SKIP:
5670 # Skip sub-operation
5671 result = "COMPLETED"
5672 result_detail = "Done"
5673 self.logger.debug(
5674 logging_text
5675 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5676 vnf_config_primitive, result, result_detail
5677 )
5678 )
5679 else:
5680 if op_index == self.SUBOPERATION_STATUS_NEW:
5681 # New sub-operation: Get index of this sub-operation
5682 op_index = (
5683 len(db_nslcmop.get("_admin", {}).get("operations"))
5684 - 1
5685 )
5686 self.logger.debug(
5687 logging_text
5688 + "vnf_config_primitive={} New sub-operation".format(
5689 vnf_config_primitive
5690 )
5691 )
5692 else:
5693 # retry: Get registered params for this existing sub-operation
5694 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5695 op_index
5696 ]
5697 vnf_index = op.get("member_vnf_index")
5698 vnf_config_primitive = op.get("primitive")
5699 primitive_params = op.get("primitive_params")
5700 self.logger.debug(
5701 logging_text
5702 + "vnf_config_primitive={} Sub-operation retry".format(
5703 vnf_config_primitive
5704 )
5705 )
5706 # Execute the primitive, either with new (first-time) or registered (reintent) args
5707 ee_descriptor_id = config_primitive.get(
5708 "execution-environment-ref"
5709 )
5710 primitive_name = config_primitive.get(
5711 "execution-environment-primitive", vnf_config_primitive
5712 )
5713 ee_id, vca_type = self._look_for_deployed_vca(
5714 nsr_deployed["VCA"],
5715 member_vnf_index=vnf_index,
5716 vdu_id=None,
5717 vdu_count_index=None,
5718 ee_descriptor_id=ee_descriptor_id,
5719 )
5720 result, result_detail = await self._ns_execute_primitive(
5721 ee_id,
5722 primitive_name,
5723 primitive_params,
5724 vca_type=vca_type,
5725 vca_id=vca_id,
5726 )
5727 self.logger.debug(
5728 logging_text
5729 + "vnf_config_primitive={} Done with result {} {}".format(
5730 vnf_config_primitive, result, result_detail
5731 )
5732 )
5733 # Update operationState = COMPLETED | FAILED
5734 self._update_suboperation_status(
5735 db_nslcmop, op_index, result, result_detail
5736 )
5737
5738 if result == "FAILED":
5739 raise LcmException(result_detail)
5740 db_nsr_update["config-status"] = old_config_status
5741 scale_process = None
5742 # PRE-SCALE END
5743
5744 db_nsr_update[
5745 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5746 ] = nb_scale_op
5747 db_nsr_update[
5748 "_admin.scaling-group.{}.time".format(admin_scale_index)
5749 ] = time()
5750
5751 # SCALE-IN VCA - BEGIN
5752 if vca_scaling_info:
5753 step = db_nslcmop_update[
5754 "detailed-status"
5755 ] = "Deleting the execution environments"
5756 scale_process = "VCA"
5757 for vca_info in vca_scaling_info:
5758 if vca_info["type"] == "delete":
5759 member_vnf_index = str(vca_info["member-vnf-index"])
5760 self.logger.debug(
5761 logging_text + "vdu info: {}".format(vca_info)
5762 )
5763 if vca_info.get("osm_vdu_id"):
5764 vdu_id = vca_info["osm_vdu_id"]
5765 vdu_index = int(vca_info["vdu_index"])
5766 stage[
5767 1
5768 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5769 member_vnf_index, vdu_id, vdu_index
5770 )
5771 else:
5772 vdu_index = 0
5773 kdu_id = vca_info["osm_kdu_id"]
5774 stage[
5775 1
5776 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5777 member_vnf_index, kdu_id, vdu_index
5778 )
5779 stage[2] = step = "Scaling in VCA"
5780 self._write_op_status(op_id=nslcmop_id, stage=stage)
5781 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5782 config_update = db_nsr["configurationStatus"]
5783 for vca_index, vca in enumerate(vca_update):
5784 if (
5785 (vca or vca.get("ee_id"))
5786 and vca["member-vnf-index"] == member_vnf_index
5787 and vca["vdu_count_index"] == vdu_index
5788 ):
5789 if vca.get("vdu_id"):
5790 config_descriptor = get_configuration(
5791 db_vnfd, vca.get("vdu_id")
5792 )
5793 elif vca.get("kdu_name"):
5794 config_descriptor = get_configuration(
5795 db_vnfd, vca.get("kdu_name")
5796 )
5797 else:
5798 config_descriptor = get_configuration(
5799 db_vnfd, db_vnfd["id"]
5800 )
5801 operation_params = (
5802 db_nslcmop.get("operationParams") or {}
5803 )
5804 exec_terminate_primitives = not operation_params.get(
5805 "skip_terminate_primitives"
5806 ) and vca.get("needed_terminate")
5807 task = asyncio.ensure_future(
5808 asyncio.wait_for(
5809 self.destroy_N2VC(
5810 logging_text,
5811 db_nslcmop,
5812 vca,
5813 config_descriptor,
5814 vca_index,
5815 destroy_ee=True,
5816 exec_primitives=exec_terminate_primitives,
5817 scaling_in=True,
5818 vca_id=vca_id,
5819 ),
5820 timeout=self.timeout_charm_delete,
5821 )
5822 )
5823 tasks_dict_info[task] = "Terminating VCA {}".format(
5824 vca.get("ee_id")
5825 )
5826 del vca_update[vca_index]
5827 del config_update[vca_index]
5828 # wait for pending tasks of terminate primitives
5829 if tasks_dict_info:
5830 self.logger.debug(
5831 logging_text
5832 + "Waiting for tasks {}".format(
5833 list(tasks_dict_info.keys())
5834 )
5835 )
5836 error_list = await self._wait_for_tasks(
5837 logging_text,
5838 tasks_dict_info,
5839 min(
5840 self.timeout_charm_delete, self.timeout_ns_terminate
5841 ),
5842 stage,
5843 nslcmop_id,
5844 )
5845 tasks_dict_info.clear()
5846 if error_list:
5847 raise LcmException("; ".join(error_list))
5848
5849 db_vca_and_config_update = {
5850 "_admin.deployed.VCA": vca_update,
5851 "configurationStatus": config_update,
5852 }
5853 self.update_db_2(
5854 "nsrs", db_nsr["_id"], db_vca_and_config_update
5855 )
5856 scale_process = None
5857 # SCALE-IN VCA - END
5858
5859 # SCALE RO - BEGIN
5860 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5861 scale_process = "RO"
5862 if self.ro_config.get("ng"):
5863 await self._scale_ng_ro(
5864 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5865 )
5866 scaling_info.pop("vdu-create", None)
5867 scaling_info.pop("vdu-delete", None)
5868
5869 scale_process = None
5870 # SCALE RO - END
5871
5872 # SCALE KDU - BEGIN
5873 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5874 scale_process = "KDU"
5875 await self._scale_kdu(
5876 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5877 )
5878 scaling_info.pop("kdu-create", None)
5879 scaling_info.pop("kdu-delete", None)
5880
5881 scale_process = None
5882 # SCALE KDU - END
5883
5884 if db_nsr_update:
5885 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5886
5887 # SCALE-UP VCA - BEGIN
5888 if vca_scaling_info:
5889 step = db_nslcmop_update[
5890 "detailed-status"
5891 ] = "Creating new execution environments"
5892 scale_process = "VCA"
5893 for vca_info in vca_scaling_info:
5894 if vca_info["type"] == "create":
5895 member_vnf_index = str(vca_info["member-vnf-index"])
5896 self.logger.debug(
5897 logging_text + "vdu info: {}".format(vca_info)
5898 )
5899 vnfd_id = db_vnfr["vnfd-ref"]
5900 if vca_info.get("osm_vdu_id"):
5901 vdu_index = int(vca_info["vdu_index"])
5902 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5903 if db_vnfr.get("additionalParamsForVnf"):
5904 deploy_params.update(
5905 parse_yaml_strings(
5906 db_vnfr["additionalParamsForVnf"].copy()
5907 )
5908 )
5909 descriptor_config = get_configuration(
5910 db_vnfd, db_vnfd["id"]
5911 )
5912 if descriptor_config:
5913 vdu_id = None
5914 vdu_name = None
5915 kdu_name = None
5916 self._deploy_n2vc(
5917 logging_text=logging_text
5918 + "member_vnf_index={} ".format(member_vnf_index),
5919 db_nsr=db_nsr,
5920 db_vnfr=db_vnfr,
5921 nslcmop_id=nslcmop_id,
5922 nsr_id=nsr_id,
5923 nsi_id=nsi_id,
5924 vnfd_id=vnfd_id,
5925 vdu_id=vdu_id,
5926 kdu_name=kdu_name,
5927 member_vnf_index=member_vnf_index,
5928 vdu_index=vdu_index,
5929 vdu_name=vdu_name,
5930 deploy_params=deploy_params,
5931 descriptor_config=descriptor_config,
5932 base_folder=base_folder,
5933 task_instantiation_info=tasks_dict_info,
5934 stage=stage,
5935 )
5936 vdu_id = vca_info["osm_vdu_id"]
5937 vdur = find_in_list(
5938 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5939 )
5940 descriptor_config = get_configuration(db_vnfd, vdu_id)
5941 if vdur.get("additionalParams"):
5942 deploy_params_vdu = parse_yaml_strings(
5943 vdur["additionalParams"]
5944 )
5945 else:
5946 deploy_params_vdu = deploy_params
5947 deploy_params_vdu["OSM"] = get_osm_params(
5948 db_vnfr, vdu_id, vdu_count_index=vdu_index
5949 )
5950 if descriptor_config:
5951 vdu_name = None
5952 kdu_name = None
5953 stage[
5954 1
5955 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5956 member_vnf_index, vdu_id, vdu_index
5957 )
5958 stage[2] = step = "Scaling out VCA"
5959 self._write_op_status(op_id=nslcmop_id, stage=stage)
5960 self._deploy_n2vc(
5961 logging_text=logging_text
5962 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5963 member_vnf_index, vdu_id, vdu_index
5964 ),
5965 db_nsr=db_nsr,
5966 db_vnfr=db_vnfr,
5967 nslcmop_id=nslcmop_id,
5968 nsr_id=nsr_id,
5969 nsi_id=nsi_id,
5970 vnfd_id=vnfd_id,
5971 vdu_id=vdu_id,
5972 kdu_name=kdu_name,
5973 member_vnf_index=member_vnf_index,
5974 vdu_index=vdu_index,
5975 vdu_name=vdu_name,
5976 deploy_params=deploy_params_vdu,
5977 descriptor_config=descriptor_config,
5978 base_folder=base_folder,
5979 task_instantiation_info=tasks_dict_info,
5980 stage=stage,
5981 )
5982 else:
5983 kdu_name = vca_info["osm_kdu_id"]
5984 descriptor_config = get_configuration(db_vnfd, kdu_name)
5985 if descriptor_config:
5986 vdu_id = None
5987 kdu_index = int(vca_info["kdu_index"])
5988 vdu_name = None
5989 kdur = next(
5990 x
5991 for x in db_vnfr["kdur"]
5992 if x["kdu-name"] == kdu_name
5993 )
5994 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
5995 if kdur.get("additionalParams"):
5996 deploy_params_kdu = parse_yaml_strings(
5997 kdur["additionalParams"]
5998 )
5999
6000 self._deploy_n2vc(
6001 logging_text=logging_text,
6002 db_nsr=db_nsr,
6003 db_vnfr=db_vnfr,
6004 nslcmop_id=nslcmop_id,
6005 nsr_id=nsr_id,
6006 nsi_id=nsi_id,
6007 vnfd_id=vnfd_id,
6008 vdu_id=vdu_id,
6009 kdu_name=kdu_name,
6010 member_vnf_index=member_vnf_index,
6011 vdu_index=kdu_index,
6012 vdu_name=vdu_name,
6013 deploy_params=deploy_params_kdu,
6014 descriptor_config=descriptor_config,
6015 base_folder=base_folder,
6016 task_instantiation_info=tasks_dict_info,
6017 stage=stage,
6018 )
6019 # SCALE-UP VCA - END
6020 scale_process = None
6021
6022 # POST-SCALE BEGIN
6023 # execute primitive service POST-SCALING
6024 step = "Executing post-scale vnf-config-primitive"
6025 if scaling_descriptor.get("scaling-config-action"):
6026 for scaling_config_action in scaling_descriptor[
6027 "scaling-config-action"
6028 ]:
6029 if (
6030 scaling_config_action.get("trigger") == "post-scale-in"
6031 and scaling_type == "SCALE_IN"
6032 ) or (
6033 scaling_config_action.get("trigger") == "post-scale-out"
6034 and scaling_type == "SCALE_OUT"
6035 ):
6036 vnf_config_primitive = scaling_config_action[
6037 "vnf-config-primitive-name-ref"
6038 ]
6039 step = db_nslcmop_update[
6040 "detailed-status"
6041 ] = "executing post-scale scaling-config-action '{}'".format(
6042 vnf_config_primitive
6043 )
6044
6045 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6046 if db_vnfr.get("additionalParamsForVnf"):
6047 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6048
6049 # look for primitive
6050 for config_primitive in (
6051 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6052 ).get("config-primitive", ()):
6053 if config_primitive["name"] == vnf_config_primitive:
6054 break
6055 else:
6056 raise LcmException(
6057 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6058 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6059 "config-primitive".format(
6060 scaling_group, vnf_config_primitive
6061 )
6062 )
6063 scale_process = "VCA"
6064 db_nsr_update["config-status"] = "configuring post-scaling"
6065 primitive_params = self._map_primitive_params(
6066 config_primitive, {}, vnfr_params
6067 )
6068
6069 # Post-scale retry check: Check if this sub-operation has been executed before
6070 op_index = self._check_or_add_scale_suboperation(
6071 db_nslcmop,
6072 vnf_index,
6073 vnf_config_primitive,
6074 primitive_params,
6075 "POST-SCALE",
6076 )
6077 if op_index == self.SUBOPERATION_STATUS_SKIP:
6078 # Skip sub-operation
6079 result = "COMPLETED"
6080 result_detail = "Done"
6081 self.logger.debug(
6082 logging_text
6083 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6084 vnf_config_primitive, result, result_detail
6085 )
6086 )
6087 else:
6088 if op_index == self.SUBOPERATION_STATUS_NEW:
6089 # New sub-operation: Get index of this sub-operation
6090 op_index = (
6091 len(db_nslcmop.get("_admin", {}).get("operations"))
6092 - 1
6093 )
6094 self.logger.debug(
6095 logging_text
6096 + "vnf_config_primitive={} New sub-operation".format(
6097 vnf_config_primitive
6098 )
6099 )
6100 else:
6101 # retry: Get registered params for this existing sub-operation
6102 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6103 op_index
6104 ]
6105 vnf_index = op.get("member_vnf_index")
6106 vnf_config_primitive = op.get("primitive")
6107 primitive_params = op.get("primitive_params")
6108 self.logger.debug(
6109 logging_text
6110 + "vnf_config_primitive={} Sub-operation retry".format(
6111 vnf_config_primitive
6112 )
6113 )
6114 # Execute the primitive, either with new (first-time) or registered (reintent) args
6115 ee_descriptor_id = config_primitive.get(
6116 "execution-environment-ref"
6117 )
6118 primitive_name = config_primitive.get(
6119 "execution-environment-primitive", vnf_config_primitive
6120 )
6121 ee_id, vca_type = self._look_for_deployed_vca(
6122 nsr_deployed["VCA"],
6123 member_vnf_index=vnf_index,
6124 vdu_id=None,
6125 vdu_count_index=None,
6126 ee_descriptor_id=ee_descriptor_id,
6127 )
6128 result, result_detail = await self._ns_execute_primitive(
6129 ee_id,
6130 primitive_name,
6131 primitive_params,
6132 vca_type=vca_type,
6133 vca_id=vca_id,
6134 )
6135 self.logger.debug(
6136 logging_text
6137 + "vnf_config_primitive={} Done with result {} {}".format(
6138 vnf_config_primitive, result, result_detail
6139 )
6140 )
6141 # Update operationState = COMPLETED | FAILED
6142 self._update_suboperation_status(
6143 db_nslcmop, op_index, result, result_detail
6144 )
6145
6146 if result == "FAILED":
6147 raise LcmException(result_detail)
6148 db_nsr_update["config-status"] = old_config_status
6149 scale_process = None
6150 # POST-SCALE END
6151
6152 db_nsr_update[
6153 "detailed-status"
6154 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6155 db_nsr_update["operational-status"] = (
6156 "running"
6157 if old_operational_status == "failed"
6158 else old_operational_status
6159 )
6160 db_nsr_update["config-status"] = old_config_status
6161 return
6162 except (
6163 ROclient.ROClientException,
6164 DbException,
6165 LcmException,
6166 NgRoException,
6167 ) as e:
6168 self.logger.error(logging_text + "Exit Exception {}".format(e))
6169 exc = e
6170 except asyncio.CancelledError:
6171 self.logger.error(
6172 logging_text + "Cancelled Exception while '{}'".format(step)
6173 )
6174 exc = "Operation was cancelled"
6175 except Exception as e:
6176 exc = traceback.format_exc()
6177 self.logger.critical(
6178 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6179 exc_info=True,
6180 )
6181 finally:
6182 self._write_ns_status(
6183 nsr_id=nsr_id,
6184 ns_state=None,
6185 current_operation="IDLE",
6186 current_operation_id=None,
6187 )
6188 if tasks_dict_info:
6189 stage[1] = "Waiting for instantiate pending tasks."
6190 self.logger.debug(logging_text + stage[1])
6191 exc = await self._wait_for_tasks(
6192 logging_text,
6193 tasks_dict_info,
6194 self.timeout_ns_deploy,
6195 stage,
6196 nslcmop_id,
6197 nsr_id=nsr_id,
6198 )
6199 if exc:
6200 db_nslcmop_update[
6201 "detailed-status"
6202 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6203 nslcmop_operation_state = "FAILED"
6204 if db_nsr:
6205 db_nsr_update["operational-status"] = old_operational_status
6206 db_nsr_update["config-status"] = old_config_status
6207 db_nsr_update["detailed-status"] = ""
6208 if scale_process:
6209 if "VCA" in scale_process:
6210 db_nsr_update["config-status"] = "failed"
6211 if "RO" in scale_process:
6212 db_nsr_update["operational-status"] = "failed"
6213 db_nsr_update[
6214 "detailed-status"
6215 ] = "FAILED scaling nslcmop={} {}: {}".format(
6216 nslcmop_id, step, exc
6217 )
6218 else:
6219 error_description_nslcmop = None
6220 nslcmop_operation_state = "COMPLETED"
6221 db_nslcmop_update["detailed-status"] = "Done"
6222
6223 self._write_op_status(
6224 op_id=nslcmop_id,
6225 stage="",
6226 error_message=error_description_nslcmop,
6227 operation_state=nslcmop_operation_state,
6228 other_update=db_nslcmop_update,
6229 )
6230 if db_nsr:
6231 self._write_ns_status(
6232 nsr_id=nsr_id,
6233 ns_state=None,
6234 current_operation="IDLE",
6235 current_operation_id=None,
6236 other_update=db_nsr_update,
6237 )
6238
6239 if nslcmop_operation_state:
6240 try:
6241 msg = {
6242 "nsr_id": nsr_id,
6243 "nslcmop_id": nslcmop_id,
6244 "operationState": nslcmop_operation_state,
6245 }
6246 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6247 except Exception as e:
6248 self.logger.error(
6249 logging_text + "kafka_write notification Exception {}".format(e)
6250 )
6251 self.logger.debug(logging_text + "Exit")
6252 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6253
6254 async def _scale_kdu(
6255 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6256 ):
6257 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6258 for kdu_name in _scaling_info:
6259 for kdu_scaling_info in _scaling_info[kdu_name]:
6260 deployed_kdu, index = get_deployed_kdu(
6261 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6262 )
6263 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6264 kdu_instance = deployed_kdu["kdu-instance"]
6265 scale = int(kdu_scaling_info["scale"])
6266 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6267
6268 db_dict = {
6269 "collection": "nsrs",
6270 "filter": {"_id": nsr_id},
6271 "path": "_admin.deployed.K8s.{}".format(index),
6272 }
6273
6274 step = "scaling application {}".format(
6275 kdu_scaling_info["resource-name"]
6276 )
6277 self.logger.debug(logging_text + step)
6278
6279 if kdu_scaling_info["type"] == "delete":
6280 kdu_config = get_configuration(db_vnfd, kdu_name)
6281 if (
6282 kdu_config
6283 and kdu_config.get("terminate-config-primitive")
6284 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6285 ):
6286 terminate_config_primitive_list = kdu_config.get(
6287 "terminate-config-primitive"
6288 )
6289 terminate_config_primitive_list.sort(
6290 key=lambda val: int(val["seq"])
6291 )
6292
6293 for (
6294 terminate_config_primitive
6295 ) in terminate_config_primitive_list:
6296 primitive_params_ = self._map_primitive_params(
6297 terminate_config_primitive, {}, {}
6298 )
6299 step = "execute terminate config primitive"
6300 self.logger.debug(logging_text + step)
6301 await asyncio.wait_for(
6302 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6303 cluster_uuid=cluster_uuid,
6304 kdu_instance=kdu_instance,
6305 primitive_name=terminate_config_primitive["name"],
6306 params=primitive_params_,
6307 db_dict=db_dict,
6308 vca_id=vca_id,
6309 ),
6310 timeout=600,
6311 )
6312
6313 await asyncio.wait_for(
6314 self.k8scluster_map[k8s_cluster_type].scale(
6315 kdu_instance,
6316 scale,
6317 kdu_scaling_info["resource-name"],
6318 vca_id=vca_id,
6319 ),
6320 timeout=self.timeout_vca_on_error,
6321 )
6322
6323 if kdu_scaling_info["type"] == "create":
6324 kdu_config = get_configuration(db_vnfd, kdu_name)
6325 if (
6326 kdu_config
6327 and kdu_config.get("initial-config-primitive")
6328 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6329 ):
6330 initial_config_primitive_list = kdu_config.get(
6331 "initial-config-primitive"
6332 )
6333 initial_config_primitive_list.sort(
6334 key=lambda val: int(val["seq"])
6335 )
6336
6337 for initial_config_primitive in initial_config_primitive_list:
6338 primitive_params_ = self._map_primitive_params(
6339 initial_config_primitive, {}, {}
6340 )
6341 step = "execute initial config primitive"
6342 self.logger.debug(logging_text + step)
6343 await asyncio.wait_for(
6344 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6345 cluster_uuid=cluster_uuid,
6346 kdu_instance=kdu_instance,
6347 primitive_name=initial_config_primitive["name"],
6348 params=primitive_params_,
6349 db_dict=db_dict,
6350 vca_id=vca_id,
6351 ),
6352 timeout=600,
6353 )
6354
6355 async def _scale_ng_ro(
6356 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6357 ):
6358 nsr_id = db_nslcmop["nsInstanceId"]
6359 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6360 db_vnfrs = {}
6361
6362 # read from db: vnfd's for every vnf
6363 db_vnfds = []
6364
6365 # for each vnf in ns, read vnfd
6366 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6367 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6368 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6369 # if we haven't this vnfd, read it from db
6370 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6371 # read from db
6372 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6373 db_vnfds.append(vnfd)
6374 n2vc_key = self.n2vc.get_public_key()
6375 n2vc_key_list = [n2vc_key]
6376 self.scale_vnfr(
6377 db_vnfr,
6378 vdu_scaling_info.get("vdu-create"),
6379 vdu_scaling_info.get("vdu-delete"),
6380 mark_delete=True,
6381 )
6382 # db_vnfr has been updated, update db_vnfrs to use it
6383 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6384 await self._instantiate_ng_ro(
6385 logging_text,
6386 nsr_id,
6387 db_nsd,
6388 db_nsr,
6389 db_nslcmop,
6390 db_vnfrs,
6391 db_vnfds,
6392 n2vc_key_list,
6393 stage=stage,
6394 start_deploy=time(),
6395 timeout_ns_deploy=self.timeout_ns_deploy,
6396 )
6397 if vdu_scaling_info.get("vdu-delete"):
6398 self.scale_vnfr(
6399 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6400 )
6401
6402 async def extract_prometheus_scrape_jobs(
6403 self,
6404 ee_id,
6405 artifact_path,
6406 ee_config_descriptor,
6407 vnfr_id,
6408 nsr_id,
6409 target_ip
6410 ):
6411 # look if exist a file called 'prometheus*.j2' and
6412 artifact_content = self.fs.dir_ls(artifact_path)
6413 job_file = next(
6414 (
6415 f
6416 for f in artifact_content
6417 if f.startswith("prometheus") and f.endswith(".j2")
6418 ),
6419 None,
6420 )
6421 if not job_file:
6422 return
6423 with self.fs.file_open((artifact_path, job_file), "r") as f:
6424 job_data = f.read()
6425
6426 # TODO get_service
6427 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6428 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6429 host_port = "80"
6430 vnfr_id = vnfr_id.replace("-", "")
6431 variables = {
6432 "JOB_NAME": vnfr_id,
6433 "TARGET_IP": target_ip,
6434 "EXPORTER_POD_IP": host_name,
6435 "EXPORTER_POD_PORT": host_port,
6436 }
6437 job_list = parse_job(job_data, variables)
6438 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6439 for job in job_list:
6440 if (
6441 not isinstance(job.get("job_name"), str)
6442 or vnfr_id not in job["job_name"]
6443 ):
6444 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6445 job["nsr_id"] = nsr_id
6446 job["vnfr_id"] = vnfr_id
6447 return job_list
6448
6449 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6450 """
6451 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6452
6453 :param: vim_account_id: VIM Account ID
6454
6455 :return: (cloud_name, cloud_credential)
6456 """
6457 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6458 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6459
6460 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6461 """
6462 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6463
6464 :param: vim_account_id: VIM Account ID
6465
6466 :return: (cloud_name, cloud_credential)
6467 """
6468 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6469 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")