Feature 10906: Support for Anti-Affinity groups
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99 from osm_lcm.prometheus import parse_job
100
101 from copy import copy, deepcopy
102 from time import time
103 from uuid import uuid4
104
105 from random import randint
106
107 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
108
109
110 class NsLcm(LcmBase):
111 timeout_vca_on_error = (
112 5 * 60
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete = 10 * 60
117 timeout_primitive = 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive = (
119 10 * 60
120 ) # timeout for some progress in a primitive execution
121
122 SUBOPERATION_STATUS_NOT_FOUND = -1
123 SUBOPERATION_STATUS_NEW = -2
124 SUBOPERATION_STATUS_SKIP = -3
125 task_name_deploy_vca = "Deploying VCA"
126
127 def __init__(self, msg, lcm_tasks, config, loop):
128 """
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
131 :return: None
132 """
133 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
134
135 self.db = Database().instance.db
136 self.fs = Filesystem().instance.fs
137 self.loop = loop
138 self.lcm_tasks = lcm_tasks
139 self.timeout = config["timeout"]
140 self.ro_config = config["ro_config"]
141 self.ng_ro = config["ro_config"].get("ng")
142 self.vca_config = config["VCA"].copy()
143
144 # create N2VC connector
145 self.n2vc = N2VCJujuConnector(
146 log=self.logger,
147 loop=self.loop,
148 on_update_db=self._on_update_n2vc_db,
149 fs=self.fs,
150 db=self.db,
151 )
152
153 self.conn_helm_ee = LCMHelmConn(
154 log=self.logger,
155 loop=self.loop,
156 vca_config=self.vca_config,
157 on_update_db=self._on_update_n2vc_db,
158 )
159
160 self.k8sclusterhelm2 = K8sHelmConnector(
161 kubectl_command=self.vca_config.get("kubectlpath"),
162 helm_command=self.vca_config.get("helmpath"),
163 log=self.logger,
164 on_update_db=None,
165 fs=self.fs,
166 db=self.db,
167 )
168
169 self.k8sclusterhelm3 = K8sHelm3Connector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helm3path"),
172 fs=self.fs,
173 log=self.logger,
174 db=self.db,
175 on_update_db=None,
176 )
177
178 self.k8sclusterjuju = K8sJujuConnector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 juju_command=self.vca_config.get("jujupath"),
181 log=self.logger,
182 loop=self.loop,
183 on_update_db=self._on_update_k8s_db,
184 fs=self.fs,
185 db=self.db,
186 )
187
188 self.k8scluster_map = {
189 "helm-chart": self.k8sclusterhelm2,
190 "helm-chart-v3": self.k8sclusterhelm3,
191 "chart": self.k8sclusterhelm3,
192 "juju-bundle": self.k8sclusterjuju,
193 "juju": self.k8sclusterjuju,
194 }
195
196 self.vca_map = {
197 "lxc_proxy_charm": self.n2vc,
198 "native_charm": self.n2vc,
199 "k8s_proxy_charm": self.n2vc,
200 "helm": self.conn_helm_ee,
201 "helm-v3": self.conn_helm_ee,
202 }
203
204 # create RO client
205 self.RO = NgRoClient(self.loop, **self.ro_config)
206
207 @staticmethod
208 def increment_ip_mac(ip_mac, vm_index=1):
209 if not isinstance(ip_mac, str):
210 return ip_mac
211 try:
212 # try with ipv4 look for last dot
213 i = ip_mac.rfind(".")
214 if i > 0:
215 i += 1
216 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i = ip_mac.rfind(":")
219 if i > 0:
220 i += 1
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
223 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
224 )
225 except Exception:
226 pass
227 return None
228
229 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
230
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
232
233 try:
234 # TODO filter RO descriptor fields...
235
236 # write to database
237 db_dict = dict()
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict["deploymentStatus"] = ro_descriptor
240 self.update_db_2("nsrs", nsrs_id, db_dict)
241
242 except Exception as e:
243 self.logger.warn(
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
245 )
246
247 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
248
249 # remove last dot from path (if exists)
250 if path.endswith("."):
251 path = path[:-1]
252
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
255 try:
256
257 nsr_id = filter.get("_id")
258
259 # read ns record from database
260 nsr = self.db.get_one(table="nsrs", q_filter=filter)
261 current_ns_status = nsr.get("nsState")
262
263 # get vca status for NS
264 status_dict = await self.n2vc.get_status(
265 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
266 )
267
268 # vcaStatus
269 db_dict = dict()
270 db_dict["vcaStatus"] = status_dict
271 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
272
273 # update configurationStatus for this VCA
274 try:
275 vca_index = int(path[path.rfind(".") + 1 :])
276
277 vca_list = deep_get(
278 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
279 )
280 vca_status = vca_list[vca_index].get("status")
281
282 configuration_status_list = nsr.get("configurationStatus")
283 config_status = configuration_status_list[vca_index].get("status")
284
285 if config_status == "BROKEN" and vca_status != "failed":
286 db_dict["configurationStatus"][vca_index] = "READY"
287 elif config_status != "BROKEN" and vca_status == "failed":
288 db_dict["configurationStatus"][vca_index] = "BROKEN"
289 except Exception as e:
290 # not update configurationStatus
291 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
292
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
295 is_degraded = False
296 if current_ns_status in ("READY", "DEGRADED"):
297 error_description = ""
298 # check machines
299 if status_dict.get("machines"):
300 for machine_id in status_dict.get("machines"):
301 machine = status_dict.get("machines").get(machine_id)
302 # check machine agent-status
303 if machine.get("agent-status"):
304 s = machine.get("agent-status").get("status")
305 if s != "started":
306 is_degraded = True
307 error_description += (
308 "machine {} agent-status={} ; ".format(
309 machine_id, s
310 )
311 )
312 # check machine instance status
313 if machine.get("instance-status"):
314 s = machine.get("instance-status").get("status")
315 if s != "running":
316 is_degraded = True
317 error_description += (
318 "machine {} instance-status={} ; ".format(
319 machine_id, s
320 )
321 )
322 # check applications
323 if status_dict.get("applications"):
324 for app_id in status_dict.get("applications"):
325 app = status_dict.get("applications").get(app_id)
326 # check application status
327 if app.get("status"):
328 s = app.get("status").get("status")
329 if s != "active":
330 is_degraded = True
331 error_description += (
332 "application {} status={} ; ".format(app_id, s)
333 )
334
335 if error_description:
336 db_dict["errorDescription"] = error_description
337 if current_ns_status == "READY" and is_degraded:
338 db_dict["nsState"] = "DEGRADED"
339 if current_ns_status == "DEGRADED" and not is_degraded:
340 db_dict["nsState"] = "READY"
341
342 # write to database
343 self.update_db_2("nsrs", nsr_id, db_dict)
344
345 except (asyncio.CancelledError, asyncio.TimeoutError):
346 raise
347 except Exception as e:
348 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
349
350 async def _on_update_k8s_db(
351 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
352 ):
353 """
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :return: none
359 """
360
361 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
362 # .format(cluster_uuid, kdu_instance, filter))
363
364 try:
365 nsr_id = filter.get("_id")
366
367 # get vca status for NS
368 vca_status = await self.k8sclusterjuju.status_kdu(
369 cluster_uuid,
370 kdu_instance,
371 complete_status=True,
372 yaml_format=False,
373 vca_id=vca_id,
374 )
375 # vcaStatus
376 db_dict = dict()
377 db_dict["vcaStatus"] = {nsr_id: vca_status}
378
379 await self.k8sclusterjuju.update_vca_status(
380 db_dict["vcaStatus"],
381 kdu_instance,
382 vca_id=vca_id,
383 )
384
385 # write to database
386 self.update_db_2("nsrs", nsr_id, db_dict)
387
388 except (asyncio.CancelledError, asyncio.TimeoutError):
389 raise
390 except Exception as e:
391 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
392
393 @staticmethod
394 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
395 try:
396 env = Environment(undefined=StrictUndefined)
397 template = env.from_string(cloud_init_text)
398 return template.render(additional_params or {})
399 except UndefinedError as e:
400 raise LcmException(
401 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
402 "file, must be provided in the instantiation parameters inside the "
403 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
404 )
405 except (TemplateError, TemplateNotFound) as e:
406 raise LcmException(
407 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
408 vnfd_id, vdu_id, e
409 )
410 )
411
412 def _get_vdu_cloud_init_content(self, vdu, vnfd):
413 cloud_init_content = cloud_init_file = None
414 try:
415 if vdu.get("cloud-init-file"):
416 base_folder = vnfd["_admin"]["storage"]
417 if base_folder["pkg-dir"]:
418 cloud_init_file = "{}/{}/cloud_init/{}".format(
419 base_folder["folder"],
420 base_folder["pkg-dir"],
421 vdu["cloud-init-file"],
422 )
423 else:
424 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
425 base_folder["folder"],
426 vdu["cloud-init-file"],
427 )
428 with self.fs.file_open(cloud_init_file, "r") as ci_file:
429 cloud_init_content = ci_file.read()
430 elif vdu.get("cloud-init"):
431 cloud_init_content = vdu["cloud-init"]
432
433 return cloud_init_content
434 except FsException as e:
435 raise LcmException(
436 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
437 vnfd["id"], vdu["id"], cloud_init_file, e
438 )
439 )
440
441 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
442 vdur = next(
443 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
444 {}
445 )
446 additional_params = vdur.get("additionalParams")
447 return parse_yaml_strings(additional_params)
448
449 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
450 """
451 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
452 :param vnfd: input vnfd
453 :param new_id: overrides vnf id if provided
454 :param additionalParams: Instantiation params for VNFs provided
455 :param nsrId: Id of the NSR
456 :return: copy of vnfd
457 """
458 vnfd_RO = deepcopy(vnfd)
459 # remove unused by RO configuration, monitoring, scaling and internal keys
460 vnfd_RO.pop("_id", None)
461 vnfd_RO.pop("_admin", None)
462 vnfd_RO.pop("monitoring-param", None)
463 vnfd_RO.pop("scaling-group-descriptor", None)
464 vnfd_RO.pop("kdu", None)
465 vnfd_RO.pop("k8s-cluster", None)
466 if new_id:
467 vnfd_RO["id"] = new_id
468
469 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
470 for vdu in get_iterable(vnfd_RO, "vdu"):
471 vdu.pop("cloud-init-file", None)
472 vdu.pop("cloud-init", None)
473 return vnfd_RO
474
475 @staticmethod
476 def ip_profile_2_RO(ip_profile):
477 RO_ip_profile = deepcopy(ip_profile)
478 if "dns-server" in RO_ip_profile:
479 if isinstance(RO_ip_profile["dns-server"], list):
480 RO_ip_profile["dns-address"] = []
481 for ds in RO_ip_profile.pop("dns-server"):
482 RO_ip_profile["dns-address"].append(ds["address"])
483 else:
484 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
485 if RO_ip_profile.get("ip-version") == "ipv4":
486 RO_ip_profile["ip-version"] = "IPv4"
487 if RO_ip_profile.get("ip-version") == "ipv6":
488 RO_ip_profile["ip-version"] = "IPv6"
489 if "dhcp-params" in RO_ip_profile:
490 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
491 return RO_ip_profile
492
493 def _get_ro_vim_id_for_vim_account(self, vim_account):
494 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
495 if db_vim["_admin"]["operationalState"] != "ENABLED":
496 raise LcmException(
497 "VIM={} is not available. operationalState={}".format(
498 vim_account, db_vim["_admin"]["operationalState"]
499 )
500 )
501 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
502 return RO_vim_id
503
504 def get_ro_wim_id_for_wim_account(self, wim_account):
505 if isinstance(wim_account, str):
506 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
507 if db_wim["_admin"]["operationalState"] != "ENABLED":
508 raise LcmException(
509 "WIM={} is not available. operationalState={}".format(
510 wim_account, db_wim["_admin"]["operationalState"]
511 )
512 )
513 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
514 return RO_wim_id
515 else:
516 return wim_account
517
518 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
519
520 db_vdu_push_list = []
521 template_vdur = []
522 db_update = {"_admin.modified": time()}
523 if vdu_create:
524 for vdu_id, vdu_count in vdu_create.items():
525 vdur = next(
526 (
527 vdur
528 for vdur in reversed(db_vnfr["vdur"])
529 if vdur["vdu-id-ref"] == vdu_id
530 ),
531 None,
532 )
533 if not vdur:
534 # Read the template saved in the db:
535 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
536 vdur_template = db_vnfr.get("vdur-template")
537 if not vdur_template:
538 raise LcmException(
539 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
540 vdu_id
541 )
542 )
543 vdur = vdur_template[0]
544 #Delete a template from the database after using it
545 self.db.set_one("vnfrs",
546 {"_id": db_vnfr["_id"]},
547 None,
548 pull={"vdur-template": {"_id": vdur['_id']}}
549 )
550 for count in range(vdu_count):
551 vdur_copy = deepcopy(vdur)
552 vdur_copy["status"] = "BUILD"
553 vdur_copy["status-detailed"] = None
554 vdur_copy["ip-address"] = None
555 vdur_copy["_id"] = str(uuid4())
556 vdur_copy["count-index"] += count + 1
557 vdur_copy["id"] = "{}-{}".format(
558 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
559 )
560 vdur_copy.pop("vim_info", None)
561 for iface in vdur_copy["interfaces"]:
562 if iface.get("fixed-ip"):
563 iface["ip-address"] = self.increment_ip_mac(
564 iface["ip-address"], count + 1
565 )
566 else:
567 iface.pop("ip-address", None)
568 if iface.get("fixed-mac"):
569 iface["mac-address"] = self.increment_ip_mac(
570 iface["mac-address"], count + 1
571 )
572 else:
573 iface.pop("mac-address", None)
574 if db_vnfr["vdur"]:
575 iface.pop(
576 "mgmt_vnf", None
577 ) # only first vdu can be managment of vnf
578 db_vdu_push_list.append(vdur_copy)
579 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
580 if vdu_delete:
581 if len(db_vnfr["vdur"]) == 1:
582 # The scale will move to 0 instances
583 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
584 template_vdur = [db_vnfr["vdur"][0]]
585 for vdu_id, vdu_count in vdu_delete.items():
586 if mark_delete:
587 indexes_to_delete = [
588 iv[0]
589 for iv in enumerate(db_vnfr["vdur"])
590 if iv[1]["vdu-id-ref"] == vdu_id
591 ]
592 db_update.update(
593 {
594 "vdur.{}.status".format(i): "DELETING"
595 for i in indexes_to_delete[-vdu_count:]
596 }
597 )
598 else:
599 # it must be deleted one by one because common.db does not allow otherwise
600 vdus_to_delete = [
601 v
602 for v in reversed(db_vnfr["vdur"])
603 if v["vdu-id-ref"] == vdu_id
604 ]
605 for vdu in vdus_to_delete[:vdu_count]:
606 self.db.set_one(
607 "vnfrs",
608 {"_id": db_vnfr["_id"]},
609 None,
610 pull={"vdur": {"_id": vdu["_id"]}},
611 )
612 db_push = {}
613 if db_vdu_push_list:
614 db_push["vdur"] = db_vdu_push_list
615 if template_vdur:
616 db_push["vdur-template"] = template_vdur
617 if not db_push:
618 db_push = None
619 db_vnfr["vdur-template"] = template_vdur
620 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
621 # modify passed dictionary db_vnfr
622 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
623 db_vnfr["vdur"] = db_vnfr_["vdur"]
624
625 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
626 """
627 Updates database nsr with the RO info for the created vld
628 :param ns_update_nsr: dictionary to be filled with the updated info
629 :param db_nsr: content of db_nsr. This is also modified
630 :param nsr_desc_RO: nsr descriptor from RO
631 :return: Nothing, LcmException is raised on errors
632 """
633
634 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
635 for net_RO in get_iterable(nsr_desc_RO, "nets"):
636 if vld["id"] != net_RO.get("ns_net_osm_id"):
637 continue
638 vld["vim-id"] = net_RO.get("vim_net_id")
639 vld["name"] = net_RO.get("vim_name")
640 vld["status"] = net_RO.get("status")
641 vld["status-detailed"] = net_RO.get("error_msg")
642 ns_update_nsr["vld.{}".format(vld_index)] = vld
643 break
644 else:
645 raise LcmException(
646 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
647 )
648
649 def set_vnfr_at_error(self, db_vnfrs, error_text):
650 try:
651 for db_vnfr in db_vnfrs.values():
652 vnfr_update = {"status": "ERROR"}
653 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
654 if "status" not in vdur:
655 vdur["status"] = "ERROR"
656 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
657 if error_text:
658 vdur["status-detailed"] = str(error_text)
659 vnfr_update[
660 "vdur.{}.status-detailed".format(vdu_index)
661 ] = "ERROR"
662 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
663 except DbException as e:
664 self.logger.error("Cannot update vnf. {}".format(e))
665
666 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
667 """
668 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
669 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
670 :param nsr_desc_RO: nsr descriptor from RO
671 :return: Nothing, LcmException is raised on errors
672 """
673 for vnf_index, db_vnfr in db_vnfrs.items():
674 for vnf_RO in nsr_desc_RO["vnfs"]:
675 if vnf_RO["member_vnf_index"] != vnf_index:
676 continue
677 vnfr_update = {}
678 if vnf_RO.get("ip_address"):
679 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
680 "ip_address"
681 ].split(";")[0]
682 elif not db_vnfr.get("ip-address"):
683 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
684 raise LcmExceptionNoMgmtIP(
685 "ns member_vnf_index '{}' has no IP address".format(
686 vnf_index
687 )
688 )
689
690 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
691 vdur_RO_count_index = 0
692 if vdur.get("pdu-type"):
693 continue
694 for vdur_RO in get_iterable(vnf_RO, "vms"):
695 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
696 continue
697 if vdur["count-index"] != vdur_RO_count_index:
698 vdur_RO_count_index += 1
699 continue
700 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
701 if vdur_RO.get("ip_address"):
702 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
703 else:
704 vdur["ip-address"] = None
705 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
706 vdur["name"] = vdur_RO.get("vim_name")
707 vdur["status"] = vdur_RO.get("status")
708 vdur["status-detailed"] = vdur_RO.get("error_msg")
709 for ifacer in get_iterable(vdur, "interfaces"):
710 for interface_RO in get_iterable(vdur_RO, "interfaces"):
711 if ifacer["name"] == interface_RO.get("internal_name"):
712 ifacer["ip-address"] = interface_RO.get(
713 "ip_address"
714 )
715 ifacer["mac-address"] = interface_RO.get(
716 "mac_address"
717 )
718 break
719 else:
720 raise LcmException(
721 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
722 "from VIM info".format(
723 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
724 )
725 )
726 vnfr_update["vdur.{}".format(vdu_index)] = vdur
727 break
728 else:
729 raise LcmException(
730 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
731 "VIM info".format(
732 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
733 )
734 )
735
736 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
737 for net_RO in get_iterable(nsr_desc_RO, "nets"):
738 if vld["id"] != net_RO.get("vnf_net_osm_id"):
739 continue
740 vld["vim-id"] = net_RO.get("vim_net_id")
741 vld["name"] = net_RO.get("vim_name")
742 vld["status"] = net_RO.get("status")
743 vld["status-detailed"] = net_RO.get("error_msg")
744 vnfr_update["vld.{}".format(vld_index)] = vld
745 break
746 else:
747 raise LcmException(
748 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
749 vnf_index, vld["id"]
750 )
751 )
752
753 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
754 break
755
756 else:
757 raise LcmException(
758 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
759 vnf_index
760 )
761 )
762
763 def _get_ns_config_info(self, nsr_id):
764 """
765 Generates a mapping between vnf,vdu elements and the N2VC id
766 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
767 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
768 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
769 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
770 """
771 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
772 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
773 mapping = {}
774 ns_config_info = {"osm-config-mapping": mapping}
775 for vca in vca_deployed_list:
776 if not vca["member-vnf-index"]:
777 continue
778 if not vca["vdu_id"]:
779 mapping[vca["member-vnf-index"]] = vca["application"]
780 else:
781 mapping[
782 "{}.{}.{}".format(
783 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
784 )
785 ] = vca["application"]
786 return ns_config_info
787
788 async def _instantiate_ng_ro(
789 self,
790 logging_text,
791 nsr_id,
792 nsd,
793 db_nsr,
794 db_nslcmop,
795 db_vnfrs,
796 db_vnfds,
797 n2vc_key_list,
798 stage,
799 start_deploy,
800 timeout_ns_deploy,
801 ):
802
803 db_vims = {}
804
805 def get_vim_account(vim_account_id):
806 nonlocal db_vims
807 if vim_account_id in db_vims:
808 return db_vims[vim_account_id]
809 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
810 db_vims[vim_account_id] = db_vim
811 return db_vim
812
813 # modify target_vld info with instantiation parameters
814 def parse_vld_instantiation_params(
815 target_vim, target_vld, vld_params, target_sdn
816 ):
817 if vld_params.get("ip-profile"):
818 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
819 "ip-profile"
820 ]
821 if vld_params.get("provider-network"):
822 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
823 "provider-network"
824 ]
825 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
826 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
827 "provider-network"
828 ]["sdn-ports"]
829 if vld_params.get("wimAccountId"):
830 target_wim = "wim:{}".format(vld_params["wimAccountId"])
831 target_vld["vim_info"][target_wim] = {}
832 for param in ("vim-network-name", "vim-network-id"):
833 if vld_params.get(param):
834 if isinstance(vld_params[param], dict):
835 for vim, vim_net in vld_params[param].items():
836 other_target_vim = "vim:" + vim
837 populate_dict(
838 target_vld["vim_info"],
839 (other_target_vim, param.replace("-", "_")),
840 vim_net,
841 )
842 else: # isinstance str
843 target_vld["vim_info"][target_vim][
844 param.replace("-", "_")
845 ] = vld_params[param]
846 if vld_params.get("common_id"):
847 target_vld["common_id"] = vld_params.get("common_id")
848
849 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
850 def update_ns_vld_target(target, ns_params):
851 for vnf_params in ns_params.get("vnf", ()):
852 if vnf_params.get("vimAccountId"):
853 target_vnf = next(
854 (
855 vnfr
856 for vnfr in db_vnfrs.values()
857 if vnf_params["member-vnf-index"]
858 == vnfr["member-vnf-index-ref"]
859 ),
860 None,
861 )
862 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
863 for a_index, a_vld in enumerate(target["ns"]["vld"]):
864 target_vld = find_in_list(
865 get_iterable(vdur, "interfaces"),
866 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
867 )
868 if target_vld:
869 if vnf_params.get("vimAccountId") not in a_vld.get(
870 "vim_info", {}
871 ):
872 target["ns"]["vld"][a_index].get("vim_info").update(
873 {
874 "vim:{}".format(vnf_params["vimAccountId"]): {
875 "vim_network_name": ""
876 }
877 }
878 )
879
880 nslcmop_id = db_nslcmop["_id"]
881 target = {
882 "name": db_nsr["name"],
883 "ns": {"vld": []},
884 "vnf": [],
885 "image": deepcopy(db_nsr["image"]),
886 "flavor": deepcopy(db_nsr["flavor"]),
887 "action_id": nslcmop_id,
888 "cloud_init_content": {},
889 }
890 for image in target["image"]:
891 image["vim_info"] = {}
892 for flavor in target["flavor"]:
893 flavor["vim_info"] = {}
894 if db_nsr.get("affinity-or-anti-affinity-group"):
895 target["affinity-or-anti-affinity-group"] = deepcopy(db_nsr["affinity-or-anti-affinity-group"])
896 for affinity_or_anti_affinity_group in target["affinity-or-anti-affinity-group"]:
897 affinity_or_anti_affinity_group["vim_info"] = {}
898
899 if db_nslcmop.get("lcmOperationType") != "instantiate":
900 # get parameters of instantiation:
901 db_nslcmop_instantiate = self.db.get_list(
902 "nslcmops",
903 {
904 "nsInstanceId": db_nslcmop["nsInstanceId"],
905 "lcmOperationType": "instantiate",
906 },
907 )[-1]
908 ns_params = db_nslcmop_instantiate.get("operationParams")
909 else:
910 ns_params = db_nslcmop.get("operationParams")
911 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
912 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
913
914 cp2target = {}
915 for vld_index, vld in enumerate(db_nsr.get("vld")):
916 target_vim = "vim:{}".format(ns_params["vimAccountId"])
917 target_vld = {
918 "id": vld["id"],
919 "name": vld["name"],
920 "mgmt-network": vld.get("mgmt-network", False),
921 "type": vld.get("type"),
922 "vim_info": {
923 target_vim: {
924 "vim_network_name": vld.get("vim-network-name"),
925 "vim_account_id": ns_params["vimAccountId"],
926 }
927 },
928 }
929 # check if this network needs SDN assist
930 if vld.get("pci-interfaces"):
931 db_vim = get_vim_account(ns_params["vimAccountId"])
932 sdnc_id = db_vim["config"].get("sdn-controller")
933 if sdnc_id:
934 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
935 target_sdn = "sdn:{}".format(sdnc_id)
936 target_vld["vim_info"][target_sdn] = {
937 "sdn": True,
938 "target_vim": target_vim,
939 "vlds": [sdn_vld],
940 "type": vld.get("type"),
941 }
942
943 nsd_vnf_profiles = get_vnf_profiles(nsd)
944 for nsd_vnf_profile in nsd_vnf_profiles:
945 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
946 if cp["virtual-link-profile-id"] == vld["id"]:
947 cp2target[
948 "member_vnf:{}.{}".format(
949 cp["constituent-cpd-id"][0][
950 "constituent-base-element-id"
951 ],
952 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
953 )
954 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
955
956 # check at nsd descriptor, if there is an ip-profile
957 vld_params = {}
958 nsd_vlp = find_in_list(
959 get_virtual_link_profiles(nsd),
960 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
961 == vld["id"],
962 )
963 if (
964 nsd_vlp
965 and nsd_vlp.get("virtual-link-protocol-data")
966 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
967 ):
968 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
969 "l3-protocol-data"
970 ]
971 ip_profile_dest_data = {}
972 if "ip-version" in ip_profile_source_data:
973 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
974 "ip-version"
975 ]
976 if "cidr" in ip_profile_source_data:
977 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
978 "cidr"
979 ]
980 if "gateway-ip" in ip_profile_source_data:
981 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
982 "gateway-ip"
983 ]
984 if "dhcp-enabled" in ip_profile_source_data:
985 ip_profile_dest_data["dhcp-params"] = {
986 "enabled": ip_profile_source_data["dhcp-enabled"]
987 }
988 vld_params["ip-profile"] = ip_profile_dest_data
989
990 # update vld_params with instantiation params
991 vld_instantiation_params = find_in_list(
992 get_iterable(ns_params, "vld"),
993 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
994 )
995 if vld_instantiation_params:
996 vld_params.update(vld_instantiation_params)
997 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
998 target["ns"]["vld"].append(target_vld)
999 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1000 update_ns_vld_target(target, ns_params)
1001
1002 for vnfr in db_vnfrs.values():
1003 vnfd = find_in_list(
1004 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1005 )
1006 vnf_params = find_in_list(
1007 get_iterable(ns_params, "vnf"),
1008 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1009 )
1010 target_vnf = deepcopy(vnfr)
1011 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1012 for vld in target_vnf.get("vld", ()):
1013 # check if connected to a ns.vld, to fill target'
1014 vnf_cp = find_in_list(
1015 vnfd.get("int-virtual-link-desc", ()),
1016 lambda cpd: cpd.get("id") == vld["id"],
1017 )
1018 if vnf_cp:
1019 ns_cp = "member_vnf:{}.{}".format(
1020 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1021 )
1022 if cp2target.get(ns_cp):
1023 vld["target"] = cp2target[ns_cp]
1024
1025 vld["vim_info"] = {
1026 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1027 }
1028 # check if this network needs SDN assist
1029 target_sdn = None
1030 if vld.get("pci-interfaces"):
1031 db_vim = get_vim_account(vnfr["vim-account-id"])
1032 sdnc_id = db_vim["config"].get("sdn-controller")
1033 if sdnc_id:
1034 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1035 target_sdn = "sdn:{}".format(sdnc_id)
1036 vld["vim_info"][target_sdn] = {
1037 "sdn": True,
1038 "target_vim": target_vim,
1039 "vlds": [sdn_vld],
1040 "type": vld.get("type"),
1041 }
1042
1043 # check at vnfd descriptor, if there is an ip-profile
1044 vld_params = {}
1045 vnfd_vlp = find_in_list(
1046 get_virtual_link_profiles(vnfd),
1047 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1048 )
1049 if (
1050 vnfd_vlp
1051 and vnfd_vlp.get("virtual-link-protocol-data")
1052 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1053 ):
1054 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1055 "l3-protocol-data"
1056 ]
1057 ip_profile_dest_data = {}
1058 if "ip-version" in ip_profile_source_data:
1059 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1060 "ip-version"
1061 ]
1062 if "cidr" in ip_profile_source_data:
1063 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1064 "cidr"
1065 ]
1066 if "gateway-ip" in ip_profile_source_data:
1067 ip_profile_dest_data[
1068 "gateway-address"
1069 ] = ip_profile_source_data["gateway-ip"]
1070 if "dhcp-enabled" in ip_profile_source_data:
1071 ip_profile_dest_data["dhcp-params"] = {
1072 "enabled": ip_profile_source_data["dhcp-enabled"]
1073 }
1074
1075 vld_params["ip-profile"] = ip_profile_dest_data
1076 # update vld_params with instantiation params
1077 if vnf_params:
1078 vld_instantiation_params = find_in_list(
1079 get_iterable(vnf_params, "internal-vld"),
1080 lambda i_vld: i_vld["name"] == vld["id"],
1081 )
1082 if vld_instantiation_params:
1083 vld_params.update(vld_instantiation_params)
1084 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1085
1086 vdur_list = []
1087 for vdur in target_vnf.get("vdur", ()):
1088 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1089 continue # This vdu must not be created
1090 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1091
1092 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1093
1094 if ssh_keys_all:
1095 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1096 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1097 if (
1098 vdu_configuration
1099 and vdu_configuration.get("config-access")
1100 and vdu_configuration.get("config-access").get("ssh-access")
1101 ):
1102 vdur["ssh-keys"] = ssh_keys_all
1103 vdur["ssh-access-required"] = vdu_configuration[
1104 "config-access"
1105 ]["ssh-access"]["required"]
1106 elif (
1107 vnf_configuration
1108 and vnf_configuration.get("config-access")
1109 and vnf_configuration.get("config-access").get("ssh-access")
1110 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1111 ):
1112 vdur["ssh-keys"] = ssh_keys_all
1113 vdur["ssh-access-required"] = vnf_configuration[
1114 "config-access"
1115 ]["ssh-access"]["required"]
1116 elif ssh_keys_instantiation and find_in_list(
1117 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1118 ):
1119 vdur["ssh-keys"] = ssh_keys_instantiation
1120
1121 self.logger.debug("NS > vdur > {}".format(vdur))
1122
1123 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1124 # cloud-init
1125 if vdud.get("cloud-init-file"):
1126 vdur["cloud-init"] = "{}:file:{}".format(
1127 vnfd["_id"], vdud.get("cloud-init-file")
1128 )
1129 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1130 if vdur["cloud-init"] not in target["cloud_init_content"]:
1131 base_folder = vnfd["_admin"]["storage"]
1132 if base_folder["pkg-dir"]:
1133 cloud_init_file = "{}/{}/cloud_init/{}".format(
1134 base_folder["folder"],
1135 base_folder["pkg-dir"],
1136 vdud.get("cloud-init-file"),
1137 )
1138 else:
1139 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1140 base_folder["folder"],
1141 vdud.get("cloud-init-file"),
1142 )
1143 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1144 target["cloud_init_content"][
1145 vdur["cloud-init"]
1146 ] = ci_file.read()
1147 elif vdud.get("cloud-init"):
1148 vdur["cloud-init"] = "{}:vdu:{}".format(
1149 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1150 )
1151 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1152 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1153 "cloud-init"
1154 ]
1155 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1156 deploy_params_vdu = self._format_additional_params(
1157 vdur.get("additionalParams") or {}
1158 )
1159 deploy_params_vdu["OSM"] = get_osm_params(
1160 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1161 )
1162 vdur["additionalParams"] = deploy_params_vdu
1163
1164 # flavor
1165 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1166 if target_vim not in ns_flavor["vim_info"]:
1167 ns_flavor["vim_info"][target_vim] = {}
1168
1169 # deal with images
1170 # in case alternative images are provided we must check if they should be applied
1171 # for the vim_type, modify the vim_type taking into account
1172 ns_image_id = int(vdur["ns-image-id"])
1173 if vdur.get("alt-image-ids"):
1174 db_vim = get_vim_account(vnfr["vim-account-id"])
1175 vim_type = db_vim["vim_type"]
1176 for alt_image_id in vdur.get("alt-image-ids"):
1177 ns_alt_image = target["image"][int(alt_image_id)]
1178 if vim_type == ns_alt_image.get("vim-type"):
1179 # must use alternative image
1180 self.logger.debug(
1181 "use alternative image id: {}".format(alt_image_id)
1182 )
1183 ns_image_id = alt_image_id
1184 vdur["ns-image-id"] = ns_image_id
1185 break
1186 ns_image = target["image"][int(ns_image_id)]
1187 if target_vim not in ns_image["vim_info"]:
1188 ns_image["vim_info"][target_vim] = {}
1189
1190 # Affinity groups
1191 if vdur.get("affinity-or-anti-affinity-group-id"):
1192 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1193 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1194 if target_vim not in ns_ags["vim_info"]:
1195 ns_ags["vim_info"][target_vim] = {}
1196
1197 vdur["vim_info"] = {target_vim: {}}
1198 # instantiation parameters
1199 # if vnf_params:
1200 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1201 # vdud["id"]), None)
1202 vdur_list.append(vdur)
1203 target_vnf["vdur"] = vdur_list
1204 target["vnf"].append(target_vnf)
1205
1206 desc = await self.RO.deploy(nsr_id, target)
1207 self.logger.debug("RO return > {}".format(desc))
1208 action_id = desc["action_id"]
1209 await self._wait_ng_ro(
1210 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1211 )
1212
1213 # Updating NSR
1214 db_nsr_update = {
1215 "_admin.deployed.RO.operational-status": "running",
1216 "detailed-status": " ".join(stage),
1217 }
1218 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1219 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1220 self._write_op_status(nslcmop_id, stage)
1221 self.logger.debug(
1222 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1223 )
1224 return
1225
1226 async def _wait_ng_ro(
1227 self,
1228 nsr_id,
1229 action_id,
1230 nslcmop_id=None,
1231 start_time=None,
1232 timeout=600,
1233 stage=None,
1234 ):
1235 detailed_status_old = None
1236 db_nsr_update = {}
1237 start_time = start_time or time()
1238 while time() <= start_time + timeout:
1239 desc_status = await self.RO.status(nsr_id, action_id)
1240 self.logger.debug("Wait NG RO > {}".format(desc_status))
1241 if desc_status["status"] == "FAILED":
1242 raise NgRoException(desc_status["details"])
1243 elif desc_status["status"] == "BUILD":
1244 if stage:
1245 stage[2] = "VIM: ({})".format(desc_status["details"])
1246 elif desc_status["status"] == "DONE":
1247 if stage:
1248 stage[2] = "Deployed at VIM"
1249 break
1250 else:
1251 assert False, "ROclient.check_ns_status returns unknown {}".format(
1252 desc_status["status"]
1253 )
1254 if stage and nslcmop_id and stage[2] != detailed_status_old:
1255 detailed_status_old = stage[2]
1256 db_nsr_update["detailed-status"] = " ".join(stage)
1257 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1258 self._write_op_status(nslcmop_id, stage)
1259 await asyncio.sleep(15, loop=self.loop)
1260 else: # timeout_ns_deploy
1261 raise NgRoException("Timeout waiting ns to deploy")
1262
1263 async def _terminate_ng_ro(
1264 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1265 ):
1266 db_nsr_update = {}
1267 failed_detail = []
1268 action_id = None
1269 start_deploy = time()
1270 try:
1271 target = {
1272 "ns": {"vld": []},
1273 "vnf": [],
1274 "image": [],
1275 "flavor": [],
1276 "action_id": nslcmop_id,
1277 }
1278 desc = await self.RO.deploy(nsr_id, target)
1279 action_id = desc["action_id"]
1280 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1281 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1282 self.logger.debug(
1283 logging_text
1284 + "ns terminate action at RO. action_id={}".format(action_id)
1285 )
1286
1287 # wait until done
1288 delete_timeout = 20 * 60 # 20 minutes
1289 await self._wait_ng_ro(
1290 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1291 )
1292
1293 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1294 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1295 # delete all nsr
1296 await self.RO.delete(nsr_id)
1297 except Exception as e:
1298 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1299 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1300 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1301 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1302 self.logger.debug(
1303 logging_text + "RO_action_id={} already deleted".format(action_id)
1304 )
1305 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1306 failed_detail.append("delete conflict: {}".format(e))
1307 self.logger.debug(
1308 logging_text
1309 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1310 )
1311 else:
1312 failed_detail.append("delete error: {}".format(e))
1313 self.logger.error(
1314 logging_text
1315 + "RO_action_id={} delete error: {}".format(action_id, e)
1316 )
1317
1318 if failed_detail:
1319 stage[2] = "Error deleting from VIM"
1320 else:
1321 stage[2] = "Deleted from VIM"
1322 db_nsr_update["detailed-status"] = " ".join(stage)
1323 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1324 self._write_op_status(nslcmop_id, stage)
1325
1326 if failed_detail:
1327 raise LcmException("; ".join(failed_detail))
1328 return
1329
1330 async def instantiate_RO(
1331 self,
1332 logging_text,
1333 nsr_id,
1334 nsd,
1335 db_nsr,
1336 db_nslcmop,
1337 db_vnfrs,
1338 db_vnfds,
1339 n2vc_key_list,
1340 stage,
1341 ):
1342 """
1343 Instantiate at RO
1344 :param logging_text: preffix text to use at logging
1345 :param nsr_id: nsr identity
1346 :param nsd: database content of ns descriptor
1347 :param db_nsr: database content of ns record
1348 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1349 :param db_vnfrs:
1350 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1351 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1352 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1353 :return: None or exception
1354 """
1355 try:
1356 start_deploy = time()
1357 ns_params = db_nslcmop.get("operationParams")
1358 if ns_params and ns_params.get("timeout_ns_deploy"):
1359 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1360 else:
1361 timeout_ns_deploy = self.timeout.get(
1362 "ns_deploy", self.timeout_ns_deploy
1363 )
1364
1365 # Check for and optionally request placement optimization. Database will be updated if placement activated
1366 stage[2] = "Waiting for Placement."
1367 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1368 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1369 for vnfr in db_vnfrs.values():
1370 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1371 break
1372 else:
1373 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1374
1375 return await self._instantiate_ng_ro(
1376 logging_text,
1377 nsr_id,
1378 nsd,
1379 db_nsr,
1380 db_nslcmop,
1381 db_vnfrs,
1382 db_vnfds,
1383 n2vc_key_list,
1384 stage,
1385 start_deploy,
1386 timeout_ns_deploy,
1387 )
1388 except Exception as e:
1389 stage[2] = "ERROR deploying at VIM"
1390 self.set_vnfr_at_error(db_vnfrs, str(e))
1391 self.logger.error(
1392 "Error deploying at VIM {}".format(e),
1393 exc_info=not isinstance(
1394 e,
1395 (
1396 ROclient.ROClientException,
1397 LcmException,
1398 DbException,
1399 NgRoException,
1400 ),
1401 ),
1402 )
1403 raise
1404
1405 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1406 """
1407 Wait for kdu to be up, get ip address
1408 :param logging_text: prefix use for logging
1409 :param nsr_id:
1410 :param vnfr_id:
1411 :param kdu_name:
1412 :return: IP address
1413 """
1414
1415 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1416 nb_tries = 0
1417
1418 while nb_tries < 360:
1419 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1420 kdur = next(
1421 (
1422 x
1423 for x in get_iterable(db_vnfr, "kdur")
1424 if x.get("kdu-name") == kdu_name
1425 ),
1426 None,
1427 )
1428 if not kdur:
1429 raise LcmException(
1430 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1431 )
1432 if kdur.get("status"):
1433 if kdur["status"] in ("READY", "ENABLED"):
1434 return kdur.get("ip-address")
1435 else:
1436 raise LcmException(
1437 "target KDU={} is in error state".format(kdu_name)
1438 )
1439
1440 await asyncio.sleep(10, loop=self.loop)
1441 nb_tries += 1
1442 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1443
1444 async def wait_vm_up_insert_key_ro(
1445 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1446 ):
1447 """
1448 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1449 :param logging_text: prefix use for logging
1450 :param nsr_id:
1451 :param vnfr_id:
1452 :param vdu_id:
1453 :param vdu_index:
1454 :param pub_key: public ssh key to inject, None to skip
1455 :param user: user to apply the public ssh key
1456 :return: IP address
1457 """
1458
1459 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1460 ro_nsr_id = None
1461 ip_address = None
1462 nb_tries = 0
1463 target_vdu_id = None
1464 ro_retries = 0
1465
1466 while True:
1467
1468 ro_retries += 1
1469 if ro_retries >= 360: # 1 hour
1470 raise LcmException(
1471 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1472 )
1473
1474 await asyncio.sleep(10, loop=self.loop)
1475
1476 # get ip address
1477 if not target_vdu_id:
1478 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1479
1480 if not vdu_id: # for the VNF case
1481 if db_vnfr.get("status") == "ERROR":
1482 raise LcmException(
1483 "Cannot inject ssh-key because target VNF is in error state"
1484 )
1485 ip_address = db_vnfr.get("ip-address")
1486 if not ip_address:
1487 continue
1488 vdur = next(
1489 (
1490 x
1491 for x in get_iterable(db_vnfr, "vdur")
1492 if x.get("ip-address") == ip_address
1493 ),
1494 None,
1495 )
1496 else: # VDU case
1497 vdur = next(
1498 (
1499 x
1500 for x in get_iterable(db_vnfr, "vdur")
1501 if x.get("vdu-id-ref") == vdu_id
1502 and x.get("count-index") == vdu_index
1503 ),
1504 None,
1505 )
1506
1507 if (
1508 not vdur and len(db_vnfr.get("vdur", ())) == 1
1509 ): # If only one, this should be the target vdu
1510 vdur = db_vnfr["vdur"][0]
1511 if not vdur:
1512 raise LcmException(
1513 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1514 vnfr_id, vdu_id, vdu_index
1515 )
1516 )
1517 # New generation RO stores information at "vim_info"
1518 ng_ro_status = None
1519 target_vim = None
1520 if vdur.get("vim_info"):
1521 target_vim = next(
1522 t for t in vdur["vim_info"]
1523 ) # there should be only one key
1524 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1525 if (
1526 vdur.get("pdu-type")
1527 or vdur.get("status") == "ACTIVE"
1528 or ng_ro_status == "ACTIVE"
1529 ):
1530 ip_address = vdur.get("ip-address")
1531 if not ip_address:
1532 continue
1533 target_vdu_id = vdur["vdu-id-ref"]
1534 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1535 raise LcmException(
1536 "Cannot inject ssh-key because target VM is in error state"
1537 )
1538
1539 if not target_vdu_id:
1540 continue
1541
1542 # inject public key into machine
1543 if pub_key and user:
1544 self.logger.debug(logging_text + "Inserting RO key")
1545 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1546 if vdur.get("pdu-type"):
1547 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1548 return ip_address
1549 try:
1550 ro_vm_id = "{}-{}".format(
1551 db_vnfr["member-vnf-index-ref"], target_vdu_id
1552 ) # TODO add vdu_index
1553 if self.ng_ro:
1554 target = {
1555 "action": {
1556 "action": "inject_ssh_key",
1557 "key": pub_key,
1558 "user": user,
1559 },
1560 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1561 }
1562 desc = await self.RO.deploy(nsr_id, target)
1563 action_id = desc["action_id"]
1564 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1565 break
1566 else:
1567 # wait until NS is deployed at RO
1568 if not ro_nsr_id:
1569 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1570 ro_nsr_id = deep_get(
1571 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1572 )
1573 if not ro_nsr_id:
1574 continue
1575 result_dict = await self.RO.create_action(
1576 item="ns",
1577 item_id_name=ro_nsr_id,
1578 descriptor={
1579 "add_public_key": pub_key,
1580 "vms": [ro_vm_id],
1581 "user": user,
1582 },
1583 )
1584 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1585 if not result_dict or not isinstance(result_dict, dict):
1586 raise LcmException(
1587 "Unknown response from RO when injecting key"
1588 )
1589 for result in result_dict.values():
1590 if result.get("vim_result") == 200:
1591 break
1592 else:
1593 raise ROclient.ROClientException(
1594 "error injecting key: {}".format(
1595 result.get("description")
1596 )
1597 )
1598 break
1599 except NgRoException as e:
1600 raise LcmException(
1601 "Reaching max tries injecting key. Error: {}".format(e)
1602 )
1603 except ROclient.ROClientException as e:
1604 if not nb_tries:
1605 self.logger.debug(
1606 logging_text
1607 + "error injecting key: {}. Retrying until {} seconds".format(
1608 e, 20 * 10
1609 )
1610 )
1611 nb_tries += 1
1612 if nb_tries >= 20:
1613 raise LcmException(
1614 "Reaching max tries injecting key. Error: {}".format(e)
1615 )
1616 else:
1617 break
1618
1619 return ip_address
1620
1621 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1622 """
1623 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1624 """
1625 my_vca = vca_deployed_list[vca_index]
1626 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1627 # vdu or kdu: no dependencies
1628 return
1629 timeout = 300
1630 while timeout >= 0:
1631 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1632 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1633 configuration_status_list = db_nsr["configurationStatus"]
1634 for index, vca_deployed in enumerate(configuration_status_list):
1635 if index == vca_index:
1636 # myself
1637 continue
1638 if not my_vca.get("member-vnf-index") or (
1639 vca_deployed.get("member-vnf-index")
1640 == my_vca.get("member-vnf-index")
1641 ):
1642 internal_status = configuration_status_list[index].get("status")
1643 if internal_status == "READY":
1644 continue
1645 elif internal_status == "BROKEN":
1646 raise LcmException(
1647 "Configuration aborted because dependent charm/s has failed"
1648 )
1649 else:
1650 break
1651 else:
1652 # no dependencies, return
1653 return
1654 await asyncio.sleep(10)
1655 timeout -= 1
1656
1657 raise LcmException("Configuration aborted because dependent charm/s timeout")
1658
1659 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1660 vca_id = None
1661 if db_vnfr:
1662 vca_id = deep_get(db_vnfr, ("vca-id",))
1663 elif db_nsr:
1664 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1665 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1666 return vca_id
1667
1668 async def instantiate_N2VC(
1669 self,
1670 logging_text,
1671 vca_index,
1672 nsi_id,
1673 db_nsr,
1674 db_vnfr,
1675 vdu_id,
1676 kdu_name,
1677 vdu_index,
1678 config_descriptor,
1679 deploy_params,
1680 base_folder,
1681 nslcmop_id,
1682 stage,
1683 vca_type,
1684 vca_name,
1685 ee_config_descriptor,
1686 ):
1687 nsr_id = db_nsr["_id"]
1688 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1689 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1690 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1691 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1692 db_dict = {
1693 "collection": "nsrs",
1694 "filter": {"_id": nsr_id},
1695 "path": db_update_entry,
1696 }
1697 step = ""
1698 try:
1699
1700 element_type = "NS"
1701 element_under_configuration = nsr_id
1702
1703 vnfr_id = None
1704 if db_vnfr:
1705 vnfr_id = db_vnfr["_id"]
1706 osm_config["osm"]["vnf_id"] = vnfr_id
1707
1708 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1709
1710 if vca_type == "native_charm":
1711 index_number = 0
1712 else:
1713 index_number = vdu_index or 0
1714
1715 if vnfr_id:
1716 element_type = "VNF"
1717 element_under_configuration = vnfr_id
1718 namespace += ".{}-{}".format(vnfr_id, index_number)
1719 if vdu_id:
1720 namespace += ".{}-{}".format(vdu_id, index_number)
1721 element_type = "VDU"
1722 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1723 osm_config["osm"]["vdu_id"] = vdu_id
1724 elif kdu_name:
1725 namespace += ".{}".format(kdu_name)
1726 element_type = "KDU"
1727 element_under_configuration = kdu_name
1728 osm_config["osm"]["kdu_name"] = kdu_name
1729
1730 # Get artifact path
1731 if base_folder["pkg-dir"]:
1732 artifact_path = "{}/{}/{}/{}".format(
1733 base_folder["folder"],
1734 base_folder["pkg-dir"],
1735 "charms"
1736 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1737 else "helm-charts",
1738 vca_name,
1739 )
1740 else:
1741 artifact_path = "{}/Scripts/{}/{}/".format(
1742 base_folder["folder"],
1743 "charms"
1744 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1745 else "helm-charts",
1746 vca_name,
1747 )
1748
1749 self.logger.debug("Artifact path > {}".format(artifact_path))
1750
1751 # get initial_config_primitive_list that applies to this element
1752 initial_config_primitive_list = config_descriptor.get(
1753 "initial-config-primitive"
1754 )
1755
1756 self.logger.debug(
1757 "Initial config primitive list > {}".format(
1758 initial_config_primitive_list
1759 )
1760 )
1761
1762 # add config if not present for NS charm
1763 ee_descriptor_id = ee_config_descriptor.get("id")
1764 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1765 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1766 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1767 )
1768
1769 self.logger.debug(
1770 "Initial config primitive list #2 > {}".format(
1771 initial_config_primitive_list
1772 )
1773 )
1774 # n2vc_redesign STEP 3.1
1775 # find old ee_id if exists
1776 ee_id = vca_deployed.get("ee_id")
1777
1778 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1779 # create or register execution environment in VCA
1780 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1781
1782 self._write_configuration_status(
1783 nsr_id=nsr_id,
1784 vca_index=vca_index,
1785 status="CREATING",
1786 element_under_configuration=element_under_configuration,
1787 element_type=element_type,
1788 )
1789
1790 step = "create execution environment"
1791 self.logger.debug(logging_text + step)
1792
1793 ee_id = None
1794 credentials = None
1795 if vca_type == "k8s_proxy_charm":
1796 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1797 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1798 namespace=namespace,
1799 artifact_path=artifact_path,
1800 db_dict=db_dict,
1801 vca_id=vca_id,
1802 )
1803 elif vca_type == "helm" or vca_type == "helm-v3":
1804 ee_id, credentials = await self.vca_map[
1805 vca_type
1806 ].create_execution_environment(
1807 namespace=namespace,
1808 reuse_ee_id=ee_id,
1809 db_dict=db_dict,
1810 config=osm_config,
1811 artifact_path=artifact_path,
1812 vca_type=vca_type,
1813 )
1814 else:
1815 ee_id, credentials = await self.vca_map[
1816 vca_type
1817 ].create_execution_environment(
1818 namespace=namespace,
1819 reuse_ee_id=ee_id,
1820 db_dict=db_dict,
1821 vca_id=vca_id,
1822 )
1823
1824 elif vca_type == "native_charm":
1825 step = "Waiting to VM being up and getting IP address"
1826 self.logger.debug(logging_text + step)
1827 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1828 logging_text,
1829 nsr_id,
1830 vnfr_id,
1831 vdu_id,
1832 vdu_index,
1833 user=None,
1834 pub_key=None,
1835 )
1836 credentials = {"hostname": rw_mgmt_ip}
1837 # get username
1838 username = deep_get(
1839 config_descriptor, ("config-access", "ssh-access", "default-user")
1840 )
1841 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1842 # merged. Meanwhile let's get username from initial-config-primitive
1843 if not username and initial_config_primitive_list:
1844 for config_primitive in initial_config_primitive_list:
1845 for param in config_primitive.get("parameter", ()):
1846 if param["name"] == "ssh-username":
1847 username = param["value"]
1848 break
1849 if not username:
1850 raise LcmException(
1851 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1852 "'config-access.ssh-access.default-user'"
1853 )
1854 credentials["username"] = username
1855 # n2vc_redesign STEP 3.2
1856
1857 self._write_configuration_status(
1858 nsr_id=nsr_id,
1859 vca_index=vca_index,
1860 status="REGISTERING",
1861 element_under_configuration=element_under_configuration,
1862 element_type=element_type,
1863 )
1864
1865 step = "register execution environment {}".format(credentials)
1866 self.logger.debug(logging_text + step)
1867 ee_id = await self.vca_map[vca_type].register_execution_environment(
1868 credentials=credentials,
1869 namespace=namespace,
1870 db_dict=db_dict,
1871 vca_id=vca_id,
1872 )
1873
1874 # for compatibility with MON/POL modules, the need model and application name at database
1875 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1876 ee_id_parts = ee_id.split(".")
1877 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1878 if len(ee_id_parts) >= 2:
1879 model_name = ee_id_parts[0]
1880 application_name = ee_id_parts[1]
1881 db_nsr_update[db_update_entry + "model"] = model_name
1882 db_nsr_update[db_update_entry + "application"] = application_name
1883
1884 # n2vc_redesign STEP 3.3
1885 step = "Install configuration Software"
1886
1887 self._write_configuration_status(
1888 nsr_id=nsr_id,
1889 vca_index=vca_index,
1890 status="INSTALLING SW",
1891 element_under_configuration=element_under_configuration,
1892 element_type=element_type,
1893 other_update=db_nsr_update,
1894 )
1895
1896 # TODO check if already done
1897 self.logger.debug(logging_text + step)
1898 config = None
1899 if vca_type == "native_charm":
1900 config_primitive = next(
1901 (p for p in initial_config_primitive_list if p["name"] == "config"),
1902 None,
1903 )
1904 if config_primitive:
1905 config = self._map_primitive_params(
1906 config_primitive, {}, deploy_params
1907 )
1908 num_units = 1
1909 if vca_type == "lxc_proxy_charm":
1910 if element_type == "NS":
1911 num_units = db_nsr.get("config-units") or 1
1912 elif element_type == "VNF":
1913 num_units = db_vnfr.get("config-units") or 1
1914 elif element_type == "VDU":
1915 for v in db_vnfr["vdur"]:
1916 if vdu_id == v["vdu-id-ref"]:
1917 num_units = v.get("config-units") or 1
1918 break
1919 if vca_type != "k8s_proxy_charm":
1920 await self.vca_map[vca_type].install_configuration_sw(
1921 ee_id=ee_id,
1922 artifact_path=artifact_path,
1923 db_dict=db_dict,
1924 config=config,
1925 num_units=num_units,
1926 vca_id=vca_id,
1927 vca_type=vca_type,
1928 )
1929
1930 # write in db flag of configuration_sw already installed
1931 self.update_db_2(
1932 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1933 )
1934
1935 # add relations for this VCA (wait for other peers related with this VCA)
1936 await self._add_vca_relations(
1937 logging_text=logging_text,
1938 nsr_id=nsr_id,
1939 vca_type=vca_type,
1940 vca_index=vca_index,
1941 )
1942
1943 # if SSH access is required, then get execution environment SSH public
1944 # if native charm we have waited already to VM be UP
1945 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1946 pub_key = None
1947 user = None
1948 # self.logger.debug("get ssh key block")
1949 if deep_get(
1950 config_descriptor, ("config-access", "ssh-access", "required")
1951 ):
1952 # self.logger.debug("ssh key needed")
1953 # Needed to inject a ssh key
1954 user = deep_get(
1955 config_descriptor,
1956 ("config-access", "ssh-access", "default-user"),
1957 )
1958 step = "Install configuration Software, getting public ssh key"
1959 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1960 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1961 )
1962
1963 step = "Insert public key into VM user={} ssh_key={}".format(
1964 user, pub_key
1965 )
1966 else:
1967 # self.logger.debug("no need to get ssh key")
1968 step = "Waiting to VM being up and getting IP address"
1969 self.logger.debug(logging_text + step)
1970
1971 # n2vc_redesign STEP 5.1
1972 # wait for RO (ip-address) Insert pub_key into VM
1973 if vnfr_id:
1974 if kdu_name:
1975 rw_mgmt_ip = await self.wait_kdu_up(
1976 logging_text, nsr_id, vnfr_id, kdu_name
1977 )
1978 else:
1979 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1980 logging_text,
1981 nsr_id,
1982 vnfr_id,
1983 vdu_id,
1984 vdu_index,
1985 user=user,
1986 pub_key=pub_key,
1987 )
1988 else:
1989 rw_mgmt_ip = None # This is for a NS configuration
1990
1991 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1992
1993 # store rw_mgmt_ip in deploy params for later replacement
1994 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1995
1996 # n2vc_redesign STEP 6 Execute initial config primitive
1997 step = "execute initial config primitive"
1998
1999 # wait for dependent primitives execution (NS -> VNF -> VDU)
2000 if initial_config_primitive_list:
2001 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
2002
2003 # stage, in function of element type: vdu, kdu, vnf or ns
2004 my_vca = vca_deployed_list[vca_index]
2005 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
2006 # VDU or KDU
2007 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2008 elif my_vca.get("member-vnf-index"):
2009 # VNF
2010 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2011 else:
2012 # NS
2013 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2014
2015 self._write_configuration_status(
2016 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2017 )
2018
2019 self._write_op_status(op_id=nslcmop_id, stage=stage)
2020
2021 check_if_terminated_needed = True
2022 for initial_config_primitive in initial_config_primitive_list:
2023 # adding information on the vca_deployed if it is a NS execution environment
2024 if not vca_deployed["member-vnf-index"]:
2025 deploy_params["ns_config_info"] = json.dumps(
2026 self._get_ns_config_info(nsr_id)
2027 )
2028 # TODO check if already done
2029 primitive_params_ = self._map_primitive_params(
2030 initial_config_primitive, {}, deploy_params
2031 )
2032
2033 step = "execute primitive '{}' params '{}'".format(
2034 initial_config_primitive["name"], primitive_params_
2035 )
2036 self.logger.debug(logging_text + step)
2037 await self.vca_map[vca_type].exec_primitive(
2038 ee_id=ee_id,
2039 primitive_name=initial_config_primitive["name"],
2040 params_dict=primitive_params_,
2041 db_dict=db_dict,
2042 vca_id=vca_id,
2043 vca_type=vca_type,
2044 )
2045 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2046 if check_if_terminated_needed:
2047 if config_descriptor.get("terminate-config-primitive"):
2048 self.update_db_2(
2049 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2050 )
2051 check_if_terminated_needed = False
2052
2053 # TODO register in database that primitive is done
2054
2055 # STEP 7 Configure metrics
2056 if vca_type == "helm" or vca_type == "helm-v3":
2057 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2058 ee_id=ee_id,
2059 artifact_path=artifact_path,
2060 ee_config_descriptor=ee_config_descriptor,
2061 vnfr_id=vnfr_id,
2062 nsr_id=nsr_id,
2063 target_ip=rw_mgmt_ip,
2064 )
2065 if prometheus_jobs:
2066 self.update_db_2(
2067 "nsrs",
2068 nsr_id,
2069 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2070 )
2071
2072 for job in prometheus_jobs:
2073 self.db.set_one(
2074 "prometheus_jobs",
2075 {
2076 "job_name": job["job_name"]
2077 },
2078 job,
2079 upsert=True,
2080 fail_on_empty=False
2081 )
2082
2083 step = "instantiated at VCA"
2084 self.logger.debug(logging_text + step)
2085
2086 self._write_configuration_status(
2087 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2088 )
2089
2090 except Exception as e: # TODO not use Exception but N2VC exception
2091 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2092 if not isinstance(
2093 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2094 ):
2095 self.logger.error(
2096 "Exception while {} : {}".format(step, e), exc_info=True
2097 )
2098 self._write_configuration_status(
2099 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2100 )
2101 raise LcmException("{} {}".format(step, e)) from e
2102
2103 def _write_ns_status(
2104 self,
2105 nsr_id: str,
2106 ns_state: str,
2107 current_operation: str,
2108 current_operation_id: str,
2109 error_description: str = None,
2110 error_detail: str = None,
2111 other_update: dict = None,
2112 ):
2113 """
2114 Update db_nsr fields.
2115 :param nsr_id:
2116 :param ns_state:
2117 :param current_operation:
2118 :param current_operation_id:
2119 :param error_description:
2120 :param error_detail:
2121 :param other_update: Other required changes at database if provided, will be cleared
2122 :return:
2123 """
2124 try:
2125 db_dict = other_update or {}
2126 db_dict[
2127 "_admin.nslcmop"
2128 ] = current_operation_id # for backward compatibility
2129 db_dict["_admin.current-operation"] = current_operation_id
2130 db_dict["_admin.operation-type"] = (
2131 current_operation if current_operation != "IDLE" else None
2132 )
2133 db_dict["currentOperation"] = current_operation
2134 db_dict["currentOperationID"] = current_operation_id
2135 db_dict["errorDescription"] = error_description
2136 db_dict["errorDetail"] = error_detail
2137
2138 if ns_state:
2139 db_dict["nsState"] = ns_state
2140 self.update_db_2("nsrs", nsr_id, db_dict)
2141 except DbException as e:
2142 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2143
2144 def _write_op_status(
2145 self,
2146 op_id: str,
2147 stage: list = None,
2148 error_message: str = None,
2149 queuePosition: int = 0,
2150 operation_state: str = None,
2151 other_update: dict = None,
2152 ):
2153 try:
2154 db_dict = other_update or {}
2155 db_dict["queuePosition"] = queuePosition
2156 if isinstance(stage, list):
2157 db_dict["stage"] = stage[0]
2158 db_dict["detailed-status"] = " ".join(stage)
2159 elif stage is not None:
2160 db_dict["stage"] = str(stage)
2161
2162 if error_message is not None:
2163 db_dict["errorMessage"] = error_message
2164 if operation_state is not None:
2165 db_dict["operationState"] = operation_state
2166 db_dict["statusEnteredTime"] = time()
2167 self.update_db_2("nslcmops", op_id, db_dict)
2168 except DbException as e:
2169 self.logger.warn(
2170 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2171 )
2172
2173 def _write_all_config_status(self, db_nsr: dict, status: str):
2174 try:
2175 nsr_id = db_nsr["_id"]
2176 # configurationStatus
2177 config_status = db_nsr.get("configurationStatus")
2178 if config_status:
2179 db_nsr_update = {
2180 "configurationStatus.{}.status".format(index): status
2181 for index, v in enumerate(config_status)
2182 if v
2183 }
2184 # update status
2185 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2186
2187 except DbException as e:
2188 self.logger.warn(
2189 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2190 )
2191
2192 def _write_configuration_status(
2193 self,
2194 nsr_id: str,
2195 vca_index: int,
2196 status: str = None,
2197 element_under_configuration: str = None,
2198 element_type: str = None,
2199 other_update: dict = None,
2200 ):
2201
2202 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2203 # .format(vca_index, status))
2204
2205 try:
2206 db_path = "configurationStatus.{}.".format(vca_index)
2207 db_dict = other_update or {}
2208 if status:
2209 db_dict[db_path + "status"] = status
2210 if element_under_configuration:
2211 db_dict[
2212 db_path + "elementUnderConfiguration"
2213 ] = element_under_configuration
2214 if element_type:
2215 db_dict[db_path + "elementType"] = element_type
2216 self.update_db_2("nsrs", nsr_id, db_dict)
2217 except DbException as e:
2218 self.logger.warn(
2219 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2220 status, nsr_id, vca_index, e
2221 )
2222 )
2223
2224 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2225 """
2226 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2227 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2228 Database is used because the result can be obtained from a different LCM worker in case of HA.
2229 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2230 :param db_nslcmop: database content of nslcmop
2231 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2232 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2233 computed 'vim-account-id'
2234 """
2235 modified = False
2236 nslcmop_id = db_nslcmop["_id"]
2237 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2238 if placement_engine == "PLA":
2239 self.logger.debug(
2240 logging_text + "Invoke and wait for placement optimization"
2241 )
2242 await self.msg.aiowrite(
2243 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2244 )
2245 db_poll_interval = 5
2246 wait = db_poll_interval * 10
2247 pla_result = None
2248 while not pla_result and wait >= 0:
2249 await asyncio.sleep(db_poll_interval)
2250 wait -= db_poll_interval
2251 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2252 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2253
2254 if not pla_result:
2255 raise LcmException(
2256 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2257 )
2258
2259 for pla_vnf in pla_result["vnf"]:
2260 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2261 if not pla_vnf.get("vimAccountId") or not vnfr:
2262 continue
2263 modified = True
2264 self.db.set_one(
2265 "vnfrs",
2266 {"_id": vnfr["_id"]},
2267 {"vim-account-id": pla_vnf["vimAccountId"]},
2268 )
2269 # Modifies db_vnfrs
2270 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2271 return modified
2272
2273 def update_nsrs_with_pla_result(self, params):
2274 try:
2275 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2276 self.update_db_2(
2277 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2278 )
2279 except Exception as e:
2280 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2281
2282 async def instantiate(self, nsr_id, nslcmop_id):
2283 """
2284
2285 :param nsr_id: ns instance to deploy
2286 :param nslcmop_id: operation to run
2287 :return:
2288 """
2289
2290 # Try to lock HA task here
2291 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2292 if not task_is_locked_by_me:
2293 self.logger.debug(
2294 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2295 )
2296 return
2297
2298 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2299 self.logger.debug(logging_text + "Enter")
2300
2301 # get all needed from database
2302
2303 # database nsrs record
2304 db_nsr = None
2305
2306 # database nslcmops record
2307 db_nslcmop = None
2308
2309 # update operation on nsrs
2310 db_nsr_update = {}
2311 # update operation on nslcmops
2312 db_nslcmop_update = {}
2313
2314 nslcmop_operation_state = None
2315 db_vnfrs = {} # vnf's info indexed by member-index
2316 # n2vc_info = {}
2317 tasks_dict_info = {} # from task to info text
2318 exc = None
2319 error_list = []
2320 stage = [
2321 "Stage 1/5: preparation of the environment.",
2322 "Waiting for previous operations to terminate.",
2323 "",
2324 ]
2325 # ^ stage, step, VIM progress
2326 try:
2327 # wait for any previous tasks in process
2328 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2329
2330 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2331 stage[1] = "Reading from database."
2332 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2333 db_nsr_update["detailed-status"] = "creating"
2334 db_nsr_update["operational-status"] = "init"
2335 self._write_ns_status(
2336 nsr_id=nsr_id,
2337 ns_state="BUILDING",
2338 current_operation="INSTANTIATING",
2339 current_operation_id=nslcmop_id,
2340 other_update=db_nsr_update,
2341 )
2342 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2343
2344 # read from db: operation
2345 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2346 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2347 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2348 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2349 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2350 )
2351 ns_params = db_nslcmop.get("operationParams")
2352 if ns_params and ns_params.get("timeout_ns_deploy"):
2353 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2354 else:
2355 timeout_ns_deploy = self.timeout.get(
2356 "ns_deploy", self.timeout_ns_deploy
2357 )
2358
2359 # read from db: ns
2360 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2361 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2362 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2363 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2364 self.fs.sync(db_nsr["nsd-id"])
2365 db_nsr["nsd"] = nsd
2366 # nsr_name = db_nsr["name"] # TODO short-name??
2367
2368 # read from db: vnf's of this ns
2369 stage[1] = "Getting vnfrs from db."
2370 self.logger.debug(logging_text + stage[1])
2371 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2372
2373 # read from db: vnfd's for every vnf
2374 db_vnfds = [] # every vnfd data
2375
2376 # for each vnf in ns, read vnfd
2377 for vnfr in db_vnfrs_list:
2378 if vnfr.get("kdur"):
2379 kdur_list = []
2380 for kdur in vnfr["kdur"]:
2381 if kdur.get("additionalParams"):
2382 kdur["additionalParams"] = json.loads(
2383 kdur["additionalParams"]
2384 )
2385 kdur_list.append(kdur)
2386 vnfr["kdur"] = kdur_list
2387
2388 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2389 vnfd_id = vnfr["vnfd-id"]
2390 vnfd_ref = vnfr["vnfd-ref"]
2391 self.fs.sync(vnfd_id)
2392
2393 # if we haven't this vnfd, read it from db
2394 if vnfd_id not in db_vnfds:
2395 # read from db
2396 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2397 vnfd_id, vnfd_ref
2398 )
2399 self.logger.debug(logging_text + stage[1])
2400 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2401
2402 # store vnfd
2403 db_vnfds.append(vnfd)
2404
2405 # Get or generates the _admin.deployed.VCA list
2406 vca_deployed_list = None
2407 if db_nsr["_admin"].get("deployed"):
2408 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2409 if vca_deployed_list is None:
2410 vca_deployed_list = []
2411 configuration_status_list = []
2412 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2413 db_nsr_update["configurationStatus"] = configuration_status_list
2414 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2415 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2416 elif isinstance(vca_deployed_list, dict):
2417 # maintain backward compatibility. Change a dict to list at database
2418 vca_deployed_list = list(vca_deployed_list.values())
2419 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2420 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2421
2422 if not isinstance(
2423 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2424 ):
2425 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2426 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2427
2428 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2429 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2430 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2431 self.db.set_list(
2432 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2433 )
2434
2435 # n2vc_redesign STEP 2 Deploy Network Scenario
2436 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2437 self._write_op_status(op_id=nslcmop_id, stage=stage)
2438
2439 stage[1] = "Deploying KDUs."
2440 # self.logger.debug(logging_text + "Before deploy_kdus")
2441 # Call to deploy_kdus in case exists the "vdu:kdu" param
2442 await self.deploy_kdus(
2443 logging_text=logging_text,
2444 nsr_id=nsr_id,
2445 nslcmop_id=nslcmop_id,
2446 db_vnfrs=db_vnfrs,
2447 db_vnfds=db_vnfds,
2448 task_instantiation_info=tasks_dict_info,
2449 )
2450
2451 stage[1] = "Getting VCA public key."
2452 # n2vc_redesign STEP 1 Get VCA public ssh-key
2453 # feature 1429. Add n2vc public key to needed VMs
2454 n2vc_key = self.n2vc.get_public_key()
2455 n2vc_key_list = [n2vc_key]
2456 if self.vca_config.get("public_key"):
2457 n2vc_key_list.append(self.vca_config["public_key"])
2458
2459 stage[1] = "Deploying NS at VIM."
2460 task_ro = asyncio.ensure_future(
2461 self.instantiate_RO(
2462 logging_text=logging_text,
2463 nsr_id=nsr_id,
2464 nsd=nsd,
2465 db_nsr=db_nsr,
2466 db_nslcmop=db_nslcmop,
2467 db_vnfrs=db_vnfrs,
2468 db_vnfds=db_vnfds,
2469 n2vc_key_list=n2vc_key_list,
2470 stage=stage,
2471 )
2472 )
2473 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2474 tasks_dict_info[task_ro] = "Deploying at VIM"
2475
2476 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2477 stage[1] = "Deploying Execution Environments."
2478 self.logger.debug(logging_text + stage[1])
2479
2480 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2481 for vnf_profile in get_vnf_profiles(nsd):
2482 vnfd_id = vnf_profile["vnfd-id"]
2483 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2484 member_vnf_index = str(vnf_profile["id"])
2485 db_vnfr = db_vnfrs[member_vnf_index]
2486 base_folder = vnfd["_admin"]["storage"]
2487 vdu_id = None
2488 vdu_index = 0
2489 vdu_name = None
2490 kdu_name = None
2491
2492 # Get additional parameters
2493 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2494 if db_vnfr.get("additionalParamsForVnf"):
2495 deploy_params.update(
2496 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2497 )
2498
2499 descriptor_config = get_configuration(vnfd, vnfd["id"])
2500 if descriptor_config:
2501 self._deploy_n2vc(
2502 logging_text=logging_text
2503 + "member_vnf_index={} ".format(member_vnf_index),
2504 db_nsr=db_nsr,
2505 db_vnfr=db_vnfr,
2506 nslcmop_id=nslcmop_id,
2507 nsr_id=nsr_id,
2508 nsi_id=nsi_id,
2509 vnfd_id=vnfd_id,
2510 vdu_id=vdu_id,
2511 kdu_name=kdu_name,
2512 member_vnf_index=member_vnf_index,
2513 vdu_index=vdu_index,
2514 vdu_name=vdu_name,
2515 deploy_params=deploy_params,
2516 descriptor_config=descriptor_config,
2517 base_folder=base_folder,
2518 task_instantiation_info=tasks_dict_info,
2519 stage=stage,
2520 )
2521
2522 # Deploy charms for each VDU that supports one.
2523 for vdud in get_vdu_list(vnfd):
2524 vdu_id = vdud["id"]
2525 descriptor_config = get_configuration(vnfd, vdu_id)
2526 vdur = find_in_list(
2527 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2528 )
2529
2530 if vdur.get("additionalParams"):
2531 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2532 else:
2533 deploy_params_vdu = deploy_params
2534 deploy_params_vdu["OSM"] = get_osm_params(
2535 db_vnfr, vdu_id, vdu_count_index=0
2536 )
2537 vdud_count = get_number_of_instances(vnfd, vdu_id)
2538
2539 self.logger.debug("VDUD > {}".format(vdud))
2540 self.logger.debug(
2541 "Descriptor config > {}".format(descriptor_config)
2542 )
2543 if descriptor_config:
2544 vdu_name = None
2545 kdu_name = None
2546 for vdu_index in range(vdud_count):
2547 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2548 self._deploy_n2vc(
2549 logging_text=logging_text
2550 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2551 member_vnf_index, vdu_id, vdu_index
2552 ),
2553 db_nsr=db_nsr,
2554 db_vnfr=db_vnfr,
2555 nslcmop_id=nslcmop_id,
2556 nsr_id=nsr_id,
2557 nsi_id=nsi_id,
2558 vnfd_id=vnfd_id,
2559 vdu_id=vdu_id,
2560 kdu_name=kdu_name,
2561 member_vnf_index=member_vnf_index,
2562 vdu_index=vdu_index,
2563 vdu_name=vdu_name,
2564 deploy_params=deploy_params_vdu,
2565 descriptor_config=descriptor_config,
2566 base_folder=base_folder,
2567 task_instantiation_info=tasks_dict_info,
2568 stage=stage,
2569 )
2570 for kdud in get_kdu_list(vnfd):
2571 kdu_name = kdud["name"]
2572 descriptor_config = get_configuration(vnfd, kdu_name)
2573 if descriptor_config:
2574 vdu_id = None
2575 vdu_index = 0
2576 vdu_name = None
2577 kdur = next(
2578 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2579 )
2580 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2581 if kdur.get("additionalParams"):
2582 deploy_params_kdu.update(
2583 parse_yaml_strings(kdur["additionalParams"].copy())
2584 )
2585
2586 self._deploy_n2vc(
2587 logging_text=logging_text,
2588 db_nsr=db_nsr,
2589 db_vnfr=db_vnfr,
2590 nslcmop_id=nslcmop_id,
2591 nsr_id=nsr_id,
2592 nsi_id=nsi_id,
2593 vnfd_id=vnfd_id,
2594 vdu_id=vdu_id,
2595 kdu_name=kdu_name,
2596 member_vnf_index=member_vnf_index,
2597 vdu_index=vdu_index,
2598 vdu_name=vdu_name,
2599 deploy_params=deploy_params_kdu,
2600 descriptor_config=descriptor_config,
2601 base_folder=base_folder,
2602 task_instantiation_info=tasks_dict_info,
2603 stage=stage,
2604 )
2605
2606 # Check if this NS has a charm configuration
2607 descriptor_config = nsd.get("ns-configuration")
2608 if descriptor_config and descriptor_config.get("juju"):
2609 vnfd_id = None
2610 db_vnfr = None
2611 member_vnf_index = None
2612 vdu_id = None
2613 kdu_name = None
2614 vdu_index = 0
2615 vdu_name = None
2616
2617 # Get additional parameters
2618 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2619 if db_nsr.get("additionalParamsForNs"):
2620 deploy_params.update(
2621 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2622 )
2623 base_folder = nsd["_admin"]["storage"]
2624 self._deploy_n2vc(
2625 logging_text=logging_text,
2626 db_nsr=db_nsr,
2627 db_vnfr=db_vnfr,
2628 nslcmop_id=nslcmop_id,
2629 nsr_id=nsr_id,
2630 nsi_id=nsi_id,
2631 vnfd_id=vnfd_id,
2632 vdu_id=vdu_id,
2633 kdu_name=kdu_name,
2634 member_vnf_index=member_vnf_index,
2635 vdu_index=vdu_index,
2636 vdu_name=vdu_name,
2637 deploy_params=deploy_params,
2638 descriptor_config=descriptor_config,
2639 base_folder=base_folder,
2640 task_instantiation_info=tasks_dict_info,
2641 stage=stage,
2642 )
2643
2644 # rest of staff will be done at finally
2645
2646 except (
2647 ROclient.ROClientException,
2648 DbException,
2649 LcmException,
2650 N2VCException,
2651 ) as e:
2652 self.logger.error(
2653 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2654 )
2655 exc = e
2656 except asyncio.CancelledError:
2657 self.logger.error(
2658 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2659 )
2660 exc = "Operation was cancelled"
2661 except Exception as e:
2662 exc = traceback.format_exc()
2663 self.logger.critical(
2664 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2665 exc_info=True,
2666 )
2667 finally:
2668 if exc:
2669 error_list.append(str(exc))
2670 try:
2671 # wait for pending tasks
2672 if tasks_dict_info:
2673 stage[1] = "Waiting for instantiate pending tasks."
2674 self.logger.debug(logging_text + stage[1])
2675 error_list += await self._wait_for_tasks(
2676 logging_text,
2677 tasks_dict_info,
2678 timeout_ns_deploy,
2679 stage,
2680 nslcmop_id,
2681 nsr_id=nsr_id,
2682 )
2683 stage[1] = stage[2] = ""
2684 except asyncio.CancelledError:
2685 error_list.append("Cancelled")
2686 # TODO cancel all tasks
2687 except Exception as exc:
2688 error_list.append(str(exc))
2689
2690 # update operation-status
2691 db_nsr_update["operational-status"] = "running"
2692 # let's begin with VCA 'configured' status (later we can change it)
2693 db_nsr_update["config-status"] = "configured"
2694 for task, task_name in tasks_dict_info.items():
2695 if not task.done() or task.cancelled() or task.exception():
2696 if task_name.startswith(self.task_name_deploy_vca):
2697 # A N2VC task is pending
2698 db_nsr_update["config-status"] = "failed"
2699 else:
2700 # RO or KDU task is pending
2701 db_nsr_update["operational-status"] = "failed"
2702
2703 # update status at database
2704 if error_list:
2705 error_detail = ". ".join(error_list)
2706 self.logger.error(logging_text + error_detail)
2707 error_description_nslcmop = "{} Detail: {}".format(
2708 stage[0], error_detail
2709 )
2710 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2711 nslcmop_id, stage[0]
2712 )
2713
2714 db_nsr_update["detailed-status"] = (
2715 error_description_nsr + " Detail: " + error_detail
2716 )
2717 db_nslcmop_update["detailed-status"] = error_detail
2718 nslcmop_operation_state = "FAILED"
2719 ns_state = "BROKEN"
2720 else:
2721 error_detail = None
2722 error_description_nsr = error_description_nslcmop = None
2723 ns_state = "READY"
2724 db_nsr_update["detailed-status"] = "Done"
2725 db_nslcmop_update["detailed-status"] = "Done"
2726 nslcmop_operation_state = "COMPLETED"
2727
2728 if db_nsr:
2729 self._write_ns_status(
2730 nsr_id=nsr_id,
2731 ns_state=ns_state,
2732 current_operation="IDLE",
2733 current_operation_id=None,
2734 error_description=error_description_nsr,
2735 error_detail=error_detail,
2736 other_update=db_nsr_update,
2737 )
2738 self._write_op_status(
2739 op_id=nslcmop_id,
2740 stage="",
2741 error_message=error_description_nslcmop,
2742 operation_state=nslcmop_operation_state,
2743 other_update=db_nslcmop_update,
2744 )
2745
2746 if nslcmop_operation_state:
2747 try:
2748 await self.msg.aiowrite(
2749 "ns",
2750 "instantiated",
2751 {
2752 "nsr_id": nsr_id,
2753 "nslcmop_id": nslcmop_id,
2754 "operationState": nslcmop_operation_state,
2755 },
2756 loop=self.loop,
2757 )
2758 except Exception as e:
2759 self.logger.error(
2760 logging_text + "kafka_write notification Exception {}".format(e)
2761 )
2762
2763 self.logger.debug(logging_text + "Exit")
2764 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2765
2766 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2767 if vnfd_id not in cached_vnfds:
2768 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2769 return cached_vnfds[vnfd_id]
2770
2771 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2772 if vnf_profile_id not in cached_vnfrs:
2773 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2774 "vnfrs",
2775 {
2776 "member-vnf-index-ref": vnf_profile_id,
2777 "nsr-id-ref": nsr_id,
2778 },
2779 )
2780 return cached_vnfrs[vnf_profile_id]
2781
2782 def _is_deployed_vca_in_relation(
2783 self, vca: DeployedVCA, relation: Relation
2784 ) -> bool:
2785 found = False
2786 for endpoint in (relation.provider, relation.requirer):
2787 if endpoint["kdu-resource-profile-id"]:
2788 continue
2789 found = (
2790 vca.vnf_profile_id == endpoint.vnf_profile_id
2791 and vca.vdu_profile_id == endpoint.vdu_profile_id
2792 and vca.execution_environment_ref == endpoint.execution_environment_ref
2793 )
2794 if found:
2795 break
2796 return found
2797
2798 def _update_ee_relation_data_with_implicit_data(
2799 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2800 ):
2801 ee_relation_data = safe_get_ee_relation(
2802 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2803 )
2804 ee_relation_level = EELevel.get_level(ee_relation_data)
2805 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2806 "execution-environment-ref"
2807 ]:
2808 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2809 vnfd_id = vnf_profile["vnfd-id"]
2810 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2811 entity_id = (
2812 vnfd_id
2813 if ee_relation_level == EELevel.VNF
2814 else ee_relation_data["vdu-profile-id"]
2815 )
2816 ee = get_juju_ee_ref(db_vnfd, entity_id)
2817 if not ee:
2818 raise Exception(
2819 f"not execution environments found for ee_relation {ee_relation_data}"
2820 )
2821 ee_relation_data["execution-environment-ref"] = ee["id"]
2822 return ee_relation_data
2823
2824 def _get_ns_relations(
2825 self,
2826 nsr_id: str,
2827 nsd: Dict[str, Any],
2828 vca: DeployedVCA,
2829 cached_vnfds: Dict[str, Any],
2830 ) -> List[Relation]:
2831 relations = []
2832 db_ns_relations = get_ns_configuration_relation_list(nsd)
2833 for r in db_ns_relations:
2834 provider_dict = None
2835 requirer_dict = None
2836 if all(key in r for key in ("provider", "requirer")):
2837 provider_dict = r["provider"]
2838 requirer_dict = r["requirer"]
2839 elif "entities" in r:
2840 provider_id = r["entities"][0]["id"]
2841 provider_dict = {
2842 "nsr-id": nsr_id,
2843 "endpoint": r["entities"][0]["endpoint"],
2844 }
2845 if provider_id != nsd["id"]:
2846 provider_dict["vnf-profile-id"] = provider_id
2847 requirer_id = r["entities"][1]["id"]
2848 requirer_dict = {
2849 "nsr-id": nsr_id,
2850 "endpoint": r["entities"][1]["endpoint"],
2851 }
2852 if requirer_id != nsd["id"]:
2853 requirer_dict["vnf-profile-id"] = requirer_id
2854 else:
2855 raise Exception("provider/requirer or entities must be included in the relation.")
2856 relation_provider = self._update_ee_relation_data_with_implicit_data(
2857 nsr_id, nsd, provider_dict, cached_vnfds
2858 )
2859 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2860 nsr_id, nsd, requirer_dict, cached_vnfds
2861 )
2862 provider = EERelation(relation_provider)
2863 requirer = EERelation(relation_requirer)
2864 relation = Relation(r["name"], provider, requirer)
2865 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2866 if vca_in_relation:
2867 relations.append(relation)
2868 return relations
2869
2870 def _get_vnf_relations(
2871 self,
2872 nsr_id: str,
2873 nsd: Dict[str, Any],
2874 vca: DeployedVCA,
2875 cached_vnfds: Dict[str, Any],
2876 ) -> List[Relation]:
2877 relations = []
2878 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2879 vnf_profile_id = vnf_profile["id"]
2880 vnfd_id = vnf_profile["vnfd-id"]
2881 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2882 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2883 for r in db_vnf_relations:
2884 provider_dict = None
2885 requirer_dict = None
2886 if all(key in r for key in ("provider", "requirer")):
2887 provider_dict = r["provider"]
2888 requirer_dict = r["requirer"]
2889 elif "entities" in r:
2890 provider_id = r["entities"][0]["id"]
2891 provider_dict = {
2892 "nsr-id": nsr_id,
2893 "vnf-profile-id": vnf_profile_id,
2894 "endpoint": r["entities"][0]["endpoint"],
2895 }
2896 if provider_id != vnfd_id:
2897 provider_dict["vdu-profile-id"] = provider_id
2898 requirer_id = r["entities"][1]["id"]
2899 requirer_dict = {
2900 "nsr-id": nsr_id,
2901 "vnf-profile-id": vnf_profile_id,
2902 "endpoint": r["entities"][1]["endpoint"],
2903 }
2904 if requirer_id != vnfd_id:
2905 requirer_dict["vdu-profile-id"] = requirer_id
2906 else:
2907 raise Exception("provider/requirer or entities must be included in the relation.")
2908 relation_provider = self._update_ee_relation_data_with_implicit_data(
2909 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2910 )
2911 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2912 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2913 )
2914 provider = EERelation(relation_provider)
2915 requirer = EERelation(relation_requirer)
2916 relation = Relation(r["name"], provider, requirer)
2917 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2918 if vca_in_relation:
2919 relations.append(relation)
2920 return relations
2921
2922 def _get_kdu_resource_data(
2923 self,
2924 ee_relation: EERelation,
2925 db_nsr: Dict[str, Any],
2926 cached_vnfds: Dict[str, Any],
2927 ) -> DeployedK8sResource:
2928 nsd = get_nsd(db_nsr)
2929 vnf_profiles = get_vnf_profiles(nsd)
2930 vnfd_id = find_in_list(
2931 vnf_profiles,
2932 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2933 )["vnfd-id"]
2934 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2935 kdu_resource_profile = get_kdu_resource_profile(
2936 db_vnfd, ee_relation.kdu_resource_profile_id
2937 )
2938 kdu_name = kdu_resource_profile["kdu-name"]
2939 deployed_kdu, _ = get_deployed_kdu(
2940 db_nsr.get("_admin", ()).get("deployed", ()),
2941 kdu_name,
2942 ee_relation.vnf_profile_id,
2943 )
2944 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2945 return deployed_kdu
2946
2947 def _get_deployed_component(
2948 self,
2949 ee_relation: EERelation,
2950 db_nsr: Dict[str, Any],
2951 cached_vnfds: Dict[str, Any],
2952 ) -> DeployedComponent:
2953 nsr_id = db_nsr["_id"]
2954 deployed_component = None
2955 ee_level = EELevel.get_level(ee_relation)
2956 if ee_level == EELevel.NS:
2957 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2958 if vca:
2959 deployed_component = DeployedVCA(nsr_id, vca)
2960 elif ee_level == EELevel.VNF:
2961 vca = get_deployed_vca(
2962 db_nsr,
2963 {
2964 "vdu_id": None,
2965 "member-vnf-index": ee_relation.vnf_profile_id,
2966 "ee_descriptor_id": ee_relation.execution_environment_ref,
2967 },
2968 )
2969 if vca:
2970 deployed_component = DeployedVCA(nsr_id, vca)
2971 elif ee_level == EELevel.VDU:
2972 vca = get_deployed_vca(
2973 db_nsr,
2974 {
2975 "vdu_id": ee_relation.vdu_profile_id,
2976 "member-vnf-index": ee_relation.vnf_profile_id,
2977 "ee_descriptor_id": ee_relation.execution_environment_ref,
2978 },
2979 )
2980 if vca:
2981 deployed_component = DeployedVCA(nsr_id, vca)
2982 elif ee_level == EELevel.KDU:
2983 kdu_resource_data = self._get_kdu_resource_data(
2984 ee_relation, db_nsr, cached_vnfds
2985 )
2986 if kdu_resource_data:
2987 deployed_component = DeployedK8sResource(kdu_resource_data)
2988 return deployed_component
2989
2990 async def _add_relation(
2991 self,
2992 relation: Relation,
2993 vca_type: str,
2994 db_nsr: Dict[str, Any],
2995 cached_vnfds: Dict[str, Any],
2996 cached_vnfrs: Dict[str, Any],
2997 ) -> bool:
2998 deployed_provider = self._get_deployed_component(
2999 relation.provider, db_nsr, cached_vnfds
3000 )
3001 deployed_requirer = self._get_deployed_component(
3002 relation.requirer, db_nsr, cached_vnfds
3003 )
3004 if (
3005 deployed_provider
3006 and deployed_requirer
3007 and deployed_provider.config_sw_installed
3008 and deployed_requirer.config_sw_installed
3009 ):
3010 provider_db_vnfr = (
3011 self._get_vnfr(
3012 relation.provider.nsr_id,
3013 relation.provider.vnf_profile_id,
3014 cached_vnfrs,
3015 )
3016 if relation.provider.vnf_profile_id
3017 else None
3018 )
3019 requirer_db_vnfr = (
3020 self._get_vnfr(
3021 relation.requirer.nsr_id,
3022 relation.requirer.vnf_profile_id,
3023 cached_vnfrs,
3024 )
3025 if relation.requirer.vnf_profile_id
3026 else None
3027 )
3028 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3029 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3030 provider_relation_endpoint = RelationEndpoint(
3031 deployed_provider.ee_id,
3032 provider_vca_id,
3033 relation.provider.endpoint,
3034 )
3035 requirer_relation_endpoint = RelationEndpoint(
3036 deployed_requirer.ee_id,
3037 requirer_vca_id,
3038 relation.requirer.endpoint,
3039 )
3040 await self.vca_map[vca_type].add_relation(
3041 provider=provider_relation_endpoint,
3042 requirer=requirer_relation_endpoint,
3043 )
3044 # remove entry from relations list
3045 return True
3046 return False
3047
3048 async def _add_vca_relations(
3049 self,
3050 logging_text,
3051 nsr_id,
3052 vca_type: str,
3053 vca_index: int,
3054 timeout: int = 3600,
3055 ) -> bool:
3056
3057 # steps:
3058 # 1. find all relations for this VCA
3059 # 2. wait for other peers related
3060 # 3. add relations
3061
3062 try:
3063 # STEP 1: find all relations for this VCA
3064
3065 # read nsr record
3066 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3067 nsd = get_nsd(db_nsr)
3068
3069 # this VCA data
3070 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3071 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3072
3073 cached_vnfds = {}
3074 cached_vnfrs = {}
3075 relations = []
3076 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3077 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3078
3079 # if no relations, terminate
3080 if not relations:
3081 self.logger.debug(logging_text + " No relations")
3082 return True
3083
3084 self.logger.debug(logging_text + " adding relations {}".format(relations))
3085
3086 # add all relations
3087 start = time()
3088 while True:
3089 # check timeout
3090 now = time()
3091 if now - start >= timeout:
3092 self.logger.error(logging_text + " : timeout adding relations")
3093 return False
3094
3095 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3096 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3097
3098 # for each relation, find the VCA's related
3099 for relation in relations.copy():
3100 added = await self._add_relation(
3101 relation,
3102 vca_type,
3103 db_nsr,
3104 cached_vnfds,
3105 cached_vnfrs,
3106 )
3107 if added:
3108 relations.remove(relation)
3109
3110 if not relations:
3111 self.logger.debug("Relations added")
3112 break
3113 await asyncio.sleep(5.0)
3114
3115 return True
3116
3117 except Exception as e:
3118 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3119 return False
3120
3121 async def _install_kdu(
3122 self,
3123 nsr_id: str,
3124 nsr_db_path: str,
3125 vnfr_data: dict,
3126 kdu_index: int,
3127 kdud: dict,
3128 vnfd: dict,
3129 k8s_instance_info: dict,
3130 k8params: dict = None,
3131 timeout: int = 600,
3132 vca_id: str = None,
3133 ):
3134
3135 try:
3136 k8sclustertype = k8s_instance_info["k8scluster-type"]
3137 # Instantiate kdu
3138 db_dict_install = {
3139 "collection": "nsrs",
3140 "filter": {"_id": nsr_id},
3141 "path": nsr_db_path,
3142 }
3143
3144 if k8s_instance_info.get("kdu-deployment-name"):
3145 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3146 else:
3147 kdu_instance = self.k8scluster_map[
3148 k8sclustertype
3149 ].generate_kdu_instance_name(
3150 db_dict=db_dict_install,
3151 kdu_model=k8s_instance_info["kdu-model"],
3152 kdu_name=k8s_instance_info["kdu-name"],
3153 )
3154 self.update_db_2(
3155 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3156 )
3157 await self.k8scluster_map[k8sclustertype].install(
3158 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3159 kdu_model=k8s_instance_info["kdu-model"],
3160 atomic=True,
3161 params=k8params,
3162 db_dict=db_dict_install,
3163 timeout=timeout,
3164 kdu_name=k8s_instance_info["kdu-name"],
3165 namespace=k8s_instance_info["namespace"],
3166 kdu_instance=kdu_instance,
3167 vca_id=vca_id,
3168 )
3169 self.update_db_2(
3170 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3171 )
3172
3173 # Obtain services to obtain management service ip
3174 services = await self.k8scluster_map[k8sclustertype].get_services(
3175 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3176 kdu_instance=kdu_instance,
3177 namespace=k8s_instance_info["namespace"],
3178 )
3179
3180 # Obtain management service info (if exists)
3181 vnfr_update_dict = {}
3182 kdu_config = get_configuration(vnfd, kdud["name"])
3183 if kdu_config:
3184 target_ee_list = kdu_config.get("execution-environment-list", [])
3185 else:
3186 target_ee_list = []
3187
3188 if services:
3189 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3190 mgmt_services = [
3191 service
3192 for service in kdud.get("service", [])
3193 if service.get("mgmt-service")
3194 ]
3195 for mgmt_service in mgmt_services:
3196 for service in services:
3197 if service["name"].startswith(mgmt_service["name"]):
3198 # Mgmt service found, Obtain service ip
3199 ip = service.get("external_ip", service.get("cluster_ip"))
3200 if isinstance(ip, list) and len(ip) == 1:
3201 ip = ip[0]
3202
3203 vnfr_update_dict[
3204 "kdur.{}.ip-address".format(kdu_index)
3205 ] = ip
3206
3207 # Check if must update also mgmt ip at the vnf
3208 service_external_cp = mgmt_service.get(
3209 "external-connection-point-ref"
3210 )
3211 if service_external_cp:
3212 if (
3213 deep_get(vnfd, ("mgmt-interface", "cp"))
3214 == service_external_cp
3215 ):
3216 vnfr_update_dict["ip-address"] = ip
3217
3218 if find_in_list(
3219 target_ee_list,
3220 lambda ee: ee.get(
3221 "external-connection-point-ref", ""
3222 )
3223 == service_external_cp,
3224 ):
3225 vnfr_update_dict[
3226 "kdur.{}.ip-address".format(kdu_index)
3227 ] = ip
3228 break
3229 else:
3230 self.logger.warn(
3231 "Mgmt service name: {} not found".format(
3232 mgmt_service["name"]
3233 )
3234 )
3235
3236 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3237 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3238
3239 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3240 if (
3241 kdu_config
3242 and kdu_config.get("initial-config-primitive")
3243 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3244 ):
3245 initial_config_primitive_list = kdu_config.get(
3246 "initial-config-primitive"
3247 )
3248 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3249
3250 for initial_config_primitive in initial_config_primitive_list:
3251 primitive_params_ = self._map_primitive_params(
3252 initial_config_primitive, {}, {}
3253 )
3254
3255 await asyncio.wait_for(
3256 self.k8scluster_map[k8sclustertype].exec_primitive(
3257 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3258 kdu_instance=kdu_instance,
3259 primitive_name=initial_config_primitive["name"],
3260 params=primitive_params_,
3261 db_dict=db_dict_install,
3262 vca_id=vca_id,
3263 ),
3264 timeout=timeout,
3265 )
3266
3267 except Exception as e:
3268 # Prepare update db with error and raise exception
3269 try:
3270 self.update_db_2(
3271 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3272 )
3273 self.update_db_2(
3274 "vnfrs",
3275 vnfr_data.get("_id"),
3276 {"kdur.{}.status".format(kdu_index): "ERROR"},
3277 )
3278 except Exception:
3279 # ignore to keep original exception
3280 pass
3281 # reraise original error
3282 raise
3283
3284 return kdu_instance
3285
3286 async def deploy_kdus(
3287 self,
3288 logging_text,
3289 nsr_id,
3290 nslcmop_id,
3291 db_vnfrs,
3292 db_vnfds,
3293 task_instantiation_info,
3294 ):
3295 # Launch kdus if present in the descriptor
3296
3297 k8scluster_id_2_uuic = {
3298 "helm-chart-v3": {},
3299 "helm-chart": {},
3300 "juju-bundle": {},
3301 }
3302
3303 async def _get_cluster_id(cluster_id, cluster_type):
3304 nonlocal k8scluster_id_2_uuic
3305 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3306 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3307
3308 # check if K8scluster is creating and wait look if previous tasks in process
3309 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3310 "k8scluster", cluster_id
3311 )
3312 if task_dependency:
3313 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3314 task_name, cluster_id
3315 )
3316 self.logger.debug(logging_text + text)
3317 await asyncio.wait(task_dependency, timeout=3600)
3318
3319 db_k8scluster = self.db.get_one(
3320 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3321 )
3322 if not db_k8scluster:
3323 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3324
3325 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3326 if not k8s_id:
3327 if cluster_type == "helm-chart-v3":
3328 try:
3329 # backward compatibility for existing clusters that have not been initialized for helm v3
3330 k8s_credentials = yaml.safe_dump(
3331 db_k8scluster.get("credentials")
3332 )
3333 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3334 k8s_credentials, reuse_cluster_uuid=cluster_id
3335 )
3336 db_k8scluster_update = {}
3337 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3338 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3339 db_k8scluster_update[
3340 "_admin.helm-chart-v3.created"
3341 ] = uninstall_sw
3342 db_k8scluster_update[
3343 "_admin.helm-chart-v3.operationalState"
3344 ] = "ENABLED"
3345 self.update_db_2(
3346 "k8sclusters", cluster_id, db_k8scluster_update
3347 )
3348 except Exception as e:
3349 self.logger.error(
3350 logging_text
3351 + "error initializing helm-v3 cluster: {}".format(str(e))
3352 )
3353 raise LcmException(
3354 "K8s cluster '{}' has not been initialized for '{}'".format(
3355 cluster_id, cluster_type
3356 )
3357 )
3358 else:
3359 raise LcmException(
3360 "K8s cluster '{}' has not been initialized for '{}'".format(
3361 cluster_id, cluster_type
3362 )
3363 )
3364 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3365 return k8s_id
3366
3367 logging_text += "Deploy kdus: "
3368 step = ""
3369 try:
3370 db_nsr_update = {"_admin.deployed.K8s": []}
3371 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3372
3373 index = 0
3374 updated_cluster_list = []
3375 updated_v3_cluster_list = []
3376
3377 for vnfr_data in db_vnfrs.values():
3378 vca_id = self.get_vca_id(vnfr_data, {})
3379 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3380 # Step 0: Prepare and set parameters
3381 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3382 vnfd_id = vnfr_data.get("vnfd-id")
3383 vnfd_with_id = find_in_list(
3384 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3385 )
3386 kdud = next(
3387 kdud
3388 for kdud in vnfd_with_id["kdu"]
3389 if kdud["name"] == kdur["kdu-name"]
3390 )
3391 namespace = kdur.get("k8s-namespace")
3392 kdu_deployment_name = kdur.get("kdu-deployment-name")
3393 if kdur.get("helm-chart"):
3394 kdumodel = kdur["helm-chart"]
3395 # Default version: helm3, if helm-version is v2 assign v2
3396 k8sclustertype = "helm-chart-v3"
3397 self.logger.debug("kdur: {}".format(kdur))
3398 if (
3399 kdur.get("helm-version")
3400 and kdur.get("helm-version") == "v2"
3401 ):
3402 k8sclustertype = "helm-chart"
3403 elif kdur.get("juju-bundle"):
3404 kdumodel = kdur["juju-bundle"]
3405 k8sclustertype = "juju-bundle"
3406 else:
3407 raise LcmException(
3408 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3409 "juju-bundle. Maybe an old NBI version is running".format(
3410 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3411 )
3412 )
3413 # check if kdumodel is a file and exists
3414 try:
3415 vnfd_with_id = find_in_list(
3416 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3417 )
3418 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3419 if storage: # may be not present if vnfd has not artifacts
3420 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3421 if storage["pkg-dir"]:
3422 filename = "{}/{}/{}s/{}".format(
3423 storage["folder"],
3424 storage["pkg-dir"],
3425 k8sclustertype,
3426 kdumodel,
3427 )
3428 else:
3429 filename = "{}/Scripts/{}s/{}".format(
3430 storage["folder"],
3431 k8sclustertype,
3432 kdumodel,
3433 )
3434 if self.fs.file_exists(
3435 filename, mode="file"
3436 ) or self.fs.file_exists(filename, mode="dir"):
3437 kdumodel = self.fs.path + filename
3438 except (asyncio.TimeoutError, asyncio.CancelledError):
3439 raise
3440 except Exception: # it is not a file
3441 pass
3442
3443 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3444 step = "Synchronize repos for k8s cluster '{}'".format(
3445 k8s_cluster_id
3446 )
3447 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3448
3449 # Synchronize repos
3450 if (
3451 k8sclustertype == "helm-chart"
3452 and cluster_uuid not in updated_cluster_list
3453 ) or (
3454 k8sclustertype == "helm-chart-v3"
3455 and cluster_uuid not in updated_v3_cluster_list
3456 ):
3457 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3458 self.k8scluster_map[k8sclustertype].synchronize_repos(
3459 cluster_uuid=cluster_uuid
3460 )
3461 )
3462 if del_repo_list or added_repo_dict:
3463 if k8sclustertype == "helm-chart":
3464 unset = {
3465 "_admin.helm_charts_added." + item: None
3466 for item in del_repo_list
3467 }
3468 updated = {
3469 "_admin.helm_charts_added." + item: name
3470 for item, name in added_repo_dict.items()
3471 }
3472 updated_cluster_list.append(cluster_uuid)
3473 elif k8sclustertype == "helm-chart-v3":
3474 unset = {
3475 "_admin.helm_charts_v3_added." + item: None
3476 for item in del_repo_list
3477 }
3478 updated = {
3479 "_admin.helm_charts_v3_added." + item: name
3480 for item, name in added_repo_dict.items()
3481 }
3482 updated_v3_cluster_list.append(cluster_uuid)
3483 self.logger.debug(
3484 logging_text + "repos synchronized on k8s cluster "
3485 "'{}' to_delete: {}, to_add: {}".format(
3486 k8s_cluster_id, del_repo_list, added_repo_dict
3487 )
3488 )
3489 self.db.set_one(
3490 "k8sclusters",
3491 {"_id": k8s_cluster_id},
3492 updated,
3493 unset=unset,
3494 )
3495
3496 # Instantiate kdu
3497 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3498 vnfr_data["member-vnf-index-ref"],
3499 kdur["kdu-name"],
3500 k8s_cluster_id,
3501 )
3502 k8s_instance_info = {
3503 "kdu-instance": None,
3504 "k8scluster-uuid": cluster_uuid,
3505 "k8scluster-type": k8sclustertype,
3506 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3507 "kdu-name": kdur["kdu-name"],
3508 "kdu-model": kdumodel,
3509 "namespace": namespace,
3510 "kdu-deployment-name": kdu_deployment_name,
3511 }
3512 db_path = "_admin.deployed.K8s.{}".format(index)
3513 db_nsr_update[db_path] = k8s_instance_info
3514 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3515 vnfd_with_id = find_in_list(
3516 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3517 )
3518 task = asyncio.ensure_future(
3519 self._install_kdu(
3520 nsr_id,
3521 db_path,
3522 vnfr_data,
3523 kdu_index,
3524 kdud,
3525 vnfd_with_id,
3526 k8s_instance_info,
3527 k8params=desc_params,
3528 timeout=600,
3529 vca_id=vca_id,
3530 )
3531 )
3532 self.lcm_tasks.register(
3533 "ns",
3534 nsr_id,
3535 nslcmop_id,
3536 "instantiate_KDU-{}".format(index),
3537 task,
3538 )
3539 task_instantiation_info[task] = "Deploying KDU {}".format(
3540 kdur["kdu-name"]
3541 )
3542
3543 index += 1
3544
3545 except (LcmException, asyncio.CancelledError):
3546 raise
3547 except Exception as e:
3548 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3549 if isinstance(e, (N2VCException, DbException)):
3550 self.logger.error(logging_text + msg)
3551 else:
3552 self.logger.critical(logging_text + msg, exc_info=True)
3553 raise LcmException(msg)
3554 finally:
3555 if db_nsr_update:
3556 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3557
3558 def _deploy_n2vc(
3559 self,
3560 logging_text,
3561 db_nsr,
3562 db_vnfr,
3563 nslcmop_id,
3564 nsr_id,
3565 nsi_id,
3566 vnfd_id,
3567 vdu_id,
3568 kdu_name,
3569 member_vnf_index,
3570 vdu_index,
3571 vdu_name,
3572 deploy_params,
3573 descriptor_config,
3574 base_folder,
3575 task_instantiation_info,
3576 stage,
3577 ):
3578 # launch instantiate_N2VC in a asyncio task and register task object
3579 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3580 # if not found, create one entry and update database
3581 # fill db_nsr._admin.deployed.VCA.<index>
3582
3583 self.logger.debug(
3584 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3585 )
3586 if "execution-environment-list" in descriptor_config:
3587 ee_list = descriptor_config.get("execution-environment-list", [])
3588 elif "juju" in descriptor_config:
3589 ee_list = [descriptor_config] # ns charms
3590 else: # other types as script are not supported
3591 ee_list = []
3592
3593 for ee_item in ee_list:
3594 self.logger.debug(
3595 logging_text
3596 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3597 ee_item.get("juju"), ee_item.get("helm-chart")
3598 )
3599 )
3600 ee_descriptor_id = ee_item.get("id")
3601 if ee_item.get("juju"):
3602 vca_name = ee_item["juju"].get("charm")
3603 vca_type = (
3604 "lxc_proxy_charm"
3605 if ee_item["juju"].get("charm") is not None
3606 else "native_charm"
3607 )
3608 if ee_item["juju"].get("cloud") == "k8s":
3609 vca_type = "k8s_proxy_charm"
3610 elif ee_item["juju"].get("proxy") is False:
3611 vca_type = "native_charm"
3612 elif ee_item.get("helm-chart"):
3613 vca_name = ee_item["helm-chart"]
3614 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3615 vca_type = "helm"
3616 else:
3617 vca_type = "helm-v3"
3618 else:
3619 self.logger.debug(
3620 logging_text + "skipping non juju neither charm configuration"
3621 )
3622 continue
3623
3624 vca_index = -1
3625 for vca_index, vca_deployed in enumerate(
3626 db_nsr["_admin"]["deployed"]["VCA"]
3627 ):
3628 if not vca_deployed:
3629 continue
3630 if (
3631 vca_deployed.get("member-vnf-index") == member_vnf_index
3632 and vca_deployed.get("vdu_id") == vdu_id
3633 and vca_deployed.get("kdu_name") == kdu_name
3634 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3635 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3636 ):
3637 break
3638 else:
3639 # not found, create one.
3640 target = (
3641 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3642 )
3643 if vdu_id:
3644 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3645 elif kdu_name:
3646 target += "/kdu/{}".format(kdu_name)
3647 vca_deployed = {
3648 "target_element": target,
3649 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3650 "member-vnf-index": member_vnf_index,
3651 "vdu_id": vdu_id,
3652 "kdu_name": kdu_name,
3653 "vdu_count_index": vdu_index,
3654 "operational-status": "init", # TODO revise
3655 "detailed-status": "", # TODO revise
3656 "step": "initial-deploy", # TODO revise
3657 "vnfd_id": vnfd_id,
3658 "vdu_name": vdu_name,
3659 "type": vca_type,
3660 "ee_descriptor_id": ee_descriptor_id,
3661 }
3662 vca_index += 1
3663
3664 # create VCA and configurationStatus in db
3665 db_dict = {
3666 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3667 "configurationStatus.{}".format(vca_index): dict(),
3668 }
3669 self.update_db_2("nsrs", nsr_id, db_dict)
3670
3671 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3672
3673 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3674 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3675 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3676
3677 # Launch task
3678 task_n2vc = asyncio.ensure_future(
3679 self.instantiate_N2VC(
3680 logging_text=logging_text,
3681 vca_index=vca_index,
3682 nsi_id=nsi_id,
3683 db_nsr=db_nsr,
3684 db_vnfr=db_vnfr,
3685 vdu_id=vdu_id,
3686 kdu_name=kdu_name,
3687 vdu_index=vdu_index,
3688 deploy_params=deploy_params,
3689 config_descriptor=descriptor_config,
3690 base_folder=base_folder,
3691 nslcmop_id=nslcmop_id,
3692 stage=stage,
3693 vca_type=vca_type,
3694 vca_name=vca_name,
3695 ee_config_descriptor=ee_item,
3696 )
3697 )
3698 self.lcm_tasks.register(
3699 "ns",
3700 nsr_id,
3701 nslcmop_id,
3702 "instantiate_N2VC-{}".format(vca_index),
3703 task_n2vc,
3704 )
3705 task_instantiation_info[
3706 task_n2vc
3707 ] = self.task_name_deploy_vca + " {}.{}".format(
3708 member_vnf_index or "", vdu_id or ""
3709 )
3710
3711 @staticmethod
3712 def _create_nslcmop(nsr_id, operation, params):
3713 """
3714 Creates a ns-lcm-opp content to be stored at database.
3715 :param nsr_id: internal id of the instance
3716 :param operation: instantiate, terminate, scale, action, ...
3717 :param params: user parameters for the operation
3718 :return: dictionary following SOL005 format
3719 """
3720 # Raise exception if invalid arguments
3721 if not (nsr_id and operation and params):
3722 raise LcmException(
3723 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3724 )
3725 now = time()
3726 _id = str(uuid4())
3727 nslcmop = {
3728 "id": _id,
3729 "_id": _id,
3730 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3731 "operationState": "PROCESSING",
3732 "statusEnteredTime": now,
3733 "nsInstanceId": nsr_id,
3734 "lcmOperationType": operation,
3735 "startTime": now,
3736 "isAutomaticInvocation": False,
3737 "operationParams": params,
3738 "isCancelPending": False,
3739 "links": {
3740 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3741 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3742 },
3743 }
3744 return nslcmop
3745
3746 def _format_additional_params(self, params):
3747 params = params or {}
3748 for key, value in params.items():
3749 if str(value).startswith("!!yaml "):
3750 params[key] = yaml.safe_load(value[7:])
3751 return params
3752
3753 def _get_terminate_primitive_params(self, seq, vnf_index):
3754 primitive = seq.get("name")
3755 primitive_params = {}
3756 params = {
3757 "member_vnf_index": vnf_index,
3758 "primitive": primitive,
3759 "primitive_params": primitive_params,
3760 }
3761 desc_params = {}
3762 return self._map_primitive_params(seq, params, desc_params)
3763
3764 # sub-operations
3765
3766 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3767 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3768 if op.get("operationState") == "COMPLETED":
3769 # b. Skip sub-operation
3770 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3771 return self.SUBOPERATION_STATUS_SKIP
3772 else:
3773 # c. retry executing sub-operation
3774 # The sub-operation exists, and operationState != 'COMPLETED'
3775 # Update operationState = 'PROCESSING' to indicate a retry.
3776 operationState = "PROCESSING"
3777 detailed_status = "In progress"
3778 self._update_suboperation_status(
3779 db_nslcmop, op_index, operationState, detailed_status
3780 )
3781 # Return the sub-operation index
3782 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3783 # with arguments extracted from the sub-operation
3784 return op_index
3785
3786 # Find a sub-operation where all keys in a matching dictionary must match
3787 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3788 def _find_suboperation(self, db_nslcmop, match):
3789 if db_nslcmop and match:
3790 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3791 for i, op in enumerate(op_list):
3792 if all(op.get(k) == match[k] for k in match):
3793 return i
3794 return self.SUBOPERATION_STATUS_NOT_FOUND
3795
3796 # Update status for a sub-operation given its index
3797 def _update_suboperation_status(
3798 self, db_nslcmop, op_index, operationState, detailed_status
3799 ):
3800 # Update DB for HA tasks
3801 q_filter = {"_id": db_nslcmop["_id"]}
3802 update_dict = {
3803 "_admin.operations.{}.operationState".format(op_index): operationState,
3804 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3805 }
3806 self.db.set_one(
3807 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3808 )
3809
3810 # Add sub-operation, return the index of the added sub-operation
3811 # Optionally, set operationState, detailed-status, and operationType
3812 # Status and type are currently set for 'scale' sub-operations:
3813 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3814 # 'detailed-status' : status message
3815 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3816 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3817 def _add_suboperation(
3818 self,
3819 db_nslcmop,
3820 vnf_index,
3821 vdu_id,
3822 vdu_count_index,
3823 vdu_name,
3824 primitive,
3825 mapped_primitive_params,
3826 operationState=None,
3827 detailed_status=None,
3828 operationType=None,
3829 RO_nsr_id=None,
3830 RO_scaling_info=None,
3831 ):
3832 if not db_nslcmop:
3833 return self.SUBOPERATION_STATUS_NOT_FOUND
3834 # Get the "_admin.operations" list, if it exists
3835 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3836 op_list = db_nslcmop_admin.get("operations")
3837 # Create or append to the "_admin.operations" list
3838 new_op = {
3839 "member_vnf_index": vnf_index,
3840 "vdu_id": vdu_id,
3841 "vdu_count_index": vdu_count_index,
3842 "primitive": primitive,
3843 "primitive_params": mapped_primitive_params,
3844 }
3845 if operationState:
3846 new_op["operationState"] = operationState
3847 if detailed_status:
3848 new_op["detailed-status"] = detailed_status
3849 if operationType:
3850 new_op["lcmOperationType"] = operationType
3851 if RO_nsr_id:
3852 new_op["RO_nsr_id"] = RO_nsr_id
3853 if RO_scaling_info:
3854 new_op["RO_scaling_info"] = RO_scaling_info
3855 if not op_list:
3856 # No existing operations, create key 'operations' with current operation as first list element
3857 db_nslcmop_admin.update({"operations": [new_op]})
3858 op_list = db_nslcmop_admin.get("operations")
3859 else:
3860 # Existing operations, append operation to list
3861 op_list.append(new_op)
3862
3863 db_nslcmop_update = {"_admin.operations": op_list}
3864 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3865 op_index = len(op_list) - 1
3866 return op_index
3867
3868 # Helper methods for scale() sub-operations
3869
3870 # pre-scale/post-scale:
3871 # Check for 3 different cases:
3872 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3873 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3874 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3875 def _check_or_add_scale_suboperation(
3876 self,
3877 db_nslcmop,
3878 vnf_index,
3879 vnf_config_primitive,
3880 primitive_params,
3881 operationType,
3882 RO_nsr_id=None,
3883 RO_scaling_info=None,
3884 ):
3885 # Find this sub-operation
3886 if RO_nsr_id and RO_scaling_info:
3887 operationType = "SCALE-RO"
3888 match = {
3889 "member_vnf_index": vnf_index,
3890 "RO_nsr_id": RO_nsr_id,
3891 "RO_scaling_info": RO_scaling_info,
3892 }
3893 else:
3894 match = {
3895 "member_vnf_index": vnf_index,
3896 "primitive": vnf_config_primitive,
3897 "primitive_params": primitive_params,
3898 "lcmOperationType": operationType,
3899 }
3900 op_index = self._find_suboperation(db_nslcmop, match)
3901 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3902 # a. New sub-operation
3903 # The sub-operation does not exist, add it.
3904 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3905 # The following parameters are set to None for all kind of scaling:
3906 vdu_id = None
3907 vdu_count_index = None
3908 vdu_name = None
3909 if RO_nsr_id and RO_scaling_info:
3910 vnf_config_primitive = None
3911 primitive_params = None
3912 else:
3913 RO_nsr_id = None
3914 RO_scaling_info = None
3915 # Initial status for sub-operation
3916 operationState = "PROCESSING"
3917 detailed_status = "In progress"
3918 # Add sub-operation for pre/post-scaling (zero or more operations)
3919 self._add_suboperation(
3920 db_nslcmop,
3921 vnf_index,
3922 vdu_id,
3923 vdu_count_index,
3924 vdu_name,
3925 vnf_config_primitive,
3926 primitive_params,
3927 operationState,
3928 detailed_status,
3929 operationType,
3930 RO_nsr_id,
3931 RO_scaling_info,
3932 )
3933 return self.SUBOPERATION_STATUS_NEW
3934 else:
3935 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3936 # or op_index (operationState != 'COMPLETED')
3937 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3938
3939 # Function to return execution_environment id
3940
3941 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3942 # TODO vdu_index_count
3943 for vca in vca_deployed_list:
3944 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3945 return vca["ee_id"]
3946
3947 async def destroy_N2VC(
3948 self,
3949 logging_text,
3950 db_nslcmop,
3951 vca_deployed,
3952 config_descriptor,
3953 vca_index,
3954 destroy_ee=True,
3955 exec_primitives=True,
3956 scaling_in=False,
3957 vca_id: str = None,
3958 ):
3959 """
3960 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3961 :param logging_text:
3962 :param db_nslcmop:
3963 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3964 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3965 :param vca_index: index in the database _admin.deployed.VCA
3966 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3967 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3968 not executed properly
3969 :param scaling_in: True destroys the application, False destroys the model
3970 :return: None or exception
3971 """
3972
3973 self.logger.debug(
3974 logging_text
3975 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3976 vca_index, vca_deployed, config_descriptor, destroy_ee
3977 )
3978 )
3979
3980 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3981
3982 # execute terminate_primitives
3983 if exec_primitives:
3984 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3985 config_descriptor.get("terminate-config-primitive"),
3986 vca_deployed.get("ee_descriptor_id"),
3987 )
3988 vdu_id = vca_deployed.get("vdu_id")
3989 vdu_count_index = vca_deployed.get("vdu_count_index")
3990 vdu_name = vca_deployed.get("vdu_name")
3991 vnf_index = vca_deployed.get("member-vnf-index")
3992 if terminate_primitives and vca_deployed.get("needed_terminate"):
3993 for seq in terminate_primitives:
3994 # For each sequence in list, get primitive and call _ns_execute_primitive()
3995 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3996 vnf_index, seq.get("name")
3997 )
3998 self.logger.debug(logging_text + step)
3999 # Create the primitive for each sequence, i.e. "primitive": "touch"
4000 primitive = seq.get("name")
4001 mapped_primitive_params = self._get_terminate_primitive_params(
4002 seq, vnf_index
4003 )
4004
4005 # Add sub-operation
4006 self._add_suboperation(
4007 db_nslcmop,
4008 vnf_index,
4009 vdu_id,
4010 vdu_count_index,
4011 vdu_name,
4012 primitive,
4013 mapped_primitive_params,
4014 )
4015 # Sub-operations: Call _ns_execute_primitive() instead of action()
4016 try:
4017 result, result_detail = await self._ns_execute_primitive(
4018 vca_deployed["ee_id"],
4019 primitive,
4020 mapped_primitive_params,
4021 vca_type=vca_type,
4022 vca_id=vca_id,
4023 )
4024 except LcmException:
4025 # this happens when VCA is not deployed. In this case it is not needed to terminate
4026 continue
4027 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4028 if result not in result_ok:
4029 raise LcmException(
4030 "terminate_primitive {} for vnf_member_index={} fails with "
4031 "error {}".format(seq.get("name"), vnf_index, result_detail)
4032 )
4033 # set that this VCA do not need terminated
4034 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4035 vca_index
4036 )
4037 self.update_db_2(
4038 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4039 )
4040
4041 # Delete Prometheus Jobs if any
4042 # This uses NSR_ID, so it will destroy any jobs under this index
4043 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4044
4045 if destroy_ee:
4046 await self.vca_map[vca_type].delete_execution_environment(
4047 vca_deployed["ee_id"],
4048 scaling_in=scaling_in,
4049 vca_type=vca_type,
4050 vca_id=vca_id,
4051 )
4052
4053 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4054 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4055 namespace = "." + db_nsr["_id"]
4056 try:
4057 await self.n2vc.delete_namespace(
4058 namespace=namespace,
4059 total_timeout=self.timeout_charm_delete,
4060 vca_id=vca_id,
4061 )
4062 except N2VCNotFound: # already deleted. Skip
4063 pass
4064 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4065
4066 async def _terminate_RO(
4067 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4068 ):
4069 """
4070 Terminates a deployment from RO
4071 :param logging_text:
4072 :param nsr_deployed: db_nsr._admin.deployed
4073 :param nsr_id:
4074 :param nslcmop_id:
4075 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4076 this method will update only the index 2, but it will write on database the concatenated content of the list
4077 :return:
4078 """
4079 db_nsr_update = {}
4080 failed_detail = []
4081 ro_nsr_id = ro_delete_action = None
4082 if nsr_deployed and nsr_deployed.get("RO"):
4083 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4084 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4085 try:
4086 if ro_nsr_id:
4087 stage[2] = "Deleting ns from VIM."
4088 db_nsr_update["detailed-status"] = " ".join(stage)
4089 self._write_op_status(nslcmop_id, stage)
4090 self.logger.debug(logging_text + stage[2])
4091 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4092 self._write_op_status(nslcmop_id, stage)
4093 desc = await self.RO.delete("ns", ro_nsr_id)
4094 ro_delete_action = desc["action_id"]
4095 db_nsr_update[
4096 "_admin.deployed.RO.nsr_delete_action_id"
4097 ] = ro_delete_action
4098 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4099 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4100 if ro_delete_action:
4101 # wait until NS is deleted from VIM
4102 stage[2] = "Waiting ns deleted from VIM."
4103 detailed_status_old = None
4104 self.logger.debug(
4105 logging_text
4106 + stage[2]
4107 + " RO_id={} ro_delete_action={}".format(
4108 ro_nsr_id, ro_delete_action
4109 )
4110 )
4111 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4112 self._write_op_status(nslcmop_id, stage)
4113
4114 delete_timeout = 20 * 60 # 20 minutes
4115 while delete_timeout > 0:
4116 desc = await self.RO.show(
4117 "ns",
4118 item_id_name=ro_nsr_id,
4119 extra_item="action",
4120 extra_item_id=ro_delete_action,
4121 )
4122
4123 # deploymentStatus
4124 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4125
4126 ns_status, ns_status_info = self.RO.check_action_status(desc)
4127 if ns_status == "ERROR":
4128 raise ROclient.ROClientException(ns_status_info)
4129 elif ns_status == "BUILD":
4130 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4131 elif ns_status == "ACTIVE":
4132 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4133 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4134 break
4135 else:
4136 assert (
4137 False
4138 ), "ROclient.check_action_status returns unknown {}".format(
4139 ns_status
4140 )
4141 if stage[2] != detailed_status_old:
4142 detailed_status_old = stage[2]
4143 db_nsr_update["detailed-status"] = " ".join(stage)
4144 self._write_op_status(nslcmop_id, stage)
4145 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4146 await asyncio.sleep(5, loop=self.loop)
4147 delete_timeout -= 5
4148 else: # delete_timeout <= 0:
4149 raise ROclient.ROClientException(
4150 "Timeout waiting ns deleted from VIM"
4151 )
4152
4153 except Exception as e:
4154 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4155 if (
4156 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4157 ): # not found
4158 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4159 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4160 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4161 self.logger.debug(
4162 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4163 )
4164 elif (
4165 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4166 ): # conflict
4167 failed_detail.append("delete conflict: {}".format(e))
4168 self.logger.debug(
4169 logging_text
4170 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4171 )
4172 else:
4173 failed_detail.append("delete error: {}".format(e))
4174 self.logger.error(
4175 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4176 )
4177
4178 # Delete nsd
4179 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4180 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4181 try:
4182 stage[2] = "Deleting nsd from RO."
4183 db_nsr_update["detailed-status"] = " ".join(stage)
4184 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4185 self._write_op_status(nslcmop_id, stage)
4186 await self.RO.delete("nsd", ro_nsd_id)
4187 self.logger.debug(
4188 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4189 )
4190 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4191 except Exception as e:
4192 if (
4193 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4194 ): # not found
4195 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4196 self.logger.debug(
4197 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4198 )
4199 elif (
4200 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4201 ): # conflict
4202 failed_detail.append(
4203 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4204 )
4205 self.logger.debug(logging_text + failed_detail[-1])
4206 else:
4207 failed_detail.append(
4208 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4209 )
4210 self.logger.error(logging_text + failed_detail[-1])
4211
4212 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4213 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4214 if not vnf_deployed or not vnf_deployed["id"]:
4215 continue
4216 try:
4217 ro_vnfd_id = vnf_deployed["id"]
4218 stage[
4219 2
4220 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4221 vnf_deployed["member-vnf-index"], ro_vnfd_id
4222 )
4223 db_nsr_update["detailed-status"] = " ".join(stage)
4224 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4225 self._write_op_status(nslcmop_id, stage)
4226 await self.RO.delete("vnfd", ro_vnfd_id)
4227 self.logger.debug(
4228 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4229 )
4230 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4231 except Exception as e:
4232 if (
4233 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4234 ): # not found
4235 db_nsr_update[
4236 "_admin.deployed.RO.vnfd.{}.id".format(index)
4237 ] = None
4238 self.logger.debug(
4239 logging_text
4240 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4241 )
4242 elif (
4243 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4244 ): # conflict
4245 failed_detail.append(
4246 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4247 )
4248 self.logger.debug(logging_text + failed_detail[-1])
4249 else:
4250 failed_detail.append(
4251 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4252 )
4253 self.logger.error(logging_text + failed_detail[-1])
4254
4255 if failed_detail:
4256 stage[2] = "Error deleting from VIM"
4257 else:
4258 stage[2] = "Deleted from VIM"
4259 db_nsr_update["detailed-status"] = " ".join(stage)
4260 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4261 self._write_op_status(nslcmop_id, stage)
4262
4263 if failed_detail:
4264 raise LcmException("; ".join(failed_detail))
4265
4266 async def terminate(self, nsr_id, nslcmop_id):
4267 # Try to lock HA task here
4268 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4269 if not task_is_locked_by_me:
4270 return
4271
4272 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4273 self.logger.debug(logging_text + "Enter")
4274 timeout_ns_terminate = self.timeout_ns_terminate
4275 db_nsr = None
4276 db_nslcmop = None
4277 operation_params = None
4278 exc = None
4279 error_list = [] # annotates all failed error messages
4280 db_nslcmop_update = {}
4281 autoremove = False # autoremove after terminated
4282 tasks_dict_info = {}
4283 db_nsr_update = {}
4284 stage = [
4285 "Stage 1/3: Preparing task.",
4286 "Waiting for previous operations to terminate.",
4287 "",
4288 ]
4289 # ^ contains [stage, step, VIM-status]
4290 try:
4291 # wait for any previous tasks in process
4292 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4293
4294 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4295 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4296 operation_params = db_nslcmop.get("operationParams") or {}
4297 if operation_params.get("timeout_ns_terminate"):
4298 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4299 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4300 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4301
4302 db_nsr_update["operational-status"] = "terminating"
4303 db_nsr_update["config-status"] = "terminating"
4304 self._write_ns_status(
4305 nsr_id=nsr_id,
4306 ns_state="TERMINATING",
4307 current_operation="TERMINATING",
4308 current_operation_id=nslcmop_id,
4309 other_update=db_nsr_update,
4310 )
4311 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4312 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4313 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4314 return
4315
4316 stage[1] = "Getting vnf descriptors from db."
4317 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4318 db_vnfrs_dict = {
4319 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4320 }
4321 db_vnfds_from_id = {}
4322 db_vnfds_from_member_index = {}
4323 # Loop over VNFRs
4324 for vnfr in db_vnfrs_list:
4325 vnfd_id = vnfr["vnfd-id"]
4326 if vnfd_id not in db_vnfds_from_id:
4327 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4328 db_vnfds_from_id[vnfd_id] = vnfd
4329 db_vnfds_from_member_index[
4330 vnfr["member-vnf-index-ref"]
4331 ] = db_vnfds_from_id[vnfd_id]
4332
4333 # Destroy individual execution environments when there are terminating primitives.
4334 # Rest of EE will be deleted at once
4335 # TODO - check before calling _destroy_N2VC
4336 # if not operation_params.get("skip_terminate_primitives"):#
4337 # or not vca.get("needed_terminate"):
4338 stage[0] = "Stage 2/3 execute terminating primitives."
4339 self.logger.debug(logging_text + stage[0])
4340 stage[1] = "Looking execution environment that needs terminate."
4341 self.logger.debug(logging_text + stage[1])
4342
4343 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4344 config_descriptor = None
4345 vca_member_vnf_index = vca.get("member-vnf-index")
4346 vca_id = self.get_vca_id(
4347 db_vnfrs_dict.get(vca_member_vnf_index)
4348 if vca_member_vnf_index
4349 else None,
4350 db_nsr,
4351 )
4352 if not vca or not vca.get("ee_id"):
4353 continue
4354 if not vca.get("member-vnf-index"):
4355 # ns
4356 config_descriptor = db_nsr.get("ns-configuration")
4357 elif vca.get("vdu_id"):
4358 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4359 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4360 elif vca.get("kdu_name"):
4361 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4362 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4363 else:
4364 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4365 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4366 vca_type = vca.get("type")
4367 exec_terminate_primitives = not operation_params.get(
4368 "skip_terminate_primitives"
4369 ) and vca.get("needed_terminate")
4370 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4371 # pending native charms
4372 destroy_ee = (
4373 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4374 )
4375 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4376 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4377 task = asyncio.ensure_future(
4378 self.destroy_N2VC(
4379 logging_text,
4380 db_nslcmop,
4381 vca,
4382 config_descriptor,
4383 vca_index,
4384 destroy_ee,
4385 exec_terminate_primitives,
4386 vca_id=vca_id,
4387 )
4388 )
4389 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4390
4391 # wait for pending tasks of terminate primitives
4392 if tasks_dict_info:
4393 self.logger.debug(
4394 logging_text
4395 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4396 )
4397 error_list = await self._wait_for_tasks(
4398 logging_text,
4399 tasks_dict_info,
4400 min(self.timeout_charm_delete, timeout_ns_terminate),
4401 stage,
4402 nslcmop_id,
4403 )
4404 tasks_dict_info.clear()
4405 if error_list:
4406 return # raise LcmException("; ".join(error_list))
4407
4408 # remove All execution environments at once
4409 stage[0] = "Stage 3/3 delete all."
4410
4411 if nsr_deployed.get("VCA"):
4412 stage[1] = "Deleting all execution environments."
4413 self.logger.debug(logging_text + stage[1])
4414 vca_id = self.get_vca_id({}, db_nsr)
4415 task_delete_ee = asyncio.ensure_future(
4416 asyncio.wait_for(
4417 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4418 timeout=self.timeout_charm_delete,
4419 )
4420 )
4421 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4422 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4423
4424 # Delete from k8scluster
4425 stage[1] = "Deleting KDUs."
4426 self.logger.debug(logging_text + stage[1])
4427 # print(nsr_deployed)
4428 for kdu in get_iterable(nsr_deployed, "K8s"):
4429 if not kdu or not kdu.get("kdu-instance"):
4430 continue
4431 kdu_instance = kdu.get("kdu-instance")
4432 if kdu.get("k8scluster-type") in self.k8scluster_map:
4433 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4434 vca_id = self.get_vca_id({}, db_nsr)
4435 task_delete_kdu_instance = asyncio.ensure_future(
4436 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4437 cluster_uuid=kdu.get("k8scluster-uuid"),
4438 kdu_instance=kdu_instance,
4439 vca_id=vca_id,
4440 )
4441 )
4442 else:
4443 self.logger.error(
4444 logging_text
4445 + "Unknown k8s deployment type {}".format(
4446 kdu.get("k8scluster-type")
4447 )
4448 )
4449 continue
4450 tasks_dict_info[
4451 task_delete_kdu_instance
4452 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4453
4454 # remove from RO
4455 stage[1] = "Deleting ns from VIM."
4456 if self.ng_ro:
4457 task_delete_ro = asyncio.ensure_future(
4458 self._terminate_ng_ro(
4459 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4460 )
4461 )
4462 else:
4463 task_delete_ro = asyncio.ensure_future(
4464 self._terminate_RO(
4465 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4466 )
4467 )
4468 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4469
4470 # rest of staff will be done at finally
4471
4472 except (
4473 ROclient.ROClientException,
4474 DbException,
4475 LcmException,
4476 N2VCException,
4477 ) as e:
4478 self.logger.error(logging_text + "Exit Exception {}".format(e))
4479 exc = e
4480 except asyncio.CancelledError:
4481 self.logger.error(
4482 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4483 )
4484 exc = "Operation was cancelled"
4485 except Exception as e:
4486 exc = traceback.format_exc()
4487 self.logger.critical(
4488 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4489 exc_info=True,
4490 )
4491 finally:
4492 if exc:
4493 error_list.append(str(exc))
4494 try:
4495 # wait for pending tasks
4496 if tasks_dict_info:
4497 stage[1] = "Waiting for terminate pending tasks."
4498 self.logger.debug(logging_text + stage[1])
4499 error_list += await self._wait_for_tasks(
4500 logging_text,
4501 tasks_dict_info,
4502 timeout_ns_terminate,
4503 stage,
4504 nslcmop_id,
4505 )
4506 stage[1] = stage[2] = ""
4507 except asyncio.CancelledError:
4508 error_list.append("Cancelled")
4509 # TODO cancell all tasks
4510 except Exception as exc:
4511 error_list.append(str(exc))
4512 # update status at database
4513 if error_list:
4514 error_detail = "; ".join(error_list)
4515 # self.logger.error(logging_text + error_detail)
4516 error_description_nslcmop = "{} Detail: {}".format(
4517 stage[0], error_detail
4518 )
4519 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4520 nslcmop_id, stage[0]
4521 )
4522
4523 db_nsr_update["operational-status"] = "failed"
4524 db_nsr_update["detailed-status"] = (
4525 error_description_nsr + " Detail: " + error_detail
4526 )
4527 db_nslcmop_update["detailed-status"] = error_detail
4528 nslcmop_operation_state = "FAILED"
4529 ns_state = "BROKEN"
4530 else:
4531 error_detail = None
4532 error_description_nsr = error_description_nslcmop = None
4533 ns_state = "NOT_INSTANTIATED"
4534 db_nsr_update["operational-status"] = "terminated"
4535 db_nsr_update["detailed-status"] = "Done"
4536 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4537 db_nslcmop_update["detailed-status"] = "Done"
4538 nslcmop_operation_state = "COMPLETED"
4539
4540 if db_nsr:
4541 self._write_ns_status(
4542 nsr_id=nsr_id,
4543 ns_state=ns_state,
4544 current_operation="IDLE",
4545 current_operation_id=None,
4546 error_description=error_description_nsr,
4547 error_detail=error_detail,
4548 other_update=db_nsr_update,
4549 )
4550 self._write_op_status(
4551 op_id=nslcmop_id,
4552 stage="",
4553 error_message=error_description_nslcmop,
4554 operation_state=nslcmop_operation_state,
4555 other_update=db_nslcmop_update,
4556 )
4557 if ns_state == "NOT_INSTANTIATED":
4558 try:
4559 self.db.set_list(
4560 "vnfrs",
4561 {"nsr-id-ref": nsr_id},
4562 {"_admin.nsState": "NOT_INSTANTIATED"},
4563 )
4564 except DbException as e:
4565 self.logger.warn(
4566 logging_text
4567 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4568 nsr_id, e
4569 )
4570 )
4571 if operation_params:
4572 autoremove = operation_params.get("autoremove", False)
4573 if nslcmop_operation_state:
4574 try:
4575 await self.msg.aiowrite(
4576 "ns",
4577 "terminated",
4578 {
4579 "nsr_id": nsr_id,
4580 "nslcmop_id": nslcmop_id,
4581 "operationState": nslcmop_operation_state,
4582 "autoremove": autoremove,
4583 },
4584 loop=self.loop,
4585 )
4586 except Exception as e:
4587 self.logger.error(
4588 logging_text + "kafka_write notification Exception {}".format(e)
4589 )
4590
4591 self.logger.debug(logging_text + "Exit")
4592 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4593
4594 async def _wait_for_tasks(
4595 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4596 ):
4597 time_start = time()
4598 error_detail_list = []
4599 error_list = []
4600 pending_tasks = list(created_tasks_info.keys())
4601 num_tasks = len(pending_tasks)
4602 num_done = 0
4603 stage[1] = "{}/{}.".format(num_done, num_tasks)
4604 self._write_op_status(nslcmop_id, stage)
4605 while pending_tasks:
4606 new_error = None
4607 _timeout = timeout + time_start - time()
4608 done, pending_tasks = await asyncio.wait(
4609 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4610 )
4611 num_done += len(done)
4612 if not done: # Timeout
4613 for task in pending_tasks:
4614 new_error = created_tasks_info[task] + ": Timeout"
4615 error_detail_list.append(new_error)
4616 error_list.append(new_error)
4617 break
4618 for task in done:
4619 if task.cancelled():
4620 exc = "Cancelled"
4621 else:
4622 exc = task.exception()
4623 if exc:
4624 if isinstance(exc, asyncio.TimeoutError):
4625 exc = "Timeout"
4626 new_error = created_tasks_info[task] + ": {}".format(exc)
4627 error_list.append(created_tasks_info[task])
4628 error_detail_list.append(new_error)
4629 if isinstance(
4630 exc,
4631 (
4632 str,
4633 DbException,
4634 N2VCException,
4635 ROclient.ROClientException,
4636 LcmException,
4637 K8sException,
4638 NgRoException,
4639 ),
4640 ):
4641 self.logger.error(logging_text + new_error)
4642 else:
4643 exc_traceback = "".join(
4644 traceback.format_exception(None, exc, exc.__traceback__)
4645 )
4646 self.logger.error(
4647 logging_text
4648 + created_tasks_info[task]
4649 + " "
4650 + exc_traceback
4651 )
4652 else:
4653 self.logger.debug(
4654 logging_text + created_tasks_info[task] + ": Done"
4655 )
4656 stage[1] = "{}/{}.".format(num_done, num_tasks)
4657 if new_error:
4658 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4659 if nsr_id: # update also nsr
4660 self.update_db_2(
4661 "nsrs",
4662 nsr_id,
4663 {
4664 "errorDescription": "Error at: " + ", ".join(error_list),
4665 "errorDetail": ". ".join(error_detail_list),
4666 },
4667 )
4668 self._write_op_status(nslcmop_id, stage)
4669 return error_detail_list
4670
4671 @staticmethod
4672 def _map_primitive_params(primitive_desc, params, instantiation_params):
4673 """
4674 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4675 The default-value is used. If it is between < > it look for a value at instantiation_params
4676 :param primitive_desc: portion of VNFD/NSD that describes primitive
4677 :param params: Params provided by user
4678 :param instantiation_params: Instantiation params provided by user
4679 :return: a dictionary with the calculated params
4680 """
4681 calculated_params = {}
4682 for parameter in primitive_desc.get("parameter", ()):
4683 param_name = parameter["name"]
4684 if param_name in params:
4685 calculated_params[param_name] = params[param_name]
4686 elif "default-value" in parameter or "value" in parameter:
4687 if "value" in parameter:
4688 calculated_params[param_name] = parameter["value"]
4689 else:
4690 calculated_params[param_name] = parameter["default-value"]
4691 if (
4692 isinstance(calculated_params[param_name], str)
4693 and calculated_params[param_name].startswith("<")
4694 and calculated_params[param_name].endswith(">")
4695 ):
4696 if calculated_params[param_name][1:-1] in instantiation_params:
4697 calculated_params[param_name] = instantiation_params[
4698 calculated_params[param_name][1:-1]
4699 ]
4700 else:
4701 raise LcmException(
4702 "Parameter {} needed to execute primitive {} not provided".format(
4703 calculated_params[param_name], primitive_desc["name"]
4704 )
4705 )
4706 else:
4707 raise LcmException(
4708 "Parameter {} needed to execute primitive {} not provided".format(
4709 param_name, primitive_desc["name"]
4710 )
4711 )
4712
4713 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4714 calculated_params[param_name] = yaml.safe_dump(
4715 calculated_params[param_name], default_flow_style=True, width=256
4716 )
4717 elif isinstance(calculated_params[param_name], str) and calculated_params[
4718 param_name
4719 ].startswith("!!yaml "):
4720 calculated_params[param_name] = calculated_params[param_name][7:]
4721 if parameter.get("data-type") == "INTEGER":
4722 try:
4723 calculated_params[param_name] = int(calculated_params[param_name])
4724 except ValueError: # error converting string to int
4725 raise LcmException(
4726 "Parameter {} of primitive {} must be integer".format(
4727 param_name, primitive_desc["name"]
4728 )
4729 )
4730 elif parameter.get("data-type") == "BOOLEAN":
4731 calculated_params[param_name] = not (
4732 (str(calculated_params[param_name])).lower() == "false"
4733 )
4734
4735 # add always ns_config_info if primitive name is config
4736 if primitive_desc["name"] == "config":
4737 if "ns_config_info" in instantiation_params:
4738 calculated_params["ns_config_info"] = instantiation_params[
4739 "ns_config_info"
4740 ]
4741 return calculated_params
4742
4743 def _look_for_deployed_vca(
4744 self,
4745 deployed_vca,
4746 member_vnf_index,
4747 vdu_id,
4748 vdu_count_index,
4749 kdu_name=None,
4750 ee_descriptor_id=None,
4751 ):
4752 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4753 for vca in deployed_vca:
4754 if not vca:
4755 continue
4756 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4757 continue
4758 if (
4759 vdu_count_index is not None
4760 and vdu_count_index != vca["vdu_count_index"]
4761 ):
4762 continue
4763 if kdu_name and kdu_name != vca["kdu_name"]:
4764 continue
4765 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4766 continue
4767 break
4768 else:
4769 # vca_deployed not found
4770 raise LcmException(
4771 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4772 " is not deployed".format(
4773 member_vnf_index,
4774 vdu_id,
4775 vdu_count_index,
4776 kdu_name,
4777 ee_descriptor_id,
4778 )
4779 )
4780 # get ee_id
4781 ee_id = vca.get("ee_id")
4782 vca_type = vca.get(
4783 "type", "lxc_proxy_charm"
4784 ) # default value for backward compatibility - proxy charm
4785 if not ee_id:
4786 raise LcmException(
4787 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4788 "execution environment".format(
4789 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4790 )
4791 )
4792 return ee_id, vca_type
4793
4794 async def _ns_execute_primitive(
4795 self,
4796 ee_id,
4797 primitive,
4798 primitive_params,
4799 retries=0,
4800 retries_interval=30,
4801 timeout=None,
4802 vca_type=None,
4803 db_dict=None,
4804 vca_id: str = None,
4805 ) -> (str, str):
4806 try:
4807 if primitive == "config":
4808 primitive_params = {"params": primitive_params}
4809
4810 vca_type = vca_type or "lxc_proxy_charm"
4811
4812 while retries >= 0:
4813 try:
4814 output = await asyncio.wait_for(
4815 self.vca_map[vca_type].exec_primitive(
4816 ee_id=ee_id,
4817 primitive_name=primitive,
4818 params_dict=primitive_params,
4819 progress_timeout=self.timeout_progress_primitive,
4820 total_timeout=self.timeout_primitive,
4821 db_dict=db_dict,
4822 vca_id=vca_id,
4823 vca_type=vca_type,
4824 ),
4825 timeout=timeout or self.timeout_primitive,
4826 )
4827 # execution was OK
4828 break
4829 except asyncio.CancelledError:
4830 raise
4831 except Exception as e: # asyncio.TimeoutError
4832 if isinstance(e, asyncio.TimeoutError):
4833 e = "Timeout"
4834 retries -= 1
4835 if retries >= 0:
4836 self.logger.debug(
4837 "Error executing action {} on {} -> {}".format(
4838 primitive, ee_id, e
4839 )
4840 )
4841 # wait and retry
4842 await asyncio.sleep(retries_interval, loop=self.loop)
4843 else:
4844 return "FAILED", str(e)
4845
4846 return "COMPLETED", output
4847
4848 except (LcmException, asyncio.CancelledError):
4849 raise
4850 except Exception as e:
4851 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4852
4853 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4854 """
4855 Updating the vca_status with latest juju information in nsrs record
4856 :param: nsr_id: Id of the nsr
4857 :param: nslcmop_id: Id of the nslcmop
4858 :return: None
4859 """
4860
4861 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4862 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4863 vca_id = self.get_vca_id({}, db_nsr)
4864 if db_nsr["_admin"]["deployed"]["K8s"]:
4865 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4866 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4867 await self._on_update_k8s_db(
4868 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4869 )
4870 else:
4871 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4872 table, filter = "nsrs", {"_id": nsr_id}
4873 path = "_admin.deployed.VCA.{}.".format(vca_index)
4874 await self._on_update_n2vc_db(table, filter, path, {})
4875
4876 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4877 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4878
4879 async def action(self, nsr_id, nslcmop_id):
4880 # Try to lock HA task here
4881 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4882 if not task_is_locked_by_me:
4883 return
4884
4885 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4886 self.logger.debug(logging_text + "Enter")
4887 # get all needed from database
4888 db_nsr = None
4889 db_nslcmop = None
4890 db_nsr_update = {}
4891 db_nslcmop_update = {}
4892 nslcmop_operation_state = None
4893 error_description_nslcmop = None
4894 exc = None
4895 try:
4896 # wait for any previous tasks in process
4897 step = "Waiting for previous operations to terminate"
4898 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4899
4900 self._write_ns_status(
4901 nsr_id=nsr_id,
4902 ns_state=None,
4903 current_operation="RUNNING ACTION",
4904 current_operation_id=nslcmop_id,
4905 )
4906
4907 step = "Getting information from database"
4908 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4909 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4910 if db_nslcmop["operationParams"].get("primitive_params"):
4911 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4912 db_nslcmop["operationParams"]["primitive_params"]
4913 )
4914
4915 nsr_deployed = db_nsr["_admin"].get("deployed")
4916 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4917 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4918 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4919 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4920 primitive = db_nslcmop["operationParams"]["primitive"]
4921 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4922 timeout_ns_action = db_nslcmop["operationParams"].get(
4923 "timeout_ns_action", self.timeout_primitive
4924 )
4925
4926 if vnf_index:
4927 step = "Getting vnfr from database"
4928 db_vnfr = self.db.get_one(
4929 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4930 )
4931 if db_vnfr.get("kdur"):
4932 kdur_list = []
4933 for kdur in db_vnfr["kdur"]:
4934 if kdur.get("additionalParams"):
4935 kdur["additionalParams"] = json.loads(
4936 kdur["additionalParams"]
4937 )
4938 kdur_list.append(kdur)
4939 db_vnfr["kdur"] = kdur_list
4940 step = "Getting vnfd from database"
4941 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4942 else:
4943 step = "Getting nsd from database"
4944 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4945
4946 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4947 # for backward compatibility
4948 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4949 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4950 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4951 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4952
4953 # look for primitive
4954 config_primitive_desc = descriptor_configuration = None
4955 if vdu_id:
4956 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4957 elif kdu_name:
4958 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4959 elif vnf_index:
4960 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4961 else:
4962 descriptor_configuration = db_nsd.get("ns-configuration")
4963
4964 if descriptor_configuration and descriptor_configuration.get(
4965 "config-primitive"
4966 ):
4967 for config_primitive in descriptor_configuration["config-primitive"]:
4968 if config_primitive["name"] == primitive:
4969 config_primitive_desc = config_primitive
4970 break
4971
4972 if not config_primitive_desc:
4973 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4974 raise LcmException(
4975 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4976 primitive
4977 )
4978 )
4979 primitive_name = primitive
4980 ee_descriptor_id = None
4981 else:
4982 primitive_name = config_primitive_desc.get(
4983 "execution-environment-primitive", primitive
4984 )
4985 ee_descriptor_id = config_primitive_desc.get(
4986 "execution-environment-ref"
4987 )
4988
4989 if vnf_index:
4990 if vdu_id:
4991 vdur = next(
4992 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4993 )
4994 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4995 elif kdu_name:
4996 kdur = next(
4997 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4998 )
4999 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
5000 else:
5001 desc_params = parse_yaml_strings(
5002 db_vnfr.get("additionalParamsForVnf")
5003 )
5004 else:
5005 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
5006 if kdu_name and get_configuration(db_vnfd, kdu_name):
5007 kdu_configuration = get_configuration(db_vnfd, kdu_name)
5008 actions = set()
5009 for primitive in kdu_configuration.get("initial-config-primitive", []):
5010 actions.add(primitive["name"])
5011 for primitive in kdu_configuration.get("config-primitive", []):
5012 actions.add(primitive["name"])
5013 kdu_action = True if primitive_name in actions else False
5014
5015 # TODO check if ns is in a proper status
5016 if kdu_name and (
5017 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5018 ):
5019 # kdur and desc_params already set from before
5020 if primitive_params:
5021 desc_params.update(primitive_params)
5022 # TODO Check if we will need something at vnf level
5023 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5024 if (
5025 kdu_name == kdu["kdu-name"]
5026 and kdu["member-vnf-index"] == vnf_index
5027 ):
5028 break
5029 else:
5030 raise LcmException(
5031 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5032 )
5033
5034 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5035 msg = "unknown k8scluster-type '{}'".format(
5036 kdu.get("k8scluster-type")
5037 )
5038 raise LcmException(msg)
5039
5040 db_dict = {
5041 "collection": "nsrs",
5042 "filter": {"_id": nsr_id},
5043 "path": "_admin.deployed.K8s.{}".format(index),
5044 }
5045 self.logger.debug(
5046 logging_text
5047 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5048 )
5049 step = "Executing kdu {}".format(primitive_name)
5050 if primitive_name == "upgrade":
5051 if desc_params.get("kdu_model"):
5052 kdu_model = desc_params.get("kdu_model")
5053 del desc_params["kdu_model"]
5054 else:
5055 kdu_model = kdu.get("kdu-model")
5056 parts = kdu_model.split(sep=":")
5057 if len(parts) == 2:
5058 kdu_model = parts[0]
5059
5060 detailed_status = await asyncio.wait_for(
5061 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5062 cluster_uuid=kdu.get("k8scluster-uuid"),
5063 kdu_instance=kdu.get("kdu-instance"),
5064 atomic=True,
5065 kdu_model=kdu_model,
5066 params=desc_params,
5067 db_dict=db_dict,
5068 timeout=timeout_ns_action,
5069 ),
5070 timeout=timeout_ns_action + 10,
5071 )
5072 self.logger.debug(
5073 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5074 )
5075 elif primitive_name == "rollback":
5076 detailed_status = await asyncio.wait_for(
5077 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5078 cluster_uuid=kdu.get("k8scluster-uuid"),
5079 kdu_instance=kdu.get("kdu-instance"),
5080 db_dict=db_dict,
5081 ),
5082 timeout=timeout_ns_action,
5083 )
5084 elif primitive_name == "status":
5085 detailed_status = await asyncio.wait_for(
5086 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5087 cluster_uuid=kdu.get("k8scluster-uuid"),
5088 kdu_instance=kdu.get("kdu-instance"),
5089 vca_id=vca_id,
5090 ),
5091 timeout=timeout_ns_action,
5092 )
5093 else:
5094 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5095 kdu["kdu-name"], nsr_id
5096 )
5097 params = self._map_primitive_params(
5098 config_primitive_desc, primitive_params, desc_params
5099 )
5100
5101 detailed_status = await asyncio.wait_for(
5102 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5103 cluster_uuid=kdu.get("k8scluster-uuid"),
5104 kdu_instance=kdu_instance,
5105 primitive_name=primitive_name,
5106 params=params,
5107 db_dict=db_dict,
5108 timeout=timeout_ns_action,
5109 vca_id=vca_id,
5110 ),
5111 timeout=timeout_ns_action,
5112 )
5113
5114 if detailed_status:
5115 nslcmop_operation_state = "COMPLETED"
5116 else:
5117 detailed_status = ""
5118 nslcmop_operation_state = "FAILED"
5119 else:
5120 ee_id, vca_type = self._look_for_deployed_vca(
5121 nsr_deployed["VCA"],
5122 member_vnf_index=vnf_index,
5123 vdu_id=vdu_id,
5124 vdu_count_index=vdu_count_index,
5125 ee_descriptor_id=ee_descriptor_id,
5126 )
5127 for vca_index, vca_deployed in enumerate(
5128 db_nsr["_admin"]["deployed"]["VCA"]
5129 ):
5130 if vca_deployed.get("member-vnf-index") == vnf_index:
5131 db_dict = {
5132 "collection": "nsrs",
5133 "filter": {"_id": nsr_id},
5134 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5135 }
5136 break
5137 (
5138 nslcmop_operation_state,
5139 detailed_status,
5140 ) = await self._ns_execute_primitive(
5141 ee_id,
5142 primitive=primitive_name,
5143 primitive_params=self._map_primitive_params(
5144 config_primitive_desc, primitive_params, desc_params
5145 ),
5146 timeout=timeout_ns_action,
5147 vca_type=vca_type,
5148 db_dict=db_dict,
5149 vca_id=vca_id,
5150 )
5151
5152 db_nslcmop_update["detailed-status"] = detailed_status
5153 error_description_nslcmop = (
5154 detailed_status if nslcmop_operation_state == "FAILED" else ""
5155 )
5156 self.logger.debug(
5157 logging_text
5158 + " task Done with result {} {}".format(
5159 nslcmop_operation_state, detailed_status
5160 )
5161 )
5162 return # database update is called inside finally
5163
5164 except (DbException, LcmException, N2VCException, K8sException) as e:
5165 self.logger.error(logging_text + "Exit Exception {}".format(e))
5166 exc = e
5167 except asyncio.CancelledError:
5168 self.logger.error(
5169 logging_text + "Cancelled Exception while '{}'".format(step)
5170 )
5171 exc = "Operation was cancelled"
5172 except asyncio.TimeoutError:
5173 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5174 exc = "Timeout"
5175 except Exception as e:
5176 exc = traceback.format_exc()
5177 self.logger.critical(
5178 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5179 exc_info=True,
5180 )
5181 finally:
5182 if exc:
5183 db_nslcmop_update[
5184 "detailed-status"
5185 ] = (
5186 detailed_status
5187 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5188 nslcmop_operation_state = "FAILED"
5189 if db_nsr:
5190 self._write_ns_status(
5191 nsr_id=nsr_id,
5192 ns_state=db_nsr[
5193 "nsState"
5194 ], # TODO check if degraded. For the moment use previous status
5195 current_operation="IDLE",
5196 current_operation_id=None,
5197 # error_description=error_description_nsr,
5198 # error_detail=error_detail,
5199 other_update=db_nsr_update,
5200 )
5201
5202 self._write_op_status(
5203 op_id=nslcmop_id,
5204 stage="",
5205 error_message=error_description_nslcmop,
5206 operation_state=nslcmop_operation_state,
5207 other_update=db_nslcmop_update,
5208 )
5209
5210 if nslcmop_operation_state:
5211 try:
5212 await self.msg.aiowrite(
5213 "ns",
5214 "actioned",
5215 {
5216 "nsr_id": nsr_id,
5217 "nslcmop_id": nslcmop_id,
5218 "operationState": nslcmop_operation_state,
5219 },
5220 loop=self.loop,
5221 )
5222 except Exception as e:
5223 self.logger.error(
5224 logging_text + "kafka_write notification Exception {}".format(e)
5225 )
5226 self.logger.debug(logging_text + "Exit")
5227 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5228 return nslcmop_operation_state, detailed_status
5229
5230 async def scale(self, nsr_id, nslcmop_id):
5231 # Try to lock HA task here
5232 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5233 if not task_is_locked_by_me:
5234 return
5235
5236 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5237 stage = ["", "", ""]
5238 tasks_dict_info = {}
5239 # ^ stage, step, VIM progress
5240 self.logger.debug(logging_text + "Enter")
5241 # get all needed from database
5242 db_nsr = None
5243 db_nslcmop_update = {}
5244 db_nsr_update = {}
5245 exc = None
5246 # in case of error, indicates what part of scale was failed to put nsr at error status
5247 scale_process = None
5248 old_operational_status = ""
5249 old_config_status = ""
5250 nsi_id = None
5251 try:
5252 # wait for any previous tasks in process
5253 step = "Waiting for previous operations to terminate"
5254 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5255 self._write_ns_status(
5256 nsr_id=nsr_id,
5257 ns_state=None,
5258 current_operation="SCALING",
5259 current_operation_id=nslcmop_id,
5260 )
5261
5262 step = "Getting nslcmop from database"
5263 self.logger.debug(
5264 step + " after having waited for previous tasks to be completed"
5265 )
5266 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5267
5268 step = "Getting nsr from database"
5269 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5270 old_operational_status = db_nsr["operational-status"]
5271 old_config_status = db_nsr["config-status"]
5272
5273 step = "Parsing scaling parameters"
5274 db_nsr_update["operational-status"] = "scaling"
5275 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5276 nsr_deployed = db_nsr["_admin"].get("deployed")
5277
5278 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5279 "scaleByStepData"
5280 ]["member-vnf-index"]
5281 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5282 "scaleByStepData"
5283 ]["scaling-group-descriptor"]
5284 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5285 # for backward compatibility
5286 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5287 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5288 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5289 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5290
5291 step = "Getting vnfr from database"
5292 db_vnfr = self.db.get_one(
5293 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5294 )
5295
5296 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5297
5298 step = "Getting vnfd from database"
5299 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5300
5301 base_folder = db_vnfd["_admin"]["storage"]
5302
5303 step = "Getting scaling-group-descriptor"
5304 scaling_descriptor = find_in_list(
5305 get_scaling_aspect(db_vnfd),
5306 lambda scale_desc: scale_desc["name"] == scaling_group,
5307 )
5308 if not scaling_descriptor:
5309 raise LcmException(
5310 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5311 "at vnfd:scaling-group-descriptor".format(scaling_group)
5312 )
5313
5314 step = "Sending scale order to VIM"
5315 # TODO check if ns is in a proper status
5316 nb_scale_op = 0
5317 if not db_nsr["_admin"].get("scaling-group"):
5318 self.update_db_2(
5319 "nsrs",
5320 nsr_id,
5321 {
5322 "_admin.scaling-group": [
5323 {"name": scaling_group, "nb-scale-op": 0}
5324 ]
5325 },
5326 )
5327 admin_scale_index = 0
5328 else:
5329 for admin_scale_index, admin_scale_info in enumerate(
5330 db_nsr["_admin"]["scaling-group"]
5331 ):
5332 if admin_scale_info["name"] == scaling_group:
5333 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5334 break
5335 else: # not found, set index one plus last element and add new entry with the name
5336 admin_scale_index += 1
5337 db_nsr_update[
5338 "_admin.scaling-group.{}.name".format(admin_scale_index)
5339 ] = scaling_group
5340
5341 vca_scaling_info = []
5342 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5343 if scaling_type == "SCALE_OUT":
5344 if "aspect-delta-details" not in scaling_descriptor:
5345 raise LcmException(
5346 "Aspect delta details not fount in scaling descriptor {}".format(
5347 scaling_descriptor["name"]
5348 )
5349 )
5350 # count if max-instance-count is reached
5351 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5352
5353 scaling_info["scaling_direction"] = "OUT"
5354 scaling_info["vdu-create"] = {}
5355 scaling_info["kdu-create"] = {}
5356 for delta in deltas:
5357 for vdu_delta in delta.get("vdu-delta", {}):
5358 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5359 # vdu_index also provides the number of instance of the targeted vdu
5360 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5361 cloud_init_text = self._get_vdu_cloud_init_content(
5362 vdud, db_vnfd
5363 )
5364 if cloud_init_text:
5365 additional_params = (
5366 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5367 or {}
5368 )
5369 cloud_init_list = []
5370
5371 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5372 max_instance_count = 10
5373 if vdu_profile and "max-number-of-instances" in vdu_profile:
5374 max_instance_count = vdu_profile.get(
5375 "max-number-of-instances", 10
5376 )
5377
5378 default_instance_num = get_number_of_instances(
5379 db_vnfd, vdud["id"]
5380 )
5381 instances_number = vdu_delta.get("number-of-instances", 1)
5382 nb_scale_op += instances_number
5383
5384 new_instance_count = nb_scale_op + default_instance_num
5385 # Control if new count is over max and vdu count is less than max.
5386 # Then assign new instance count
5387 if new_instance_count > max_instance_count > vdu_count:
5388 instances_number = new_instance_count - max_instance_count
5389 else:
5390 instances_number = instances_number
5391
5392 if new_instance_count > max_instance_count:
5393 raise LcmException(
5394 "reached the limit of {} (max-instance-count) "
5395 "scaling-out operations for the "
5396 "scaling-group-descriptor '{}'".format(
5397 nb_scale_op, scaling_group
5398 )
5399 )
5400 for x in range(vdu_delta.get("number-of-instances", 1)):
5401 if cloud_init_text:
5402 # TODO Information of its own ip is not available because db_vnfr is not updated.
5403 additional_params["OSM"] = get_osm_params(
5404 db_vnfr, vdu_delta["id"], vdu_index + x
5405 )
5406 cloud_init_list.append(
5407 self._parse_cloud_init(
5408 cloud_init_text,
5409 additional_params,
5410 db_vnfd["id"],
5411 vdud["id"],
5412 )
5413 )
5414 vca_scaling_info.append(
5415 {
5416 "osm_vdu_id": vdu_delta["id"],
5417 "member-vnf-index": vnf_index,
5418 "type": "create",
5419 "vdu_index": vdu_index + x,
5420 }
5421 )
5422 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5423 for kdu_delta in delta.get("kdu-resource-delta", {}):
5424 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5425 kdu_name = kdu_profile["kdu-name"]
5426 resource_name = kdu_profile["resource-name"]
5427
5428 # Might have different kdus in the same delta
5429 # Should have list for each kdu
5430 if not scaling_info["kdu-create"].get(kdu_name, None):
5431 scaling_info["kdu-create"][kdu_name] = []
5432
5433 kdur = get_kdur(db_vnfr, kdu_name)
5434 if kdur.get("helm-chart"):
5435 k8s_cluster_type = "helm-chart-v3"
5436 self.logger.debug("kdur: {}".format(kdur))
5437 if (
5438 kdur.get("helm-version")
5439 and kdur.get("helm-version") == "v2"
5440 ):
5441 k8s_cluster_type = "helm-chart"
5442 raise NotImplementedError
5443 elif kdur.get("juju-bundle"):
5444 k8s_cluster_type = "juju-bundle"
5445 else:
5446 raise LcmException(
5447 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5448 "juju-bundle. Maybe an old NBI version is running".format(
5449 db_vnfr["member-vnf-index-ref"], kdu_name
5450 )
5451 )
5452
5453 max_instance_count = 10
5454 if kdu_profile and "max-number-of-instances" in kdu_profile:
5455 max_instance_count = kdu_profile.get(
5456 "max-number-of-instances", 10
5457 )
5458
5459 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5460 deployed_kdu, _ = get_deployed_kdu(
5461 nsr_deployed, kdu_name, vnf_index
5462 )
5463 if deployed_kdu is None:
5464 raise LcmException(
5465 "KDU '{}' for vnf '{}' not deployed".format(
5466 kdu_name, vnf_index
5467 )
5468 )
5469 kdu_instance = deployed_kdu.get("kdu-instance")
5470 instance_num = await self.k8scluster_map[
5471 k8s_cluster_type
5472 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5473 kdu_replica_count = instance_num + kdu_delta.get(
5474 "number-of-instances", 1
5475 )
5476
5477 # Control if new count is over max and instance_num is less than max.
5478 # Then assign max instance number to kdu replica count
5479 if kdu_replica_count > max_instance_count > instance_num:
5480 kdu_replica_count = max_instance_count
5481 if kdu_replica_count > max_instance_count:
5482 raise LcmException(
5483 "reached the limit of {} (max-instance-count) "
5484 "scaling-out operations for the "
5485 "scaling-group-descriptor '{}'".format(
5486 instance_num, scaling_group
5487 )
5488 )
5489
5490 for x in range(kdu_delta.get("number-of-instances", 1)):
5491 vca_scaling_info.append(
5492 {
5493 "osm_kdu_id": kdu_name,
5494 "member-vnf-index": vnf_index,
5495 "type": "create",
5496 "kdu_index": instance_num + x - 1,
5497 }
5498 )
5499 scaling_info["kdu-create"][kdu_name].append(
5500 {
5501 "member-vnf-index": vnf_index,
5502 "type": "create",
5503 "k8s-cluster-type": k8s_cluster_type,
5504 "resource-name": resource_name,
5505 "scale": kdu_replica_count,
5506 }
5507 )
5508 elif scaling_type == "SCALE_IN":
5509 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5510
5511 scaling_info["scaling_direction"] = "IN"
5512 scaling_info["vdu-delete"] = {}
5513 scaling_info["kdu-delete"] = {}
5514
5515 for delta in deltas:
5516 for vdu_delta in delta.get("vdu-delta", {}):
5517 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5518 min_instance_count = 0
5519 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5520 if vdu_profile and "min-number-of-instances" in vdu_profile:
5521 min_instance_count = vdu_profile["min-number-of-instances"]
5522
5523 default_instance_num = get_number_of_instances(
5524 db_vnfd, vdu_delta["id"]
5525 )
5526 instance_num = vdu_delta.get("number-of-instances", 1)
5527 nb_scale_op -= instance_num
5528
5529 new_instance_count = nb_scale_op + default_instance_num
5530
5531 if new_instance_count < min_instance_count < vdu_count:
5532 instances_number = min_instance_count - new_instance_count
5533 else:
5534 instances_number = instance_num
5535
5536 if new_instance_count < min_instance_count:
5537 raise LcmException(
5538 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5539 "scaling-group-descriptor '{}'".format(
5540 nb_scale_op, scaling_group
5541 )
5542 )
5543 for x in range(vdu_delta.get("number-of-instances", 1)):
5544 vca_scaling_info.append(
5545 {
5546 "osm_vdu_id": vdu_delta["id"],
5547 "member-vnf-index": vnf_index,
5548 "type": "delete",
5549 "vdu_index": vdu_index - 1 - x,
5550 }
5551 )
5552 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5553 for kdu_delta in delta.get("kdu-resource-delta", {}):
5554 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5555 kdu_name = kdu_profile["kdu-name"]
5556 resource_name = kdu_profile["resource-name"]
5557
5558 if not scaling_info["kdu-delete"].get(kdu_name, None):
5559 scaling_info["kdu-delete"][kdu_name] = []
5560
5561 kdur = get_kdur(db_vnfr, kdu_name)
5562 if kdur.get("helm-chart"):
5563 k8s_cluster_type = "helm-chart-v3"
5564 self.logger.debug("kdur: {}".format(kdur))
5565 if (
5566 kdur.get("helm-version")
5567 and kdur.get("helm-version") == "v2"
5568 ):
5569 k8s_cluster_type = "helm-chart"
5570 raise NotImplementedError
5571 elif kdur.get("juju-bundle"):
5572 k8s_cluster_type = "juju-bundle"
5573 else:
5574 raise LcmException(
5575 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5576 "juju-bundle. Maybe an old NBI version is running".format(
5577 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5578 )
5579 )
5580
5581 min_instance_count = 0
5582 if kdu_profile and "min-number-of-instances" in kdu_profile:
5583 min_instance_count = kdu_profile["min-number-of-instances"]
5584
5585 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5586 deployed_kdu, _ = get_deployed_kdu(
5587 nsr_deployed, kdu_name, vnf_index
5588 )
5589 if deployed_kdu is None:
5590 raise LcmException(
5591 "KDU '{}' for vnf '{}' not deployed".format(
5592 kdu_name, vnf_index
5593 )
5594 )
5595 kdu_instance = deployed_kdu.get("kdu-instance")
5596 instance_num = await self.k8scluster_map[
5597 k8s_cluster_type
5598 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5599 kdu_replica_count = instance_num - kdu_delta.get(
5600 "number-of-instances", 1
5601 )
5602
5603 if kdu_replica_count < min_instance_count < instance_num:
5604 kdu_replica_count = min_instance_count
5605 if kdu_replica_count < min_instance_count:
5606 raise LcmException(
5607 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5608 "scaling-group-descriptor '{}'".format(
5609 instance_num, scaling_group
5610 )
5611 )
5612
5613 for x in range(kdu_delta.get("number-of-instances", 1)):
5614 vca_scaling_info.append(
5615 {
5616 "osm_kdu_id": kdu_name,
5617 "member-vnf-index": vnf_index,
5618 "type": "delete",
5619 "kdu_index": instance_num - x - 1,
5620 }
5621 )
5622 scaling_info["kdu-delete"][kdu_name].append(
5623 {
5624 "member-vnf-index": vnf_index,
5625 "type": "delete",
5626 "k8s-cluster-type": k8s_cluster_type,
5627 "resource-name": resource_name,
5628 "scale": kdu_replica_count,
5629 }
5630 )
5631
5632 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5633 vdu_delete = copy(scaling_info.get("vdu-delete"))
5634 if scaling_info["scaling_direction"] == "IN":
5635 for vdur in reversed(db_vnfr["vdur"]):
5636 if vdu_delete.get(vdur["vdu-id-ref"]):
5637 vdu_delete[vdur["vdu-id-ref"]] -= 1
5638 scaling_info["vdu"].append(
5639 {
5640 "name": vdur.get("name") or vdur.get("vdu-name"),
5641 "vdu_id": vdur["vdu-id-ref"],
5642 "interface": [],
5643 }
5644 )
5645 for interface in vdur["interfaces"]:
5646 scaling_info["vdu"][-1]["interface"].append(
5647 {
5648 "name": interface["name"],
5649 "ip_address": interface["ip-address"],
5650 "mac_address": interface.get("mac-address"),
5651 }
5652 )
5653 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5654
5655 # PRE-SCALE BEGIN
5656 step = "Executing pre-scale vnf-config-primitive"
5657 if scaling_descriptor.get("scaling-config-action"):
5658 for scaling_config_action in scaling_descriptor[
5659 "scaling-config-action"
5660 ]:
5661 if (
5662 scaling_config_action.get("trigger") == "pre-scale-in"
5663 and scaling_type == "SCALE_IN"
5664 ) or (
5665 scaling_config_action.get("trigger") == "pre-scale-out"
5666 and scaling_type == "SCALE_OUT"
5667 ):
5668 vnf_config_primitive = scaling_config_action[
5669 "vnf-config-primitive-name-ref"
5670 ]
5671 step = db_nslcmop_update[
5672 "detailed-status"
5673 ] = "executing pre-scale scaling-config-action '{}'".format(
5674 vnf_config_primitive
5675 )
5676
5677 # look for primitive
5678 for config_primitive in (
5679 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5680 ).get("config-primitive", ()):
5681 if config_primitive["name"] == vnf_config_primitive:
5682 break
5683 else:
5684 raise LcmException(
5685 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5686 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5687 "primitive".format(scaling_group, vnf_config_primitive)
5688 )
5689
5690 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5691 if db_vnfr.get("additionalParamsForVnf"):
5692 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5693
5694 scale_process = "VCA"
5695 db_nsr_update["config-status"] = "configuring pre-scaling"
5696 primitive_params = self._map_primitive_params(
5697 config_primitive, {}, vnfr_params
5698 )
5699
5700 # Pre-scale retry check: Check if this sub-operation has been executed before
5701 op_index = self._check_or_add_scale_suboperation(
5702 db_nslcmop,
5703 vnf_index,
5704 vnf_config_primitive,
5705 primitive_params,
5706 "PRE-SCALE",
5707 )
5708 if op_index == self.SUBOPERATION_STATUS_SKIP:
5709 # Skip sub-operation
5710 result = "COMPLETED"
5711 result_detail = "Done"
5712 self.logger.debug(
5713 logging_text
5714 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5715 vnf_config_primitive, result, result_detail
5716 )
5717 )
5718 else:
5719 if op_index == self.SUBOPERATION_STATUS_NEW:
5720 # New sub-operation: Get index of this sub-operation
5721 op_index = (
5722 len(db_nslcmop.get("_admin", {}).get("operations"))
5723 - 1
5724 )
5725 self.logger.debug(
5726 logging_text
5727 + "vnf_config_primitive={} New sub-operation".format(
5728 vnf_config_primitive
5729 )
5730 )
5731 else:
5732 # retry: Get registered params for this existing sub-operation
5733 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5734 op_index
5735 ]
5736 vnf_index = op.get("member_vnf_index")
5737 vnf_config_primitive = op.get("primitive")
5738 primitive_params = op.get("primitive_params")
5739 self.logger.debug(
5740 logging_text
5741 + "vnf_config_primitive={} Sub-operation retry".format(
5742 vnf_config_primitive
5743 )
5744 )
5745 # Execute the primitive, either with new (first-time) or registered (reintent) args
5746 ee_descriptor_id = config_primitive.get(
5747 "execution-environment-ref"
5748 )
5749 primitive_name = config_primitive.get(
5750 "execution-environment-primitive", vnf_config_primitive
5751 )
5752 ee_id, vca_type = self._look_for_deployed_vca(
5753 nsr_deployed["VCA"],
5754 member_vnf_index=vnf_index,
5755 vdu_id=None,
5756 vdu_count_index=None,
5757 ee_descriptor_id=ee_descriptor_id,
5758 )
5759 result, result_detail = await self._ns_execute_primitive(
5760 ee_id,
5761 primitive_name,
5762 primitive_params,
5763 vca_type=vca_type,
5764 vca_id=vca_id,
5765 )
5766 self.logger.debug(
5767 logging_text
5768 + "vnf_config_primitive={} Done with result {} {}".format(
5769 vnf_config_primitive, result, result_detail
5770 )
5771 )
5772 # Update operationState = COMPLETED | FAILED
5773 self._update_suboperation_status(
5774 db_nslcmop, op_index, result, result_detail
5775 )
5776
5777 if result == "FAILED":
5778 raise LcmException(result_detail)
5779 db_nsr_update["config-status"] = old_config_status
5780 scale_process = None
5781 # PRE-SCALE END
5782
5783 db_nsr_update[
5784 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5785 ] = nb_scale_op
5786 db_nsr_update[
5787 "_admin.scaling-group.{}.time".format(admin_scale_index)
5788 ] = time()
5789
5790 # SCALE-IN VCA - BEGIN
5791 if vca_scaling_info:
5792 step = db_nslcmop_update[
5793 "detailed-status"
5794 ] = "Deleting the execution environments"
5795 scale_process = "VCA"
5796 for vca_info in vca_scaling_info:
5797 if vca_info["type"] == "delete":
5798 member_vnf_index = str(vca_info["member-vnf-index"])
5799 self.logger.debug(
5800 logging_text + "vdu info: {}".format(vca_info)
5801 )
5802 if vca_info.get("osm_vdu_id"):
5803 vdu_id = vca_info["osm_vdu_id"]
5804 vdu_index = int(vca_info["vdu_index"])
5805 stage[
5806 1
5807 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5808 member_vnf_index, vdu_id, vdu_index
5809 )
5810 else:
5811 vdu_index = 0
5812 kdu_id = vca_info["osm_kdu_id"]
5813 stage[
5814 1
5815 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5816 member_vnf_index, kdu_id, vdu_index
5817 )
5818 stage[2] = step = "Scaling in VCA"
5819 self._write_op_status(op_id=nslcmop_id, stage=stage)
5820 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5821 config_update = db_nsr["configurationStatus"]
5822 for vca_index, vca in enumerate(vca_update):
5823 if (
5824 (vca or vca.get("ee_id"))
5825 and vca["member-vnf-index"] == member_vnf_index
5826 and vca["vdu_count_index"] == vdu_index
5827 ):
5828 if vca.get("vdu_id"):
5829 config_descriptor = get_configuration(
5830 db_vnfd, vca.get("vdu_id")
5831 )
5832 elif vca.get("kdu_name"):
5833 config_descriptor = get_configuration(
5834 db_vnfd, vca.get("kdu_name")
5835 )
5836 else:
5837 config_descriptor = get_configuration(
5838 db_vnfd, db_vnfd["id"]
5839 )
5840 operation_params = (
5841 db_nslcmop.get("operationParams") or {}
5842 )
5843 exec_terminate_primitives = not operation_params.get(
5844 "skip_terminate_primitives"
5845 ) and vca.get("needed_terminate")
5846 task = asyncio.ensure_future(
5847 asyncio.wait_for(
5848 self.destroy_N2VC(
5849 logging_text,
5850 db_nslcmop,
5851 vca,
5852 config_descriptor,
5853 vca_index,
5854 destroy_ee=True,
5855 exec_primitives=exec_terminate_primitives,
5856 scaling_in=True,
5857 vca_id=vca_id,
5858 ),
5859 timeout=self.timeout_charm_delete,
5860 )
5861 )
5862 tasks_dict_info[task] = "Terminating VCA {}".format(
5863 vca.get("ee_id")
5864 )
5865 del vca_update[vca_index]
5866 del config_update[vca_index]
5867 # wait for pending tasks of terminate primitives
5868 if tasks_dict_info:
5869 self.logger.debug(
5870 logging_text
5871 + "Waiting for tasks {}".format(
5872 list(tasks_dict_info.keys())
5873 )
5874 )
5875 error_list = await self._wait_for_tasks(
5876 logging_text,
5877 tasks_dict_info,
5878 min(
5879 self.timeout_charm_delete, self.timeout_ns_terminate
5880 ),
5881 stage,
5882 nslcmop_id,
5883 )
5884 tasks_dict_info.clear()
5885 if error_list:
5886 raise LcmException("; ".join(error_list))
5887
5888 db_vca_and_config_update = {
5889 "_admin.deployed.VCA": vca_update,
5890 "configurationStatus": config_update,
5891 }
5892 self.update_db_2(
5893 "nsrs", db_nsr["_id"], db_vca_and_config_update
5894 )
5895 scale_process = None
5896 # SCALE-IN VCA - END
5897
5898 # SCALE RO - BEGIN
5899 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5900 scale_process = "RO"
5901 if self.ro_config.get("ng"):
5902 await self._scale_ng_ro(
5903 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5904 )
5905 scaling_info.pop("vdu-create", None)
5906 scaling_info.pop("vdu-delete", None)
5907
5908 scale_process = None
5909 # SCALE RO - END
5910
5911 # SCALE KDU - BEGIN
5912 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5913 scale_process = "KDU"
5914 await self._scale_kdu(
5915 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5916 )
5917 scaling_info.pop("kdu-create", None)
5918 scaling_info.pop("kdu-delete", None)
5919
5920 scale_process = None
5921 # SCALE KDU - END
5922
5923 if db_nsr_update:
5924 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5925
5926 # SCALE-UP VCA - BEGIN
5927 if vca_scaling_info:
5928 step = db_nslcmop_update[
5929 "detailed-status"
5930 ] = "Creating new execution environments"
5931 scale_process = "VCA"
5932 for vca_info in vca_scaling_info:
5933 if vca_info["type"] == "create":
5934 member_vnf_index = str(vca_info["member-vnf-index"])
5935 self.logger.debug(
5936 logging_text + "vdu info: {}".format(vca_info)
5937 )
5938 vnfd_id = db_vnfr["vnfd-ref"]
5939 if vca_info.get("osm_vdu_id"):
5940 vdu_index = int(vca_info["vdu_index"])
5941 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5942 if db_vnfr.get("additionalParamsForVnf"):
5943 deploy_params.update(
5944 parse_yaml_strings(
5945 db_vnfr["additionalParamsForVnf"].copy()
5946 )
5947 )
5948 descriptor_config = get_configuration(
5949 db_vnfd, db_vnfd["id"]
5950 )
5951 if descriptor_config:
5952 vdu_id = None
5953 vdu_name = None
5954 kdu_name = None
5955 self._deploy_n2vc(
5956 logging_text=logging_text
5957 + "member_vnf_index={} ".format(member_vnf_index),
5958 db_nsr=db_nsr,
5959 db_vnfr=db_vnfr,
5960 nslcmop_id=nslcmop_id,
5961 nsr_id=nsr_id,
5962 nsi_id=nsi_id,
5963 vnfd_id=vnfd_id,
5964 vdu_id=vdu_id,
5965 kdu_name=kdu_name,
5966 member_vnf_index=member_vnf_index,
5967 vdu_index=vdu_index,
5968 vdu_name=vdu_name,
5969 deploy_params=deploy_params,
5970 descriptor_config=descriptor_config,
5971 base_folder=base_folder,
5972 task_instantiation_info=tasks_dict_info,
5973 stage=stage,
5974 )
5975 vdu_id = vca_info["osm_vdu_id"]
5976 vdur = find_in_list(
5977 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5978 )
5979 descriptor_config = get_configuration(db_vnfd, vdu_id)
5980 if vdur.get("additionalParams"):
5981 deploy_params_vdu = parse_yaml_strings(
5982 vdur["additionalParams"]
5983 )
5984 else:
5985 deploy_params_vdu = deploy_params
5986 deploy_params_vdu["OSM"] = get_osm_params(
5987 db_vnfr, vdu_id, vdu_count_index=vdu_index
5988 )
5989 if descriptor_config:
5990 vdu_name = None
5991 kdu_name = None
5992 stage[
5993 1
5994 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5995 member_vnf_index, vdu_id, vdu_index
5996 )
5997 stage[2] = step = "Scaling out VCA"
5998 self._write_op_status(op_id=nslcmop_id, stage=stage)
5999 self._deploy_n2vc(
6000 logging_text=logging_text
6001 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6002 member_vnf_index, vdu_id, vdu_index
6003 ),
6004 db_nsr=db_nsr,
6005 db_vnfr=db_vnfr,
6006 nslcmop_id=nslcmop_id,
6007 nsr_id=nsr_id,
6008 nsi_id=nsi_id,
6009 vnfd_id=vnfd_id,
6010 vdu_id=vdu_id,
6011 kdu_name=kdu_name,
6012 member_vnf_index=member_vnf_index,
6013 vdu_index=vdu_index,
6014 vdu_name=vdu_name,
6015 deploy_params=deploy_params_vdu,
6016 descriptor_config=descriptor_config,
6017 base_folder=base_folder,
6018 task_instantiation_info=tasks_dict_info,
6019 stage=stage,
6020 )
6021 else:
6022 kdu_name = vca_info["osm_kdu_id"]
6023 descriptor_config = get_configuration(db_vnfd, kdu_name)
6024 if descriptor_config:
6025 vdu_id = None
6026 kdu_index = int(vca_info["kdu_index"])
6027 vdu_name = None
6028 kdur = next(
6029 x
6030 for x in db_vnfr["kdur"]
6031 if x["kdu-name"] == kdu_name
6032 )
6033 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
6034 if kdur.get("additionalParams"):
6035 deploy_params_kdu = parse_yaml_strings(
6036 kdur["additionalParams"]
6037 )
6038
6039 self._deploy_n2vc(
6040 logging_text=logging_text,
6041 db_nsr=db_nsr,
6042 db_vnfr=db_vnfr,
6043 nslcmop_id=nslcmop_id,
6044 nsr_id=nsr_id,
6045 nsi_id=nsi_id,
6046 vnfd_id=vnfd_id,
6047 vdu_id=vdu_id,
6048 kdu_name=kdu_name,
6049 member_vnf_index=member_vnf_index,
6050 vdu_index=kdu_index,
6051 vdu_name=vdu_name,
6052 deploy_params=deploy_params_kdu,
6053 descriptor_config=descriptor_config,
6054 base_folder=base_folder,
6055 task_instantiation_info=tasks_dict_info,
6056 stage=stage,
6057 )
6058 # SCALE-UP VCA - END
6059 scale_process = None
6060
6061 # POST-SCALE BEGIN
6062 # execute primitive service POST-SCALING
6063 step = "Executing post-scale vnf-config-primitive"
6064 if scaling_descriptor.get("scaling-config-action"):
6065 for scaling_config_action in scaling_descriptor[
6066 "scaling-config-action"
6067 ]:
6068 if (
6069 scaling_config_action.get("trigger") == "post-scale-in"
6070 and scaling_type == "SCALE_IN"
6071 ) or (
6072 scaling_config_action.get("trigger") == "post-scale-out"
6073 and scaling_type == "SCALE_OUT"
6074 ):
6075 vnf_config_primitive = scaling_config_action[
6076 "vnf-config-primitive-name-ref"
6077 ]
6078 step = db_nslcmop_update[
6079 "detailed-status"
6080 ] = "executing post-scale scaling-config-action '{}'".format(
6081 vnf_config_primitive
6082 )
6083
6084 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6085 if db_vnfr.get("additionalParamsForVnf"):
6086 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6087
6088 # look for primitive
6089 for config_primitive in (
6090 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6091 ).get("config-primitive", ()):
6092 if config_primitive["name"] == vnf_config_primitive:
6093 break
6094 else:
6095 raise LcmException(
6096 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6097 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6098 "config-primitive".format(
6099 scaling_group, vnf_config_primitive
6100 )
6101 )
6102 scale_process = "VCA"
6103 db_nsr_update["config-status"] = "configuring post-scaling"
6104 primitive_params = self._map_primitive_params(
6105 config_primitive, {}, vnfr_params
6106 )
6107
6108 # Post-scale retry check: Check if this sub-operation has been executed before
6109 op_index = self._check_or_add_scale_suboperation(
6110 db_nslcmop,
6111 vnf_index,
6112 vnf_config_primitive,
6113 primitive_params,
6114 "POST-SCALE",
6115 )
6116 if op_index == self.SUBOPERATION_STATUS_SKIP:
6117 # Skip sub-operation
6118 result = "COMPLETED"
6119 result_detail = "Done"
6120 self.logger.debug(
6121 logging_text
6122 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6123 vnf_config_primitive, result, result_detail
6124 )
6125 )
6126 else:
6127 if op_index == self.SUBOPERATION_STATUS_NEW:
6128 # New sub-operation: Get index of this sub-operation
6129 op_index = (
6130 len(db_nslcmop.get("_admin", {}).get("operations"))
6131 - 1
6132 )
6133 self.logger.debug(
6134 logging_text
6135 + "vnf_config_primitive={} New sub-operation".format(
6136 vnf_config_primitive
6137 )
6138 )
6139 else:
6140 # retry: Get registered params for this existing sub-operation
6141 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6142 op_index
6143 ]
6144 vnf_index = op.get("member_vnf_index")
6145 vnf_config_primitive = op.get("primitive")
6146 primitive_params = op.get("primitive_params")
6147 self.logger.debug(
6148 logging_text
6149 + "vnf_config_primitive={} Sub-operation retry".format(
6150 vnf_config_primitive
6151 )
6152 )
6153 # Execute the primitive, either with new (first-time) or registered (reintent) args
6154 ee_descriptor_id = config_primitive.get(
6155 "execution-environment-ref"
6156 )
6157 primitive_name = config_primitive.get(
6158 "execution-environment-primitive", vnf_config_primitive
6159 )
6160 ee_id, vca_type = self._look_for_deployed_vca(
6161 nsr_deployed["VCA"],
6162 member_vnf_index=vnf_index,
6163 vdu_id=None,
6164 vdu_count_index=None,
6165 ee_descriptor_id=ee_descriptor_id,
6166 )
6167 result, result_detail = await self._ns_execute_primitive(
6168 ee_id,
6169 primitive_name,
6170 primitive_params,
6171 vca_type=vca_type,
6172 vca_id=vca_id,
6173 )
6174 self.logger.debug(
6175 logging_text
6176 + "vnf_config_primitive={} Done with result {} {}".format(
6177 vnf_config_primitive, result, result_detail
6178 )
6179 )
6180 # Update operationState = COMPLETED | FAILED
6181 self._update_suboperation_status(
6182 db_nslcmop, op_index, result, result_detail
6183 )
6184
6185 if result == "FAILED":
6186 raise LcmException(result_detail)
6187 db_nsr_update["config-status"] = old_config_status
6188 scale_process = None
6189 # POST-SCALE END
6190
6191 db_nsr_update[
6192 "detailed-status"
6193 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6194 db_nsr_update["operational-status"] = (
6195 "running"
6196 if old_operational_status == "failed"
6197 else old_operational_status
6198 )
6199 db_nsr_update["config-status"] = old_config_status
6200 return
6201 except (
6202 ROclient.ROClientException,
6203 DbException,
6204 LcmException,
6205 NgRoException,
6206 ) as e:
6207 self.logger.error(logging_text + "Exit Exception {}".format(e))
6208 exc = e
6209 except asyncio.CancelledError:
6210 self.logger.error(
6211 logging_text + "Cancelled Exception while '{}'".format(step)
6212 )
6213 exc = "Operation was cancelled"
6214 except Exception as e:
6215 exc = traceback.format_exc()
6216 self.logger.critical(
6217 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6218 exc_info=True,
6219 )
6220 finally:
6221 self._write_ns_status(
6222 nsr_id=nsr_id,
6223 ns_state=None,
6224 current_operation="IDLE",
6225 current_operation_id=None,
6226 )
6227 if tasks_dict_info:
6228 stage[1] = "Waiting for instantiate pending tasks."
6229 self.logger.debug(logging_text + stage[1])
6230 exc = await self._wait_for_tasks(
6231 logging_text,
6232 tasks_dict_info,
6233 self.timeout_ns_deploy,
6234 stage,
6235 nslcmop_id,
6236 nsr_id=nsr_id,
6237 )
6238 if exc:
6239 db_nslcmop_update[
6240 "detailed-status"
6241 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6242 nslcmop_operation_state = "FAILED"
6243 if db_nsr:
6244 db_nsr_update["operational-status"] = old_operational_status
6245 db_nsr_update["config-status"] = old_config_status
6246 db_nsr_update["detailed-status"] = ""
6247 if scale_process:
6248 if "VCA" in scale_process:
6249 db_nsr_update["config-status"] = "failed"
6250 if "RO" in scale_process:
6251 db_nsr_update["operational-status"] = "failed"
6252 db_nsr_update[
6253 "detailed-status"
6254 ] = "FAILED scaling nslcmop={} {}: {}".format(
6255 nslcmop_id, step, exc
6256 )
6257 else:
6258 error_description_nslcmop = None
6259 nslcmop_operation_state = "COMPLETED"
6260 db_nslcmop_update["detailed-status"] = "Done"
6261
6262 self._write_op_status(
6263 op_id=nslcmop_id,
6264 stage="",
6265 error_message=error_description_nslcmop,
6266 operation_state=nslcmop_operation_state,
6267 other_update=db_nslcmop_update,
6268 )
6269 if db_nsr:
6270 self._write_ns_status(
6271 nsr_id=nsr_id,
6272 ns_state=None,
6273 current_operation="IDLE",
6274 current_operation_id=None,
6275 other_update=db_nsr_update,
6276 )
6277
6278 if nslcmop_operation_state:
6279 try:
6280 msg = {
6281 "nsr_id": nsr_id,
6282 "nslcmop_id": nslcmop_id,
6283 "operationState": nslcmop_operation_state,
6284 }
6285 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6286 except Exception as e:
6287 self.logger.error(
6288 logging_text + "kafka_write notification Exception {}".format(e)
6289 )
6290 self.logger.debug(logging_text + "Exit")
6291 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6292
6293 async def _scale_kdu(
6294 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6295 ):
6296 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6297 for kdu_name in _scaling_info:
6298 for kdu_scaling_info in _scaling_info[kdu_name]:
6299 deployed_kdu, index = get_deployed_kdu(
6300 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6301 )
6302 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6303 kdu_instance = deployed_kdu["kdu-instance"]
6304 scale = int(kdu_scaling_info["scale"])
6305 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6306
6307 db_dict = {
6308 "collection": "nsrs",
6309 "filter": {"_id": nsr_id},
6310 "path": "_admin.deployed.K8s.{}".format(index),
6311 }
6312
6313 step = "scaling application {}".format(
6314 kdu_scaling_info["resource-name"]
6315 )
6316 self.logger.debug(logging_text + step)
6317
6318 if kdu_scaling_info["type"] == "delete":
6319 kdu_config = get_configuration(db_vnfd, kdu_name)
6320 if (
6321 kdu_config
6322 and kdu_config.get("terminate-config-primitive")
6323 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6324 ):
6325 terminate_config_primitive_list = kdu_config.get(
6326 "terminate-config-primitive"
6327 )
6328 terminate_config_primitive_list.sort(
6329 key=lambda val: int(val["seq"])
6330 )
6331
6332 for (
6333 terminate_config_primitive
6334 ) in terminate_config_primitive_list:
6335 primitive_params_ = self._map_primitive_params(
6336 terminate_config_primitive, {}, {}
6337 )
6338 step = "execute terminate config primitive"
6339 self.logger.debug(logging_text + step)
6340 await asyncio.wait_for(
6341 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6342 cluster_uuid=cluster_uuid,
6343 kdu_instance=kdu_instance,
6344 primitive_name=terminate_config_primitive["name"],
6345 params=primitive_params_,
6346 db_dict=db_dict,
6347 vca_id=vca_id,
6348 ),
6349 timeout=600,
6350 )
6351
6352 await asyncio.wait_for(
6353 self.k8scluster_map[k8s_cluster_type].scale(
6354 kdu_instance,
6355 scale,
6356 kdu_scaling_info["resource-name"],
6357 vca_id=vca_id,
6358 ),
6359 timeout=self.timeout_vca_on_error,
6360 )
6361
6362 if kdu_scaling_info["type"] == "create":
6363 kdu_config = get_configuration(db_vnfd, kdu_name)
6364 if (
6365 kdu_config
6366 and kdu_config.get("initial-config-primitive")
6367 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6368 ):
6369 initial_config_primitive_list = kdu_config.get(
6370 "initial-config-primitive"
6371 )
6372 initial_config_primitive_list.sort(
6373 key=lambda val: int(val["seq"])
6374 )
6375
6376 for initial_config_primitive in initial_config_primitive_list:
6377 primitive_params_ = self._map_primitive_params(
6378 initial_config_primitive, {}, {}
6379 )
6380 step = "execute initial config primitive"
6381 self.logger.debug(logging_text + step)
6382 await asyncio.wait_for(
6383 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6384 cluster_uuid=cluster_uuid,
6385 kdu_instance=kdu_instance,
6386 primitive_name=initial_config_primitive["name"],
6387 params=primitive_params_,
6388 db_dict=db_dict,
6389 vca_id=vca_id,
6390 ),
6391 timeout=600,
6392 )
6393
6394 async def _scale_ng_ro(
6395 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6396 ):
6397 nsr_id = db_nslcmop["nsInstanceId"]
6398 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6399 db_vnfrs = {}
6400
6401 # read from db: vnfd's for every vnf
6402 db_vnfds = []
6403
6404 # for each vnf in ns, read vnfd
6405 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6406 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6407 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6408 # if we haven't this vnfd, read it from db
6409 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6410 # read from db
6411 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6412 db_vnfds.append(vnfd)
6413 n2vc_key = self.n2vc.get_public_key()
6414 n2vc_key_list = [n2vc_key]
6415 self.scale_vnfr(
6416 db_vnfr,
6417 vdu_scaling_info.get("vdu-create"),
6418 vdu_scaling_info.get("vdu-delete"),
6419 mark_delete=True,
6420 )
6421 # db_vnfr has been updated, update db_vnfrs to use it
6422 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6423 await self._instantiate_ng_ro(
6424 logging_text,
6425 nsr_id,
6426 db_nsd,
6427 db_nsr,
6428 db_nslcmop,
6429 db_vnfrs,
6430 db_vnfds,
6431 n2vc_key_list,
6432 stage=stage,
6433 start_deploy=time(),
6434 timeout_ns_deploy=self.timeout_ns_deploy,
6435 )
6436 if vdu_scaling_info.get("vdu-delete"):
6437 self.scale_vnfr(
6438 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6439 )
6440
6441 async def extract_prometheus_scrape_jobs(
6442 self,
6443 ee_id,
6444 artifact_path,
6445 ee_config_descriptor,
6446 vnfr_id,
6447 nsr_id,
6448 target_ip
6449 ):
6450 # look if exist a file called 'prometheus*.j2' and
6451 artifact_content = self.fs.dir_ls(artifact_path)
6452 job_file = next(
6453 (
6454 f
6455 for f in artifact_content
6456 if f.startswith("prometheus") and f.endswith(".j2")
6457 ),
6458 None,
6459 )
6460 if not job_file:
6461 return
6462 with self.fs.file_open((artifact_path, job_file), "r") as f:
6463 job_data = f.read()
6464
6465 # TODO get_service
6466 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6467 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6468 host_port = "80"
6469 vnfr_id = vnfr_id.replace("-", "")
6470 variables = {
6471 "JOB_NAME": vnfr_id,
6472 "TARGET_IP": target_ip,
6473 "EXPORTER_POD_IP": host_name,
6474 "EXPORTER_POD_PORT": host_port,
6475 }
6476 job_list = parse_job(job_data, variables)
6477 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6478 for job in job_list:
6479 if (
6480 not isinstance(job.get("job_name"), str)
6481 or vnfr_id not in job["job_name"]
6482 ):
6483 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6484 job["nsr_id"] = nsr_id
6485 job["vnfr_id"] = vnfr_id
6486 return job_list
6487
6488 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6489 """
6490 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6491
6492 :param: vim_account_id: VIM Account ID
6493
6494 :return: (cloud_name, cloud_credential)
6495 """
6496 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6497 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6498
6499 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6500 """
6501 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6502
6503 :param: vim_account_id: VIM Account ID
6504
6505 :return: (cloud_name, cloud_credential)
6506 """
6507 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6508 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")