8eccf05523faa1ef1b1dbd9d677146a000086358
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99 from osm_lcm.prometheus import parse_job
100
101 from copy import copy, deepcopy
102 from time import time
103 from uuid import uuid4
104
105 from random import randint
106
107 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
108
109
110 class NsLcm(LcmBase):
111 timeout_vca_on_error = (
112 5 * 60
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete = 10 * 60
117 timeout_primitive = 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive = (
119 10 * 60
120 ) # timeout for some progress in a primitive execution
121
122 SUBOPERATION_STATUS_NOT_FOUND = -1
123 SUBOPERATION_STATUS_NEW = -2
124 SUBOPERATION_STATUS_SKIP = -3
125 task_name_deploy_vca = "Deploying VCA"
126
127 def __init__(self, msg, lcm_tasks, config, loop):
128 """
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
131 :return: None
132 """
133 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
134
135 self.db = Database().instance.db
136 self.fs = Filesystem().instance.fs
137 self.loop = loop
138 self.lcm_tasks = lcm_tasks
139 self.timeout = config["timeout"]
140 self.ro_config = config["ro_config"]
141 self.ng_ro = config["ro_config"].get("ng")
142 self.vca_config = config["VCA"].copy()
143
144 # create N2VC connector
145 self.n2vc = N2VCJujuConnector(
146 log=self.logger,
147 loop=self.loop,
148 on_update_db=self._on_update_n2vc_db,
149 fs=self.fs,
150 db=self.db,
151 )
152
153 self.conn_helm_ee = LCMHelmConn(
154 log=self.logger,
155 loop=self.loop,
156 vca_config=self.vca_config,
157 on_update_db=self._on_update_n2vc_db,
158 )
159
160 self.k8sclusterhelm2 = K8sHelmConnector(
161 kubectl_command=self.vca_config.get("kubectlpath"),
162 helm_command=self.vca_config.get("helmpath"),
163 log=self.logger,
164 on_update_db=None,
165 fs=self.fs,
166 db=self.db,
167 )
168
169 self.k8sclusterhelm3 = K8sHelm3Connector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helm3path"),
172 fs=self.fs,
173 log=self.logger,
174 db=self.db,
175 on_update_db=None,
176 )
177
178 self.k8sclusterjuju = K8sJujuConnector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 juju_command=self.vca_config.get("jujupath"),
181 log=self.logger,
182 loop=self.loop,
183 on_update_db=self._on_update_k8s_db,
184 fs=self.fs,
185 db=self.db,
186 )
187
188 self.k8scluster_map = {
189 "helm-chart": self.k8sclusterhelm2,
190 "helm-chart-v3": self.k8sclusterhelm3,
191 "chart": self.k8sclusterhelm3,
192 "juju-bundle": self.k8sclusterjuju,
193 "juju": self.k8sclusterjuju,
194 }
195
196 self.vca_map = {
197 "lxc_proxy_charm": self.n2vc,
198 "native_charm": self.n2vc,
199 "k8s_proxy_charm": self.n2vc,
200 "helm": self.conn_helm_ee,
201 "helm-v3": self.conn_helm_ee,
202 }
203
204 # create RO client
205 self.RO = NgRoClient(self.loop, **self.ro_config)
206
207 @staticmethod
208 def increment_ip_mac(ip_mac, vm_index=1):
209 if not isinstance(ip_mac, str):
210 return ip_mac
211 try:
212 # try with ipv4 look for last dot
213 i = ip_mac.rfind(".")
214 if i > 0:
215 i += 1
216 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i = ip_mac.rfind(":")
219 if i > 0:
220 i += 1
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
223 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
224 )
225 except Exception:
226 pass
227 return None
228
229 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
230
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
232
233 try:
234 # TODO filter RO descriptor fields...
235
236 # write to database
237 db_dict = dict()
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict["deploymentStatus"] = ro_descriptor
240 self.update_db_2("nsrs", nsrs_id, db_dict)
241
242 except Exception as e:
243 self.logger.warn(
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
245 )
246
247 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
248
249 # remove last dot from path (if exists)
250 if path.endswith("."):
251 path = path[:-1]
252
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
255 try:
256
257 nsr_id = filter.get("_id")
258
259 # read ns record from database
260 nsr = self.db.get_one(table="nsrs", q_filter=filter)
261 current_ns_status = nsr.get("nsState")
262
263 # get vca status for NS
264 status_dict = await self.n2vc.get_status(
265 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
266 )
267
268 # vcaStatus
269 db_dict = dict()
270 db_dict["vcaStatus"] = status_dict
271 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
272
273 # update configurationStatus for this VCA
274 try:
275 vca_index = int(path[path.rfind(".") + 1 :])
276
277 vca_list = deep_get(
278 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
279 )
280 vca_status = vca_list[vca_index].get("status")
281
282 configuration_status_list = nsr.get("configurationStatus")
283 config_status = configuration_status_list[vca_index].get("status")
284
285 if config_status == "BROKEN" and vca_status != "failed":
286 db_dict["configurationStatus"][vca_index] = "READY"
287 elif config_status != "BROKEN" and vca_status == "failed":
288 db_dict["configurationStatus"][vca_index] = "BROKEN"
289 except Exception as e:
290 # not update configurationStatus
291 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
292
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
295 is_degraded = False
296 if current_ns_status in ("READY", "DEGRADED"):
297 error_description = ""
298 # check machines
299 if status_dict.get("machines"):
300 for machine_id in status_dict.get("machines"):
301 machine = status_dict.get("machines").get(machine_id)
302 # check machine agent-status
303 if machine.get("agent-status"):
304 s = machine.get("agent-status").get("status")
305 if s != "started":
306 is_degraded = True
307 error_description += (
308 "machine {} agent-status={} ; ".format(
309 machine_id, s
310 )
311 )
312 # check machine instance status
313 if machine.get("instance-status"):
314 s = machine.get("instance-status").get("status")
315 if s != "running":
316 is_degraded = True
317 error_description += (
318 "machine {} instance-status={} ; ".format(
319 machine_id, s
320 )
321 )
322 # check applications
323 if status_dict.get("applications"):
324 for app_id in status_dict.get("applications"):
325 app = status_dict.get("applications").get(app_id)
326 # check application status
327 if app.get("status"):
328 s = app.get("status").get("status")
329 if s != "active":
330 is_degraded = True
331 error_description += (
332 "application {} status={} ; ".format(app_id, s)
333 )
334
335 if error_description:
336 db_dict["errorDescription"] = error_description
337 if current_ns_status == "READY" and is_degraded:
338 db_dict["nsState"] = "DEGRADED"
339 if current_ns_status == "DEGRADED" and not is_degraded:
340 db_dict["nsState"] = "READY"
341
342 # write to database
343 self.update_db_2("nsrs", nsr_id, db_dict)
344
345 except (asyncio.CancelledError, asyncio.TimeoutError):
346 raise
347 except Exception as e:
348 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
349
350 async def _on_update_k8s_db(
351 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
352 ):
353 """
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :return: none
359 """
360
361 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
362 # .format(cluster_uuid, kdu_instance, filter))
363
364 try:
365 nsr_id = filter.get("_id")
366
367 # get vca status for NS
368 vca_status = await self.k8sclusterjuju.status_kdu(
369 cluster_uuid,
370 kdu_instance,
371 complete_status=True,
372 yaml_format=False,
373 vca_id=vca_id,
374 )
375 # vcaStatus
376 db_dict = dict()
377 db_dict["vcaStatus"] = {nsr_id: vca_status}
378
379 await self.k8sclusterjuju.update_vca_status(
380 db_dict["vcaStatus"],
381 kdu_instance,
382 vca_id=vca_id,
383 )
384
385 # write to database
386 self.update_db_2("nsrs", nsr_id, db_dict)
387
388 except (asyncio.CancelledError, asyncio.TimeoutError):
389 raise
390 except Exception as e:
391 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
392
393 @staticmethod
394 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
395 try:
396 env = Environment(undefined=StrictUndefined)
397 template = env.from_string(cloud_init_text)
398 return template.render(additional_params or {})
399 except UndefinedError as e:
400 raise LcmException(
401 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
402 "file, must be provided in the instantiation parameters inside the "
403 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
404 )
405 except (TemplateError, TemplateNotFound) as e:
406 raise LcmException(
407 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
408 vnfd_id, vdu_id, e
409 )
410 )
411
412 def _get_vdu_cloud_init_content(self, vdu, vnfd):
413 cloud_init_content = cloud_init_file = None
414 try:
415 if vdu.get("cloud-init-file"):
416 base_folder = vnfd["_admin"]["storage"]
417 if base_folder["pkg-dir"]:
418 cloud_init_file = "{}/{}/cloud_init/{}".format(
419 base_folder["folder"],
420 base_folder["pkg-dir"],
421 vdu["cloud-init-file"],
422 )
423 else:
424 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
425 base_folder["folder"],
426 vdu["cloud-init-file"],
427 )
428 with self.fs.file_open(cloud_init_file, "r") as ci_file:
429 cloud_init_content = ci_file.read()
430 elif vdu.get("cloud-init"):
431 cloud_init_content = vdu["cloud-init"]
432
433 return cloud_init_content
434 except FsException as e:
435 raise LcmException(
436 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
437 vnfd["id"], vdu["id"], cloud_init_file, e
438 )
439 )
440
441 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
442 vdur = next(
443 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
444 {}
445 )
446 additional_params = vdur.get("additionalParams")
447 return parse_yaml_strings(additional_params)
448
449 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
450 """
451 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
452 :param vnfd: input vnfd
453 :param new_id: overrides vnf id if provided
454 :param additionalParams: Instantiation params for VNFs provided
455 :param nsrId: Id of the NSR
456 :return: copy of vnfd
457 """
458 vnfd_RO = deepcopy(vnfd)
459 # remove unused by RO configuration, monitoring, scaling and internal keys
460 vnfd_RO.pop("_id", None)
461 vnfd_RO.pop("_admin", None)
462 vnfd_RO.pop("monitoring-param", None)
463 vnfd_RO.pop("scaling-group-descriptor", None)
464 vnfd_RO.pop("kdu", None)
465 vnfd_RO.pop("k8s-cluster", None)
466 if new_id:
467 vnfd_RO["id"] = new_id
468
469 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
470 for vdu in get_iterable(vnfd_RO, "vdu"):
471 vdu.pop("cloud-init-file", None)
472 vdu.pop("cloud-init", None)
473 return vnfd_RO
474
475 @staticmethod
476 def ip_profile_2_RO(ip_profile):
477 RO_ip_profile = deepcopy(ip_profile)
478 if "dns-server" in RO_ip_profile:
479 if isinstance(RO_ip_profile["dns-server"], list):
480 RO_ip_profile["dns-address"] = []
481 for ds in RO_ip_profile.pop("dns-server"):
482 RO_ip_profile["dns-address"].append(ds["address"])
483 else:
484 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
485 if RO_ip_profile.get("ip-version") == "ipv4":
486 RO_ip_profile["ip-version"] = "IPv4"
487 if RO_ip_profile.get("ip-version") == "ipv6":
488 RO_ip_profile["ip-version"] = "IPv6"
489 if "dhcp-params" in RO_ip_profile:
490 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
491 return RO_ip_profile
492
493 def _get_ro_vim_id_for_vim_account(self, vim_account):
494 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
495 if db_vim["_admin"]["operationalState"] != "ENABLED":
496 raise LcmException(
497 "VIM={} is not available. operationalState={}".format(
498 vim_account, db_vim["_admin"]["operationalState"]
499 )
500 )
501 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
502 return RO_vim_id
503
504 def get_ro_wim_id_for_wim_account(self, wim_account):
505 if isinstance(wim_account, str):
506 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
507 if db_wim["_admin"]["operationalState"] != "ENABLED":
508 raise LcmException(
509 "WIM={} is not available. operationalState={}".format(
510 wim_account, db_wim["_admin"]["operationalState"]
511 )
512 )
513 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
514 return RO_wim_id
515 else:
516 return wim_account
517
518 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
519
520 db_vdu_push_list = []
521 template_vdur = []
522 db_update = {"_admin.modified": time()}
523 if vdu_create:
524 for vdu_id, vdu_count in vdu_create.items():
525 vdur = next(
526 (
527 vdur
528 for vdur in reversed(db_vnfr["vdur"])
529 if vdur["vdu-id-ref"] == vdu_id
530 ),
531 None,
532 )
533 if not vdur:
534 # Read the template saved in the db:
535 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
536 vdur_template = db_vnfr.get("vdur-template")
537 if not vdur_template:
538 raise LcmException(
539 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
540 vdu_id
541 )
542 )
543 vdur = vdur_template[0]
544 #Delete a template from the database after using it
545 self.db.set_one("vnfrs",
546 {"_id": db_vnfr["_id"]},
547 None,
548 pull={"vdur-template": {"_id": vdur['_id']}}
549 )
550 for count in range(vdu_count):
551 vdur_copy = deepcopy(vdur)
552 vdur_copy["status"] = "BUILD"
553 vdur_copy["status-detailed"] = None
554 vdur_copy["ip-address"] = None
555 vdur_copy["_id"] = str(uuid4())
556 vdur_copy["count-index"] += count + 1
557 vdur_copy["id"] = "{}-{}".format(
558 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
559 )
560 vdur_copy.pop("vim_info", None)
561 for iface in vdur_copy["interfaces"]:
562 if iface.get("fixed-ip"):
563 iface["ip-address"] = self.increment_ip_mac(
564 iface["ip-address"], count + 1
565 )
566 else:
567 iface.pop("ip-address", None)
568 if iface.get("fixed-mac"):
569 iface["mac-address"] = self.increment_ip_mac(
570 iface["mac-address"], count + 1
571 )
572 else:
573 iface.pop("mac-address", None)
574 if db_vnfr["vdur"]:
575 iface.pop(
576 "mgmt_vnf", None
577 ) # only first vdu can be managment of vnf
578 db_vdu_push_list.append(vdur_copy)
579 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
580 if vdu_delete:
581 if len(db_vnfr["vdur"]) == 1:
582 # The scale will move to 0 instances
583 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
584 template_vdur = [db_vnfr["vdur"][0]]
585 for vdu_id, vdu_count in vdu_delete.items():
586 if mark_delete:
587 indexes_to_delete = [
588 iv[0]
589 for iv in enumerate(db_vnfr["vdur"])
590 if iv[1]["vdu-id-ref"] == vdu_id
591 ]
592 db_update.update(
593 {
594 "vdur.{}.status".format(i): "DELETING"
595 for i in indexes_to_delete[-vdu_count:]
596 }
597 )
598 else:
599 # it must be deleted one by one because common.db does not allow otherwise
600 vdus_to_delete = [
601 v
602 for v in reversed(db_vnfr["vdur"])
603 if v["vdu-id-ref"] == vdu_id
604 ]
605 for vdu in vdus_to_delete[:vdu_count]:
606 self.db.set_one(
607 "vnfrs",
608 {"_id": db_vnfr["_id"]},
609 None,
610 pull={"vdur": {"_id": vdu["_id"]}},
611 )
612 db_push = {}
613 if db_vdu_push_list:
614 db_push["vdur"] = db_vdu_push_list
615 if template_vdur:
616 db_push["vdur-template"] = template_vdur
617 if not db_push:
618 db_push = None
619 db_vnfr["vdur-template"] = template_vdur
620 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
621 # modify passed dictionary db_vnfr
622 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
623 db_vnfr["vdur"] = db_vnfr_["vdur"]
624
625 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
626 """
627 Updates database nsr with the RO info for the created vld
628 :param ns_update_nsr: dictionary to be filled with the updated info
629 :param db_nsr: content of db_nsr. This is also modified
630 :param nsr_desc_RO: nsr descriptor from RO
631 :return: Nothing, LcmException is raised on errors
632 """
633
634 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
635 for net_RO in get_iterable(nsr_desc_RO, "nets"):
636 if vld["id"] != net_RO.get("ns_net_osm_id"):
637 continue
638 vld["vim-id"] = net_RO.get("vim_net_id")
639 vld["name"] = net_RO.get("vim_name")
640 vld["status"] = net_RO.get("status")
641 vld["status-detailed"] = net_RO.get("error_msg")
642 ns_update_nsr["vld.{}".format(vld_index)] = vld
643 break
644 else:
645 raise LcmException(
646 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
647 )
648
649 def set_vnfr_at_error(self, db_vnfrs, error_text):
650 try:
651 for db_vnfr in db_vnfrs.values():
652 vnfr_update = {"status": "ERROR"}
653 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
654 if "status" not in vdur:
655 vdur["status"] = "ERROR"
656 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
657 if error_text:
658 vdur["status-detailed"] = str(error_text)
659 vnfr_update[
660 "vdur.{}.status-detailed".format(vdu_index)
661 ] = "ERROR"
662 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
663 except DbException as e:
664 self.logger.error("Cannot update vnf. {}".format(e))
665
666 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
667 """
668 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
669 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
670 :param nsr_desc_RO: nsr descriptor from RO
671 :return: Nothing, LcmException is raised on errors
672 """
673 for vnf_index, db_vnfr in db_vnfrs.items():
674 for vnf_RO in nsr_desc_RO["vnfs"]:
675 if vnf_RO["member_vnf_index"] != vnf_index:
676 continue
677 vnfr_update = {}
678 if vnf_RO.get("ip_address"):
679 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
680 "ip_address"
681 ].split(";")[0]
682 elif not db_vnfr.get("ip-address"):
683 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
684 raise LcmExceptionNoMgmtIP(
685 "ns member_vnf_index '{}' has no IP address".format(
686 vnf_index
687 )
688 )
689
690 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
691 vdur_RO_count_index = 0
692 if vdur.get("pdu-type"):
693 continue
694 for vdur_RO in get_iterable(vnf_RO, "vms"):
695 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
696 continue
697 if vdur["count-index"] != vdur_RO_count_index:
698 vdur_RO_count_index += 1
699 continue
700 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
701 if vdur_RO.get("ip_address"):
702 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
703 else:
704 vdur["ip-address"] = None
705 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
706 vdur["name"] = vdur_RO.get("vim_name")
707 vdur["status"] = vdur_RO.get("status")
708 vdur["status-detailed"] = vdur_RO.get("error_msg")
709 for ifacer in get_iterable(vdur, "interfaces"):
710 for interface_RO in get_iterable(vdur_RO, "interfaces"):
711 if ifacer["name"] == interface_RO.get("internal_name"):
712 ifacer["ip-address"] = interface_RO.get(
713 "ip_address"
714 )
715 ifacer["mac-address"] = interface_RO.get(
716 "mac_address"
717 )
718 break
719 else:
720 raise LcmException(
721 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
722 "from VIM info".format(
723 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
724 )
725 )
726 vnfr_update["vdur.{}".format(vdu_index)] = vdur
727 break
728 else:
729 raise LcmException(
730 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
731 "VIM info".format(
732 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
733 )
734 )
735
736 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
737 for net_RO in get_iterable(nsr_desc_RO, "nets"):
738 if vld["id"] != net_RO.get("vnf_net_osm_id"):
739 continue
740 vld["vim-id"] = net_RO.get("vim_net_id")
741 vld["name"] = net_RO.get("vim_name")
742 vld["status"] = net_RO.get("status")
743 vld["status-detailed"] = net_RO.get("error_msg")
744 vnfr_update["vld.{}".format(vld_index)] = vld
745 break
746 else:
747 raise LcmException(
748 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
749 vnf_index, vld["id"]
750 )
751 )
752
753 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
754 break
755
756 else:
757 raise LcmException(
758 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
759 vnf_index
760 )
761 )
762
763 def _get_ns_config_info(self, nsr_id):
764 """
765 Generates a mapping between vnf,vdu elements and the N2VC id
766 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
767 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
768 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
769 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
770 """
771 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
772 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
773 mapping = {}
774 ns_config_info = {"osm-config-mapping": mapping}
775 for vca in vca_deployed_list:
776 if not vca["member-vnf-index"]:
777 continue
778 if not vca["vdu_id"]:
779 mapping[vca["member-vnf-index"]] = vca["application"]
780 else:
781 mapping[
782 "{}.{}.{}".format(
783 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
784 )
785 ] = vca["application"]
786 return ns_config_info
787
788 async def _instantiate_ng_ro(
789 self,
790 logging_text,
791 nsr_id,
792 nsd,
793 db_nsr,
794 db_nslcmop,
795 db_vnfrs,
796 db_vnfds,
797 n2vc_key_list,
798 stage,
799 start_deploy,
800 timeout_ns_deploy,
801 ):
802
803 db_vims = {}
804
805 def get_vim_account(vim_account_id):
806 nonlocal db_vims
807 if vim_account_id in db_vims:
808 return db_vims[vim_account_id]
809 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
810 db_vims[vim_account_id] = db_vim
811 return db_vim
812
813 # modify target_vld info with instantiation parameters
814 def parse_vld_instantiation_params(
815 target_vim, target_vld, vld_params, target_sdn
816 ):
817 if vld_params.get("ip-profile"):
818 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
819 "ip-profile"
820 ]
821 if vld_params.get("provider-network"):
822 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
823 "provider-network"
824 ]
825 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
826 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
827 "provider-network"
828 ]["sdn-ports"]
829 if vld_params.get("wimAccountId"):
830 target_wim = "wim:{}".format(vld_params["wimAccountId"])
831 target_vld["vim_info"][target_wim] = {}
832 for param in ("vim-network-name", "vim-network-id"):
833 if vld_params.get(param):
834 if isinstance(vld_params[param], dict):
835 for vim, vim_net in vld_params[param].items():
836 other_target_vim = "vim:" + vim
837 populate_dict(
838 target_vld["vim_info"],
839 (other_target_vim, param.replace("-", "_")),
840 vim_net,
841 )
842 else: # isinstance str
843 target_vld["vim_info"][target_vim][
844 param.replace("-", "_")
845 ] = vld_params[param]
846 if vld_params.get("common_id"):
847 target_vld["common_id"] = vld_params.get("common_id")
848
849 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
850 def update_ns_vld_target(target, ns_params):
851 for vnf_params in ns_params.get("vnf", ()):
852 if vnf_params.get("vimAccountId"):
853 target_vnf = next(
854 (
855 vnfr
856 for vnfr in db_vnfrs.values()
857 if vnf_params["member-vnf-index"]
858 == vnfr["member-vnf-index-ref"]
859 ),
860 None,
861 )
862 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
863 for a_index, a_vld in enumerate(target["ns"]["vld"]):
864 target_vld = find_in_list(
865 get_iterable(vdur, "interfaces"),
866 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
867 )
868 if target_vld:
869 if vnf_params.get("vimAccountId") not in a_vld.get(
870 "vim_info", {}
871 ):
872 target["ns"]["vld"][a_index].get("vim_info").update(
873 {
874 "vim:{}".format(vnf_params["vimAccountId"]): {
875 "vim_network_name": ""
876 }
877 }
878 )
879
880 nslcmop_id = db_nslcmop["_id"]
881 target = {
882 "name": db_nsr["name"],
883 "ns": {"vld": []},
884 "vnf": [],
885 "image": deepcopy(db_nsr["image"]),
886 "flavor": deepcopy(db_nsr["flavor"]),
887 "action_id": nslcmop_id,
888 "cloud_init_content": {},
889 }
890 for image in target["image"]:
891 image["vim_info"] = {}
892 for flavor in target["flavor"]:
893 flavor["vim_info"] = {}
894 if db_nsr.get("affinity-or-anti-affinity-group"):
895 target["affinity-or-anti-affinity-group"] = deepcopy(
896 db_nsr["affinity-or-anti-affinity-group"]
897 )
898 for affinity_or_anti_affinity_group in target[
899 "affinity-or-anti-affinity-group"
900 ]:
901 affinity_or_anti_affinity_group["vim_info"] = {}
902
903 if db_nslcmop.get("lcmOperationType") != "instantiate":
904 # get parameters of instantiation:
905 db_nslcmop_instantiate = self.db.get_list(
906 "nslcmops",
907 {
908 "nsInstanceId": db_nslcmop["nsInstanceId"],
909 "lcmOperationType": "instantiate",
910 },
911 )[-1]
912 ns_params = db_nslcmop_instantiate.get("operationParams")
913 else:
914 ns_params = db_nslcmop.get("operationParams")
915 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
916 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
917
918 cp2target = {}
919 for vld_index, vld in enumerate(db_nsr.get("vld")):
920 target_vim = "vim:{}".format(ns_params["vimAccountId"])
921 target_vld = {
922 "id": vld["id"],
923 "name": vld["name"],
924 "mgmt-network": vld.get("mgmt-network", False),
925 "type": vld.get("type"),
926 "vim_info": {
927 target_vim: {
928 "vim_network_name": vld.get("vim-network-name"),
929 "vim_account_id": ns_params["vimAccountId"],
930 }
931 },
932 }
933 # check if this network needs SDN assist
934 if vld.get("pci-interfaces"):
935 db_vim = get_vim_account(ns_params["vimAccountId"])
936 sdnc_id = db_vim["config"].get("sdn-controller")
937 if sdnc_id:
938 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
939 target_sdn = "sdn:{}".format(sdnc_id)
940 target_vld["vim_info"][target_sdn] = {
941 "sdn": True,
942 "target_vim": target_vim,
943 "vlds": [sdn_vld],
944 "type": vld.get("type"),
945 }
946
947 nsd_vnf_profiles = get_vnf_profiles(nsd)
948 for nsd_vnf_profile in nsd_vnf_profiles:
949 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
950 if cp["virtual-link-profile-id"] == vld["id"]:
951 cp2target[
952 "member_vnf:{}.{}".format(
953 cp["constituent-cpd-id"][0][
954 "constituent-base-element-id"
955 ],
956 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
957 )
958 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
959
960 # check at nsd descriptor, if there is an ip-profile
961 vld_params = {}
962 nsd_vlp = find_in_list(
963 get_virtual_link_profiles(nsd),
964 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
965 == vld["id"],
966 )
967 if (
968 nsd_vlp
969 and nsd_vlp.get("virtual-link-protocol-data")
970 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
971 ):
972 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
973 "l3-protocol-data"
974 ]
975 ip_profile_dest_data = {}
976 if "ip-version" in ip_profile_source_data:
977 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
978 "ip-version"
979 ]
980 if "cidr" in ip_profile_source_data:
981 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
982 "cidr"
983 ]
984 if "gateway-ip" in ip_profile_source_data:
985 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
986 "gateway-ip"
987 ]
988 if "dhcp-enabled" in ip_profile_source_data:
989 ip_profile_dest_data["dhcp-params"] = {
990 "enabled": ip_profile_source_data["dhcp-enabled"]
991 }
992 vld_params["ip-profile"] = ip_profile_dest_data
993
994 # update vld_params with instantiation params
995 vld_instantiation_params = find_in_list(
996 get_iterable(ns_params, "vld"),
997 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
998 )
999 if vld_instantiation_params:
1000 vld_params.update(vld_instantiation_params)
1001 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
1002 target["ns"]["vld"].append(target_vld)
1003 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1004 update_ns_vld_target(target, ns_params)
1005
1006 for vnfr in db_vnfrs.values():
1007 vnfd = find_in_list(
1008 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1009 )
1010 vnf_params = find_in_list(
1011 get_iterable(ns_params, "vnf"),
1012 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1013 )
1014 target_vnf = deepcopy(vnfr)
1015 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1016 for vld in target_vnf.get("vld", ()):
1017 # check if connected to a ns.vld, to fill target'
1018 vnf_cp = find_in_list(
1019 vnfd.get("int-virtual-link-desc", ()),
1020 lambda cpd: cpd.get("id") == vld["id"],
1021 )
1022 if vnf_cp:
1023 ns_cp = "member_vnf:{}.{}".format(
1024 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1025 )
1026 if cp2target.get(ns_cp):
1027 vld["target"] = cp2target[ns_cp]
1028
1029 vld["vim_info"] = {
1030 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1031 }
1032 # check if this network needs SDN assist
1033 target_sdn = None
1034 if vld.get("pci-interfaces"):
1035 db_vim = get_vim_account(vnfr["vim-account-id"])
1036 sdnc_id = db_vim["config"].get("sdn-controller")
1037 if sdnc_id:
1038 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1039 target_sdn = "sdn:{}".format(sdnc_id)
1040 vld["vim_info"][target_sdn] = {
1041 "sdn": True,
1042 "target_vim": target_vim,
1043 "vlds": [sdn_vld],
1044 "type": vld.get("type"),
1045 }
1046
1047 # check at vnfd descriptor, if there is an ip-profile
1048 vld_params = {}
1049 vnfd_vlp = find_in_list(
1050 get_virtual_link_profiles(vnfd),
1051 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1052 )
1053 if (
1054 vnfd_vlp
1055 and vnfd_vlp.get("virtual-link-protocol-data")
1056 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1057 ):
1058 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1059 "l3-protocol-data"
1060 ]
1061 ip_profile_dest_data = {}
1062 if "ip-version" in ip_profile_source_data:
1063 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1064 "ip-version"
1065 ]
1066 if "cidr" in ip_profile_source_data:
1067 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1068 "cidr"
1069 ]
1070 if "gateway-ip" in ip_profile_source_data:
1071 ip_profile_dest_data[
1072 "gateway-address"
1073 ] = ip_profile_source_data["gateway-ip"]
1074 if "dhcp-enabled" in ip_profile_source_data:
1075 ip_profile_dest_data["dhcp-params"] = {
1076 "enabled": ip_profile_source_data["dhcp-enabled"]
1077 }
1078
1079 vld_params["ip-profile"] = ip_profile_dest_data
1080 # update vld_params with instantiation params
1081 if vnf_params:
1082 vld_instantiation_params = find_in_list(
1083 get_iterable(vnf_params, "internal-vld"),
1084 lambda i_vld: i_vld["name"] == vld["id"],
1085 )
1086 if vld_instantiation_params:
1087 vld_params.update(vld_instantiation_params)
1088 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1089
1090 vdur_list = []
1091 for vdur in target_vnf.get("vdur", ()):
1092 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1093 continue # This vdu must not be created
1094 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1095
1096 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1097
1098 if ssh_keys_all:
1099 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1100 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1101 if (
1102 vdu_configuration
1103 and vdu_configuration.get("config-access")
1104 and vdu_configuration.get("config-access").get("ssh-access")
1105 ):
1106 vdur["ssh-keys"] = ssh_keys_all
1107 vdur["ssh-access-required"] = vdu_configuration[
1108 "config-access"
1109 ]["ssh-access"]["required"]
1110 elif (
1111 vnf_configuration
1112 and vnf_configuration.get("config-access")
1113 and vnf_configuration.get("config-access").get("ssh-access")
1114 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1115 ):
1116 vdur["ssh-keys"] = ssh_keys_all
1117 vdur["ssh-access-required"] = vnf_configuration[
1118 "config-access"
1119 ]["ssh-access"]["required"]
1120 elif ssh_keys_instantiation and find_in_list(
1121 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1122 ):
1123 vdur["ssh-keys"] = ssh_keys_instantiation
1124
1125 self.logger.debug("NS > vdur > {}".format(vdur))
1126
1127 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1128 # cloud-init
1129 if vdud.get("cloud-init-file"):
1130 vdur["cloud-init"] = "{}:file:{}".format(
1131 vnfd["_id"], vdud.get("cloud-init-file")
1132 )
1133 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1134 if vdur["cloud-init"] not in target["cloud_init_content"]:
1135 base_folder = vnfd["_admin"]["storage"]
1136 if base_folder["pkg-dir"]:
1137 cloud_init_file = "{}/{}/cloud_init/{}".format(
1138 base_folder["folder"],
1139 base_folder["pkg-dir"],
1140 vdud.get("cloud-init-file"),
1141 )
1142 else:
1143 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1144 base_folder["folder"],
1145 vdud.get("cloud-init-file"),
1146 )
1147 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1148 target["cloud_init_content"][
1149 vdur["cloud-init"]
1150 ] = ci_file.read()
1151 elif vdud.get("cloud-init"):
1152 vdur["cloud-init"] = "{}:vdu:{}".format(
1153 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1154 )
1155 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1156 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1157 "cloud-init"
1158 ]
1159 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1160 deploy_params_vdu = self._format_additional_params(
1161 vdur.get("additionalParams") or {}
1162 )
1163 deploy_params_vdu["OSM"] = get_osm_params(
1164 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1165 )
1166 vdur["additionalParams"] = deploy_params_vdu
1167
1168 # flavor
1169 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1170 if target_vim not in ns_flavor["vim_info"]:
1171 ns_flavor["vim_info"][target_vim] = {}
1172
1173 # deal with images
1174 # in case alternative images are provided we must check if they should be applied
1175 # for the vim_type, modify the vim_type taking into account
1176 ns_image_id = int(vdur["ns-image-id"])
1177 if vdur.get("alt-image-ids"):
1178 db_vim = get_vim_account(vnfr["vim-account-id"])
1179 vim_type = db_vim["vim_type"]
1180 for alt_image_id in vdur.get("alt-image-ids"):
1181 ns_alt_image = target["image"][int(alt_image_id)]
1182 if vim_type == ns_alt_image.get("vim-type"):
1183 # must use alternative image
1184 self.logger.debug(
1185 "use alternative image id: {}".format(alt_image_id)
1186 )
1187 ns_image_id = alt_image_id
1188 vdur["ns-image-id"] = ns_image_id
1189 break
1190 ns_image = target["image"][int(ns_image_id)]
1191 if target_vim not in ns_image["vim_info"]:
1192 ns_image["vim_info"][target_vim] = {}
1193
1194 # Affinity groups
1195 if vdur.get("affinity-or-anti-affinity-group-id"):
1196 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1197 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1198 if target_vim not in ns_ags["vim_info"]:
1199 ns_ags["vim_info"][target_vim] = {}
1200
1201 vdur["vim_info"] = {target_vim: {}}
1202 # instantiation parameters
1203 # if vnf_params:
1204 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1205 # vdud["id"]), None)
1206 vdur_list.append(vdur)
1207 target_vnf["vdur"] = vdur_list
1208 target["vnf"].append(target_vnf)
1209
1210 desc = await self.RO.deploy(nsr_id, target)
1211 self.logger.debug("RO return > {}".format(desc))
1212 action_id = desc["action_id"]
1213 await self._wait_ng_ro(
1214 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1215 )
1216
1217 # Updating NSR
1218 db_nsr_update = {
1219 "_admin.deployed.RO.operational-status": "running",
1220 "detailed-status": " ".join(stage),
1221 }
1222 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1223 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1224 self._write_op_status(nslcmop_id, stage)
1225 self.logger.debug(
1226 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1227 )
1228 return
1229
1230 async def _wait_ng_ro(
1231 self,
1232 nsr_id,
1233 action_id,
1234 nslcmop_id=None,
1235 start_time=None,
1236 timeout=600,
1237 stage=None,
1238 ):
1239 detailed_status_old = None
1240 db_nsr_update = {}
1241 start_time = start_time or time()
1242 while time() <= start_time + timeout:
1243 desc_status = await self.RO.status(nsr_id, action_id)
1244 self.logger.debug("Wait NG RO > {}".format(desc_status))
1245 if desc_status["status"] == "FAILED":
1246 raise NgRoException(desc_status["details"])
1247 elif desc_status["status"] == "BUILD":
1248 if stage:
1249 stage[2] = "VIM: ({})".format(desc_status["details"])
1250 elif desc_status["status"] == "DONE":
1251 if stage:
1252 stage[2] = "Deployed at VIM"
1253 break
1254 else:
1255 assert False, "ROclient.check_ns_status returns unknown {}".format(
1256 desc_status["status"]
1257 )
1258 if stage and nslcmop_id and stage[2] != detailed_status_old:
1259 detailed_status_old = stage[2]
1260 db_nsr_update["detailed-status"] = " ".join(stage)
1261 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1262 self._write_op_status(nslcmop_id, stage)
1263 await asyncio.sleep(15, loop=self.loop)
1264 else: # timeout_ns_deploy
1265 raise NgRoException("Timeout waiting ns to deploy")
1266
1267 async def _terminate_ng_ro(
1268 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1269 ):
1270 db_nsr_update = {}
1271 failed_detail = []
1272 action_id = None
1273 start_deploy = time()
1274 try:
1275 target = {
1276 "ns": {"vld": []},
1277 "vnf": [],
1278 "image": [],
1279 "flavor": [],
1280 "action_id": nslcmop_id,
1281 }
1282 desc = await self.RO.deploy(nsr_id, target)
1283 action_id = desc["action_id"]
1284 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1285 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1286 self.logger.debug(
1287 logging_text
1288 + "ns terminate action at RO. action_id={}".format(action_id)
1289 )
1290
1291 # wait until done
1292 delete_timeout = 20 * 60 # 20 minutes
1293 await self._wait_ng_ro(
1294 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1295 )
1296
1297 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1298 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1299 # delete all nsr
1300 await self.RO.delete(nsr_id)
1301 except Exception as e:
1302 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1303 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1304 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1305 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1306 self.logger.debug(
1307 logging_text + "RO_action_id={} already deleted".format(action_id)
1308 )
1309 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1310 failed_detail.append("delete conflict: {}".format(e))
1311 self.logger.debug(
1312 logging_text
1313 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1314 )
1315 else:
1316 failed_detail.append("delete error: {}".format(e))
1317 self.logger.error(
1318 logging_text
1319 + "RO_action_id={} delete error: {}".format(action_id, e)
1320 )
1321
1322 if failed_detail:
1323 stage[2] = "Error deleting from VIM"
1324 else:
1325 stage[2] = "Deleted from VIM"
1326 db_nsr_update["detailed-status"] = " ".join(stage)
1327 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1328 self._write_op_status(nslcmop_id, stage)
1329
1330 if failed_detail:
1331 raise LcmException("; ".join(failed_detail))
1332 return
1333
1334 async def instantiate_RO(
1335 self,
1336 logging_text,
1337 nsr_id,
1338 nsd,
1339 db_nsr,
1340 db_nslcmop,
1341 db_vnfrs,
1342 db_vnfds,
1343 n2vc_key_list,
1344 stage,
1345 ):
1346 """
1347 Instantiate at RO
1348 :param logging_text: preffix text to use at logging
1349 :param nsr_id: nsr identity
1350 :param nsd: database content of ns descriptor
1351 :param db_nsr: database content of ns record
1352 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1353 :param db_vnfrs:
1354 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1355 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1356 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1357 :return: None or exception
1358 """
1359 try:
1360 start_deploy = time()
1361 ns_params = db_nslcmop.get("operationParams")
1362 if ns_params and ns_params.get("timeout_ns_deploy"):
1363 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1364 else:
1365 timeout_ns_deploy = self.timeout.get(
1366 "ns_deploy", self.timeout_ns_deploy
1367 )
1368
1369 # Check for and optionally request placement optimization. Database will be updated if placement activated
1370 stage[2] = "Waiting for Placement."
1371 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1372 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1373 for vnfr in db_vnfrs.values():
1374 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1375 break
1376 else:
1377 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1378
1379 return await self._instantiate_ng_ro(
1380 logging_text,
1381 nsr_id,
1382 nsd,
1383 db_nsr,
1384 db_nslcmop,
1385 db_vnfrs,
1386 db_vnfds,
1387 n2vc_key_list,
1388 stage,
1389 start_deploy,
1390 timeout_ns_deploy,
1391 )
1392 except Exception as e:
1393 stage[2] = "ERROR deploying at VIM"
1394 self.set_vnfr_at_error(db_vnfrs, str(e))
1395 self.logger.error(
1396 "Error deploying at VIM {}".format(e),
1397 exc_info=not isinstance(
1398 e,
1399 (
1400 ROclient.ROClientException,
1401 LcmException,
1402 DbException,
1403 NgRoException,
1404 ),
1405 ),
1406 )
1407 raise
1408
1409 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1410 """
1411 Wait for kdu to be up, get ip address
1412 :param logging_text: prefix use for logging
1413 :param nsr_id:
1414 :param vnfr_id:
1415 :param kdu_name:
1416 :return: IP address
1417 """
1418
1419 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1420 nb_tries = 0
1421
1422 while nb_tries < 360:
1423 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1424 kdur = next(
1425 (
1426 x
1427 for x in get_iterable(db_vnfr, "kdur")
1428 if x.get("kdu-name") == kdu_name
1429 ),
1430 None,
1431 )
1432 if not kdur:
1433 raise LcmException(
1434 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1435 )
1436 if kdur.get("status"):
1437 if kdur["status"] in ("READY", "ENABLED"):
1438 return kdur.get("ip-address")
1439 else:
1440 raise LcmException(
1441 "target KDU={} is in error state".format(kdu_name)
1442 )
1443
1444 await asyncio.sleep(10, loop=self.loop)
1445 nb_tries += 1
1446 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1447
1448 async def wait_vm_up_insert_key_ro(
1449 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1450 ):
1451 """
1452 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1453 :param logging_text: prefix use for logging
1454 :param nsr_id:
1455 :param vnfr_id:
1456 :param vdu_id:
1457 :param vdu_index:
1458 :param pub_key: public ssh key to inject, None to skip
1459 :param user: user to apply the public ssh key
1460 :return: IP address
1461 """
1462
1463 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1464 ro_nsr_id = None
1465 ip_address = None
1466 nb_tries = 0
1467 target_vdu_id = None
1468 ro_retries = 0
1469
1470 while True:
1471
1472 ro_retries += 1
1473 if ro_retries >= 360: # 1 hour
1474 raise LcmException(
1475 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1476 )
1477
1478 await asyncio.sleep(10, loop=self.loop)
1479
1480 # get ip address
1481 if not target_vdu_id:
1482 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1483
1484 if not vdu_id: # for the VNF case
1485 if db_vnfr.get("status") == "ERROR":
1486 raise LcmException(
1487 "Cannot inject ssh-key because target VNF is in error state"
1488 )
1489 ip_address = db_vnfr.get("ip-address")
1490 if not ip_address:
1491 continue
1492 vdur = next(
1493 (
1494 x
1495 for x in get_iterable(db_vnfr, "vdur")
1496 if x.get("ip-address") == ip_address
1497 ),
1498 None,
1499 )
1500 else: # VDU case
1501 vdur = next(
1502 (
1503 x
1504 for x in get_iterable(db_vnfr, "vdur")
1505 if x.get("vdu-id-ref") == vdu_id
1506 and x.get("count-index") == vdu_index
1507 ),
1508 None,
1509 )
1510
1511 if (
1512 not vdur and len(db_vnfr.get("vdur", ())) == 1
1513 ): # If only one, this should be the target vdu
1514 vdur = db_vnfr["vdur"][0]
1515 if not vdur:
1516 raise LcmException(
1517 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1518 vnfr_id, vdu_id, vdu_index
1519 )
1520 )
1521 # New generation RO stores information at "vim_info"
1522 ng_ro_status = None
1523 target_vim = None
1524 if vdur.get("vim_info"):
1525 target_vim = next(
1526 t for t in vdur["vim_info"]
1527 ) # there should be only one key
1528 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1529 if (
1530 vdur.get("pdu-type")
1531 or vdur.get("status") == "ACTIVE"
1532 or ng_ro_status == "ACTIVE"
1533 ):
1534 ip_address = vdur.get("ip-address")
1535 if not ip_address:
1536 continue
1537 target_vdu_id = vdur["vdu-id-ref"]
1538 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1539 raise LcmException(
1540 "Cannot inject ssh-key because target VM is in error state"
1541 )
1542
1543 if not target_vdu_id:
1544 continue
1545
1546 # inject public key into machine
1547 if pub_key and user:
1548 self.logger.debug(logging_text + "Inserting RO key")
1549 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1550 if vdur.get("pdu-type"):
1551 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1552 return ip_address
1553 try:
1554 ro_vm_id = "{}-{}".format(
1555 db_vnfr["member-vnf-index-ref"], target_vdu_id
1556 ) # TODO add vdu_index
1557 if self.ng_ro:
1558 target = {
1559 "action": {
1560 "action": "inject_ssh_key",
1561 "key": pub_key,
1562 "user": user,
1563 },
1564 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1565 }
1566 desc = await self.RO.deploy(nsr_id, target)
1567 action_id = desc["action_id"]
1568 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1569 break
1570 else:
1571 # wait until NS is deployed at RO
1572 if not ro_nsr_id:
1573 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1574 ro_nsr_id = deep_get(
1575 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1576 )
1577 if not ro_nsr_id:
1578 continue
1579 result_dict = await self.RO.create_action(
1580 item="ns",
1581 item_id_name=ro_nsr_id,
1582 descriptor={
1583 "add_public_key": pub_key,
1584 "vms": [ro_vm_id],
1585 "user": user,
1586 },
1587 )
1588 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1589 if not result_dict or not isinstance(result_dict, dict):
1590 raise LcmException(
1591 "Unknown response from RO when injecting key"
1592 )
1593 for result in result_dict.values():
1594 if result.get("vim_result") == 200:
1595 break
1596 else:
1597 raise ROclient.ROClientException(
1598 "error injecting key: {}".format(
1599 result.get("description")
1600 )
1601 )
1602 break
1603 except NgRoException as e:
1604 raise LcmException(
1605 "Reaching max tries injecting key. Error: {}".format(e)
1606 )
1607 except ROclient.ROClientException as e:
1608 if not nb_tries:
1609 self.logger.debug(
1610 logging_text
1611 + "error injecting key: {}. Retrying until {} seconds".format(
1612 e, 20 * 10
1613 )
1614 )
1615 nb_tries += 1
1616 if nb_tries >= 20:
1617 raise LcmException(
1618 "Reaching max tries injecting key. Error: {}".format(e)
1619 )
1620 else:
1621 break
1622
1623 return ip_address
1624
1625 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1626 """
1627 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1628 """
1629 my_vca = vca_deployed_list[vca_index]
1630 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1631 # vdu or kdu: no dependencies
1632 return
1633 timeout = 300
1634 while timeout >= 0:
1635 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1636 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1637 configuration_status_list = db_nsr["configurationStatus"]
1638 for index, vca_deployed in enumerate(configuration_status_list):
1639 if index == vca_index:
1640 # myself
1641 continue
1642 if not my_vca.get("member-vnf-index") or (
1643 vca_deployed.get("member-vnf-index")
1644 == my_vca.get("member-vnf-index")
1645 ):
1646 internal_status = configuration_status_list[index].get("status")
1647 if internal_status == "READY":
1648 continue
1649 elif internal_status == "BROKEN":
1650 raise LcmException(
1651 "Configuration aborted because dependent charm/s has failed"
1652 )
1653 else:
1654 break
1655 else:
1656 # no dependencies, return
1657 return
1658 await asyncio.sleep(10)
1659 timeout -= 1
1660
1661 raise LcmException("Configuration aborted because dependent charm/s timeout")
1662
1663 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1664 vca_id = None
1665 if db_vnfr:
1666 vca_id = deep_get(db_vnfr, ("vca-id",))
1667 elif db_nsr:
1668 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1669 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1670 return vca_id
1671
1672 async def instantiate_N2VC(
1673 self,
1674 logging_text,
1675 vca_index,
1676 nsi_id,
1677 db_nsr,
1678 db_vnfr,
1679 vdu_id,
1680 kdu_name,
1681 vdu_index,
1682 config_descriptor,
1683 deploy_params,
1684 base_folder,
1685 nslcmop_id,
1686 stage,
1687 vca_type,
1688 vca_name,
1689 ee_config_descriptor,
1690 ):
1691 nsr_id = db_nsr["_id"]
1692 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1693 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1694 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1695 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1696 db_dict = {
1697 "collection": "nsrs",
1698 "filter": {"_id": nsr_id},
1699 "path": db_update_entry,
1700 }
1701 step = ""
1702 try:
1703
1704 element_type = "NS"
1705 element_under_configuration = nsr_id
1706
1707 vnfr_id = None
1708 if db_vnfr:
1709 vnfr_id = db_vnfr["_id"]
1710 osm_config["osm"]["vnf_id"] = vnfr_id
1711
1712 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1713
1714 if vca_type == "native_charm":
1715 index_number = 0
1716 else:
1717 index_number = vdu_index or 0
1718
1719 if vnfr_id:
1720 element_type = "VNF"
1721 element_under_configuration = vnfr_id
1722 namespace += ".{}-{}".format(vnfr_id, index_number)
1723 if vdu_id:
1724 namespace += ".{}-{}".format(vdu_id, index_number)
1725 element_type = "VDU"
1726 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1727 osm_config["osm"]["vdu_id"] = vdu_id
1728 elif kdu_name:
1729 namespace += ".{}".format(kdu_name)
1730 element_type = "KDU"
1731 element_under_configuration = kdu_name
1732 osm_config["osm"]["kdu_name"] = kdu_name
1733
1734 # Get artifact path
1735 if base_folder["pkg-dir"]:
1736 artifact_path = "{}/{}/{}/{}".format(
1737 base_folder["folder"],
1738 base_folder["pkg-dir"],
1739 "charms"
1740 if vca_type
1741 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1742 else "helm-charts",
1743 vca_name,
1744 )
1745 else:
1746 artifact_path = "{}/Scripts/{}/{}/".format(
1747 base_folder["folder"],
1748 "charms"
1749 if vca_type
1750 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1751 else "helm-charts",
1752 vca_name,
1753 )
1754
1755 self.logger.debug("Artifact path > {}".format(artifact_path))
1756
1757 # get initial_config_primitive_list that applies to this element
1758 initial_config_primitive_list = config_descriptor.get(
1759 "initial-config-primitive"
1760 )
1761
1762 self.logger.debug(
1763 "Initial config primitive list > {}".format(
1764 initial_config_primitive_list
1765 )
1766 )
1767
1768 # add config if not present for NS charm
1769 ee_descriptor_id = ee_config_descriptor.get("id")
1770 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1771 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1772 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1773 )
1774
1775 self.logger.debug(
1776 "Initial config primitive list #2 > {}".format(
1777 initial_config_primitive_list
1778 )
1779 )
1780 # n2vc_redesign STEP 3.1
1781 # find old ee_id if exists
1782 ee_id = vca_deployed.get("ee_id")
1783
1784 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1785 # create or register execution environment in VCA
1786 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1787
1788 self._write_configuration_status(
1789 nsr_id=nsr_id,
1790 vca_index=vca_index,
1791 status="CREATING",
1792 element_under_configuration=element_under_configuration,
1793 element_type=element_type,
1794 )
1795
1796 step = "create execution environment"
1797 self.logger.debug(logging_text + step)
1798
1799 ee_id = None
1800 credentials = None
1801 if vca_type == "k8s_proxy_charm":
1802 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1803 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1804 namespace=namespace,
1805 artifact_path=artifact_path,
1806 db_dict=db_dict,
1807 vca_id=vca_id,
1808 )
1809 elif vca_type == "helm" or vca_type == "helm-v3":
1810 ee_id, credentials = await self.vca_map[
1811 vca_type
1812 ].create_execution_environment(
1813 namespace=namespace,
1814 reuse_ee_id=ee_id,
1815 db_dict=db_dict,
1816 config=osm_config,
1817 artifact_path=artifact_path,
1818 vca_type=vca_type,
1819 )
1820 else:
1821 ee_id, credentials = await self.vca_map[
1822 vca_type
1823 ].create_execution_environment(
1824 namespace=namespace,
1825 reuse_ee_id=ee_id,
1826 db_dict=db_dict,
1827 vca_id=vca_id,
1828 )
1829
1830 elif vca_type == "native_charm":
1831 step = "Waiting to VM being up and getting IP address"
1832 self.logger.debug(logging_text + step)
1833 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1834 logging_text,
1835 nsr_id,
1836 vnfr_id,
1837 vdu_id,
1838 vdu_index,
1839 user=None,
1840 pub_key=None,
1841 )
1842 credentials = {"hostname": rw_mgmt_ip}
1843 # get username
1844 username = deep_get(
1845 config_descriptor, ("config-access", "ssh-access", "default-user")
1846 )
1847 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1848 # merged. Meanwhile let's get username from initial-config-primitive
1849 if not username and initial_config_primitive_list:
1850 for config_primitive in initial_config_primitive_list:
1851 for param in config_primitive.get("parameter", ()):
1852 if param["name"] == "ssh-username":
1853 username = param["value"]
1854 break
1855 if not username:
1856 raise LcmException(
1857 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1858 "'config-access.ssh-access.default-user'"
1859 )
1860 credentials["username"] = username
1861 # n2vc_redesign STEP 3.2
1862
1863 self._write_configuration_status(
1864 nsr_id=nsr_id,
1865 vca_index=vca_index,
1866 status="REGISTERING",
1867 element_under_configuration=element_under_configuration,
1868 element_type=element_type,
1869 )
1870
1871 step = "register execution environment {}".format(credentials)
1872 self.logger.debug(logging_text + step)
1873 ee_id = await self.vca_map[vca_type].register_execution_environment(
1874 credentials=credentials,
1875 namespace=namespace,
1876 db_dict=db_dict,
1877 vca_id=vca_id,
1878 )
1879
1880 # for compatibility with MON/POL modules, the need model and application name at database
1881 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1882 ee_id_parts = ee_id.split(".")
1883 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1884 if len(ee_id_parts) >= 2:
1885 model_name = ee_id_parts[0]
1886 application_name = ee_id_parts[1]
1887 db_nsr_update[db_update_entry + "model"] = model_name
1888 db_nsr_update[db_update_entry + "application"] = application_name
1889
1890 # n2vc_redesign STEP 3.3
1891 step = "Install configuration Software"
1892
1893 self._write_configuration_status(
1894 nsr_id=nsr_id,
1895 vca_index=vca_index,
1896 status="INSTALLING SW",
1897 element_under_configuration=element_under_configuration,
1898 element_type=element_type,
1899 other_update=db_nsr_update,
1900 )
1901
1902 # TODO check if already done
1903 self.logger.debug(logging_text + step)
1904 config = None
1905 if vca_type == "native_charm":
1906 config_primitive = next(
1907 (p for p in initial_config_primitive_list if p["name"] == "config"),
1908 None,
1909 )
1910 if config_primitive:
1911 config = self._map_primitive_params(
1912 config_primitive, {}, deploy_params
1913 )
1914 num_units = 1
1915 if vca_type == "lxc_proxy_charm":
1916 if element_type == "NS":
1917 num_units = db_nsr.get("config-units") or 1
1918 elif element_type == "VNF":
1919 num_units = db_vnfr.get("config-units") or 1
1920 elif element_type == "VDU":
1921 for v in db_vnfr["vdur"]:
1922 if vdu_id == v["vdu-id-ref"]:
1923 num_units = v.get("config-units") or 1
1924 break
1925 if vca_type != "k8s_proxy_charm":
1926 await self.vca_map[vca_type].install_configuration_sw(
1927 ee_id=ee_id,
1928 artifact_path=artifact_path,
1929 db_dict=db_dict,
1930 config=config,
1931 num_units=num_units,
1932 vca_id=vca_id,
1933 vca_type=vca_type,
1934 )
1935
1936 # write in db flag of configuration_sw already installed
1937 self.update_db_2(
1938 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1939 )
1940
1941 # add relations for this VCA (wait for other peers related with this VCA)
1942 await self._add_vca_relations(
1943 logging_text=logging_text,
1944 nsr_id=nsr_id,
1945 vca_type=vca_type,
1946 vca_index=vca_index,
1947 )
1948
1949 # if SSH access is required, then get execution environment SSH public
1950 # if native charm we have waited already to VM be UP
1951 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1952 pub_key = None
1953 user = None
1954 # self.logger.debug("get ssh key block")
1955 if deep_get(
1956 config_descriptor, ("config-access", "ssh-access", "required")
1957 ):
1958 # self.logger.debug("ssh key needed")
1959 # Needed to inject a ssh key
1960 user = deep_get(
1961 config_descriptor,
1962 ("config-access", "ssh-access", "default-user"),
1963 )
1964 step = "Install configuration Software, getting public ssh key"
1965 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1966 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1967 )
1968
1969 step = "Insert public key into VM user={} ssh_key={}".format(
1970 user, pub_key
1971 )
1972 else:
1973 # self.logger.debug("no need to get ssh key")
1974 step = "Waiting to VM being up and getting IP address"
1975 self.logger.debug(logging_text + step)
1976
1977 # n2vc_redesign STEP 5.1
1978 # wait for RO (ip-address) Insert pub_key into VM
1979 if vnfr_id:
1980 if kdu_name:
1981 rw_mgmt_ip = await self.wait_kdu_up(
1982 logging_text, nsr_id, vnfr_id, kdu_name
1983 )
1984 else:
1985 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1986 logging_text,
1987 nsr_id,
1988 vnfr_id,
1989 vdu_id,
1990 vdu_index,
1991 user=user,
1992 pub_key=pub_key,
1993 )
1994 else:
1995 rw_mgmt_ip = None # This is for a NS configuration
1996
1997 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1998
1999 # store rw_mgmt_ip in deploy params for later replacement
2000 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
2001
2002 # n2vc_redesign STEP 6 Execute initial config primitive
2003 step = "execute initial config primitive"
2004
2005 # wait for dependent primitives execution (NS -> VNF -> VDU)
2006 if initial_config_primitive_list:
2007 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
2008
2009 # stage, in function of element type: vdu, kdu, vnf or ns
2010 my_vca = vca_deployed_list[vca_index]
2011 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
2012 # VDU or KDU
2013 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2014 elif my_vca.get("member-vnf-index"):
2015 # VNF
2016 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2017 else:
2018 # NS
2019 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2020
2021 self._write_configuration_status(
2022 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2023 )
2024
2025 self._write_op_status(op_id=nslcmop_id, stage=stage)
2026
2027 check_if_terminated_needed = True
2028 for initial_config_primitive in initial_config_primitive_list:
2029 # adding information on the vca_deployed if it is a NS execution environment
2030 if not vca_deployed["member-vnf-index"]:
2031 deploy_params["ns_config_info"] = json.dumps(
2032 self._get_ns_config_info(nsr_id)
2033 )
2034 # TODO check if already done
2035 primitive_params_ = self._map_primitive_params(
2036 initial_config_primitive, {}, deploy_params
2037 )
2038
2039 step = "execute primitive '{}' params '{}'".format(
2040 initial_config_primitive["name"], primitive_params_
2041 )
2042 self.logger.debug(logging_text + step)
2043 await self.vca_map[vca_type].exec_primitive(
2044 ee_id=ee_id,
2045 primitive_name=initial_config_primitive["name"],
2046 params_dict=primitive_params_,
2047 db_dict=db_dict,
2048 vca_id=vca_id,
2049 vca_type=vca_type,
2050 )
2051 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2052 if check_if_terminated_needed:
2053 if config_descriptor.get("terminate-config-primitive"):
2054 self.update_db_2(
2055 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2056 )
2057 check_if_terminated_needed = False
2058
2059 # TODO register in database that primitive is done
2060
2061 # STEP 7 Configure metrics
2062 if vca_type == "helm" or vca_type == "helm-v3":
2063 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2064 ee_id=ee_id,
2065 artifact_path=artifact_path,
2066 ee_config_descriptor=ee_config_descriptor,
2067 vnfr_id=vnfr_id,
2068 nsr_id=nsr_id,
2069 target_ip=rw_mgmt_ip,
2070 )
2071 if prometheus_jobs:
2072 self.update_db_2(
2073 "nsrs",
2074 nsr_id,
2075 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2076 )
2077
2078 for job in prometheus_jobs:
2079 self.db.set_one(
2080 "prometheus_jobs",
2081 {"job_name": job["job_name"]},
2082 job,
2083 upsert=True,
2084 fail_on_empty=False,
2085 )
2086
2087 step = "instantiated at VCA"
2088 self.logger.debug(logging_text + step)
2089
2090 self._write_configuration_status(
2091 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2092 )
2093
2094 except Exception as e: # TODO not use Exception but N2VC exception
2095 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2096 if not isinstance(
2097 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2098 ):
2099 self.logger.error(
2100 "Exception while {} : {}".format(step, e), exc_info=True
2101 )
2102 self._write_configuration_status(
2103 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2104 )
2105 raise LcmException("{} {}".format(step, e)) from e
2106
2107 def _write_ns_status(
2108 self,
2109 nsr_id: str,
2110 ns_state: str,
2111 current_operation: str,
2112 current_operation_id: str,
2113 error_description: str = None,
2114 error_detail: str = None,
2115 other_update: dict = None,
2116 ):
2117 """
2118 Update db_nsr fields.
2119 :param nsr_id:
2120 :param ns_state:
2121 :param current_operation:
2122 :param current_operation_id:
2123 :param error_description:
2124 :param error_detail:
2125 :param other_update: Other required changes at database if provided, will be cleared
2126 :return:
2127 """
2128 try:
2129 db_dict = other_update or {}
2130 db_dict[
2131 "_admin.nslcmop"
2132 ] = current_operation_id # for backward compatibility
2133 db_dict["_admin.current-operation"] = current_operation_id
2134 db_dict["_admin.operation-type"] = (
2135 current_operation if current_operation != "IDLE" else None
2136 )
2137 db_dict["currentOperation"] = current_operation
2138 db_dict["currentOperationID"] = current_operation_id
2139 db_dict["errorDescription"] = error_description
2140 db_dict["errorDetail"] = error_detail
2141
2142 if ns_state:
2143 db_dict["nsState"] = ns_state
2144 self.update_db_2("nsrs", nsr_id, db_dict)
2145 except DbException as e:
2146 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2147
2148 def _write_op_status(
2149 self,
2150 op_id: str,
2151 stage: list = None,
2152 error_message: str = None,
2153 queuePosition: int = 0,
2154 operation_state: str = None,
2155 other_update: dict = None,
2156 ):
2157 try:
2158 db_dict = other_update or {}
2159 db_dict["queuePosition"] = queuePosition
2160 if isinstance(stage, list):
2161 db_dict["stage"] = stage[0]
2162 db_dict["detailed-status"] = " ".join(stage)
2163 elif stage is not None:
2164 db_dict["stage"] = str(stage)
2165
2166 if error_message is not None:
2167 db_dict["errorMessage"] = error_message
2168 if operation_state is not None:
2169 db_dict["operationState"] = operation_state
2170 db_dict["statusEnteredTime"] = time()
2171 self.update_db_2("nslcmops", op_id, db_dict)
2172 except DbException as e:
2173 self.logger.warn(
2174 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2175 )
2176
2177 def _write_all_config_status(self, db_nsr: dict, status: str):
2178 try:
2179 nsr_id = db_nsr["_id"]
2180 # configurationStatus
2181 config_status = db_nsr.get("configurationStatus")
2182 if config_status:
2183 db_nsr_update = {
2184 "configurationStatus.{}.status".format(index): status
2185 for index, v in enumerate(config_status)
2186 if v
2187 }
2188 # update status
2189 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2190
2191 except DbException as e:
2192 self.logger.warn(
2193 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2194 )
2195
2196 def _write_configuration_status(
2197 self,
2198 nsr_id: str,
2199 vca_index: int,
2200 status: str = None,
2201 element_under_configuration: str = None,
2202 element_type: str = None,
2203 other_update: dict = None,
2204 ):
2205
2206 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2207 # .format(vca_index, status))
2208
2209 try:
2210 db_path = "configurationStatus.{}.".format(vca_index)
2211 db_dict = other_update or {}
2212 if status:
2213 db_dict[db_path + "status"] = status
2214 if element_under_configuration:
2215 db_dict[
2216 db_path + "elementUnderConfiguration"
2217 ] = element_under_configuration
2218 if element_type:
2219 db_dict[db_path + "elementType"] = element_type
2220 self.update_db_2("nsrs", nsr_id, db_dict)
2221 except DbException as e:
2222 self.logger.warn(
2223 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2224 status, nsr_id, vca_index, e
2225 )
2226 )
2227
2228 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2229 """
2230 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2231 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2232 Database is used because the result can be obtained from a different LCM worker in case of HA.
2233 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2234 :param db_nslcmop: database content of nslcmop
2235 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2236 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2237 computed 'vim-account-id'
2238 """
2239 modified = False
2240 nslcmop_id = db_nslcmop["_id"]
2241 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2242 if placement_engine == "PLA":
2243 self.logger.debug(
2244 logging_text + "Invoke and wait for placement optimization"
2245 )
2246 await self.msg.aiowrite(
2247 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2248 )
2249 db_poll_interval = 5
2250 wait = db_poll_interval * 10
2251 pla_result = None
2252 while not pla_result and wait >= 0:
2253 await asyncio.sleep(db_poll_interval)
2254 wait -= db_poll_interval
2255 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2256 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2257
2258 if not pla_result:
2259 raise LcmException(
2260 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2261 )
2262
2263 for pla_vnf in pla_result["vnf"]:
2264 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2265 if not pla_vnf.get("vimAccountId") or not vnfr:
2266 continue
2267 modified = True
2268 self.db.set_one(
2269 "vnfrs",
2270 {"_id": vnfr["_id"]},
2271 {"vim-account-id": pla_vnf["vimAccountId"]},
2272 )
2273 # Modifies db_vnfrs
2274 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2275 return modified
2276
2277 def update_nsrs_with_pla_result(self, params):
2278 try:
2279 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2280 self.update_db_2(
2281 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2282 )
2283 except Exception as e:
2284 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2285
2286 async def instantiate(self, nsr_id, nslcmop_id):
2287 """
2288
2289 :param nsr_id: ns instance to deploy
2290 :param nslcmop_id: operation to run
2291 :return:
2292 """
2293
2294 # Try to lock HA task here
2295 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2296 if not task_is_locked_by_me:
2297 self.logger.debug(
2298 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2299 )
2300 return
2301
2302 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2303 self.logger.debug(logging_text + "Enter")
2304
2305 # get all needed from database
2306
2307 # database nsrs record
2308 db_nsr = None
2309
2310 # database nslcmops record
2311 db_nslcmop = None
2312
2313 # update operation on nsrs
2314 db_nsr_update = {}
2315 # update operation on nslcmops
2316 db_nslcmop_update = {}
2317
2318 nslcmop_operation_state = None
2319 db_vnfrs = {} # vnf's info indexed by member-index
2320 # n2vc_info = {}
2321 tasks_dict_info = {} # from task to info text
2322 exc = None
2323 error_list = []
2324 stage = [
2325 "Stage 1/5: preparation of the environment.",
2326 "Waiting for previous operations to terminate.",
2327 "",
2328 ]
2329 # ^ stage, step, VIM progress
2330 try:
2331 # wait for any previous tasks in process
2332 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2333
2334 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2335 stage[1] = "Reading from database."
2336 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2337 db_nsr_update["detailed-status"] = "creating"
2338 db_nsr_update["operational-status"] = "init"
2339 self._write_ns_status(
2340 nsr_id=nsr_id,
2341 ns_state="BUILDING",
2342 current_operation="INSTANTIATING",
2343 current_operation_id=nslcmop_id,
2344 other_update=db_nsr_update,
2345 )
2346 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2347
2348 # read from db: operation
2349 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2350 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2351 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2352 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2353 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2354 )
2355 ns_params = db_nslcmop.get("operationParams")
2356 if ns_params and ns_params.get("timeout_ns_deploy"):
2357 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2358 else:
2359 timeout_ns_deploy = self.timeout.get(
2360 "ns_deploy", self.timeout_ns_deploy
2361 )
2362
2363 # read from db: ns
2364 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2365 self.logger.debug(logging_text + stage[1])
2366 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2367 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2368 self.logger.debug(logging_text + stage[1])
2369 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2370 self.fs.sync(db_nsr["nsd-id"])
2371 db_nsr["nsd"] = nsd
2372 # nsr_name = db_nsr["name"] # TODO short-name??
2373
2374 # read from db: vnf's of this ns
2375 stage[1] = "Getting vnfrs from db."
2376 self.logger.debug(logging_text + stage[1])
2377 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2378
2379 # read from db: vnfd's for every vnf
2380 db_vnfds = [] # every vnfd data
2381
2382 # for each vnf in ns, read vnfd
2383 for vnfr in db_vnfrs_list:
2384 if vnfr.get("kdur"):
2385 kdur_list = []
2386 for kdur in vnfr["kdur"]:
2387 if kdur.get("additionalParams"):
2388 kdur["additionalParams"] = json.loads(
2389 kdur["additionalParams"]
2390 )
2391 kdur_list.append(kdur)
2392 vnfr["kdur"] = kdur_list
2393
2394 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2395 vnfd_id = vnfr["vnfd-id"]
2396 vnfd_ref = vnfr["vnfd-ref"]
2397 self.fs.sync(vnfd_id)
2398
2399 # if we haven't this vnfd, read it from db
2400 if vnfd_id not in db_vnfds:
2401 # read from db
2402 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2403 vnfd_id, vnfd_ref
2404 )
2405 self.logger.debug(logging_text + stage[1])
2406 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2407
2408 # store vnfd
2409 db_vnfds.append(vnfd)
2410
2411 # Get or generates the _admin.deployed.VCA list
2412 vca_deployed_list = None
2413 if db_nsr["_admin"].get("deployed"):
2414 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2415 if vca_deployed_list is None:
2416 vca_deployed_list = []
2417 configuration_status_list = []
2418 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2419 db_nsr_update["configurationStatus"] = configuration_status_list
2420 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2421 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2422 elif isinstance(vca_deployed_list, dict):
2423 # maintain backward compatibility. Change a dict to list at database
2424 vca_deployed_list = list(vca_deployed_list.values())
2425 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2426 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2427
2428 if not isinstance(
2429 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2430 ):
2431 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2432 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2433
2434 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2435 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2436 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2437 self.db.set_list(
2438 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2439 )
2440
2441 # n2vc_redesign STEP 2 Deploy Network Scenario
2442 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2443 self._write_op_status(op_id=nslcmop_id, stage=stage)
2444
2445 stage[1] = "Deploying KDUs."
2446 # self.logger.debug(logging_text + "Before deploy_kdus")
2447 # Call to deploy_kdus in case exists the "vdu:kdu" param
2448 await self.deploy_kdus(
2449 logging_text=logging_text,
2450 nsr_id=nsr_id,
2451 nslcmop_id=nslcmop_id,
2452 db_vnfrs=db_vnfrs,
2453 db_vnfds=db_vnfds,
2454 task_instantiation_info=tasks_dict_info,
2455 )
2456
2457 stage[1] = "Getting VCA public key."
2458 # n2vc_redesign STEP 1 Get VCA public ssh-key
2459 # feature 1429. Add n2vc public key to needed VMs
2460 n2vc_key = self.n2vc.get_public_key()
2461 n2vc_key_list = [n2vc_key]
2462 if self.vca_config.get("public_key"):
2463 n2vc_key_list.append(self.vca_config["public_key"])
2464
2465 stage[1] = "Deploying NS at VIM."
2466 task_ro = asyncio.ensure_future(
2467 self.instantiate_RO(
2468 logging_text=logging_text,
2469 nsr_id=nsr_id,
2470 nsd=nsd,
2471 db_nsr=db_nsr,
2472 db_nslcmop=db_nslcmop,
2473 db_vnfrs=db_vnfrs,
2474 db_vnfds=db_vnfds,
2475 n2vc_key_list=n2vc_key_list,
2476 stage=stage,
2477 )
2478 )
2479 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2480 tasks_dict_info[task_ro] = "Deploying at VIM"
2481
2482 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2483 stage[1] = "Deploying Execution Environments."
2484 self.logger.debug(logging_text + stage[1])
2485
2486 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2487 for vnf_profile in get_vnf_profiles(nsd):
2488 vnfd_id = vnf_profile["vnfd-id"]
2489 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2490 member_vnf_index = str(vnf_profile["id"])
2491 db_vnfr = db_vnfrs[member_vnf_index]
2492 base_folder = vnfd["_admin"]["storage"]
2493 vdu_id = None
2494 vdu_index = 0
2495 vdu_name = None
2496 kdu_name = None
2497
2498 # Get additional parameters
2499 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2500 if db_vnfr.get("additionalParamsForVnf"):
2501 deploy_params.update(
2502 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2503 )
2504
2505 descriptor_config = get_configuration(vnfd, vnfd["id"])
2506 if descriptor_config:
2507 self._deploy_n2vc(
2508 logging_text=logging_text
2509 + "member_vnf_index={} ".format(member_vnf_index),
2510 db_nsr=db_nsr,
2511 db_vnfr=db_vnfr,
2512 nslcmop_id=nslcmop_id,
2513 nsr_id=nsr_id,
2514 nsi_id=nsi_id,
2515 vnfd_id=vnfd_id,
2516 vdu_id=vdu_id,
2517 kdu_name=kdu_name,
2518 member_vnf_index=member_vnf_index,
2519 vdu_index=vdu_index,
2520 vdu_name=vdu_name,
2521 deploy_params=deploy_params,
2522 descriptor_config=descriptor_config,
2523 base_folder=base_folder,
2524 task_instantiation_info=tasks_dict_info,
2525 stage=stage,
2526 )
2527
2528 # Deploy charms for each VDU that supports one.
2529 for vdud in get_vdu_list(vnfd):
2530 vdu_id = vdud["id"]
2531 descriptor_config = get_configuration(vnfd, vdu_id)
2532 vdur = find_in_list(
2533 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2534 )
2535
2536 if vdur.get("additionalParams"):
2537 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2538 else:
2539 deploy_params_vdu = deploy_params
2540 deploy_params_vdu["OSM"] = get_osm_params(
2541 db_vnfr, vdu_id, vdu_count_index=0
2542 )
2543 vdud_count = get_number_of_instances(vnfd, vdu_id)
2544
2545 self.logger.debug("VDUD > {}".format(vdud))
2546 self.logger.debug(
2547 "Descriptor config > {}".format(descriptor_config)
2548 )
2549 if descriptor_config:
2550 vdu_name = None
2551 kdu_name = None
2552 for vdu_index in range(vdud_count):
2553 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2554 self._deploy_n2vc(
2555 logging_text=logging_text
2556 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2557 member_vnf_index, vdu_id, vdu_index
2558 ),
2559 db_nsr=db_nsr,
2560 db_vnfr=db_vnfr,
2561 nslcmop_id=nslcmop_id,
2562 nsr_id=nsr_id,
2563 nsi_id=nsi_id,
2564 vnfd_id=vnfd_id,
2565 vdu_id=vdu_id,
2566 kdu_name=kdu_name,
2567 member_vnf_index=member_vnf_index,
2568 vdu_index=vdu_index,
2569 vdu_name=vdu_name,
2570 deploy_params=deploy_params_vdu,
2571 descriptor_config=descriptor_config,
2572 base_folder=base_folder,
2573 task_instantiation_info=tasks_dict_info,
2574 stage=stage,
2575 )
2576 for kdud in get_kdu_list(vnfd):
2577 kdu_name = kdud["name"]
2578 descriptor_config = get_configuration(vnfd, kdu_name)
2579 if descriptor_config:
2580 vdu_id = None
2581 vdu_index = 0
2582 vdu_name = None
2583 kdur = next(
2584 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2585 )
2586 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2587 if kdur.get("additionalParams"):
2588 deploy_params_kdu.update(
2589 parse_yaml_strings(kdur["additionalParams"].copy())
2590 )
2591
2592 self._deploy_n2vc(
2593 logging_text=logging_text,
2594 db_nsr=db_nsr,
2595 db_vnfr=db_vnfr,
2596 nslcmop_id=nslcmop_id,
2597 nsr_id=nsr_id,
2598 nsi_id=nsi_id,
2599 vnfd_id=vnfd_id,
2600 vdu_id=vdu_id,
2601 kdu_name=kdu_name,
2602 member_vnf_index=member_vnf_index,
2603 vdu_index=vdu_index,
2604 vdu_name=vdu_name,
2605 deploy_params=deploy_params_kdu,
2606 descriptor_config=descriptor_config,
2607 base_folder=base_folder,
2608 task_instantiation_info=tasks_dict_info,
2609 stage=stage,
2610 )
2611
2612 # Check if this NS has a charm configuration
2613 descriptor_config = nsd.get("ns-configuration")
2614 if descriptor_config and descriptor_config.get("juju"):
2615 vnfd_id = None
2616 db_vnfr = None
2617 member_vnf_index = None
2618 vdu_id = None
2619 kdu_name = None
2620 vdu_index = 0
2621 vdu_name = None
2622
2623 # Get additional parameters
2624 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2625 if db_nsr.get("additionalParamsForNs"):
2626 deploy_params.update(
2627 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2628 )
2629 base_folder = nsd["_admin"]["storage"]
2630 self._deploy_n2vc(
2631 logging_text=logging_text,
2632 db_nsr=db_nsr,
2633 db_vnfr=db_vnfr,
2634 nslcmop_id=nslcmop_id,
2635 nsr_id=nsr_id,
2636 nsi_id=nsi_id,
2637 vnfd_id=vnfd_id,
2638 vdu_id=vdu_id,
2639 kdu_name=kdu_name,
2640 member_vnf_index=member_vnf_index,
2641 vdu_index=vdu_index,
2642 vdu_name=vdu_name,
2643 deploy_params=deploy_params,
2644 descriptor_config=descriptor_config,
2645 base_folder=base_folder,
2646 task_instantiation_info=tasks_dict_info,
2647 stage=stage,
2648 )
2649
2650 # rest of staff will be done at finally
2651
2652 except (
2653 ROclient.ROClientException,
2654 DbException,
2655 LcmException,
2656 N2VCException,
2657 ) as e:
2658 self.logger.error(
2659 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2660 )
2661 exc = e
2662 except asyncio.CancelledError:
2663 self.logger.error(
2664 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2665 )
2666 exc = "Operation was cancelled"
2667 except Exception as e:
2668 exc = traceback.format_exc()
2669 self.logger.critical(
2670 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2671 exc_info=True,
2672 )
2673 finally:
2674 if exc:
2675 error_list.append(str(exc))
2676 try:
2677 # wait for pending tasks
2678 if tasks_dict_info:
2679 stage[1] = "Waiting for instantiate pending tasks."
2680 self.logger.debug(logging_text + stage[1])
2681 error_list += await self._wait_for_tasks(
2682 logging_text,
2683 tasks_dict_info,
2684 timeout_ns_deploy,
2685 stage,
2686 nslcmop_id,
2687 nsr_id=nsr_id,
2688 )
2689 stage[1] = stage[2] = ""
2690 except asyncio.CancelledError:
2691 error_list.append("Cancelled")
2692 # TODO cancel all tasks
2693 except Exception as exc:
2694 error_list.append(str(exc))
2695
2696 # update operation-status
2697 db_nsr_update["operational-status"] = "running"
2698 # let's begin with VCA 'configured' status (later we can change it)
2699 db_nsr_update["config-status"] = "configured"
2700 for task, task_name in tasks_dict_info.items():
2701 if not task.done() or task.cancelled() or task.exception():
2702 if task_name.startswith(self.task_name_deploy_vca):
2703 # A N2VC task is pending
2704 db_nsr_update["config-status"] = "failed"
2705 else:
2706 # RO or KDU task is pending
2707 db_nsr_update["operational-status"] = "failed"
2708
2709 # update status at database
2710 if error_list:
2711 error_detail = ". ".join(error_list)
2712 self.logger.error(logging_text + error_detail)
2713 error_description_nslcmop = "{} Detail: {}".format(
2714 stage[0], error_detail
2715 )
2716 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2717 nslcmop_id, stage[0]
2718 )
2719
2720 db_nsr_update["detailed-status"] = (
2721 error_description_nsr + " Detail: " + error_detail
2722 )
2723 db_nslcmop_update["detailed-status"] = error_detail
2724 nslcmop_operation_state = "FAILED"
2725 ns_state = "BROKEN"
2726 else:
2727 error_detail = None
2728 error_description_nsr = error_description_nslcmop = None
2729 ns_state = "READY"
2730 db_nsr_update["detailed-status"] = "Done"
2731 db_nslcmop_update["detailed-status"] = "Done"
2732 nslcmop_operation_state = "COMPLETED"
2733
2734 if db_nsr:
2735 self._write_ns_status(
2736 nsr_id=nsr_id,
2737 ns_state=ns_state,
2738 current_operation="IDLE",
2739 current_operation_id=None,
2740 error_description=error_description_nsr,
2741 error_detail=error_detail,
2742 other_update=db_nsr_update,
2743 )
2744 self._write_op_status(
2745 op_id=nslcmop_id,
2746 stage="",
2747 error_message=error_description_nslcmop,
2748 operation_state=nslcmop_operation_state,
2749 other_update=db_nslcmop_update,
2750 )
2751
2752 if nslcmop_operation_state:
2753 try:
2754 await self.msg.aiowrite(
2755 "ns",
2756 "instantiated",
2757 {
2758 "nsr_id": nsr_id,
2759 "nslcmop_id": nslcmop_id,
2760 "operationState": nslcmop_operation_state,
2761 },
2762 loop=self.loop,
2763 )
2764 except Exception as e:
2765 self.logger.error(
2766 logging_text + "kafka_write notification Exception {}".format(e)
2767 )
2768
2769 self.logger.debug(logging_text + "Exit")
2770 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2771
2772 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2773 if vnfd_id not in cached_vnfds:
2774 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2775 return cached_vnfds[vnfd_id]
2776
2777 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2778 if vnf_profile_id not in cached_vnfrs:
2779 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2780 "vnfrs",
2781 {
2782 "member-vnf-index-ref": vnf_profile_id,
2783 "nsr-id-ref": nsr_id,
2784 },
2785 )
2786 return cached_vnfrs[vnf_profile_id]
2787
2788 def _is_deployed_vca_in_relation(
2789 self, vca: DeployedVCA, relation: Relation
2790 ) -> bool:
2791 found = False
2792 for endpoint in (relation.provider, relation.requirer):
2793 if endpoint["kdu-resource-profile-id"]:
2794 continue
2795 found = (
2796 vca.vnf_profile_id == endpoint.vnf_profile_id
2797 and vca.vdu_profile_id == endpoint.vdu_profile_id
2798 and vca.execution_environment_ref == endpoint.execution_environment_ref
2799 )
2800 if found:
2801 break
2802 return found
2803
2804 def _update_ee_relation_data_with_implicit_data(
2805 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2806 ):
2807 ee_relation_data = safe_get_ee_relation(
2808 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2809 )
2810 ee_relation_level = EELevel.get_level(ee_relation_data)
2811 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2812 "execution-environment-ref"
2813 ]:
2814 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2815 vnfd_id = vnf_profile["vnfd-id"]
2816 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2817 entity_id = (
2818 vnfd_id
2819 if ee_relation_level == EELevel.VNF
2820 else ee_relation_data["vdu-profile-id"]
2821 )
2822 ee = get_juju_ee_ref(db_vnfd, entity_id)
2823 if not ee:
2824 raise Exception(
2825 f"not execution environments found for ee_relation {ee_relation_data}"
2826 )
2827 ee_relation_data["execution-environment-ref"] = ee["id"]
2828 return ee_relation_data
2829
2830 def _get_ns_relations(
2831 self,
2832 nsr_id: str,
2833 nsd: Dict[str, Any],
2834 vca: DeployedVCA,
2835 cached_vnfds: Dict[str, Any],
2836 ) -> List[Relation]:
2837 relations = []
2838 db_ns_relations = get_ns_configuration_relation_list(nsd)
2839 for r in db_ns_relations:
2840 provider_dict = None
2841 requirer_dict = None
2842 if all(key in r for key in ("provider", "requirer")):
2843 provider_dict = r["provider"]
2844 requirer_dict = r["requirer"]
2845 elif "entities" in r:
2846 provider_id = r["entities"][0]["id"]
2847 provider_dict = {
2848 "nsr-id": nsr_id,
2849 "endpoint": r["entities"][0]["endpoint"],
2850 }
2851 if provider_id != nsd["id"]:
2852 provider_dict["vnf-profile-id"] = provider_id
2853 requirer_id = r["entities"][1]["id"]
2854 requirer_dict = {
2855 "nsr-id": nsr_id,
2856 "endpoint": r["entities"][1]["endpoint"],
2857 }
2858 if requirer_id != nsd["id"]:
2859 requirer_dict["vnf-profile-id"] = requirer_id
2860 else:
2861 raise Exception(
2862 "provider/requirer or entities must be included in the relation."
2863 )
2864 relation_provider = self._update_ee_relation_data_with_implicit_data(
2865 nsr_id, nsd, provider_dict, cached_vnfds
2866 )
2867 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2868 nsr_id, nsd, requirer_dict, cached_vnfds
2869 )
2870 provider = EERelation(relation_provider)
2871 requirer = EERelation(relation_requirer)
2872 relation = Relation(r["name"], provider, requirer)
2873 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2874 if vca_in_relation:
2875 relations.append(relation)
2876 return relations
2877
2878 def _get_vnf_relations(
2879 self,
2880 nsr_id: str,
2881 nsd: Dict[str, Any],
2882 vca: DeployedVCA,
2883 cached_vnfds: Dict[str, Any],
2884 ) -> List[Relation]:
2885 relations = []
2886 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2887 vnf_profile_id = vnf_profile["id"]
2888 vnfd_id = vnf_profile["vnfd-id"]
2889 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2890 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2891 for r in db_vnf_relations:
2892 provider_dict = None
2893 requirer_dict = None
2894 if all(key in r for key in ("provider", "requirer")):
2895 provider_dict = r["provider"]
2896 requirer_dict = r["requirer"]
2897 elif "entities" in r:
2898 provider_id = r["entities"][0]["id"]
2899 provider_dict = {
2900 "nsr-id": nsr_id,
2901 "vnf-profile-id": vnf_profile_id,
2902 "endpoint": r["entities"][0]["endpoint"],
2903 }
2904 if provider_id != vnfd_id:
2905 provider_dict["vdu-profile-id"] = provider_id
2906 requirer_id = r["entities"][1]["id"]
2907 requirer_dict = {
2908 "nsr-id": nsr_id,
2909 "vnf-profile-id": vnf_profile_id,
2910 "endpoint": r["entities"][1]["endpoint"],
2911 }
2912 if requirer_id != vnfd_id:
2913 requirer_dict["vdu-profile-id"] = requirer_id
2914 else:
2915 raise Exception(
2916 "provider/requirer or entities must be included in the relation."
2917 )
2918 relation_provider = self._update_ee_relation_data_with_implicit_data(
2919 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2920 )
2921 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2922 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2923 )
2924 provider = EERelation(relation_provider)
2925 requirer = EERelation(relation_requirer)
2926 relation = Relation(r["name"], provider, requirer)
2927 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2928 if vca_in_relation:
2929 relations.append(relation)
2930 return relations
2931
2932 def _get_kdu_resource_data(
2933 self,
2934 ee_relation: EERelation,
2935 db_nsr: Dict[str, Any],
2936 cached_vnfds: Dict[str, Any],
2937 ) -> DeployedK8sResource:
2938 nsd = get_nsd(db_nsr)
2939 vnf_profiles = get_vnf_profiles(nsd)
2940 vnfd_id = find_in_list(
2941 vnf_profiles,
2942 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2943 )["vnfd-id"]
2944 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2945 kdu_resource_profile = get_kdu_resource_profile(
2946 db_vnfd, ee_relation.kdu_resource_profile_id
2947 )
2948 kdu_name = kdu_resource_profile["kdu-name"]
2949 deployed_kdu, _ = get_deployed_kdu(
2950 db_nsr.get("_admin", ()).get("deployed", ()),
2951 kdu_name,
2952 ee_relation.vnf_profile_id,
2953 )
2954 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2955 return deployed_kdu
2956
2957 def _get_deployed_component(
2958 self,
2959 ee_relation: EERelation,
2960 db_nsr: Dict[str, Any],
2961 cached_vnfds: Dict[str, Any],
2962 ) -> DeployedComponent:
2963 nsr_id = db_nsr["_id"]
2964 deployed_component = None
2965 ee_level = EELevel.get_level(ee_relation)
2966 if ee_level == EELevel.NS:
2967 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2968 if vca:
2969 deployed_component = DeployedVCA(nsr_id, vca)
2970 elif ee_level == EELevel.VNF:
2971 vca = get_deployed_vca(
2972 db_nsr,
2973 {
2974 "vdu_id": None,
2975 "member-vnf-index": ee_relation.vnf_profile_id,
2976 "ee_descriptor_id": ee_relation.execution_environment_ref,
2977 },
2978 )
2979 if vca:
2980 deployed_component = DeployedVCA(nsr_id, vca)
2981 elif ee_level == EELevel.VDU:
2982 vca = get_deployed_vca(
2983 db_nsr,
2984 {
2985 "vdu_id": ee_relation.vdu_profile_id,
2986 "member-vnf-index": ee_relation.vnf_profile_id,
2987 "ee_descriptor_id": ee_relation.execution_environment_ref,
2988 },
2989 )
2990 if vca:
2991 deployed_component = DeployedVCA(nsr_id, vca)
2992 elif ee_level == EELevel.KDU:
2993 kdu_resource_data = self._get_kdu_resource_data(
2994 ee_relation, db_nsr, cached_vnfds
2995 )
2996 if kdu_resource_data:
2997 deployed_component = DeployedK8sResource(kdu_resource_data)
2998 return deployed_component
2999
3000 async def _add_relation(
3001 self,
3002 relation: Relation,
3003 vca_type: str,
3004 db_nsr: Dict[str, Any],
3005 cached_vnfds: Dict[str, Any],
3006 cached_vnfrs: Dict[str, Any],
3007 ) -> bool:
3008 deployed_provider = self._get_deployed_component(
3009 relation.provider, db_nsr, cached_vnfds
3010 )
3011 deployed_requirer = self._get_deployed_component(
3012 relation.requirer, db_nsr, cached_vnfds
3013 )
3014 if (
3015 deployed_provider
3016 and deployed_requirer
3017 and deployed_provider.config_sw_installed
3018 and deployed_requirer.config_sw_installed
3019 ):
3020 provider_db_vnfr = (
3021 self._get_vnfr(
3022 relation.provider.nsr_id,
3023 relation.provider.vnf_profile_id,
3024 cached_vnfrs,
3025 )
3026 if relation.provider.vnf_profile_id
3027 else None
3028 )
3029 requirer_db_vnfr = (
3030 self._get_vnfr(
3031 relation.requirer.nsr_id,
3032 relation.requirer.vnf_profile_id,
3033 cached_vnfrs,
3034 )
3035 if relation.requirer.vnf_profile_id
3036 else None
3037 )
3038 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3039 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3040 provider_relation_endpoint = RelationEndpoint(
3041 deployed_provider.ee_id,
3042 provider_vca_id,
3043 relation.provider.endpoint,
3044 )
3045 requirer_relation_endpoint = RelationEndpoint(
3046 deployed_requirer.ee_id,
3047 requirer_vca_id,
3048 relation.requirer.endpoint,
3049 )
3050 await self.vca_map[vca_type].add_relation(
3051 provider=provider_relation_endpoint,
3052 requirer=requirer_relation_endpoint,
3053 )
3054 # remove entry from relations list
3055 return True
3056 return False
3057
3058 async def _add_vca_relations(
3059 self,
3060 logging_text,
3061 nsr_id,
3062 vca_type: str,
3063 vca_index: int,
3064 timeout: int = 3600,
3065 ) -> bool:
3066
3067 # steps:
3068 # 1. find all relations for this VCA
3069 # 2. wait for other peers related
3070 # 3. add relations
3071
3072 try:
3073 # STEP 1: find all relations for this VCA
3074
3075 # read nsr record
3076 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3077 nsd = get_nsd(db_nsr)
3078
3079 # this VCA data
3080 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3081 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3082
3083 cached_vnfds = {}
3084 cached_vnfrs = {}
3085 relations = []
3086 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3087 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3088
3089 # if no relations, terminate
3090 if not relations:
3091 self.logger.debug(logging_text + " No relations")
3092 return True
3093
3094 self.logger.debug(logging_text + " adding relations {}".format(relations))
3095
3096 # add all relations
3097 start = time()
3098 while True:
3099 # check timeout
3100 now = time()
3101 if now - start >= timeout:
3102 self.logger.error(logging_text + " : timeout adding relations")
3103 return False
3104
3105 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3106 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3107
3108 # for each relation, find the VCA's related
3109 for relation in relations.copy():
3110 added = await self._add_relation(
3111 relation,
3112 vca_type,
3113 db_nsr,
3114 cached_vnfds,
3115 cached_vnfrs,
3116 )
3117 if added:
3118 relations.remove(relation)
3119
3120 if not relations:
3121 self.logger.debug("Relations added")
3122 break
3123 await asyncio.sleep(5.0)
3124
3125 return True
3126
3127 except Exception as e:
3128 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3129 return False
3130
3131 async def _install_kdu(
3132 self,
3133 nsr_id: str,
3134 nsr_db_path: str,
3135 vnfr_data: dict,
3136 kdu_index: int,
3137 kdud: dict,
3138 vnfd: dict,
3139 k8s_instance_info: dict,
3140 k8params: dict = None,
3141 timeout: int = 600,
3142 vca_id: str = None,
3143 ):
3144
3145 try:
3146 k8sclustertype = k8s_instance_info["k8scluster-type"]
3147 # Instantiate kdu
3148 db_dict_install = {
3149 "collection": "nsrs",
3150 "filter": {"_id": nsr_id},
3151 "path": nsr_db_path,
3152 }
3153
3154 if k8s_instance_info.get("kdu-deployment-name"):
3155 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3156 else:
3157 kdu_instance = self.k8scluster_map[
3158 k8sclustertype
3159 ].generate_kdu_instance_name(
3160 db_dict=db_dict_install,
3161 kdu_model=k8s_instance_info["kdu-model"],
3162 kdu_name=k8s_instance_info["kdu-name"],
3163 )
3164 self.update_db_2(
3165 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3166 )
3167 await self.k8scluster_map[k8sclustertype].install(
3168 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3169 kdu_model=k8s_instance_info["kdu-model"],
3170 atomic=True,
3171 params=k8params,
3172 db_dict=db_dict_install,
3173 timeout=timeout,
3174 kdu_name=k8s_instance_info["kdu-name"],
3175 namespace=k8s_instance_info["namespace"],
3176 kdu_instance=kdu_instance,
3177 vca_id=vca_id,
3178 )
3179 self.update_db_2(
3180 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3181 )
3182
3183 # Obtain services to obtain management service ip
3184 services = await self.k8scluster_map[k8sclustertype].get_services(
3185 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3186 kdu_instance=kdu_instance,
3187 namespace=k8s_instance_info["namespace"],
3188 )
3189
3190 # Obtain management service info (if exists)
3191 vnfr_update_dict = {}
3192 kdu_config = get_configuration(vnfd, kdud["name"])
3193 if kdu_config:
3194 target_ee_list = kdu_config.get("execution-environment-list", [])
3195 else:
3196 target_ee_list = []
3197
3198 if services:
3199 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3200 mgmt_services = [
3201 service
3202 for service in kdud.get("service", [])
3203 if service.get("mgmt-service")
3204 ]
3205 for mgmt_service in mgmt_services:
3206 for service in services:
3207 if service["name"].startswith(mgmt_service["name"]):
3208 # Mgmt service found, Obtain service ip
3209 ip = service.get("external_ip", service.get("cluster_ip"))
3210 if isinstance(ip, list) and len(ip) == 1:
3211 ip = ip[0]
3212
3213 vnfr_update_dict[
3214 "kdur.{}.ip-address".format(kdu_index)
3215 ] = ip
3216
3217 # Check if must update also mgmt ip at the vnf
3218 service_external_cp = mgmt_service.get(
3219 "external-connection-point-ref"
3220 )
3221 if service_external_cp:
3222 if (
3223 deep_get(vnfd, ("mgmt-interface", "cp"))
3224 == service_external_cp
3225 ):
3226 vnfr_update_dict["ip-address"] = ip
3227
3228 if find_in_list(
3229 target_ee_list,
3230 lambda ee: ee.get(
3231 "external-connection-point-ref", ""
3232 )
3233 == service_external_cp,
3234 ):
3235 vnfr_update_dict[
3236 "kdur.{}.ip-address".format(kdu_index)
3237 ] = ip
3238 break
3239 else:
3240 self.logger.warn(
3241 "Mgmt service name: {} not found".format(
3242 mgmt_service["name"]
3243 )
3244 )
3245
3246 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3247 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3248
3249 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3250 if (
3251 kdu_config
3252 and kdu_config.get("initial-config-primitive")
3253 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3254 ):
3255 initial_config_primitive_list = kdu_config.get(
3256 "initial-config-primitive"
3257 )
3258 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3259
3260 for initial_config_primitive in initial_config_primitive_list:
3261 primitive_params_ = self._map_primitive_params(
3262 initial_config_primitive, {}, {}
3263 )
3264
3265 await asyncio.wait_for(
3266 self.k8scluster_map[k8sclustertype].exec_primitive(
3267 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3268 kdu_instance=kdu_instance,
3269 primitive_name=initial_config_primitive["name"],
3270 params=primitive_params_,
3271 db_dict=db_dict_install,
3272 vca_id=vca_id,
3273 ),
3274 timeout=timeout,
3275 )
3276
3277 except Exception as e:
3278 # Prepare update db with error and raise exception
3279 try:
3280 self.update_db_2(
3281 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3282 )
3283 self.update_db_2(
3284 "vnfrs",
3285 vnfr_data.get("_id"),
3286 {"kdur.{}.status".format(kdu_index): "ERROR"},
3287 )
3288 except Exception:
3289 # ignore to keep original exception
3290 pass
3291 # reraise original error
3292 raise
3293
3294 return kdu_instance
3295
3296 async def deploy_kdus(
3297 self,
3298 logging_text,
3299 nsr_id,
3300 nslcmop_id,
3301 db_vnfrs,
3302 db_vnfds,
3303 task_instantiation_info,
3304 ):
3305 # Launch kdus if present in the descriptor
3306
3307 k8scluster_id_2_uuic = {
3308 "helm-chart-v3": {},
3309 "helm-chart": {},
3310 "juju-bundle": {},
3311 }
3312
3313 async def _get_cluster_id(cluster_id, cluster_type):
3314 nonlocal k8scluster_id_2_uuic
3315 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3316 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3317
3318 # check if K8scluster is creating and wait look if previous tasks in process
3319 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3320 "k8scluster", cluster_id
3321 )
3322 if task_dependency:
3323 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3324 task_name, cluster_id
3325 )
3326 self.logger.debug(logging_text + text)
3327 await asyncio.wait(task_dependency, timeout=3600)
3328
3329 db_k8scluster = self.db.get_one(
3330 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3331 )
3332 if not db_k8scluster:
3333 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3334
3335 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3336 if not k8s_id:
3337 if cluster_type == "helm-chart-v3":
3338 try:
3339 # backward compatibility for existing clusters that have not been initialized for helm v3
3340 k8s_credentials = yaml.safe_dump(
3341 db_k8scluster.get("credentials")
3342 )
3343 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3344 k8s_credentials, reuse_cluster_uuid=cluster_id
3345 )
3346 db_k8scluster_update = {}
3347 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3348 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3349 db_k8scluster_update[
3350 "_admin.helm-chart-v3.created"
3351 ] = uninstall_sw
3352 db_k8scluster_update[
3353 "_admin.helm-chart-v3.operationalState"
3354 ] = "ENABLED"
3355 self.update_db_2(
3356 "k8sclusters", cluster_id, db_k8scluster_update
3357 )
3358 except Exception as e:
3359 self.logger.error(
3360 logging_text
3361 + "error initializing helm-v3 cluster: {}".format(str(e))
3362 )
3363 raise LcmException(
3364 "K8s cluster '{}' has not been initialized for '{}'".format(
3365 cluster_id, cluster_type
3366 )
3367 )
3368 else:
3369 raise LcmException(
3370 "K8s cluster '{}' has not been initialized for '{}'".format(
3371 cluster_id, cluster_type
3372 )
3373 )
3374 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3375 return k8s_id
3376
3377 logging_text += "Deploy kdus: "
3378 step = ""
3379 try:
3380 db_nsr_update = {"_admin.deployed.K8s": []}
3381 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3382
3383 index = 0
3384 updated_cluster_list = []
3385 updated_v3_cluster_list = []
3386
3387 for vnfr_data in db_vnfrs.values():
3388 vca_id = self.get_vca_id(vnfr_data, {})
3389 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3390 # Step 0: Prepare and set parameters
3391 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3392 vnfd_id = vnfr_data.get("vnfd-id")
3393 vnfd_with_id = find_in_list(
3394 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3395 )
3396 kdud = next(
3397 kdud
3398 for kdud in vnfd_with_id["kdu"]
3399 if kdud["name"] == kdur["kdu-name"]
3400 )
3401 namespace = kdur.get("k8s-namespace")
3402 kdu_deployment_name = kdur.get("kdu-deployment-name")
3403 if kdur.get("helm-chart"):
3404 kdumodel = kdur["helm-chart"]
3405 # Default version: helm3, if helm-version is v2 assign v2
3406 k8sclustertype = "helm-chart-v3"
3407 self.logger.debug("kdur: {}".format(kdur))
3408 if (
3409 kdur.get("helm-version")
3410 and kdur.get("helm-version") == "v2"
3411 ):
3412 k8sclustertype = "helm-chart"
3413 elif kdur.get("juju-bundle"):
3414 kdumodel = kdur["juju-bundle"]
3415 k8sclustertype = "juju-bundle"
3416 else:
3417 raise LcmException(
3418 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3419 "juju-bundle. Maybe an old NBI version is running".format(
3420 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3421 )
3422 )
3423 # check if kdumodel is a file and exists
3424 try:
3425 vnfd_with_id = find_in_list(
3426 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3427 )
3428 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3429 if storage: # may be not present if vnfd has not artifacts
3430 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3431 if storage["pkg-dir"]:
3432 filename = "{}/{}/{}s/{}".format(
3433 storage["folder"],
3434 storage["pkg-dir"],
3435 k8sclustertype,
3436 kdumodel,
3437 )
3438 else:
3439 filename = "{}/Scripts/{}s/{}".format(
3440 storage["folder"],
3441 k8sclustertype,
3442 kdumodel,
3443 )
3444 if self.fs.file_exists(
3445 filename, mode="file"
3446 ) or self.fs.file_exists(filename, mode="dir"):
3447 kdumodel = self.fs.path + filename
3448 except (asyncio.TimeoutError, asyncio.CancelledError):
3449 raise
3450 except Exception: # it is not a file
3451 pass
3452
3453 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3454 step = "Synchronize repos for k8s cluster '{}'".format(
3455 k8s_cluster_id
3456 )
3457 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3458
3459 # Synchronize repos
3460 if (
3461 k8sclustertype == "helm-chart"
3462 and cluster_uuid not in updated_cluster_list
3463 ) or (
3464 k8sclustertype == "helm-chart-v3"
3465 and cluster_uuid not in updated_v3_cluster_list
3466 ):
3467 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3468 self.k8scluster_map[k8sclustertype].synchronize_repos(
3469 cluster_uuid=cluster_uuid
3470 )
3471 )
3472 if del_repo_list or added_repo_dict:
3473 if k8sclustertype == "helm-chart":
3474 unset = {
3475 "_admin.helm_charts_added." + item: None
3476 for item in del_repo_list
3477 }
3478 updated = {
3479 "_admin.helm_charts_added." + item: name
3480 for item, name in added_repo_dict.items()
3481 }
3482 updated_cluster_list.append(cluster_uuid)
3483 elif k8sclustertype == "helm-chart-v3":
3484 unset = {
3485 "_admin.helm_charts_v3_added." + item: None
3486 for item in del_repo_list
3487 }
3488 updated = {
3489 "_admin.helm_charts_v3_added." + item: name
3490 for item, name in added_repo_dict.items()
3491 }
3492 updated_v3_cluster_list.append(cluster_uuid)
3493 self.logger.debug(
3494 logging_text + "repos synchronized on k8s cluster "
3495 "'{}' to_delete: {}, to_add: {}".format(
3496 k8s_cluster_id, del_repo_list, added_repo_dict
3497 )
3498 )
3499 self.db.set_one(
3500 "k8sclusters",
3501 {"_id": k8s_cluster_id},
3502 updated,
3503 unset=unset,
3504 )
3505
3506 # Instantiate kdu
3507 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3508 vnfr_data["member-vnf-index-ref"],
3509 kdur["kdu-name"],
3510 k8s_cluster_id,
3511 )
3512 k8s_instance_info = {
3513 "kdu-instance": None,
3514 "k8scluster-uuid": cluster_uuid,
3515 "k8scluster-type": k8sclustertype,
3516 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3517 "kdu-name": kdur["kdu-name"],
3518 "kdu-model": kdumodel,
3519 "namespace": namespace,
3520 "kdu-deployment-name": kdu_deployment_name,
3521 }
3522 db_path = "_admin.deployed.K8s.{}".format(index)
3523 db_nsr_update[db_path] = k8s_instance_info
3524 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3525 vnfd_with_id = find_in_list(
3526 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3527 )
3528 task = asyncio.ensure_future(
3529 self._install_kdu(
3530 nsr_id,
3531 db_path,
3532 vnfr_data,
3533 kdu_index,
3534 kdud,
3535 vnfd_with_id,
3536 k8s_instance_info,
3537 k8params=desc_params,
3538 timeout=600,
3539 vca_id=vca_id,
3540 )
3541 )
3542 self.lcm_tasks.register(
3543 "ns",
3544 nsr_id,
3545 nslcmop_id,
3546 "instantiate_KDU-{}".format(index),
3547 task,
3548 )
3549 task_instantiation_info[task] = "Deploying KDU {}".format(
3550 kdur["kdu-name"]
3551 )
3552
3553 index += 1
3554
3555 except (LcmException, asyncio.CancelledError):
3556 raise
3557 except Exception as e:
3558 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3559 if isinstance(e, (N2VCException, DbException)):
3560 self.logger.error(logging_text + msg)
3561 else:
3562 self.logger.critical(logging_text + msg, exc_info=True)
3563 raise LcmException(msg)
3564 finally:
3565 if db_nsr_update:
3566 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3567
3568 def _deploy_n2vc(
3569 self,
3570 logging_text,
3571 db_nsr,
3572 db_vnfr,
3573 nslcmop_id,
3574 nsr_id,
3575 nsi_id,
3576 vnfd_id,
3577 vdu_id,
3578 kdu_name,
3579 member_vnf_index,
3580 vdu_index,
3581 vdu_name,
3582 deploy_params,
3583 descriptor_config,
3584 base_folder,
3585 task_instantiation_info,
3586 stage,
3587 ):
3588 # launch instantiate_N2VC in a asyncio task and register task object
3589 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3590 # if not found, create one entry and update database
3591 # fill db_nsr._admin.deployed.VCA.<index>
3592
3593 self.logger.debug(
3594 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3595 )
3596 if "execution-environment-list" in descriptor_config:
3597 ee_list = descriptor_config.get("execution-environment-list", [])
3598 elif "juju" in descriptor_config:
3599 ee_list = [descriptor_config] # ns charms
3600 else: # other types as script are not supported
3601 ee_list = []
3602
3603 for ee_item in ee_list:
3604 self.logger.debug(
3605 logging_text
3606 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3607 ee_item.get("juju"), ee_item.get("helm-chart")
3608 )
3609 )
3610 ee_descriptor_id = ee_item.get("id")
3611 if ee_item.get("juju"):
3612 vca_name = ee_item["juju"].get("charm")
3613 vca_type = (
3614 "lxc_proxy_charm"
3615 if ee_item["juju"].get("charm") is not None
3616 else "native_charm"
3617 )
3618 if ee_item["juju"].get("cloud") == "k8s":
3619 vca_type = "k8s_proxy_charm"
3620 elif ee_item["juju"].get("proxy") is False:
3621 vca_type = "native_charm"
3622 elif ee_item.get("helm-chart"):
3623 vca_name = ee_item["helm-chart"]
3624 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3625 vca_type = "helm"
3626 else:
3627 vca_type = "helm-v3"
3628 else:
3629 self.logger.debug(
3630 logging_text + "skipping non juju neither charm configuration"
3631 )
3632 continue
3633
3634 vca_index = -1
3635 for vca_index, vca_deployed in enumerate(
3636 db_nsr["_admin"]["deployed"]["VCA"]
3637 ):
3638 if not vca_deployed:
3639 continue
3640 if (
3641 vca_deployed.get("member-vnf-index") == member_vnf_index
3642 and vca_deployed.get("vdu_id") == vdu_id
3643 and vca_deployed.get("kdu_name") == kdu_name
3644 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3645 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3646 ):
3647 break
3648 else:
3649 # not found, create one.
3650 target = (
3651 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3652 )
3653 if vdu_id:
3654 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3655 elif kdu_name:
3656 target += "/kdu/{}".format(kdu_name)
3657 vca_deployed = {
3658 "target_element": target,
3659 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3660 "member-vnf-index": member_vnf_index,
3661 "vdu_id": vdu_id,
3662 "kdu_name": kdu_name,
3663 "vdu_count_index": vdu_index,
3664 "operational-status": "init", # TODO revise
3665 "detailed-status": "", # TODO revise
3666 "step": "initial-deploy", # TODO revise
3667 "vnfd_id": vnfd_id,
3668 "vdu_name": vdu_name,
3669 "type": vca_type,
3670 "ee_descriptor_id": ee_descriptor_id,
3671 }
3672 vca_index += 1
3673
3674 # create VCA and configurationStatus in db
3675 db_dict = {
3676 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3677 "configurationStatus.{}".format(vca_index): dict(),
3678 }
3679 self.update_db_2("nsrs", nsr_id, db_dict)
3680
3681 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3682
3683 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3684 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3685 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3686
3687 # Launch task
3688 task_n2vc = asyncio.ensure_future(
3689 self.instantiate_N2VC(
3690 logging_text=logging_text,
3691 vca_index=vca_index,
3692 nsi_id=nsi_id,
3693 db_nsr=db_nsr,
3694 db_vnfr=db_vnfr,
3695 vdu_id=vdu_id,
3696 kdu_name=kdu_name,
3697 vdu_index=vdu_index,
3698 deploy_params=deploy_params,
3699 config_descriptor=descriptor_config,
3700 base_folder=base_folder,
3701 nslcmop_id=nslcmop_id,
3702 stage=stage,
3703 vca_type=vca_type,
3704 vca_name=vca_name,
3705 ee_config_descriptor=ee_item,
3706 )
3707 )
3708 self.lcm_tasks.register(
3709 "ns",
3710 nsr_id,
3711 nslcmop_id,
3712 "instantiate_N2VC-{}".format(vca_index),
3713 task_n2vc,
3714 )
3715 task_instantiation_info[
3716 task_n2vc
3717 ] = self.task_name_deploy_vca + " {}.{}".format(
3718 member_vnf_index or "", vdu_id or ""
3719 )
3720
3721 @staticmethod
3722 def _create_nslcmop(nsr_id, operation, params):
3723 """
3724 Creates a ns-lcm-opp content to be stored at database.
3725 :param nsr_id: internal id of the instance
3726 :param operation: instantiate, terminate, scale, action, ...
3727 :param params: user parameters for the operation
3728 :return: dictionary following SOL005 format
3729 """
3730 # Raise exception if invalid arguments
3731 if not (nsr_id and operation and params):
3732 raise LcmException(
3733 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3734 )
3735 now = time()
3736 _id = str(uuid4())
3737 nslcmop = {
3738 "id": _id,
3739 "_id": _id,
3740 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3741 "operationState": "PROCESSING",
3742 "statusEnteredTime": now,
3743 "nsInstanceId": nsr_id,
3744 "lcmOperationType": operation,
3745 "startTime": now,
3746 "isAutomaticInvocation": False,
3747 "operationParams": params,
3748 "isCancelPending": False,
3749 "links": {
3750 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3751 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3752 },
3753 }
3754 return nslcmop
3755
3756 def _format_additional_params(self, params):
3757 params = params or {}
3758 for key, value in params.items():
3759 if str(value).startswith("!!yaml "):
3760 params[key] = yaml.safe_load(value[7:])
3761 return params
3762
3763 def _get_terminate_primitive_params(self, seq, vnf_index):
3764 primitive = seq.get("name")
3765 primitive_params = {}
3766 params = {
3767 "member_vnf_index": vnf_index,
3768 "primitive": primitive,
3769 "primitive_params": primitive_params,
3770 }
3771 desc_params = {}
3772 return self._map_primitive_params(seq, params, desc_params)
3773
3774 # sub-operations
3775
3776 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3777 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3778 if op.get("operationState") == "COMPLETED":
3779 # b. Skip sub-operation
3780 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3781 return self.SUBOPERATION_STATUS_SKIP
3782 else:
3783 # c. retry executing sub-operation
3784 # The sub-operation exists, and operationState != 'COMPLETED'
3785 # Update operationState = 'PROCESSING' to indicate a retry.
3786 operationState = "PROCESSING"
3787 detailed_status = "In progress"
3788 self._update_suboperation_status(
3789 db_nslcmop, op_index, operationState, detailed_status
3790 )
3791 # Return the sub-operation index
3792 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3793 # with arguments extracted from the sub-operation
3794 return op_index
3795
3796 # Find a sub-operation where all keys in a matching dictionary must match
3797 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3798 def _find_suboperation(self, db_nslcmop, match):
3799 if db_nslcmop and match:
3800 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3801 for i, op in enumerate(op_list):
3802 if all(op.get(k) == match[k] for k in match):
3803 return i
3804 return self.SUBOPERATION_STATUS_NOT_FOUND
3805
3806 # Update status for a sub-operation given its index
3807 def _update_suboperation_status(
3808 self, db_nslcmop, op_index, operationState, detailed_status
3809 ):
3810 # Update DB for HA tasks
3811 q_filter = {"_id": db_nslcmop["_id"]}
3812 update_dict = {
3813 "_admin.operations.{}.operationState".format(op_index): operationState,
3814 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3815 }
3816 self.db.set_one(
3817 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3818 )
3819
3820 # Add sub-operation, return the index of the added sub-operation
3821 # Optionally, set operationState, detailed-status, and operationType
3822 # Status and type are currently set for 'scale' sub-operations:
3823 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3824 # 'detailed-status' : status message
3825 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3826 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3827 def _add_suboperation(
3828 self,
3829 db_nslcmop,
3830 vnf_index,
3831 vdu_id,
3832 vdu_count_index,
3833 vdu_name,
3834 primitive,
3835 mapped_primitive_params,
3836 operationState=None,
3837 detailed_status=None,
3838 operationType=None,
3839 RO_nsr_id=None,
3840 RO_scaling_info=None,
3841 ):
3842 if not db_nslcmop:
3843 return self.SUBOPERATION_STATUS_NOT_FOUND
3844 # Get the "_admin.operations" list, if it exists
3845 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3846 op_list = db_nslcmop_admin.get("operations")
3847 # Create or append to the "_admin.operations" list
3848 new_op = {
3849 "member_vnf_index": vnf_index,
3850 "vdu_id": vdu_id,
3851 "vdu_count_index": vdu_count_index,
3852 "primitive": primitive,
3853 "primitive_params": mapped_primitive_params,
3854 }
3855 if operationState:
3856 new_op["operationState"] = operationState
3857 if detailed_status:
3858 new_op["detailed-status"] = detailed_status
3859 if operationType:
3860 new_op["lcmOperationType"] = operationType
3861 if RO_nsr_id:
3862 new_op["RO_nsr_id"] = RO_nsr_id
3863 if RO_scaling_info:
3864 new_op["RO_scaling_info"] = RO_scaling_info
3865 if not op_list:
3866 # No existing operations, create key 'operations' with current operation as first list element
3867 db_nslcmop_admin.update({"operations": [new_op]})
3868 op_list = db_nslcmop_admin.get("operations")
3869 else:
3870 # Existing operations, append operation to list
3871 op_list.append(new_op)
3872
3873 db_nslcmop_update = {"_admin.operations": op_list}
3874 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3875 op_index = len(op_list) - 1
3876 return op_index
3877
3878 # Helper methods for scale() sub-operations
3879
3880 # pre-scale/post-scale:
3881 # Check for 3 different cases:
3882 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3883 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3884 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3885 def _check_or_add_scale_suboperation(
3886 self,
3887 db_nslcmop,
3888 vnf_index,
3889 vnf_config_primitive,
3890 primitive_params,
3891 operationType,
3892 RO_nsr_id=None,
3893 RO_scaling_info=None,
3894 ):
3895 # Find this sub-operation
3896 if RO_nsr_id and RO_scaling_info:
3897 operationType = "SCALE-RO"
3898 match = {
3899 "member_vnf_index": vnf_index,
3900 "RO_nsr_id": RO_nsr_id,
3901 "RO_scaling_info": RO_scaling_info,
3902 }
3903 else:
3904 match = {
3905 "member_vnf_index": vnf_index,
3906 "primitive": vnf_config_primitive,
3907 "primitive_params": primitive_params,
3908 "lcmOperationType": operationType,
3909 }
3910 op_index = self._find_suboperation(db_nslcmop, match)
3911 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3912 # a. New sub-operation
3913 # The sub-operation does not exist, add it.
3914 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3915 # The following parameters are set to None for all kind of scaling:
3916 vdu_id = None
3917 vdu_count_index = None
3918 vdu_name = None
3919 if RO_nsr_id and RO_scaling_info:
3920 vnf_config_primitive = None
3921 primitive_params = None
3922 else:
3923 RO_nsr_id = None
3924 RO_scaling_info = None
3925 # Initial status for sub-operation
3926 operationState = "PROCESSING"
3927 detailed_status = "In progress"
3928 # Add sub-operation for pre/post-scaling (zero or more operations)
3929 self._add_suboperation(
3930 db_nslcmop,
3931 vnf_index,
3932 vdu_id,
3933 vdu_count_index,
3934 vdu_name,
3935 vnf_config_primitive,
3936 primitive_params,
3937 operationState,
3938 detailed_status,
3939 operationType,
3940 RO_nsr_id,
3941 RO_scaling_info,
3942 )
3943 return self.SUBOPERATION_STATUS_NEW
3944 else:
3945 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3946 # or op_index (operationState != 'COMPLETED')
3947 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3948
3949 # Function to return execution_environment id
3950
3951 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3952 # TODO vdu_index_count
3953 for vca in vca_deployed_list:
3954 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3955 return vca["ee_id"]
3956
3957 async def destroy_N2VC(
3958 self,
3959 logging_text,
3960 db_nslcmop,
3961 vca_deployed,
3962 config_descriptor,
3963 vca_index,
3964 destroy_ee=True,
3965 exec_primitives=True,
3966 scaling_in=False,
3967 vca_id: str = None,
3968 ):
3969 """
3970 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3971 :param logging_text:
3972 :param db_nslcmop:
3973 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3974 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3975 :param vca_index: index in the database _admin.deployed.VCA
3976 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3977 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3978 not executed properly
3979 :param scaling_in: True destroys the application, False destroys the model
3980 :return: None or exception
3981 """
3982
3983 self.logger.debug(
3984 logging_text
3985 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3986 vca_index, vca_deployed, config_descriptor, destroy_ee
3987 )
3988 )
3989
3990 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3991
3992 # execute terminate_primitives
3993 if exec_primitives:
3994 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3995 config_descriptor.get("terminate-config-primitive"),
3996 vca_deployed.get("ee_descriptor_id"),
3997 )
3998 vdu_id = vca_deployed.get("vdu_id")
3999 vdu_count_index = vca_deployed.get("vdu_count_index")
4000 vdu_name = vca_deployed.get("vdu_name")
4001 vnf_index = vca_deployed.get("member-vnf-index")
4002 if terminate_primitives and vca_deployed.get("needed_terminate"):
4003 for seq in terminate_primitives:
4004 # For each sequence in list, get primitive and call _ns_execute_primitive()
4005 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
4006 vnf_index, seq.get("name")
4007 )
4008 self.logger.debug(logging_text + step)
4009 # Create the primitive for each sequence, i.e. "primitive": "touch"
4010 primitive = seq.get("name")
4011 mapped_primitive_params = self._get_terminate_primitive_params(
4012 seq, vnf_index
4013 )
4014
4015 # Add sub-operation
4016 self._add_suboperation(
4017 db_nslcmop,
4018 vnf_index,
4019 vdu_id,
4020 vdu_count_index,
4021 vdu_name,
4022 primitive,
4023 mapped_primitive_params,
4024 )
4025 # Sub-operations: Call _ns_execute_primitive() instead of action()
4026 try:
4027 result, result_detail = await self._ns_execute_primitive(
4028 vca_deployed["ee_id"],
4029 primitive,
4030 mapped_primitive_params,
4031 vca_type=vca_type,
4032 vca_id=vca_id,
4033 )
4034 except LcmException:
4035 # this happens when VCA is not deployed. In this case it is not needed to terminate
4036 continue
4037 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4038 if result not in result_ok:
4039 raise LcmException(
4040 "terminate_primitive {} for vnf_member_index={} fails with "
4041 "error {}".format(seq.get("name"), vnf_index, result_detail)
4042 )
4043 # set that this VCA do not need terminated
4044 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4045 vca_index
4046 )
4047 self.update_db_2(
4048 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4049 )
4050
4051 # Delete Prometheus Jobs if any
4052 # This uses NSR_ID, so it will destroy any jobs under this index
4053 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4054
4055 if destroy_ee:
4056 await self.vca_map[vca_type].delete_execution_environment(
4057 vca_deployed["ee_id"],
4058 scaling_in=scaling_in,
4059 vca_type=vca_type,
4060 vca_id=vca_id,
4061 )
4062
4063 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4064 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4065 namespace = "." + db_nsr["_id"]
4066 try:
4067 await self.n2vc.delete_namespace(
4068 namespace=namespace,
4069 total_timeout=self.timeout_charm_delete,
4070 vca_id=vca_id,
4071 )
4072 except N2VCNotFound: # already deleted. Skip
4073 pass
4074 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4075
4076 async def _terminate_RO(
4077 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4078 ):
4079 """
4080 Terminates a deployment from RO
4081 :param logging_text:
4082 :param nsr_deployed: db_nsr._admin.deployed
4083 :param nsr_id:
4084 :param nslcmop_id:
4085 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4086 this method will update only the index 2, but it will write on database the concatenated content of the list
4087 :return:
4088 """
4089 db_nsr_update = {}
4090 failed_detail = []
4091 ro_nsr_id = ro_delete_action = None
4092 if nsr_deployed and nsr_deployed.get("RO"):
4093 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4094 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4095 try:
4096 if ro_nsr_id:
4097 stage[2] = "Deleting ns from VIM."
4098 db_nsr_update["detailed-status"] = " ".join(stage)
4099 self._write_op_status(nslcmop_id, stage)
4100 self.logger.debug(logging_text + stage[2])
4101 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4102 self._write_op_status(nslcmop_id, stage)
4103 desc = await self.RO.delete("ns", ro_nsr_id)
4104 ro_delete_action = desc["action_id"]
4105 db_nsr_update[
4106 "_admin.deployed.RO.nsr_delete_action_id"
4107 ] = ro_delete_action
4108 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4109 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4110 if ro_delete_action:
4111 # wait until NS is deleted from VIM
4112 stage[2] = "Waiting ns deleted from VIM."
4113 detailed_status_old = None
4114 self.logger.debug(
4115 logging_text
4116 + stage[2]
4117 + " RO_id={} ro_delete_action={}".format(
4118 ro_nsr_id, ro_delete_action
4119 )
4120 )
4121 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4122 self._write_op_status(nslcmop_id, stage)
4123
4124 delete_timeout = 20 * 60 # 20 minutes
4125 while delete_timeout > 0:
4126 desc = await self.RO.show(
4127 "ns",
4128 item_id_name=ro_nsr_id,
4129 extra_item="action",
4130 extra_item_id=ro_delete_action,
4131 )
4132
4133 # deploymentStatus
4134 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4135
4136 ns_status, ns_status_info = self.RO.check_action_status(desc)
4137 if ns_status == "ERROR":
4138 raise ROclient.ROClientException(ns_status_info)
4139 elif ns_status == "BUILD":
4140 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4141 elif ns_status == "ACTIVE":
4142 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4143 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4144 break
4145 else:
4146 assert (
4147 False
4148 ), "ROclient.check_action_status returns unknown {}".format(
4149 ns_status
4150 )
4151 if stage[2] != detailed_status_old:
4152 detailed_status_old = stage[2]
4153 db_nsr_update["detailed-status"] = " ".join(stage)
4154 self._write_op_status(nslcmop_id, stage)
4155 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4156 await asyncio.sleep(5, loop=self.loop)
4157 delete_timeout -= 5
4158 else: # delete_timeout <= 0:
4159 raise ROclient.ROClientException(
4160 "Timeout waiting ns deleted from VIM"
4161 )
4162
4163 except Exception as e:
4164 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4165 if (
4166 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4167 ): # not found
4168 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4169 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4170 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4171 self.logger.debug(
4172 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4173 )
4174 elif (
4175 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4176 ): # conflict
4177 failed_detail.append("delete conflict: {}".format(e))
4178 self.logger.debug(
4179 logging_text
4180 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4181 )
4182 else:
4183 failed_detail.append("delete error: {}".format(e))
4184 self.logger.error(
4185 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4186 )
4187
4188 # Delete nsd
4189 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4190 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4191 try:
4192 stage[2] = "Deleting nsd from RO."
4193 db_nsr_update["detailed-status"] = " ".join(stage)
4194 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4195 self._write_op_status(nslcmop_id, stage)
4196 await self.RO.delete("nsd", ro_nsd_id)
4197 self.logger.debug(
4198 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4199 )
4200 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4201 except Exception as e:
4202 if (
4203 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4204 ): # not found
4205 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4206 self.logger.debug(
4207 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4208 )
4209 elif (
4210 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4211 ): # conflict
4212 failed_detail.append(
4213 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4214 )
4215 self.logger.debug(logging_text + failed_detail[-1])
4216 else:
4217 failed_detail.append(
4218 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4219 )
4220 self.logger.error(logging_text + failed_detail[-1])
4221
4222 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4223 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4224 if not vnf_deployed or not vnf_deployed["id"]:
4225 continue
4226 try:
4227 ro_vnfd_id = vnf_deployed["id"]
4228 stage[
4229 2
4230 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4231 vnf_deployed["member-vnf-index"], ro_vnfd_id
4232 )
4233 db_nsr_update["detailed-status"] = " ".join(stage)
4234 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4235 self._write_op_status(nslcmop_id, stage)
4236 await self.RO.delete("vnfd", ro_vnfd_id)
4237 self.logger.debug(
4238 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4239 )
4240 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4241 except Exception as e:
4242 if (
4243 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4244 ): # not found
4245 db_nsr_update[
4246 "_admin.deployed.RO.vnfd.{}.id".format(index)
4247 ] = None
4248 self.logger.debug(
4249 logging_text
4250 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4251 )
4252 elif (
4253 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4254 ): # conflict
4255 failed_detail.append(
4256 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4257 )
4258 self.logger.debug(logging_text + failed_detail[-1])
4259 else:
4260 failed_detail.append(
4261 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4262 )
4263 self.logger.error(logging_text + failed_detail[-1])
4264
4265 if failed_detail:
4266 stage[2] = "Error deleting from VIM"
4267 else:
4268 stage[2] = "Deleted from VIM"
4269 db_nsr_update["detailed-status"] = " ".join(stage)
4270 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4271 self._write_op_status(nslcmop_id, stage)
4272
4273 if failed_detail:
4274 raise LcmException("; ".join(failed_detail))
4275
4276 async def terminate(self, nsr_id, nslcmop_id):
4277 # Try to lock HA task here
4278 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4279 if not task_is_locked_by_me:
4280 return
4281
4282 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4283 self.logger.debug(logging_text + "Enter")
4284 timeout_ns_terminate = self.timeout_ns_terminate
4285 db_nsr = None
4286 db_nslcmop = None
4287 operation_params = None
4288 exc = None
4289 error_list = [] # annotates all failed error messages
4290 db_nslcmop_update = {}
4291 autoremove = False # autoremove after terminated
4292 tasks_dict_info = {}
4293 db_nsr_update = {}
4294 stage = [
4295 "Stage 1/3: Preparing task.",
4296 "Waiting for previous operations to terminate.",
4297 "",
4298 ]
4299 # ^ contains [stage, step, VIM-status]
4300 try:
4301 # wait for any previous tasks in process
4302 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4303
4304 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4305 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4306 operation_params = db_nslcmop.get("operationParams") or {}
4307 if operation_params.get("timeout_ns_terminate"):
4308 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4309 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4310 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4311
4312 db_nsr_update["operational-status"] = "terminating"
4313 db_nsr_update["config-status"] = "terminating"
4314 self._write_ns_status(
4315 nsr_id=nsr_id,
4316 ns_state="TERMINATING",
4317 current_operation="TERMINATING",
4318 current_operation_id=nslcmop_id,
4319 other_update=db_nsr_update,
4320 )
4321 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4322 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4323 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4324 return
4325
4326 stage[1] = "Getting vnf descriptors from db."
4327 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4328 db_vnfrs_dict = {
4329 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4330 }
4331 db_vnfds_from_id = {}
4332 db_vnfds_from_member_index = {}
4333 # Loop over VNFRs
4334 for vnfr in db_vnfrs_list:
4335 vnfd_id = vnfr["vnfd-id"]
4336 if vnfd_id not in db_vnfds_from_id:
4337 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4338 db_vnfds_from_id[vnfd_id] = vnfd
4339 db_vnfds_from_member_index[
4340 vnfr["member-vnf-index-ref"]
4341 ] = db_vnfds_from_id[vnfd_id]
4342
4343 # Destroy individual execution environments when there are terminating primitives.
4344 # Rest of EE will be deleted at once
4345 # TODO - check before calling _destroy_N2VC
4346 # if not operation_params.get("skip_terminate_primitives"):#
4347 # or not vca.get("needed_terminate"):
4348 stage[0] = "Stage 2/3 execute terminating primitives."
4349 self.logger.debug(logging_text + stage[0])
4350 stage[1] = "Looking execution environment that needs terminate."
4351 self.logger.debug(logging_text + stage[1])
4352
4353 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4354 config_descriptor = None
4355 vca_member_vnf_index = vca.get("member-vnf-index")
4356 vca_id = self.get_vca_id(
4357 db_vnfrs_dict.get(vca_member_vnf_index)
4358 if vca_member_vnf_index
4359 else None,
4360 db_nsr,
4361 )
4362 if not vca or not vca.get("ee_id"):
4363 continue
4364 if not vca.get("member-vnf-index"):
4365 # ns
4366 config_descriptor = db_nsr.get("ns-configuration")
4367 elif vca.get("vdu_id"):
4368 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4369 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4370 elif vca.get("kdu_name"):
4371 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4372 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4373 else:
4374 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4375 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4376 vca_type = vca.get("type")
4377 exec_terminate_primitives = not operation_params.get(
4378 "skip_terminate_primitives"
4379 ) and vca.get("needed_terminate")
4380 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4381 # pending native charms
4382 destroy_ee = (
4383 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4384 )
4385 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4386 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4387 task = asyncio.ensure_future(
4388 self.destroy_N2VC(
4389 logging_text,
4390 db_nslcmop,
4391 vca,
4392 config_descriptor,
4393 vca_index,
4394 destroy_ee,
4395 exec_terminate_primitives,
4396 vca_id=vca_id,
4397 )
4398 )
4399 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4400
4401 # wait for pending tasks of terminate primitives
4402 if tasks_dict_info:
4403 self.logger.debug(
4404 logging_text
4405 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4406 )
4407 error_list = await self._wait_for_tasks(
4408 logging_text,
4409 tasks_dict_info,
4410 min(self.timeout_charm_delete, timeout_ns_terminate),
4411 stage,
4412 nslcmop_id,
4413 )
4414 tasks_dict_info.clear()
4415 if error_list:
4416 return # raise LcmException("; ".join(error_list))
4417
4418 # remove All execution environments at once
4419 stage[0] = "Stage 3/3 delete all."
4420
4421 if nsr_deployed.get("VCA"):
4422 stage[1] = "Deleting all execution environments."
4423 self.logger.debug(logging_text + stage[1])
4424 vca_id = self.get_vca_id({}, db_nsr)
4425 task_delete_ee = asyncio.ensure_future(
4426 asyncio.wait_for(
4427 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4428 timeout=self.timeout_charm_delete,
4429 )
4430 )
4431 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4432 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4433
4434 # Delete from k8scluster
4435 stage[1] = "Deleting KDUs."
4436 self.logger.debug(logging_text + stage[1])
4437 # print(nsr_deployed)
4438 for kdu in get_iterable(nsr_deployed, "K8s"):
4439 if not kdu or not kdu.get("kdu-instance"):
4440 continue
4441 kdu_instance = kdu.get("kdu-instance")
4442 if kdu.get("k8scluster-type") in self.k8scluster_map:
4443 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4444 vca_id = self.get_vca_id({}, db_nsr)
4445 task_delete_kdu_instance = asyncio.ensure_future(
4446 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4447 cluster_uuid=kdu.get("k8scluster-uuid"),
4448 kdu_instance=kdu_instance,
4449 vca_id=vca_id,
4450 )
4451 )
4452 else:
4453 self.logger.error(
4454 logging_text
4455 + "Unknown k8s deployment type {}".format(
4456 kdu.get("k8scluster-type")
4457 )
4458 )
4459 continue
4460 tasks_dict_info[
4461 task_delete_kdu_instance
4462 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4463
4464 # remove from RO
4465 stage[1] = "Deleting ns from VIM."
4466 if self.ng_ro:
4467 task_delete_ro = asyncio.ensure_future(
4468 self._terminate_ng_ro(
4469 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4470 )
4471 )
4472 else:
4473 task_delete_ro = asyncio.ensure_future(
4474 self._terminate_RO(
4475 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4476 )
4477 )
4478 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4479
4480 # rest of staff will be done at finally
4481
4482 except (
4483 ROclient.ROClientException,
4484 DbException,
4485 LcmException,
4486 N2VCException,
4487 ) as e:
4488 self.logger.error(logging_text + "Exit Exception {}".format(e))
4489 exc = e
4490 except asyncio.CancelledError:
4491 self.logger.error(
4492 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4493 )
4494 exc = "Operation was cancelled"
4495 except Exception as e:
4496 exc = traceback.format_exc()
4497 self.logger.critical(
4498 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4499 exc_info=True,
4500 )
4501 finally:
4502 if exc:
4503 error_list.append(str(exc))
4504 try:
4505 # wait for pending tasks
4506 if tasks_dict_info:
4507 stage[1] = "Waiting for terminate pending tasks."
4508 self.logger.debug(logging_text + stage[1])
4509 error_list += await self._wait_for_tasks(
4510 logging_text,
4511 tasks_dict_info,
4512 timeout_ns_terminate,
4513 stage,
4514 nslcmop_id,
4515 )
4516 stage[1] = stage[2] = ""
4517 except asyncio.CancelledError:
4518 error_list.append("Cancelled")
4519 # TODO cancell all tasks
4520 except Exception as exc:
4521 error_list.append(str(exc))
4522 # update status at database
4523 if error_list:
4524 error_detail = "; ".join(error_list)
4525 # self.logger.error(logging_text + error_detail)
4526 error_description_nslcmop = "{} Detail: {}".format(
4527 stage[0], error_detail
4528 )
4529 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4530 nslcmop_id, stage[0]
4531 )
4532
4533 db_nsr_update["operational-status"] = "failed"
4534 db_nsr_update["detailed-status"] = (
4535 error_description_nsr + " Detail: " + error_detail
4536 )
4537 db_nslcmop_update["detailed-status"] = error_detail
4538 nslcmop_operation_state = "FAILED"
4539 ns_state = "BROKEN"
4540 else:
4541 error_detail = None
4542 error_description_nsr = error_description_nslcmop = None
4543 ns_state = "NOT_INSTANTIATED"
4544 db_nsr_update["operational-status"] = "terminated"
4545 db_nsr_update["detailed-status"] = "Done"
4546 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4547 db_nslcmop_update["detailed-status"] = "Done"
4548 nslcmop_operation_state = "COMPLETED"
4549
4550 if db_nsr:
4551 self._write_ns_status(
4552 nsr_id=nsr_id,
4553 ns_state=ns_state,
4554 current_operation="IDLE",
4555 current_operation_id=None,
4556 error_description=error_description_nsr,
4557 error_detail=error_detail,
4558 other_update=db_nsr_update,
4559 )
4560 self._write_op_status(
4561 op_id=nslcmop_id,
4562 stage="",
4563 error_message=error_description_nslcmop,
4564 operation_state=nslcmop_operation_state,
4565 other_update=db_nslcmop_update,
4566 )
4567 if ns_state == "NOT_INSTANTIATED":
4568 try:
4569 self.db.set_list(
4570 "vnfrs",
4571 {"nsr-id-ref": nsr_id},
4572 {"_admin.nsState": "NOT_INSTANTIATED"},
4573 )
4574 except DbException as e:
4575 self.logger.warn(
4576 logging_text
4577 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4578 nsr_id, e
4579 )
4580 )
4581 if operation_params:
4582 autoremove = operation_params.get("autoremove", False)
4583 if nslcmop_operation_state:
4584 try:
4585 await self.msg.aiowrite(
4586 "ns",
4587 "terminated",
4588 {
4589 "nsr_id": nsr_id,
4590 "nslcmop_id": nslcmop_id,
4591 "operationState": nslcmop_operation_state,
4592 "autoremove": autoremove,
4593 },
4594 loop=self.loop,
4595 )
4596 except Exception as e:
4597 self.logger.error(
4598 logging_text + "kafka_write notification Exception {}".format(e)
4599 )
4600
4601 self.logger.debug(logging_text + "Exit")
4602 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4603
4604 async def _wait_for_tasks(
4605 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4606 ):
4607 time_start = time()
4608 error_detail_list = []
4609 error_list = []
4610 pending_tasks = list(created_tasks_info.keys())
4611 num_tasks = len(pending_tasks)
4612 num_done = 0
4613 stage[1] = "{}/{}.".format(num_done, num_tasks)
4614 self._write_op_status(nslcmop_id, stage)
4615 while pending_tasks:
4616 new_error = None
4617 _timeout = timeout + time_start - time()
4618 done, pending_tasks = await asyncio.wait(
4619 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4620 )
4621 num_done += len(done)
4622 if not done: # Timeout
4623 for task in pending_tasks:
4624 new_error = created_tasks_info[task] + ": Timeout"
4625 error_detail_list.append(new_error)
4626 error_list.append(new_error)
4627 break
4628 for task in done:
4629 if task.cancelled():
4630 exc = "Cancelled"
4631 else:
4632 exc = task.exception()
4633 if exc:
4634 if isinstance(exc, asyncio.TimeoutError):
4635 exc = "Timeout"
4636 new_error = created_tasks_info[task] + ": {}".format(exc)
4637 error_list.append(created_tasks_info[task])
4638 error_detail_list.append(new_error)
4639 if isinstance(
4640 exc,
4641 (
4642 str,
4643 DbException,
4644 N2VCException,
4645 ROclient.ROClientException,
4646 LcmException,
4647 K8sException,
4648 NgRoException,
4649 ),
4650 ):
4651 self.logger.error(logging_text + new_error)
4652 else:
4653 exc_traceback = "".join(
4654 traceback.format_exception(None, exc, exc.__traceback__)
4655 )
4656 self.logger.error(
4657 logging_text
4658 + created_tasks_info[task]
4659 + " "
4660 + exc_traceback
4661 )
4662 else:
4663 self.logger.debug(
4664 logging_text + created_tasks_info[task] + ": Done"
4665 )
4666 stage[1] = "{}/{}.".format(num_done, num_tasks)
4667 if new_error:
4668 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4669 if nsr_id: # update also nsr
4670 self.update_db_2(
4671 "nsrs",
4672 nsr_id,
4673 {
4674 "errorDescription": "Error at: " + ", ".join(error_list),
4675 "errorDetail": ". ".join(error_detail_list),
4676 },
4677 )
4678 self._write_op_status(nslcmop_id, stage)
4679 return error_detail_list
4680
4681 @staticmethod
4682 def _map_primitive_params(primitive_desc, params, instantiation_params):
4683 """
4684 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4685 The default-value is used. If it is between < > it look for a value at instantiation_params
4686 :param primitive_desc: portion of VNFD/NSD that describes primitive
4687 :param params: Params provided by user
4688 :param instantiation_params: Instantiation params provided by user
4689 :return: a dictionary with the calculated params
4690 """
4691 calculated_params = {}
4692 for parameter in primitive_desc.get("parameter", ()):
4693 param_name = parameter["name"]
4694 if param_name in params:
4695 calculated_params[param_name] = params[param_name]
4696 elif "default-value" in parameter or "value" in parameter:
4697 if "value" in parameter:
4698 calculated_params[param_name] = parameter["value"]
4699 else:
4700 calculated_params[param_name] = parameter["default-value"]
4701 if (
4702 isinstance(calculated_params[param_name], str)
4703 and calculated_params[param_name].startswith("<")
4704 and calculated_params[param_name].endswith(">")
4705 ):
4706 if calculated_params[param_name][1:-1] in instantiation_params:
4707 calculated_params[param_name] = instantiation_params[
4708 calculated_params[param_name][1:-1]
4709 ]
4710 else:
4711 raise LcmException(
4712 "Parameter {} needed to execute primitive {} not provided".format(
4713 calculated_params[param_name], primitive_desc["name"]
4714 )
4715 )
4716 else:
4717 raise LcmException(
4718 "Parameter {} needed to execute primitive {} not provided".format(
4719 param_name, primitive_desc["name"]
4720 )
4721 )
4722
4723 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4724 calculated_params[param_name] = yaml.safe_dump(
4725 calculated_params[param_name], default_flow_style=True, width=256
4726 )
4727 elif isinstance(calculated_params[param_name], str) and calculated_params[
4728 param_name
4729 ].startswith("!!yaml "):
4730 calculated_params[param_name] = calculated_params[param_name][7:]
4731 if parameter.get("data-type") == "INTEGER":
4732 try:
4733 calculated_params[param_name] = int(calculated_params[param_name])
4734 except ValueError: # error converting string to int
4735 raise LcmException(
4736 "Parameter {} of primitive {} must be integer".format(
4737 param_name, primitive_desc["name"]
4738 )
4739 )
4740 elif parameter.get("data-type") == "BOOLEAN":
4741 calculated_params[param_name] = not (
4742 (str(calculated_params[param_name])).lower() == "false"
4743 )
4744
4745 # add always ns_config_info if primitive name is config
4746 if primitive_desc["name"] == "config":
4747 if "ns_config_info" in instantiation_params:
4748 calculated_params["ns_config_info"] = instantiation_params[
4749 "ns_config_info"
4750 ]
4751 return calculated_params
4752
4753 def _look_for_deployed_vca(
4754 self,
4755 deployed_vca,
4756 member_vnf_index,
4757 vdu_id,
4758 vdu_count_index,
4759 kdu_name=None,
4760 ee_descriptor_id=None,
4761 ):
4762 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4763 for vca in deployed_vca:
4764 if not vca:
4765 continue
4766 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4767 continue
4768 if (
4769 vdu_count_index is not None
4770 and vdu_count_index != vca["vdu_count_index"]
4771 ):
4772 continue
4773 if kdu_name and kdu_name != vca["kdu_name"]:
4774 continue
4775 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4776 continue
4777 break
4778 else:
4779 # vca_deployed not found
4780 raise LcmException(
4781 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4782 " is not deployed".format(
4783 member_vnf_index,
4784 vdu_id,
4785 vdu_count_index,
4786 kdu_name,
4787 ee_descriptor_id,
4788 )
4789 )
4790 # get ee_id
4791 ee_id = vca.get("ee_id")
4792 vca_type = vca.get(
4793 "type", "lxc_proxy_charm"
4794 ) # default value for backward compatibility - proxy charm
4795 if not ee_id:
4796 raise LcmException(
4797 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4798 "execution environment".format(
4799 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4800 )
4801 )
4802 return ee_id, vca_type
4803
4804 async def _ns_execute_primitive(
4805 self,
4806 ee_id,
4807 primitive,
4808 primitive_params,
4809 retries=0,
4810 retries_interval=30,
4811 timeout=None,
4812 vca_type=None,
4813 db_dict=None,
4814 vca_id: str = None,
4815 ) -> (str, str):
4816 try:
4817 if primitive == "config":
4818 primitive_params = {"params": primitive_params}
4819
4820 vca_type = vca_type or "lxc_proxy_charm"
4821
4822 while retries >= 0:
4823 try:
4824 output = await asyncio.wait_for(
4825 self.vca_map[vca_type].exec_primitive(
4826 ee_id=ee_id,
4827 primitive_name=primitive,
4828 params_dict=primitive_params,
4829 progress_timeout=self.timeout_progress_primitive,
4830 total_timeout=self.timeout_primitive,
4831 db_dict=db_dict,
4832 vca_id=vca_id,
4833 vca_type=vca_type,
4834 ),
4835 timeout=timeout or self.timeout_primitive,
4836 )
4837 # execution was OK
4838 break
4839 except asyncio.CancelledError:
4840 raise
4841 except Exception as e: # asyncio.TimeoutError
4842 if isinstance(e, asyncio.TimeoutError):
4843 e = "Timeout"
4844 retries -= 1
4845 if retries >= 0:
4846 self.logger.debug(
4847 "Error executing action {} on {} -> {}".format(
4848 primitive, ee_id, e
4849 )
4850 )
4851 # wait and retry
4852 await asyncio.sleep(retries_interval, loop=self.loop)
4853 else:
4854 return "FAILED", str(e)
4855
4856 return "COMPLETED", output
4857
4858 except (LcmException, asyncio.CancelledError):
4859 raise
4860 except Exception as e:
4861 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4862
4863 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4864 """
4865 Updating the vca_status with latest juju information in nsrs record
4866 :param: nsr_id: Id of the nsr
4867 :param: nslcmop_id: Id of the nslcmop
4868 :return: None
4869 """
4870
4871 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4872 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4873 vca_id = self.get_vca_id({}, db_nsr)
4874 if db_nsr["_admin"]["deployed"]["K8s"]:
4875 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4876 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4877 await self._on_update_k8s_db(
4878 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4879 )
4880 else:
4881 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4882 table, filter = "nsrs", {"_id": nsr_id}
4883 path = "_admin.deployed.VCA.{}.".format(vca_index)
4884 await self._on_update_n2vc_db(table, filter, path, {})
4885
4886 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4887 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4888
4889 async def action(self, nsr_id, nslcmop_id):
4890 # Try to lock HA task here
4891 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4892 if not task_is_locked_by_me:
4893 return
4894
4895 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4896 self.logger.debug(logging_text + "Enter")
4897 # get all needed from database
4898 db_nsr = None
4899 db_nslcmop = None
4900 db_nsr_update = {}
4901 db_nslcmop_update = {}
4902 nslcmop_operation_state = None
4903 error_description_nslcmop = None
4904 exc = None
4905 try:
4906 # wait for any previous tasks in process
4907 step = "Waiting for previous operations to terminate"
4908 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4909
4910 self._write_ns_status(
4911 nsr_id=nsr_id,
4912 ns_state=None,
4913 current_operation="RUNNING ACTION",
4914 current_operation_id=nslcmop_id,
4915 )
4916
4917 step = "Getting information from database"
4918 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4919 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4920 if db_nslcmop["operationParams"].get("primitive_params"):
4921 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4922 db_nslcmop["operationParams"]["primitive_params"]
4923 )
4924
4925 nsr_deployed = db_nsr["_admin"].get("deployed")
4926 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4927 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4928 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4929 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4930 primitive = db_nslcmop["operationParams"]["primitive"]
4931 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4932 timeout_ns_action = db_nslcmop["operationParams"].get(
4933 "timeout_ns_action", self.timeout_primitive
4934 )
4935
4936 if vnf_index:
4937 step = "Getting vnfr from database"
4938 db_vnfr = self.db.get_one(
4939 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4940 )
4941 if db_vnfr.get("kdur"):
4942 kdur_list = []
4943 for kdur in db_vnfr["kdur"]:
4944 if kdur.get("additionalParams"):
4945 kdur["additionalParams"] = json.loads(
4946 kdur["additionalParams"]
4947 )
4948 kdur_list.append(kdur)
4949 db_vnfr["kdur"] = kdur_list
4950 step = "Getting vnfd from database"
4951 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4952
4953 # Sync filesystem before running a primitive
4954 self.fs.sync(db_vnfr["vnfd-id"])
4955 else:
4956 step = "Getting nsd from database"
4957 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4958
4959 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4960 # for backward compatibility
4961 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4962 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4963 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4964 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4965
4966 # look for primitive
4967 config_primitive_desc = descriptor_configuration = None
4968 if vdu_id:
4969 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4970 elif kdu_name:
4971 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4972 elif vnf_index:
4973 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4974 else:
4975 descriptor_configuration = db_nsd.get("ns-configuration")
4976
4977 if descriptor_configuration and descriptor_configuration.get(
4978 "config-primitive"
4979 ):
4980 for config_primitive in descriptor_configuration["config-primitive"]:
4981 if config_primitive["name"] == primitive:
4982 config_primitive_desc = config_primitive
4983 break
4984
4985 if not config_primitive_desc:
4986 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4987 raise LcmException(
4988 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4989 primitive
4990 )
4991 )
4992 primitive_name = primitive
4993 ee_descriptor_id = None
4994 else:
4995 primitive_name = config_primitive_desc.get(
4996 "execution-environment-primitive", primitive
4997 )
4998 ee_descriptor_id = config_primitive_desc.get(
4999 "execution-environment-ref"
5000 )
5001
5002 if vnf_index:
5003 if vdu_id:
5004 vdur = next(
5005 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
5006 )
5007 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
5008 elif kdu_name:
5009 kdur = next(
5010 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
5011 )
5012 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
5013 else:
5014 desc_params = parse_yaml_strings(
5015 db_vnfr.get("additionalParamsForVnf")
5016 )
5017 else:
5018 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
5019 if kdu_name and get_configuration(db_vnfd, kdu_name):
5020 kdu_configuration = get_configuration(db_vnfd, kdu_name)
5021 actions = set()
5022 for primitive in kdu_configuration.get("initial-config-primitive", []):
5023 actions.add(primitive["name"])
5024 for primitive in kdu_configuration.get("config-primitive", []):
5025 actions.add(primitive["name"])
5026 kdu_action = True if primitive_name in actions else False
5027
5028 # TODO check if ns is in a proper status
5029 if kdu_name and (
5030 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5031 ):
5032 # kdur and desc_params already set from before
5033 if primitive_params:
5034 desc_params.update(primitive_params)
5035 # TODO Check if we will need something at vnf level
5036 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5037 if (
5038 kdu_name == kdu["kdu-name"]
5039 and kdu["member-vnf-index"] == vnf_index
5040 ):
5041 break
5042 else:
5043 raise LcmException(
5044 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5045 )
5046
5047 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5048 msg = "unknown k8scluster-type '{}'".format(
5049 kdu.get("k8scluster-type")
5050 )
5051 raise LcmException(msg)
5052
5053 db_dict = {
5054 "collection": "nsrs",
5055 "filter": {"_id": nsr_id},
5056 "path": "_admin.deployed.K8s.{}".format(index),
5057 }
5058 self.logger.debug(
5059 logging_text
5060 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5061 )
5062 step = "Executing kdu {}".format(primitive_name)
5063 if primitive_name == "upgrade":
5064 if desc_params.get("kdu_model"):
5065 kdu_model = desc_params.get("kdu_model")
5066 del desc_params["kdu_model"]
5067 else:
5068 kdu_model = kdu.get("kdu-model")
5069 parts = kdu_model.split(sep=":")
5070 if len(parts) == 2:
5071 kdu_model = parts[0]
5072
5073 detailed_status = await asyncio.wait_for(
5074 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5075 cluster_uuid=kdu.get("k8scluster-uuid"),
5076 kdu_instance=kdu.get("kdu-instance"),
5077 atomic=True,
5078 kdu_model=kdu_model,
5079 params=desc_params,
5080 db_dict=db_dict,
5081 timeout=timeout_ns_action,
5082 ),
5083 timeout=timeout_ns_action + 10,
5084 )
5085 self.logger.debug(
5086 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5087 )
5088 elif primitive_name == "rollback":
5089 detailed_status = await asyncio.wait_for(
5090 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5091 cluster_uuid=kdu.get("k8scluster-uuid"),
5092 kdu_instance=kdu.get("kdu-instance"),
5093 db_dict=db_dict,
5094 ),
5095 timeout=timeout_ns_action,
5096 )
5097 elif primitive_name == "status":
5098 detailed_status = await asyncio.wait_for(
5099 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5100 cluster_uuid=kdu.get("k8scluster-uuid"),
5101 kdu_instance=kdu.get("kdu-instance"),
5102 vca_id=vca_id,
5103 ),
5104 timeout=timeout_ns_action,
5105 )
5106 else:
5107 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5108 kdu["kdu-name"], nsr_id
5109 )
5110 params = self._map_primitive_params(
5111 config_primitive_desc, primitive_params, desc_params
5112 )
5113
5114 detailed_status = await asyncio.wait_for(
5115 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5116 cluster_uuid=kdu.get("k8scluster-uuid"),
5117 kdu_instance=kdu_instance,
5118 primitive_name=primitive_name,
5119 params=params,
5120 db_dict=db_dict,
5121 timeout=timeout_ns_action,
5122 vca_id=vca_id,
5123 ),
5124 timeout=timeout_ns_action,
5125 )
5126
5127 if detailed_status:
5128 nslcmop_operation_state = "COMPLETED"
5129 else:
5130 detailed_status = ""
5131 nslcmop_operation_state = "FAILED"
5132 else:
5133 ee_id, vca_type = self._look_for_deployed_vca(
5134 nsr_deployed["VCA"],
5135 member_vnf_index=vnf_index,
5136 vdu_id=vdu_id,
5137 vdu_count_index=vdu_count_index,
5138 ee_descriptor_id=ee_descriptor_id,
5139 )
5140 for vca_index, vca_deployed in enumerate(
5141 db_nsr["_admin"]["deployed"]["VCA"]
5142 ):
5143 if vca_deployed.get("member-vnf-index") == vnf_index:
5144 db_dict = {
5145 "collection": "nsrs",
5146 "filter": {"_id": nsr_id},
5147 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5148 }
5149 break
5150 (
5151 nslcmop_operation_state,
5152 detailed_status,
5153 ) = await self._ns_execute_primitive(
5154 ee_id,
5155 primitive=primitive_name,
5156 primitive_params=self._map_primitive_params(
5157 config_primitive_desc, primitive_params, desc_params
5158 ),
5159 timeout=timeout_ns_action,
5160 vca_type=vca_type,
5161 db_dict=db_dict,
5162 vca_id=vca_id,
5163 )
5164
5165 db_nslcmop_update["detailed-status"] = detailed_status
5166 error_description_nslcmop = (
5167 detailed_status if nslcmop_operation_state == "FAILED" else ""
5168 )
5169 self.logger.debug(
5170 logging_text
5171 + " task Done with result {} {}".format(
5172 nslcmop_operation_state, detailed_status
5173 )
5174 )
5175 return # database update is called inside finally
5176
5177 except (DbException, LcmException, N2VCException, K8sException) as e:
5178 self.logger.error(logging_text + "Exit Exception {}".format(e))
5179 exc = e
5180 except asyncio.CancelledError:
5181 self.logger.error(
5182 logging_text + "Cancelled Exception while '{}'".format(step)
5183 )
5184 exc = "Operation was cancelled"
5185 except asyncio.TimeoutError:
5186 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5187 exc = "Timeout"
5188 except Exception as e:
5189 exc = traceback.format_exc()
5190 self.logger.critical(
5191 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5192 exc_info=True,
5193 )
5194 finally:
5195 if exc:
5196 db_nslcmop_update[
5197 "detailed-status"
5198 ] = (
5199 detailed_status
5200 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5201 nslcmop_operation_state = "FAILED"
5202 if db_nsr:
5203 self._write_ns_status(
5204 nsr_id=nsr_id,
5205 ns_state=db_nsr[
5206 "nsState"
5207 ], # TODO check if degraded. For the moment use previous status
5208 current_operation="IDLE",
5209 current_operation_id=None,
5210 # error_description=error_description_nsr,
5211 # error_detail=error_detail,
5212 other_update=db_nsr_update,
5213 )
5214
5215 self._write_op_status(
5216 op_id=nslcmop_id,
5217 stage="",
5218 error_message=error_description_nslcmop,
5219 operation_state=nslcmop_operation_state,
5220 other_update=db_nslcmop_update,
5221 )
5222
5223 if nslcmop_operation_state:
5224 try:
5225 await self.msg.aiowrite(
5226 "ns",
5227 "actioned",
5228 {
5229 "nsr_id": nsr_id,
5230 "nslcmop_id": nslcmop_id,
5231 "operationState": nslcmop_operation_state,
5232 },
5233 loop=self.loop,
5234 )
5235 except Exception as e:
5236 self.logger.error(
5237 logging_text + "kafka_write notification Exception {}".format(e)
5238 )
5239 self.logger.debug(logging_text + "Exit")
5240 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5241 return nslcmop_operation_state, detailed_status
5242
5243 async def scale(self, nsr_id, nslcmop_id):
5244 # Try to lock HA task here
5245 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5246 if not task_is_locked_by_me:
5247 return
5248
5249 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5250 stage = ["", "", ""]
5251 tasks_dict_info = {}
5252 # ^ stage, step, VIM progress
5253 self.logger.debug(logging_text + "Enter")
5254 # get all needed from database
5255 db_nsr = None
5256 db_nslcmop_update = {}
5257 db_nsr_update = {}
5258 exc = None
5259 # in case of error, indicates what part of scale was failed to put nsr at error status
5260 scale_process = None
5261 old_operational_status = ""
5262 old_config_status = ""
5263 nsi_id = None
5264 try:
5265 # wait for any previous tasks in process
5266 step = "Waiting for previous operations to terminate"
5267 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5268 self._write_ns_status(
5269 nsr_id=nsr_id,
5270 ns_state=None,
5271 current_operation="SCALING",
5272 current_operation_id=nslcmop_id,
5273 )
5274
5275 step = "Getting nslcmop from database"
5276 self.logger.debug(
5277 step + " after having waited for previous tasks to be completed"
5278 )
5279 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5280
5281 step = "Getting nsr from database"
5282 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5283 old_operational_status = db_nsr["operational-status"]
5284 old_config_status = db_nsr["config-status"]
5285
5286 step = "Parsing scaling parameters"
5287 db_nsr_update["operational-status"] = "scaling"
5288 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5289 nsr_deployed = db_nsr["_admin"].get("deployed")
5290
5291 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5292 "scaleByStepData"
5293 ]["member-vnf-index"]
5294 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5295 "scaleByStepData"
5296 ]["scaling-group-descriptor"]
5297 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5298 # for backward compatibility
5299 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5300 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5301 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5302 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5303
5304 step = "Getting vnfr from database"
5305 db_vnfr = self.db.get_one(
5306 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5307 )
5308
5309 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5310
5311 step = "Getting vnfd from database"
5312 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5313
5314 base_folder = db_vnfd["_admin"]["storage"]
5315
5316 step = "Getting scaling-group-descriptor"
5317 scaling_descriptor = find_in_list(
5318 get_scaling_aspect(db_vnfd),
5319 lambda scale_desc: scale_desc["name"] == scaling_group,
5320 )
5321 if not scaling_descriptor:
5322 raise LcmException(
5323 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5324 "at vnfd:scaling-group-descriptor".format(scaling_group)
5325 )
5326
5327 step = "Sending scale order to VIM"
5328 # TODO check if ns is in a proper status
5329 nb_scale_op = 0
5330 if not db_nsr["_admin"].get("scaling-group"):
5331 self.update_db_2(
5332 "nsrs",
5333 nsr_id,
5334 {
5335 "_admin.scaling-group": [
5336 {"name": scaling_group, "nb-scale-op": 0}
5337 ]
5338 },
5339 )
5340 admin_scale_index = 0
5341 else:
5342 for admin_scale_index, admin_scale_info in enumerate(
5343 db_nsr["_admin"]["scaling-group"]
5344 ):
5345 if admin_scale_info["name"] == scaling_group:
5346 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5347 break
5348 else: # not found, set index one plus last element and add new entry with the name
5349 admin_scale_index += 1
5350 db_nsr_update[
5351 "_admin.scaling-group.{}.name".format(admin_scale_index)
5352 ] = scaling_group
5353
5354 vca_scaling_info = []
5355 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5356 if scaling_type == "SCALE_OUT":
5357 if "aspect-delta-details" not in scaling_descriptor:
5358 raise LcmException(
5359 "Aspect delta details not fount in scaling descriptor {}".format(
5360 scaling_descriptor["name"]
5361 )
5362 )
5363 # count if max-instance-count is reached
5364 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5365
5366 scaling_info["scaling_direction"] = "OUT"
5367 scaling_info["vdu-create"] = {}
5368 scaling_info["kdu-create"] = {}
5369 for delta in deltas:
5370 for vdu_delta in delta.get("vdu-delta", {}):
5371 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5372 # vdu_index also provides the number of instance of the targeted vdu
5373 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5374 cloud_init_text = self._get_vdu_cloud_init_content(
5375 vdud, db_vnfd
5376 )
5377 if cloud_init_text:
5378 additional_params = (
5379 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5380 or {}
5381 )
5382 cloud_init_list = []
5383
5384 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5385 max_instance_count = 10
5386 if vdu_profile and "max-number-of-instances" in vdu_profile:
5387 max_instance_count = vdu_profile.get(
5388 "max-number-of-instances", 10
5389 )
5390
5391 default_instance_num = get_number_of_instances(
5392 db_vnfd, vdud["id"]
5393 )
5394 instances_number = vdu_delta.get("number-of-instances", 1)
5395 nb_scale_op += instances_number
5396
5397 new_instance_count = nb_scale_op + default_instance_num
5398 # Control if new count is over max and vdu count is less than max.
5399 # Then assign new instance count
5400 if new_instance_count > max_instance_count > vdu_count:
5401 instances_number = new_instance_count - max_instance_count
5402 else:
5403 instances_number = instances_number
5404
5405 if new_instance_count > max_instance_count:
5406 raise LcmException(
5407 "reached the limit of {} (max-instance-count) "
5408 "scaling-out operations for the "
5409 "scaling-group-descriptor '{}'".format(
5410 nb_scale_op, scaling_group
5411 )
5412 )
5413 for x in range(vdu_delta.get("number-of-instances", 1)):
5414 if cloud_init_text:
5415 # TODO Information of its own ip is not available because db_vnfr is not updated.
5416 additional_params["OSM"] = get_osm_params(
5417 db_vnfr, vdu_delta["id"], vdu_index + x
5418 )
5419 cloud_init_list.append(
5420 self._parse_cloud_init(
5421 cloud_init_text,
5422 additional_params,
5423 db_vnfd["id"],
5424 vdud["id"],
5425 )
5426 )
5427 vca_scaling_info.append(
5428 {
5429 "osm_vdu_id": vdu_delta["id"],
5430 "member-vnf-index": vnf_index,
5431 "type": "create",
5432 "vdu_index": vdu_index + x,
5433 }
5434 )
5435 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5436 for kdu_delta in delta.get("kdu-resource-delta", {}):
5437 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5438 kdu_name = kdu_profile["kdu-name"]
5439 resource_name = kdu_profile.get("resource-name", "")
5440
5441 # Might have different kdus in the same delta
5442 # Should have list for each kdu
5443 if not scaling_info["kdu-create"].get(kdu_name, None):
5444 scaling_info["kdu-create"][kdu_name] = []
5445
5446 kdur = get_kdur(db_vnfr, kdu_name)
5447 if kdur.get("helm-chart"):
5448 k8s_cluster_type = "helm-chart-v3"
5449 self.logger.debug("kdur: {}".format(kdur))
5450 if (
5451 kdur.get("helm-version")
5452 and kdur.get("helm-version") == "v2"
5453 ):
5454 k8s_cluster_type = "helm-chart"
5455 elif kdur.get("juju-bundle"):
5456 k8s_cluster_type = "juju-bundle"
5457 else:
5458 raise LcmException(
5459 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5460 "juju-bundle. Maybe an old NBI version is running".format(
5461 db_vnfr["member-vnf-index-ref"], kdu_name
5462 )
5463 )
5464
5465 max_instance_count = 10
5466 if kdu_profile and "max-number-of-instances" in kdu_profile:
5467 max_instance_count = kdu_profile.get(
5468 "max-number-of-instances", 10
5469 )
5470
5471 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5472 deployed_kdu, _ = get_deployed_kdu(
5473 nsr_deployed, kdu_name, vnf_index
5474 )
5475 if deployed_kdu is None:
5476 raise LcmException(
5477 "KDU '{}' for vnf '{}' not deployed".format(
5478 kdu_name, vnf_index
5479 )
5480 )
5481 kdu_instance = deployed_kdu.get("kdu-instance")
5482 instance_num = await self.k8scluster_map[
5483 k8s_cluster_type
5484 ].get_scale_count(
5485 resource_name,
5486 kdu_instance,
5487 vca_id=vca_id,
5488 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
5489 kdu_model=deployed_kdu.get("kdu-model"),
5490 )
5491 kdu_replica_count = instance_num + kdu_delta.get(
5492 "number-of-instances", 1
5493 )
5494
5495 # Control if new count is over max and instance_num is less than max.
5496 # Then assign max instance number to kdu replica count
5497 if kdu_replica_count > max_instance_count > instance_num:
5498 kdu_replica_count = max_instance_count
5499 if kdu_replica_count > max_instance_count:
5500 raise LcmException(
5501 "reached the limit of {} (max-instance-count) "
5502 "scaling-out operations for the "
5503 "scaling-group-descriptor '{}'".format(
5504 instance_num, scaling_group
5505 )
5506 )
5507
5508 for x in range(kdu_delta.get("number-of-instances", 1)):
5509 vca_scaling_info.append(
5510 {
5511 "osm_kdu_id": kdu_name,
5512 "member-vnf-index": vnf_index,
5513 "type": "create",
5514 "kdu_index": instance_num + x - 1,
5515 }
5516 )
5517 scaling_info["kdu-create"][kdu_name].append(
5518 {
5519 "member-vnf-index": vnf_index,
5520 "type": "create",
5521 "k8s-cluster-type": k8s_cluster_type,
5522 "resource-name": resource_name,
5523 "scale": kdu_replica_count,
5524 }
5525 )
5526 elif scaling_type == "SCALE_IN":
5527 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5528
5529 scaling_info["scaling_direction"] = "IN"
5530 scaling_info["vdu-delete"] = {}
5531 scaling_info["kdu-delete"] = {}
5532
5533 for delta in deltas:
5534 for vdu_delta in delta.get("vdu-delta", {}):
5535 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5536 min_instance_count = 0
5537 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5538 if vdu_profile and "min-number-of-instances" in vdu_profile:
5539 min_instance_count = vdu_profile["min-number-of-instances"]
5540
5541 default_instance_num = get_number_of_instances(
5542 db_vnfd, vdu_delta["id"]
5543 )
5544 instance_num = vdu_delta.get("number-of-instances", 1)
5545 nb_scale_op -= instance_num
5546
5547 new_instance_count = nb_scale_op + default_instance_num
5548
5549 if new_instance_count < min_instance_count < vdu_count:
5550 instances_number = min_instance_count - new_instance_count
5551 else:
5552 instances_number = instance_num
5553
5554 if new_instance_count < min_instance_count:
5555 raise LcmException(
5556 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5557 "scaling-group-descriptor '{}'".format(
5558 nb_scale_op, scaling_group
5559 )
5560 )
5561 for x in range(vdu_delta.get("number-of-instances", 1)):
5562 vca_scaling_info.append(
5563 {
5564 "osm_vdu_id": vdu_delta["id"],
5565 "member-vnf-index": vnf_index,
5566 "type": "delete",
5567 "vdu_index": vdu_index - 1 - x,
5568 }
5569 )
5570 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5571 for kdu_delta in delta.get("kdu-resource-delta", {}):
5572 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5573 kdu_name = kdu_profile["kdu-name"]
5574 resource_name = kdu_profile.get("resource-name", "")
5575
5576 if not scaling_info["kdu-delete"].get(kdu_name, None):
5577 scaling_info["kdu-delete"][kdu_name] = []
5578
5579 kdur = get_kdur(db_vnfr, kdu_name)
5580 if kdur.get("helm-chart"):
5581 k8s_cluster_type = "helm-chart-v3"
5582 self.logger.debug("kdur: {}".format(kdur))
5583 if (
5584 kdur.get("helm-version")
5585 and kdur.get("helm-version") == "v2"
5586 ):
5587 k8s_cluster_type = "helm-chart"
5588 elif kdur.get("juju-bundle"):
5589 k8s_cluster_type = "juju-bundle"
5590 else:
5591 raise LcmException(
5592 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5593 "juju-bundle. Maybe an old NBI version is running".format(
5594 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5595 )
5596 )
5597
5598 min_instance_count = 0
5599 if kdu_profile and "min-number-of-instances" in kdu_profile:
5600 min_instance_count = kdu_profile["min-number-of-instances"]
5601
5602 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5603 deployed_kdu, _ = get_deployed_kdu(
5604 nsr_deployed, kdu_name, vnf_index
5605 )
5606 if deployed_kdu is None:
5607 raise LcmException(
5608 "KDU '{}' for vnf '{}' not deployed".format(
5609 kdu_name, vnf_index
5610 )
5611 )
5612 kdu_instance = deployed_kdu.get("kdu-instance")
5613 instance_num = await self.k8scluster_map[
5614 k8s_cluster_type
5615 ].get_scale_count(
5616 resource_name,
5617 kdu_instance,
5618 vca_id=vca_id,
5619 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
5620 kdu_model=deployed_kdu.get("kdu-model"),
5621 )
5622 kdu_replica_count = instance_num - kdu_delta.get(
5623 "number-of-instances", 1
5624 )
5625
5626 if kdu_replica_count < min_instance_count < instance_num:
5627 kdu_replica_count = min_instance_count
5628 if kdu_replica_count < min_instance_count:
5629 raise LcmException(
5630 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5631 "scaling-group-descriptor '{}'".format(
5632 instance_num, scaling_group
5633 )
5634 )
5635
5636 for x in range(kdu_delta.get("number-of-instances", 1)):
5637 vca_scaling_info.append(
5638 {
5639 "osm_kdu_id": kdu_name,
5640 "member-vnf-index": vnf_index,
5641 "type": "delete",
5642 "kdu_index": instance_num - x - 1,
5643 }
5644 )
5645 scaling_info["kdu-delete"][kdu_name].append(
5646 {
5647 "member-vnf-index": vnf_index,
5648 "type": "delete",
5649 "k8s-cluster-type": k8s_cluster_type,
5650 "resource-name": resource_name,
5651 "scale": kdu_replica_count,
5652 }
5653 )
5654
5655 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5656 vdu_delete = copy(scaling_info.get("vdu-delete"))
5657 if scaling_info["scaling_direction"] == "IN":
5658 for vdur in reversed(db_vnfr["vdur"]):
5659 if vdu_delete.get(vdur["vdu-id-ref"]):
5660 vdu_delete[vdur["vdu-id-ref"]] -= 1
5661 scaling_info["vdu"].append(
5662 {
5663 "name": vdur.get("name") or vdur.get("vdu-name"),
5664 "vdu_id": vdur["vdu-id-ref"],
5665 "interface": [],
5666 }
5667 )
5668 for interface in vdur["interfaces"]:
5669 scaling_info["vdu"][-1]["interface"].append(
5670 {
5671 "name": interface["name"],
5672 "ip_address": interface["ip-address"],
5673 "mac_address": interface.get("mac-address"),
5674 }
5675 )
5676 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5677
5678 # PRE-SCALE BEGIN
5679 step = "Executing pre-scale vnf-config-primitive"
5680 if scaling_descriptor.get("scaling-config-action"):
5681 for scaling_config_action in scaling_descriptor[
5682 "scaling-config-action"
5683 ]:
5684 if (
5685 scaling_config_action.get("trigger") == "pre-scale-in"
5686 and scaling_type == "SCALE_IN"
5687 ) or (
5688 scaling_config_action.get("trigger") == "pre-scale-out"
5689 and scaling_type == "SCALE_OUT"
5690 ):
5691 vnf_config_primitive = scaling_config_action[
5692 "vnf-config-primitive-name-ref"
5693 ]
5694 step = db_nslcmop_update[
5695 "detailed-status"
5696 ] = "executing pre-scale scaling-config-action '{}'".format(
5697 vnf_config_primitive
5698 )
5699
5700 # look for primitive
5701 for config_primitive in (
5702 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5703 ).get("config-primitive", ()):
5704 if config_primitive["name"] == vnf_config_primitive:
5705 break
5706 else:
5707 raise LcmException(
5708 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5709 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5710 "primitive".format(scaling_group, vnf_config_primitive)
5711 )
5712
5713 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5714 if db_vnfr.get("additionalParamsForVnf"):
5715 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5716
5717 scale_process = "VCA"
5718 db_nsr_update["config-status"] = "configuring pre-scaling"
5719 primitive_params = self._map_primitive_params(
5720 config_primitive, {}, vnfr_params
5721 )
5722
5723 # Pre-scale retry check: Check if this sub-operation has been executed before
5724 op_index = self._check_or_add_scale_suboperation(
5725 db_nslcmop,
5726 vnf_index,
5727 vnf_config_primitive,
5728 primitive_params,
5729 "PRE-SCALE",
5730 )
5731 if op_index == self.SUBOPERATION_STATUS_SKIP:
5732 # Skip sub-operation
5733 result = "COMPLETED"
5734 result_detail = "Done"
5735 self.logger.debug(
5736 logging_text
5737 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5738 vnf_config_primitive, result, result_detail
5739 )
5740 )
5741 else:
5742 if op_index == self.SUBOPERATION_STATUS_NEW:
5743 # New sub-operation: Get index of this sub-operation
5744 op_index = (
5745 len(db_nslcmop.get("_admin", {}).get("operations"))
5746 - 1
5747 )
5748 self.logger.debug(
5749 logging_text
5750 + "vnf_config_primitive={} New sub-operation".format(
5751 vnf_config_primitive
5752 )
5753 )
5754 else:
5755 # retry: Get registered params for this existing sub-operation
5756 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5757 op_index
5758 ]
5759 vnf_index = op.get("member_vnf_index")
5760 vnf_config_primitive = op.get("primitive")
5761 primitive_params = op.get("primitive_params")
5762 self.logger.debug(
5763 logging_text
5764 + "vnf_config_primitive={} Sub-operation retry".format(
5765 vnf_config_primitive
5766 )
5767 )
5768 # Execute the primitive, either with new (first-time) or registered (reintent) args
5769 ee_descriptor_id = config_primitive.get(
5770 "execution-environment-ref"
5771 )
5772 primitive_name = config_primitive.get(
5773 "execution-environment-primitive", vnf_config_primitive
5774 )
5775 ee_id, vca_type = self._look_for_deployed_vca(
5776 nsr_deployed["VCA"],
5777 member_vnf_index=vnf_index,
5778 vdu_id=None,
5779 vdu_count_index=None,
5780 ee_descriptor_id=ee_descriptor_id,
5781 )
5782 result, result_detail = await self._ns_execute_primitive(
5783 ee_id,
5784 primitive_name,
5785 primitive_params,
5786 vca_type=vca_type,
5787 vca_id=vca_id,
5788 )
5789 self.logger.debug(
5790 logging_text
5791 + "vnf_config_primitive={} Done with result {} {}".format(
5792 vnf_config_primitive, result, result_detail
5793 )
5794 )
5795 # Update operationState = COMPLETED | FAILED
5796 self._update_suboperation_status(
5797 db_nslcmop, op_index, result, result_detail
5798 )
5799
5800 if result == "FAILED":
5801 raise LcmException(result_detail)
5802 db_nsr_update["config-status"] = old_config_status
5803 scale_process = None
5804 # PRE-SCALE END
5805
5806 db_nsr_update[
5807 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5808 ] = nb_scale_op
5809 db_nsr_update[
5810 "_admin.scaling-group.{}.time".format(admin_scale_index)
5811 ] = time()
5812
5813 # SCALE-IN VCA - BEGIN
5814 if vca_scaling_info:
5815 step = db_nslcmop_update[
5816 "detailed-status"
5817 ] = "Deleting the execution environments"
5818 scale_process = "VCA"
5819 for vca_info in vca_scaling_info:
5820 if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
5821 member_vnf_index = str(vca_info["member-vnf-index"])
5822 self.logger.debug(
5823 logging_text + "vdu info: {}".format(vca_info)
5824 )
5825 if vca_info.get("osm_vdu_id"):
5826 vdu_id = vca_info["osm_vdu_id"]
5827 vdu_index = int(vca_info["vdu_index"])
5828 stage[
5829 1
5830 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5831 member_vnf_index, vdu_id, vdu_index
5832 )
5833 stage[2] = step = "Scaling in VCA"
5834 self._write_op_status(op_id=nslcmop_id, stage=stage)
5835 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5836 config_update = db_nsr["configurationStatus"]
5837 for vca_index, vca in enumerate(vca_update):
5838 if (
5839 (vca or vca.get("ee_id"))
5840 and vca["member-vnf-index"] == member_vnf_index
5841 and vca["vdu_count_index"] == vdu_index
5842 ):
5843 if vca.get("vdu_id"):
5844 config_descriptor = get_configuration(
5845 db_vnfd, vca.get("vdu_id")
5846 )
5847 elif vca.get("kdu_name"):
5848 config_descriptor = get_configuration(
5849 db_vnfd, vca.get("kdu_name")
5850 )
5851 else:
5852 config_descriptor = get_configuration(
5853 db_vnfd, db_vnfd["id"]
5854 )
5855 operation_params = (
5856 db_nslcmop.get("operationParams") or {}
5857 )
5858 exec_terminate_primitives = not operation_params.get(
5859 "skip_terminate_primitives"
5860 ) and vca.get("needed_terminate")
5861 task = asyncio.ensure_future(
5862 asyncio.wait_for(
5863 self.destroy_N2VC(
5864 logging_text,
5865 db_nslcmop,
5866 vca,
5867 config_descriptor,
5868 vca_index,
5869 destroy_ee=True,
5870 exec_primitives=exec_terminate_primitives,
5871 scaling_in=True,
5872 vca_id=vca_id,
5873 ),
5874 timeout=self.timeout_charm_delete,
5875 )
5876 )
5877 tasks_dict_info[task] = "Terminating VCA {}".format(
5878 vca.get("ee_id")
5879 )
5880 del vca_update[vca_index]
5881 del config_update[vca_index]
5882 # wait for pending tasks of terminate primitives
5883 if tasks_dict_info:
5884 self.logger.debug(
5885 logging_text
5886 + "Waiting for tasks {}".format(
5887 list(tasks_dict_info.keys())
5888 )
5889 )
5890 error_list = await self._wait_for_tasks(
5891 logging_text,
5892 tasks_dict_info,
5893 min(
5894 self.timeout_charm_delete, self.timeout_ns_terminate
5895 ),
5896 stage,
5897 nslcmop_id,
5898 )
5899 tasks_dict_info.clear()
5900 if error_list:
5901 raise LcmException("; ".join(error_list))
5902
5903 db_vca_and_config_update = {
5904 "_admin.deployed.VCA": vca_update,
5905 "configurationStatus": config_update,
5906 }
5907 self.update_db_2(
5908 "nsrs", db_nsr["_id"], db_vca_and_config_update
5909 )
5910 scale_process = None
5911 # SCALE-IN VCA - END
5912
5913 # SCALE RO - BEGIN
5914 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5915 scale_process = "RO"
5916 if self.ro_config.get("ng"):
5917 await self._scale_ng_ro(
5918 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5919 )
5920 scaling_info.pop("vdu-create", None)
5921 scaling_info.pop("vdu-delete", None)
5922
5923 scale_process = None
5924 # SCALE RO - END
5925
5926 # SCALE KDU - BEGIN
5927 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5928 scale_process = "KDU"
5929 await self._scale_kdu(
5930 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5931 )
5932 scaling_info.pop("kdu-create", None)
5933 scaling_info.pop("kdu-delete", None)
5934
5935 scale_process = None
5936 # SCALE KDU - END
5937
5938 if db_nsr_update:
5939 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5940
5941 # SCALE-UP VCA - BEGIN
5942 if vca_scaling_info:
5943 step = db_nslcmop_update[
5944 "detailed-status"
5945 ] = "Creating new execution environments"
5946 scale_process = "VCA"
5947 for vca_info in vca_scaling_info:
5948 if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
5949 member_vnf_index = str(vca_info["member-vnf-index"])
5950 self.logger.debug(
5951 logging_text + "vdu info: {}".format(vca_info)
5952 )
5953 vnfd_id = db_vnfr["vnfd-ref"]
5954 if vca_info.get("osm_vdu_id"):
5955 vdu_index = int(vca_info["vdu_index"])
5956 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5957 if db_vnfr.get("additionalParamsForVnf"):
5958 deploy_params.update(
5959 parse_yaml_strings(
5960 db_vnfr["additionalParamsForVnf"].copy()
5961 )
5962 )
5963 descriptor_config = get_configuration(
5964 db_vnfd, db_vnfd["id"]
5965 )
5966 if descriptor_config:
5967 vdu_id = None
5968 vdu_name = None
5969 kdu_name = None
5970 self._deploy_n2vc(
5971 logging_text=logging_text
5972 + "member_vnf_index={} ".format(member_vnf_index),
5973 db_nsr=db_nsr,
5974 db_vnfr=db_vnfr,
5975 nslcmop_id=nslcmop_id,
5976 nsr_id=nsr_id,
5977 nsi_id=nsi_id,
5978 vnfd_id=vnfd_id,
5979 vdu_id=vdu_id,
5980 kdu_name=kdu_name,
5981 member_vnf_index=member_vnf_index,
5982 vdu_index=vdu_index,
5983 vdu_name=vdu_name,
5984 deploy_params=deploy_params,
5985 descriptor_config=descriptor_config,
5986 base_folder=base_folder,
5987 task_instantiation_info=tasks_dict_info,
5988 stage=stage,
5989 )
5990 vdu_id = vca_info["osm_vdu_id"]
5991 vdur = find_in_list(
5992 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5993 )
5994 descriptor_config = get_configuration(db_vnfd, vdu_id)
5995 if vdur.get("additionalParams"):
5996 deploy_params_vdu = parse_yaml_strings(
5997 vdur["additionalParams"]
5998 )
5999 else:
6000 deploy_params_vdu = deploy_params
6001 deploy_params_vdu["OSM"] = get_osm_params(
6002 db_vnfr, vdu_id, vdu_count_index=vdu_index
6003 )
6004 if descriptor_config:
6005 vdu_name = None
6006 kdu_name = None
6007 stage[
6008 1
6009 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6010 member_vnf_index, vdu_id, vdu_index
6011 )
6012 stage[2] = step = "Scaling out VCA"
6013 self._write_op_status(op_id=nslcmop_id, stage=stage)
6014 self._deploy_n2vc(
6015 logging_text=logging_text
6016 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6017 member_vnf_index, vdu_id, vdu_index
6018 ),
6019 db_nsr=db_nsr,
6020 db_vnfr=db_vnfr,
6021 nslcmop_id=nslcmop_id,
6022 nsr_id=nsr_id,
6023 nsi_id=nsi_id,
6024 vnfd_id=vnfd_id,
6025 vdu_id=vdu_id,
6026 kdu_name=kdu_name,
6027 member_vnf_index=member_vnf_index,
6028 vdu_index=vdu_index,
6029 vdu_name=vdu_name,
6030 deploy_params=deploy_params_vdu,
6031 descriptor_config=descriptor_config,
6032 base_folder=base_folder,
6033 task_instantiation_info=tasks_dict_info,
6034 stage=stage,
6035 )
6036 # SCALE-UP VCA - END
6037 scale_process = None
6038
6039 # POST-SCALE BEGIN
6040 # execute primitive service POST-SCALING
6041 step = "Executing post-scale vnf-config-primitive"
6042 if scaling_descriptor.get("scaling-config-action"):
6043 for scaling_config_action in scaling_descriptor[
6044 "scaling-config-action"
6045 ]:
6046 if (
6047 scaling_config_action.get("trigger") == "post-scale-in"
6048 and scaling_type == "SCALE_IN"
6049 ) or (
6050 scaling_config_action.get("trigger") == "post-scale-out"
6051 and scaling_type == "SCALE_OUT"
6052 ):
6053 vnf_config_primitive = scaling_config_action[
6054 "vnf-config-primitive-name-ref"
6055 ]
6056 step = db_nslcmop_update[
6057 "detailed-status"
6058 ] = "executing post-scale scaling-config-action '{}'".format(
6059 vnf_config_primitive
6060 )
6061
6062 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6063 if db_vnfr.get("additionalParamsForVnf"):
6064 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6065
6066 # look for primitive
6067 for config_primitive in (
6068 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6069 ).get("config-primitive", ()):
6070 if config_primitive["name"] == vnf_config_primitive:
6071 break
6072 else:
6073 raise LcmException(
6074 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6075 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6076 "config-primitive".format(
6077 scaling_group, vnf_config_primitive
6078 )
6079 )
6080 scale_process = "VCA"
6081 db_nsr_update["config-status"] = "configuring post-scaling"
6082 primitive_params = self._map_primitive_params(
6083 config_primitive, {}, vnfr_params
6084 )
6085
6086 # Post-scale retry check: Check if this sub-operation has been executed before
6087 op_index = self._check_or_add_scale_suboperation(
6088 db_nslcmop,
6089 vnf_index,
6090 vnf_config_primitive,
6091 primitive_params,
6092 "POST-SCALE",
6093 )
6094 if op_index == self.SUBOPERATION_STATUS_SKIP:
6095 # Skip sub-operation
6096 result = "COMPLETED"
6097 result_detail = "Done"
6098 self.logger.debug(
6099 logging_text
6100 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6101 vnf_config_primitive, result, result_detail
6102 )
6103 )
6104 else:
6105 if op_index == self.SUBOPERATION_STATUS_NEW:
6106 # New sub-operation: Get index of this sub-operation
6107 op_index = (
6108 len(db_nslcmop.get("_admin", {}).get("operations"))
6109 - 1
6110 )
6111 self.logger.debug(
6112 logging_text
6113 + "vnf_config_primitive={} New sub-operation".format(
6114 vnf_config_primitive
6115 )
6116 )
6117 else:
6118 # retry: Get registered params for this existing sub-operation
6119 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6120 op_index
6121 ]
6122 vnf_index = op.get("member_vnf_index")
6123 vnf_config_primitive = op.get("primitive")
6124 primitive_params = op.get("primitive_params")
6125 self.logger.debug(
6126 logging_text
6127 + "vnf_config_primitive={} Sub-operation retry".format(
6128 vnf_config_primitive
6129 )
6130 )
6131 # Execute the primitive, either with new (first-time) or registered (reintent) args
6132 ee_descriptor_id = config_primitive.get(
6133 "execution-environment-ref"
6134 )
6135 primitive_name = config_primitive.get(
6136 "execution-environment-primitive", vnf_config_primitive
6137 )
6138 ee_id, vca_type = self._look_for_deployed_vca(
6139 nsr_deployed["VCA"],
6140 member_vnf_index=vnf_index,
6141 vdu_id=None,
6142 vdu_count_index=None,
6143 ee_descriptor_id=ee_descriptor_id,
6144 )
6145 result, result_detail = await self._ns_execute_primitive(
6146 ee_id,
6147 primitive_name,
6148 primitive_params,
6149 vca_type=vca_type,
6150 vca_id=vca_id,
6151 )
6152 self.logger.debug(
6153 logging_text
6154 + "vnf_config_primitive={} Done with result {} {}".format(
6155 vnf_config_primitive, result, result_detail
6156 )
6157 )
6158 # Update operationState = COMPLETED | FAILED
6159 self._update_suboperation_status(
6160 db_nslcmop, op_index, result, result_detail
6161 )
6162
6163 if result == "FAILED":
6164 raise LcmException(result_detail)
6165 db_nsr_update["config-status"] = old_config_status
6166 scale_process = None
6167 # POST-SCALE END
6168
6169 db_nsr_update[
6170 "detailed-status"
6171 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6172 db_nsr_update["operational-status"] = (
6173 "running"
6174 if old_operational_status == "failed"
6175 else old_operational_status
6176 )
6177 db_nsr_update["config-status"] = old_config_status
6178 return
6179 except (
6180 ROclient.ROClientException,
6181 DbException,
6182 LcmException,
6183 NgRoException,
6184 ) as e:
6185 self.logger.error(logging_text + "Exit Exception {}".format(e))
6186 exc = e
6187 except asyncio.CancelledError:
6188 self.logger.error(
6189 logging_text + "Cancelled Exception while '{}'".format(step)
6190 )
6191 exc = "Operation was cancelled"
6192 except Exception as e:
6193 exc = traceback.format_exc()
6194 self.logger.critical(
6195 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6196 exc_info=True,
6197 )
6198 finally:
6199 self._write_ns_status(
6200 nsr_id=nsr_id,
6201 ns_state=None,
6202 current_operation="IDLE",
6203 current_operation_id=None,
6204 )
6205 if tasks_dict_info:
6206 stage[1] = "Waiting for instantiate pending tasks."
6207 self.logger.debug(logging_text + stage[1])
6208 exc = await self._wait_for_tasks(
6209 logging_text,
6210 tasks_dict_info,
6211 self.timeout_ns_deploy,
6212 stage,
6213 nslcmop_id,
6214 nsr_id=nsr_id,
6215 )
6216 if exc:
6217 db_nslcmop_update[
6218 "detailed-status"
6219 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6220 nslcmop_operation_state = "FAILED"
6221 if db_nsr:
6222 db_nsr_update["operational-status"] = old_operational_status
6223 db_nsr_update["config-status"] = old_config_status
6224 db_nsr_update["detailed-status"] = ""
6225 if scale_process:
6226 if "VCA" in scale_process:
6227 db_nsr_update["config-status"] = "failed"
6228 if "RO" in scale_process:
6229 db_nsr_update["operational-status"] = "failed"
6230 db_nsr_update[
6231 "detailed-status"
6232 ] = "FAILED scaling nslcmop={} {}: {}".format(
6233 nslcmop_id, step, exc
6234 )
6235 else:
6236 error_description_nslcmop = None
6237 nslcmop_operation_state = "COMPLETED"
6238 db_nslcmop_update["detailed-status"] = "Done"
6239
6240 self._write_op_status(
6241 op_id=nslcmop_id,
6242 stage="",
6243 error_message=error_description_nslcmop,
6244 operation_state=nslcmop_operation_state,
6245 other_update=db_nslcmop_update,
6246 )
6247 if db_nsr:
6248 self._write_ns_status(
6249 nsr_id=nsr_id,
6250 ns_state=None,
6251 current_operation="IDLE",
6252 current_operation_id=None,
6253 other_update=db_nsr_update,
6254 )
6255
6256 if nslcmop_operation_state:
6257 try:
6258 msg = {
6259 "nsr_id": nsr_id,
6260 "nslcmop_id": nslcmop_id,
6261 "operationState": nslcmop_operation_state,
6262 }
6263 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6264 except Exception as e:
6265 self.logger.error(
6266 logging_text + "kafka_write notification Exception {}".format(e)
6267 )
6268 self.logger.debug(logging_text + "Exit")
6269 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6270
6271 async def _scale_kdu(
6272 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6273 ):
6274 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6275 for kdu_name in _scaling_info:
6276 for kdu_scaling_info in _scaling_info[kdu_name]:
6277 deployed_kdu, index = get_deployed_kdu(
6278 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6279 )
6280 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6281 kdu_instance = deployed_kdu["kdu-instance"]
6282 kdu_model = deployed_kdu.get("kdu-model")
6283 scale = int(kdu_scaling_info["scale"])
6284 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6285
6286 db_dict = {
6287 "collection": "nsrs",
6288 "filter": {"_id": nsr_id},
6289 "path": "_admin.deployed.K8s.{}".format(index),
6290 }
6291
6292 step = "scaling application {}".format(
6293 kdu_scaling_info["resource-name"]
6294 )
6295 self.logger.debug(logging_text + step)
6296
6297 if kdu_scaling_info["type"] == "delete":
6298 kdu_config = get_configuration(db_vnfd, kdu_name)
6299 if (
6300 kdu_config
6301 and kdu_config.get("terminate-config-primitive")
6302 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6303 ):
6304 terminate_config_primitive_list = kdu_config.get(
6305 "terminate-config-primitive"
6306 )
6307 terminate_config_primitive_list.sort(
6308 key=lambda val: int(val["seq"])
6309 )
6310
6311 for (
6312 terminate_config_primitive
6313 ) in terminate_config_primitive_list:
6314 primitive_params_ = self._map_primitive_params(
6315 terminate_config_primitive, {}, {}
6316 )
6317 step = "execute terminate config primitive"
6318 self.logger.debug(logging_text + step)
6319 await asyncio.wait_for(
6320 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6321 cluster_uuid=cluster_uuid,
6322 kdu_instance=kdu_instance,
6323 primitive_name=terminate_config_primitive["name"],
6324 params=primitive_params_,
6325 db_dict=db_dict,
6326 vca_id=vca_id,
6327 ),
6328 timeout=600,
6329 )
6330
6331 await asyncio.wait_for(
6332 self.k8scluster_map[k8s_cluster_type].scale(
6333 kdu_instance,
6334 scale,
6335 kdu_scaling_info["resource-name"],
6336 vca_id=vca_id,
6337 cluster_uuid=cluster_uuid,
6338 kdu_model=kdu_model,
6339 atomic=True,
6340 db_dict=db_dict,
6341 ),
6342 timeout=self.timeout_vca_on_error,
6343 )
6344
6345 if kdu_scaling_info["type"] == "create":
6346 kdu_config = get_configuration(db_vnfd, kdu_name)
6347 if (
6348 kdu_config
6349 and kdu_config.get("initial-config-primitive")
6350 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6351 ):
6352 initial_config_primitive_list = kdu_config.get(
6353 "initial-config-primitive"
6354 )
6355 initial_config_primitive_list.sort(
6356 key=lambda val: int(val["seq"])
6357 )
6358
6359 for initial_config_primitive in initial_config_primitive_list:
6360 primitive_params_ = self._map_primitive_params(
6361 initial_config_primitive, {}, {}
6362 )
6363 step = "execute initial config primitive"
6364 self.logger.debug(logging_text + step)
6365 await asyncio.wait_for(
6366 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6367 cluster_uuid=cluster_uuid,
6368 kdu_instance=kdu_instance,
6369 primitive_name=initial_config_primitive["name"],
6370 params=primitive_params_,
6371 db_dict=db_dict,
6372 vca_id=vca_id,
6373 ),
6374 timeout=600,
6375 )
6376
6377 async def _scale_ng_ro(
6378 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6379 ):
6380 nsr_id = db_nslcmop["nsInstanceId"]
6381 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6382 db_vnfrs = {}
6383
6384 # read from db: vnfd's for every vnf
6385 db_vnfds = []
6386
6387 # for each vnf in ns, read vnfd
6388 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6389 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6390 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6391 # if we haven't this vnfd, read it from db
6392 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6393 # read from db
6394 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6395 db_vnfds.append(vnfd)
6396 n2vc_key = self.n2vc.get_public_key()
6397 n2vc_key_list = [n2vc_key]
6398 self.scale_vnfr(
6399 db_vnfr,
6400 vdu_scaling_info.get("vdu-create"),
6401 vdu_scaling_info.get("vdu-delete"),
6402 mark_delete=True,
6403 )
6404 # db_vnfr has been updated, update db_vnfrs to use it
6405 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6406 await self._instantiate_ng_ro(
6407 logging_text,
6408 nsr_id,
6409 db_nsd,
6410 db_nsr,
6411 db_nslcmop,
6412 db_vnfrs,
6413 db_vnfds,
6414 n2vc_key_list,
6415 stage=stage,
6416 start_deploy=time(),
6417 timeout_ns_deploy=self.timeout_ns_deploy,
6418 )
6419 if vdu_scaling_info.get("vdu-delete"):
6420 self.scale_vnfr(
6421 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6422 )
6423
6424 async def extract_prometheus_scrape_jobs(
6425 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6426 ):
6427 # look if exist a file called 'prometheus*.j2' and
6428 artifact_content = self.fs.dir_ls(artifact_path)
6429 job_file = next(
6430 (
6431 f
6432 for f in artifact_content
6433 if f.startswith("prometheus") and f.endswith(".j2")
6434 ),
6435 None,
6436 )
6437 if not job_file:
6438 return
6439 with self.fs.file_open((artifact_path, job_file), "r") as f:
6440 job_data = f.read()
6441
6442 # TODO get_service
6443 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6444 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6445 host_port = "80"
6446 vnfr_id = vnfr_id.replace("-", "")
6447 variables = {
6448 "JOB_NAME": vnfr_id,
6449 "TARGET_IP": target_ip,
6450 "EXPORTER_POD_IP": host_name,
6451 "EXPORTER_POD_PORT": host_port,
6452 }
6453 job_list = parse_job(job_data, variables)
6454 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6455 for job in job_list:
6456 if (
6457 not isinstance(job.get("job_name"), str)
6458 or vnfr_id not in job["job_name"]
6459 ):
6460 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6461 job["nsr_id"] = nsr_id
6462 job["vnfr_id"] = vnfr_id
6463 return job_list
6464
6465 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6466 """
6467 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6468
6469 :param: vim_account_id: VIM Account ID
6470
6471 :return: (cloud_name, cloud_credential)
6472 """
6473 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6474 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6475
6476 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6477 """
6478 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6479
6480 :param: vim_account_id: VIM Account ID
6481
6482 :return: (cloud_name, cloud_credential)
6483 """
6484 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6485 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")