Fix bug 1865: Manually scaling VDU from 0 to 1 instance fails
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99 from osm_lcm.prometheus import parse_job
100
101 from copy import copy, deepcopy
102 from time import time
103 from uuid import uuid4
104
105 from random import randint
106
107 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
108
109
110 class NsLcm(LcmBase):
111 timeout_vca_on_error = (
112 5 * 60
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete = 10 * 60
117 timeout_primitive = 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive = (
119 10 * 60
120 ) # timeout for some progress in a primitive execution
121
122 SUBOPERATION_STATUS_NOT_FOUND = -1
123 SUBOPERATION_STATUS_NEW = -2
124 SUBOPERATION_STATUS_SKIP = -3
125 task_name_deploy_vca = "Deploying VCA"
126
127 def __init__(self, msg, lcm_tasks, config, loop):
128 """
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
131 :return: None
132 """
133 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
134
135 self.db = Database().instance.db
136 self.fs = Filesystem().instance.fs
137 self.loop = loop
138 self.lcm_tasks = lcm_tasks
139 self.timeout = config["timeout"]
140 self.ro_config = config["ro_config"]
141 self.ng_ro = config["ro_config"].get("ng")
142 self.vca_config = config["VCA"].copy()
143
144 # create N2VC connector
145 self.n2vc = N2VCJujuConnector(
146 log=self.logger,
147 loop=self.loop,
148 on_update_db=self._on_update_n2vc_db,
149 fs=self.fs,
150 db=self.db,
151 )
152
153 self.conn_helm_ee = LCMHelmConn(
154 log=self.logger,
155 loop=self.loop,
156 vca_config=self.vca_config,
157 on_update_db=self._on_update_n2vc_db,
158 )
159
160 self.k8sclusterhelm2 = K8sHelmConnector(
161 kubectl_command=self.vca_config.get("kubectlpath"),
162 helm_command=self.vca_config.get("helmpath"),
163 log=self.logger,
164 on_update_db=None,
165 fs=self.fs,
166 db=self.db,
167 )
168
169 self.k8sclusterhelm3 = K8sHelm3Connector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helm3path"),
172 fs=self.fs,
173 log=self.logger,
174 db=self.db,
175 on_update_db=None,
176 )
177
178 self.k8sclusterjuju = K8sJujuConnector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 juju_command=self.vca_config.get("jujupath"),
181 log=self.logger,
182 loop=self.loop,
183 on_update_db=self._on_update_k8s_db,
184 fs=self.fs,
185 db=self.db,
186 )
187
188 self.k8scluster_map = {
189 "helm-chart": self.k8sclusterhelm2,
190 "helm-chart-v3": self.k8sclusterhelm3,
191 "chart": self.k8sclusterhelm3,
192 "juju-bundle": self.k8sclusterjuju,
193 "juju": self.k8sclusterjuju,
194 }
195
196 self.vca_map = {
197 "lxc_proxy_charm": self.n2vc,
198 "native_charm": self.n2vc,
199 "k8s_proxy_charm": self.n2vc,
200 "helm": self.conn_helm_ee,
201 "helm-v3": self.conn_helm_ee,
202 }
203
204 # create RO client
205 self.RO = NgRoClient(self.loop, **self.ro_config)
206
207 @staticmethod
208 def increment_ip_mac(ip_mac, vm_index=1):
209 if not isinstance(ip_mac, str):
210 return ip_mac
211 try:
212 # try with ipv4 look for last dot
213 i = ip_mac.rfind(".")
214 if i > 0:
215 i += 1
216 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i = ip_mac.rfind(":")
219 if i > 0:
220 i += 1
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
223 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
224 )
225 except Exception:
226 pass
227 return None
228
229 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
230
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
232
233 try:
234 # TODO filter RO descriptor fields...
235
236 # write to database
237 db_dict = dict()
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict["deploymentStatus"] = ro_descriptor
240 self.update_db_2("nsrs", nsrs_id, db_dict)
241
242 except Exception as e:
243 self.logger.warn(
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
245 )
246
247 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
248
249 # remove last dot from path (if exists)
250 if path.endswith("."):
251 path = path[:-1]
252
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
255 try:
256
257 nsr_id = filter.get("_id")
258
259 # read ns record from database
260 nsr = self.db.get_one(table="nsrs", q_filter=filter)
261 current_ns_status = nsr.get("nsState")
262
263 # get vca status for NS
264 status_dict = await self.n2vc.get_status(
265 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
266 )
267
268 # vcaStatus
269 db_dict = dict()
270 db_dict["vcaStatus"] = status_dict
271 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
272
273 # update configurationStatus for this VCA
274 try:
275 vca_index = int(path[path.rfind(".") + 1 :])
276
277 vca_list = deep_get(
278 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
279 )
280 vca_status = vca_list[vca_index].get("status")
281
282 configuration_status_list = nsr.get("configurationStatus")
283 config_status = configuration_status_list[vca_index].get("status")
284
285 if config_status == "BROKEN" and vca_status != "failed":
286 db_dict["configurationStatus"][vca_index] = "READY"
287 elif config_status != "BROKEN" and vca_status == "failed":
288 db_dict["configurationStatus"][vca_index] = "BROKEN"
289 except Exception as e:
290 # not update configurationStatus
291 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
292
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
295 is_degraded = False
296 if current_ns_status in ("READY", "DEGRADED"):
297 error_description = ""
298 # check machines
299 if status_dict.get("machines"):
300 for machine_id in status_dict.get("machines"):
301 machine = status_dict.get("machines").get(machine_id)
302 # check machine agent-status
303 if machine.get("agent-status"):
304 s = machine.get("agent-status").get("status")
305 if s != "started":
306 is_degraded = True
307 error_description += (
308 "machine {} agent-status={} ; ".format(
309 machine_id, s
310 )
311 )
312 # check machine instance status
313 if machine.get("instance-status"):
314 s = machine.get("instance-status").get("status")
315 if s != "running":
316 is_degraded = True
317 error_description += (
318 "machine {} instance-status={} ; ".format(
319 machine_id, s
320 )
321 )
322 # check applications
323 if status_dict.get("applications"):
324 for app_id in status_dict.get("applications"):
325 app = status_dict.get("applications").get(app_id)
326 # check application status
327 if app.get("status"):
328 s = app.get("status").get("status")
329 if s != "active":
330 is_degraded = True
331 error_description += (
332 "application {} status={} ; ".format(app_id, s)
333 )
334
335 if error_description:
336 db_dict["errorDescription"] = error_description
337 if current_ns_status == "READY" and is_degraded:
338 db_dict["nsState"] = "DEGRADED"
339 if current_ns_status == "DEGRADED" and not is_degraded:
340 db_dict["nsState"] = "READY"
341
342 # write to database
343 self.update_db_2("nsrs", nsr_id, db_dict)
344
345 except (asyncio.CancelledError, asyncio.TimeoutError):
346 raise
347 except Exception as e:
348 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
349
350 async def _on_update_k8s_db(
351 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
352 ):
353 """
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :return: none
359 """
360
361 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
362 # .format(cluster_uuid, kdu_instance, filter))
363
364 try:
365 nsr_id = filter.get("_id")
366
367 # get vca status for NS
368 vca_status = await self.k8sclusterjuju.status_kdu(
369 cluster_uuid,
370 kdu_instance,
371 complete_status=True,
372 yaml_format=False,
373 vca_id=vca_id,
374 )
375 # vcaStatus
376 db_dict = dict()
377 db_dict["vcaStatus"] = {nsr_id: vca_status}
378
379 await self.k8sclusterjuju.update_vca_status(
380 db_dict["vcaStatus"],
381 kdu_instance,
382 vca_id=vca_id,
383 )
384
385 # write to database
386 self.update_db_2("nsrs", nsr_id, db_dict)
387
388 except (asyncio.CancelledError, asyncio.TimeoutError):
389 raise
390 except Exception as e:
391 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
392
393 @staticmethod
394 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
395 try:
396 env = Environment(undefined=StrictUndefined)
397 template = env.from_string(cloud_init_text)
398 return template.render(additional_params or {})
399 except UndefinedError as e:
400 raise LcmException(
401 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
402 "file, must be provided in the instantiation parameters inside the "
403 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
404 )
405 except (TemplateError, TemplateNotFound) as e:
406 raise LcmException(
407 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
408 vnfd_id, vdu_id, e
409 )
410 )
411
412 def _get_vdu_cloud_init_content(self, vdu, vnfd):
413 cloud_init_content = cloud_init_file = None
414 try:
415 if vdu.get("cloud-init-file"):
416 base_folder = vnfd["_admin"]["storage"]
417 if base_folder["pkg-dir"]:
418 cloud_init_file = "{}/{}/cloud_init/{}".format(
419 base_folder["folder"],
420 base_folder["pkg-dir"],
421 vdu["cloud-init-file"],
422 )
423 else:
424 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
425 base_folder["folder"],
426 vdu["cloud-init-file"],
427 )
428 with self.fs.file_open(cloud_init_file, "r") as ci_file:
429 cloud_init_content = ci_file.read()
430 elif vdu.get("cloud-init"):
431 cloud_init_content = vdu["cloud-init"]
432
433 return cloud_init_content
434 except FsException as e:
435 raise LcmException(
436 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
437 vnfd["id"], vdu["id"], cloud_init_file, e
438 )
439 )
440
441 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
442 vdur = next(
443 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
444 {}
445 )
446 additional_params = vdur.get("additionalParams")
447 return parse_yaml_strings(additional_params)
448
449 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
450 """
451 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
452 :param vnfd: input vnfd
453 :param new_id: overrides vnf id if provided
454 :param additionalParams: Instantiation params for VNFs provided
455 :param nsrId: Id of the NSR
456 :return: copy of vnfd
457 """
458 vnfd_RO = deepcopy(vnfd)
459 # remove unused by RO configuration, monitoring, scaling and internal keys
460 vnfd_RO.pop("_id", None)
461 vnfd_RO.pop("_admin", None)
462 vnfd_RO.pop("monitoring-param", None)
463 vnfd_RO.pop("scaling-group-descriptor", None)
464 vnfd_RO.pop("kdu", None)
465 vnfd_RO.pop("k8s-cluster", None)
466 if new_id:
467 vnfd_RO["id"] = new_id
468
469 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
470 for vdu in get_iterable(vnfd_RO, "vdu"):
471 vdu.pop("cloud-init-file", None)
472 vdu.pop("cloud-init", None)
473 return vnfd_RO
474
475 @staticmethod
476 def ip_profile_2_RO(ip_profile):
477 RO_ip_profile = deepcopy(ip_profile)
478 if "dns-server" in RO_ip_profile:
479 if isinstance(RO_ip_profile["dns-server"], list):
480 RO_ip_profile["dns-address"] = []
481 for ds in RO_ip_profile.pop("dns-server"):
482 RO_ip_profile["dns-address"].append(ds["address"])
483 else:
484 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
485 if RO_ip_profile.get("ip-version") == "ipv4":
486 RO_ip_profile["ip-version"] = "IPv4"
487 if RO_ip_profile.get("ip-version") == "ipv6":
488 RO_ip_profile["ip-version"] = "IPv6"
489 if "dhcp-params" in RO_ip_profile:
490 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
491 return RO_ip_profile
492
493 def _get_ro_vim_id_for_vim_account(self, vim_account):
494 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
495 if db_vim["_admin"]["operationalState"] != "ENABLED":
496 raise LcmException(
497 "VIM={} is not available. operationalState={}".format(
498 vim_account, db_vim["_admin"]["operationalState"]
499 )
500 )
501 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
502 return RO_vim_id
503
504 def get_ro_wim_id_for_wim_account(self, wim_account):
505 if isinstance(wim_account, str):
506 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
507 if db_wim["_admin"]["operationalState"] != "ENABLED":
508 raise LcmException(
509 "WIM={} is not available. operationalState={}".format(
510 wim_account, db_wim["_admin"]["operationalState"]
511 )
512 )
513 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
514 return RO_wim_id
515 else:
516 return wim_account
517
518 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
519
520 db_vdu_push_list = []
521 template_vdur = []
522 db_update = {"_admin.modified": time()}
523 if vdu_create:
524 for vdu_id, vdu_count in vdu_create.items():
525 vdur = next(
526 (
527 vdur
528 for vdur in reversed(db_vnfr["vdur"])
529 if vdur["vdu-id-ref"] == vdu_id
530 ),
531 None,
532 )
533 if not vdur:
534 # Read the template saved in the db:
535 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
536 vdur_template = db_vnfr.get("vdur-template")
537 if not vdur_template:
538 raise LcmException(
539 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
540 vdu_id
541 )
542 )
543 vdur = vdur_template[0]
544 #Delete a template from the database after using it
545 self.db.set_one("vnfrs",
546 {"_id": db_vnfr["_id"]},
547 None,
548 pull={"vdur-template": {"_id": vdur['_id']}}
549 )
550 for count in range(vdu_count):
551 vdur_copy = deepcopy(vdur)
552 vdur_copy["status"] = "BUILD"
553 vdur_copy["status-detailed"] = None
554 vdur_copy["ip-address"] = None
555 vdur_copy["_id"] = str(uuid4())
556 vdur_copy["count-index"] += count + 1
557 vdur_copy["id"] = "{}-{}".format(
558 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
559 )
560 vdur_copy.pop("vim_info", None)
561 for iface in vdur_copy["interfaces"]:
562 if iface.get("fixed-ip"):
563 iface["ip-address"] = self.increment_ip_mac(
564 iface["ip-address"], count + 1
565 )
566 else:
567 iface.pop("ip-address", None)
568 if iface.get("fixed-mac"):
569 iface["mac-address"] = self.increment_ip_mac(
570 iface["mac-address"], count + 1
571 )
572 else:
573 iface.pop("mac-address", None)
574 if db_vnfr["vdur"]:
575 iface.pop(
576 "mgmt_vnf", None
577 ) # only first vdu can be managment of vnf
578 db_vdu_push_list.append(vdur_copy)
579 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
580 if vdu_delete:
581 if len(db_vnfr["vdur"]) == 1:
582 # The scale will move to 0 instances
583 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
584 template_vdur = [db_vnfr["vdur"][0]]
585 for vdu_id, vdu_count in vdu_delete.items():
586 if mark_delete:
587 indexes_to_delete = [
588 iv[0]
589 for iv in enumerate(db_vnfr["vdur"])
590 if iv[1]["vdu-id-ref"] == vdu_id
591 ]
592 db_update.update(
593 {
594 "vdur.{}.status".format(i): "DELETING"
595 for i in indexes_to_delete[-vdu_count:]
596 }
597 )
598 else:
599 # it must be deleted one by one because common.db does not allow otherwise
600 vdus_to_delete = [
601 v
602 for v in reversed(db_vnfr["vdur"])
603 if v["vdu-id-ref"] == vdu_id
604 ]
605 for vdu in vdus_to_delete[:vdu_count]:
606 self.db.set_one(
607 "vnfrs",
608 {"_id": db_vnfr["_id"]},
609 None,
610 pull={"vdur": {"_id": vdu["_id"]}},
611 )
612 db_push = {}
613 if db_vdu_push_list:
614 db_push["vdur"] = db_vdu_push_list
615 if template_vdur:
616 db_push["vdur-template"] = template_vdur
617 if not db_push:
618 db_push = None
619 db_vnfr["vdur-template"] = template_vdur
620 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
621 # modify passed dictionary db_vnfr
622 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
623 db_vnfr["vdur"] = db_vnfr_["vdur"]
624
625 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
626 """
627 Updates database nsr with the RO info for the created vld
628 :param ns_update_nsr: dictionary to be filled with the updated info
629 :param db_nsr: content of db_nsr. This is also modified
630 :param nsr_desc_RO: nsr descriptor from RO
631 :return: Nothing, LcmException is raised on errors
632 """
633
634 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
635 for net_RO in get_iterable(nsr_desc_RO, "nets"):
636 if vld["id"] != net_RO.get("ns_net_osm_id"):
637 continue
638 vld["vim-id"] = net_RO.get("vim_net_id")
639 vld["name"] = net_RO.get("vim_name")
640 vld["status"] = net_RO.get("status")
641 vld["status-detailed"] = net_RO.get("error_msg")
642 ns_update_nsr["vld.{}".format(vld_index)] = vld
643 break
644 else:
645 raise LcmException(
646 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
647 )
648
649 def set_vnfr_at_error(self, db_vnfrs, error_text):
650 try:
651 for db_vnfr in db_vnfrs.values():
652 vnfr_update = {"status": "ERROR"}
653 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
654 if "status" not in vdur:
655 vdur["status"] = "ERROR"
656 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
657 if error_text:
658 vdur["status-detailed"] = str(error_text)
659 vnfr_update[
660 "vdur.{}.status-detailed".format(vdu_index)
661 ] = "ERROR"
662 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
663 except DbException as e:
664 self.logger.error("Cannot update vnf. {}".format(e))
665
666 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
667 """
668 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
669 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
670 :param nsr_desc_RO: nsr descriptor from RO
671 :return: Nothing, LcmException is raised on errors
672 """
673 for vnf_index, db_vnfr in db_vnfrs.items():
674 for vnf_RO in nsr_desc_RO["vnfs"]:
675 if vnf_RO["member_vnf_index"] != vnf_index:
676 continue
677 vnfr_update = {}
678 if vnf_RO.get("ip_address"):
679 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
680 "ip_address"
681 ].split(";")[0]
682 elif not db_vnfr.get("ip-address"):
683 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
684 raise LcmExceptionNoMgmtIP(
685 "ns member_vnf_index '{}' has no IP address".format(
686 vnf_index
687 )
688 )
689
690 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
691 vdur_RO_count_index = 0
692 if vdur.get("pdu-type"):
693 continue
694 for vdur_RO in get_iterable(vnf_RO, "vms"):
695 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
696 continue
697 if vdur["count-index"] != vdur_RO_count_index:
698 vdur_RO_count_index += 1
699 continue
700 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
701 if vdur_RO.get("ip_address"):
702 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
703 else:
704 vdur["ip-address"] = None
705 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
706 vdur["name"] = vdur_RO.get("vim_name")
707 vdur["status"] = vdur_RO.get("status")
708 vdur["status-detailed"] = vdur_RO.get("error_msg")
709 for ifacer in get_iterable(vdur, "interfaces"):
710 for interface_RO in get_iterable(vdur_RO, "interfaces"):
711 if ifacer["name"] == interface_RO.get("internal_name"):
712 ifacer["ip-address"] = interface_RO.get(
713 "ip_address"
714 )
715 ifacer["mac-address"] = interface_RO.get(
716 "mac_address"
717 )
718 break
719 else:
720 raise LcmException(
721 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
722 "from VIM info".format(
723 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
724 )
725 )
726 vnfr_update["vdur.{}".format(vdu_index)] = vdur
727 break
728 else:
729 raise LcmException(
730 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
731 "VIM info".format(
732 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
733 )
734 )
735
736 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
737 for net_RO in get_iterable(nsr_desc_RO, "nets"):
738 if vld["id"] != net_RO.get("vnf_net_osm_id"):
739 continue
740 vld["vim-id"] = net_RO.get("vim_net_id")
741 vld["name"] = net_RO.get("vim_name")
742 vld["status"] = net_RO.get("status")
743 vld["status-detailed"] = net_RO.get("error_msg")
744 vnfr_update["vld.{}".format(vld_index)] = vld
745 break
746 else:
747 raise LcmException(
748 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
749 vnf_index, vld["id"]
750 )
751 )
752
753 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
754 break
755
756 else:
757 raise LcmException(
758 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
759 vnf_index
760 )
761 )
762
763 def _get_ns_config_info(self, nsr_id):
764 """
765 Generates a mapping between vnf,vdu elements and the N2VC id
766 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
767 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
768 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
769 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
770 """
771 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
772 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
773 mapping = {}
774 ns_config_info = {"osm-config-mapping": mapping}
775 for vca in vca_deployed_list:
776 if not vca["member-vnf-index"]:
777 continue
778 if not vca["vdu_id"]:
779 mapping[vca["member-vnf-index"]] = vca["application"]
780 else:
781 mapping[
782 "{}.{}.{}".format(
783 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
784 )
785 ] = vca["application"]
786 return ns_config_info
787
788 async def _instantiate_ng_ro(
789 self,
790 logging_text,
791 nsr_id,
792 nsd,
793 db_nsr,
794 db_nslcmop,
795 db_vnfrs,
796 db_vnfds,
797 n2vc_key_list,
798 stage,
799 start_deploy,
800 timeout_ns_deploy,
801 ):
802
803 db_vims = {}
804
805 def get_vim_account(vim_account_id):
806 nonlocal db_vims
807 if vim_account_id in db_vims:
808 return db_vims[vim_account_id]
809 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
810 db_vims[vim_account_id] = db_vim
811 return db_vim
812
813 # modify target_vld info with instantiation parameters
814 def parse_vld_instantiation_params(
815 target_vim, target_vld, vld_params, target_sdn
816 ):
817 if vld_params.get("ip-profile"):
818 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
819 "ip-profile"
820 ]
821 if vld_params.get("provider-network"):
822 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
823 "provider-network"
824 ]
825 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
826 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
827 "provider-network"
828 ]["sdn-ports"]
829 if vld_params.get("wimAccountId"):
830 target_wim = "wim:{}".format(vld_params["wimAccountId"])
831 target_vld["vim_info"][target_wim] = {}
832 for param in ("vim-network-name", "vim-network-id"):
833 if vld_params.get(param):
834 if isinstance(vld_params[param], dict):
835 for vim, vim_net in vld_params[param].items():
836 other_target_vim = "vim:" + vim
837 populate_dict(
838 target_vld["vim_info"],
839 (other_target_vim, param.replace("-", "_")),
840 vim_net,
841 )
842 else: # isinstance str
843 target_vld["vim_info"][target_vim][
844 param.replace("-", "_")
845 ] = vld_params[param]
846 if vld_params.get("common_id"):
847 target_vld["common_id"] = vld_params.get("common_id")
848
849 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
850 def update_ns_vld_target(target, ns_params):
851 for vnf_params in ns_params.get("vnf", ()):
852 if vnf_params.get("vimAccountId"):
853 target_vnf = next(
854 (
855 vnfr
856 for vnfr in db_vnfrs.values()
857 if vnf_params["member-vnf-index"]
858 == vnfr["member-vnf-index-ref"]
859 ),
860 None,
861 )
862 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
863 for a_index, a_vld in enumerate(target["ns"]["vld"]):
864 target_vld = find_in_list(
865 get_iterable(vdur, "interfaces"),
866 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
867 )
868 if target_vld:
869 if vnf_params.get("vimAccountId") not in a_vld.get(
870 "vim_info", {}
871 ):
872 target["ns"]["vld"][a_index].get("vim_info").update(
873 {
874 "vim:{}".format(vnf_params["vimAccountId"]): {
875 "vim_network_name": ""
876 }
877 }
878 )
879
880 nslcmop_id = db_nslcmop["_id"]
881 target = {
882 "name": db_nsr["name"],
883 "ns": {"vld": []},
884 "vnf": [],
885 "image": deepcopy(db_nsr["image"]),
886 "flavor": deepcopy(db_nsr["flavor"]),
887 "action_id": nslcmop_id,
888 "cloud_init_content": {},
889 }
890 for image in target["image"]:
891 image["vim_info"] = {}
892 for flavor in target["flavor"]:
893 flavor["vim_info"] = {}
894
895 if db_nslcmop.get("lcmOperationType") != "instantiate":
896 # get parameters of instantiation:
897 db_nslcmop_instantiate = self.db.get_list(
898 "nslcmops",
899 {
900 "nsInstanceId": db_nslcmop["nsInstanceId"],
901 "lcmOperationType": "instantiate",
902 },
903 )[-1]
904 ns_params = db_nslcmop_instantiate.get("operationParams")
905 else:
906 ns_params = db_nslcmop.get("operationParams")
907 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
908 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
909
910 cp2target = {}
911 for vld_index, vld in enumerate(db_nsr.get("vld")):
912 target_vim = "vim:{}".format(ns_params["vimAccountId"])
913 target_vld = {
914 "id": vld["id"],
915 "name": vld["name"],
916 "mgmt-network": vld.get("mgmt-network", False),
917 "type": vld.get("type"),
918 "vim_info": {
919 target_vim: {
920 "vim_network_name": vld.get("vim-network-name"),
921 "vim_account_id": ns_params["vimAccountId"],
922 }
923 },
924 }
925 # check if this network needs SDN assist
926 if vld.get("pci-interfaces"):
927 db_vim = get_vim_account(ns_params["vimAccountId"])
928 sdnc_id = db_vim["config"].get("sdn-controller")
929 if sdnc_id:
930 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
931 target_sdn = "sdn:{}".format(sdnc_id)
932 target_vld["vim_info"][target_sdn] = {
933 "sdn": True,
934 "target_vim": target_vim,
935 "vlds": [sdn_vld],
936 "type": vld.get("type"),
937 }
938
939 nsd_vnf_profiles = get_vnf_profiles(nsd)
940 for nsd_vnf_profile in nsd_vnf_profiles:
941 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
942 if cp["virtual-link-profile-id"] == vld["id"]:
943 cp2target[
944 "member_vnf:{}.{}".format(
945 cp["constituent-cpd-id"][0][
946 "constituent-base-element-id"
947 ],
948 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
949 )
950 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
951
952 # check at nsd descriptor, if there is an ip-profile
953 vld_params = {}
954 nsd_vlp = find_in_list(
955 get_virtual_link_profiles(nsd),
956 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
957 == vld["id"],
958 )
959 if (
960 nsd_vlp
961 and nsd_vlp.get("virtual-link-protocol-data")
962 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
963 ):
964 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
965 "l3-protocol-data"
966 ]
967 ip_profile_dest_data = {}
968 if "ip-version" in ip_profile_source_data:
969 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
970 "ip-version"
971 ]
972 if "cidr" in ip_profile_source_data:
973 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
974 "cidr"
975 ]
976 if "gateway-ip" in ip_profile_source_data:
977 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
978 "gateway-ip"
979 ]
980 if "dhcp-enabled" in ip_profile_source_data:
981 ip_profile_dest_data["dhcp-params"] = {
982 "enabled": ip_profile_source_data["dhcp-enabled"]
983 }
984 vld_params["ip-profile"] = ip_profile_dest_data
985
986 # update vld_params with instantiation params
987 vld_instantiation_params = find_in_list(
988 get_iterable(ns_params, "vld"),
989 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
990 )
991 if vld_instantiation_params:
992 vld_params.update(vld_instantiation_params)
993 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
994 target["ns"]["vld"].append(target_vld)
995 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
996 update_ns_vld_target(target, ns_params)
997
998 for vnfr in db_vnfrs.values():
999 vnfd = find_in_list(
1000 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1001 )
1002 vnf_params = find_in_list(
1003 get_iterable(ns_params, "vnf"),
1004 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1005 )
1006 target_vnf = deepcopy(vnfr)
1007 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1008 for vld in target_vnf.get("vld", ()):
1009 # check if connected to a ns.vld, to fill target'
1010 vnf_cp = find_in_list(
1011 vnfd.get("int-virtual-link-desc", ()),
1012 lambda cpd: cpd.get("id") == vld["id"],
1013 )
1014 if vnf_cp:
1015 ns_cp = "member_vnf:{}.{}".format(
1016 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1017 )
1018 if cp2target.get(ns_cp):
1019 vld["target"] = cp2target[ns_cp]
1020
1021 vld["vim_info"] = {
1022 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1023 }
1024 # check if this network needs SDN assist
1025 target_sdn = None
1026 if vld.get("pci-interfaces"):
1027 db_vim = get_vim_account(vnfr["vim-account-id"])
1028 sdnc_id = db_vim["config"].get("sdn-controller")
1029 if sdnc_id:
1030 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1031 target_sdn = "sdn:{}".format(sdnc_id)
1032 vld["vim_info"][target_sdn] = {
1033 "sdn": True,
1034 "target_vim": target_vim,
1035 "vlds": [sdn_vld],
1036 "type": vld.get("type"),
1037 }
1038
1039 # check at vnfd descriptor, if there is an ip-profile
1040 vld_params = {}
1041 vnfd_vlp = find_in_list(
1042 get_virtual_link_profiles(vnfd),
1043 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1044 )
1045 if (
1046 vnfd_vlp
1047 and vnfd_vlp.get("virtual-link-protocol-data")
1048 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1049 ):
1050 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1051 "l3-protocol-data"
1052 ]
1053 ip_profile_dest_data = {}
1054 if "ip-version" in ip_profile_source_data:
1055 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1056 "ip-version"
1057 ]
1058 if "cidr" in ip_profile_source_data:
1059 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1060 "cidr"
1061 ]
1062 if "gateway-ip" in ip_profile_source_data:
1063 ip_profile_dest_data[
1064 "gateway-address"
1065 ] = ip_profile_source_data["gateway-ip"]
1066 if "dhcp-enabled" in ip_profile_source_data:
1067 ip_profile_dest_data["dhcp-params"] = {
1068 "enabled": ip_profile_source_data["dhcp-enabled"]
1069 }
1070
1071 vld_params["ip-profile"] = ip_profile_dest_data
1072 # update vld_params with instantiation params
1073 if vnf_params:
1074 vld_instantiation_params = find_in_list(
1075 get_iterable(vnf_params, "internal-vld"),
1076 lambda i_vld: i_vld["name"] == vld["id"],
1077 )
1078 if vld_instantiation_params:
1079 vld_params.update(vld_instantiation_params)
1080 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1081
1082 vdur_list = []
1083 for vdur in target_vnf.get("vdur", ()):
1084 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1085 continue # This vdu must not be created
1086 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1087
1088 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1089
1090 if ssh_keys_all:
1091 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1092 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1093 if (
1094 vdu_configuration
1095 and vdu_configuration.get("config-access")
1096 and vdu_configuration.get("config-access").get("ssh-access")
1097 ):
1098 vdur["ssh-keys"] = ssh_keys_all
1099 vdur["ssh-access-required"] = vdu_configuration[
1100 "config-access"
1101 ]["ssh-access"]["required"]
1102 elif (
1103 vnf_configuration
1104 and vnf_configuration.get("config-access")
1105 and vnf_configuration.get("config-access").get("ssh-access")
1106 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1107 ):
1108 vdur["ssh-keys"] = ssh_keys_all
1109 vdur["ssh-access-required"] = vnf_configuration[
1110 "config-access"
1111 ]["ssh-access"]["required"]
1112 elif ssh_keys_instantiation and find_in_list(
1113 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1114 ):
1115 vdur["ssh-keys"] = ssh_keys_instantiation
1116
1117 self.logger.debug("NS > vdur > {}".format(vdur))
1118
1119 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1120 # cloud-init
1121 if vdud.get("cloud-init-file"):
1122 vdur["cloud-init"] = "{}:file:{}".format(
1123 vnfd["_id"], vdud.get("cloud-init-file")
1124 )
1125 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1126 if vdur["cloud-init"] not in target["cloud_init_content"]:
1127 base_folder = vnfd["_admin"]["storage"]
1128 if base_folder["pkg-dir"]:
1129 cloud_init_file = "{}/{}/cloud_init/{}".format(
1130 base_folder["folder"],
1131 base_folder["pkg-dir"],
1132 vdud.get("cloud-init-file"),
1133 )
1134 else:
1135 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1136 base_folder["folder"],
1137 vdud.get("cloud-init-file"),
1138 )
1139 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1140 target["cloud_init_content"][
1141 vdur["cloud-init"]
1142 ] = ci_file.read()
1143 elif vdud.get("cloud-init"):
1144 vdur["cloud-init"] = "{}:vdu:{}".format(
1145 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1146 )
1147 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1148 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1149 "cloud-init"
1150 ]
1151 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1152 deploy_params_vdu = self._format_additional_params(
1153 vdur.get("additionalParams") or {}
1154 )
1155 deploy_params_vdu["OSM"] = get_osm_params(
1156 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1157 )
1158 vdur["additionalParams"] = deploy_params_vdu
1159
1160 # flavor
1161 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1162 if target_vim not in ns_flavor["vim_info"]:
1163 ns_flavor["vim_info"][target_vim] = {}
1164
1165 # deal with images
1166 # in case alternative images are provided we must check if they should be applied
1167 # for the vim_type, modify the vim_type taking into account
1168 ns_image_id = int(vdur["ns-image-id"])
1169 if vdur.get("alt-image-ids"):
1170 db_vim = get_vim_account(vnfr["vim-account-id"])
1171 vim_type = db_vim["vim_type"]
1172 for alt_image_id in vdur.get("alt-image-ids"):
1173 ns_alt_image = target["image"][int(alt_image_id)]
1174 if vim_type == ns_alt_image.get("vim-type"):
1175 # must use alternative image
1176 self.logger.debug(
1177 "use alternative image id: {}".format(alt_image_id)
1178 )
1179 ns_image_id = alt_image_id
1180 vdur["ns-image-id"] = ns_image_id
1181 break
1182 ns_image = target["image"][int(ns_image_id)]
1183 if target_vim not in ns_image["vim_info"]:
1184 ns_image["vim_info"][target_vim] = {}
1185
1186 vdur["vim_info"] = {target_vim: {}}
1187 # instantiation parameters
1188 # if vnf_params:
1189 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1190 # vdud["id"]), None)
1191 vdur_list.append(vdur)
1192 target_vnf["vdur"] = vdur_list
1193 target["vnf"].append(target_vnf)
1194
1195 desc = await self.RO.deploy(nsr_id, target)
1196 self.logger.debug("RO return > {}".format(desc))
1197 action_id = desc["action_id"]
1198 await self._wait_ng_ro(
1199 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1200 )
1201
1202 # Updating NSR
1203 db_nsr_update = {
1204 "_admin.deployed.RO.operational-status": "running",
1205 "detailed-status": " ".join(stage),
1206 }
1207 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1208 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1209 self._write_op_status(nslcmop_id, stage)
1210 self.logger.debug(
1211 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1212 )
1213 return
1214
1215 async def _wait_ng_ro(
1216 self,
1217 nsr_id,
1218 action_id,
1219 nslcmop_id=None,
1220 start_time=None,
1221 timeout=600,
1222 stage=None,
1223 ):
1224 detailed_status_old = None
1225 db_nsr_update = {}
1226 start_time = start_time or time()
1227 while time() <= start_time + timeout:
1228 desc_status = await self.RO.status(nsr_id, action_id)
1229 self.logger.debug("Wait NG RO > {}".format(desc_status))
1230 if desc_status["status"] == "FAILED":
1231 raise NgRoException(desc_status["details"])
1232 elif desc_status["status"] == "BUILD":
1233 if stage:
1234 stage[2] = "VIM: ({})".format(desc_status["details"])
1235 elif desc_status["status"] == "DONE":
1236 if stage:
1237 stage[2] = "Deployed at VIM"
1238 break
1239 else:
1240 assert False, "ROclient.check_ns_status returns unknown {}".format(
1241 desc_status["status"]
1242 )
1243 if stage and nslcmop_id and stage[2] != detailed_status_old:
1244 detailed_status_old = stage[2]
1245 db_nsr_update["detailed-status"] = " ".join(stage)
1246 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1247 self._write_op_status(nslcmop_id, stage)
1248 await asyncio.sleep(15, loop=self.loop)
1249 else: # timeout_ns_deploy
1250 raise NgRoException("Timeout waiting ns to deploy")
1251
1252 async def _terminate_ng_ro(
1253 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1254 ):
1255 db_nsr_update = {}
1256 failed_detail = []
1257 action_id = None
1258 start_deploy = time()
1259 try:
1260 target = {
1261 "ns": {"vld": []},
1262 "vnf": [],
1263 "image": [],
1264 "flavor": [],
1265 "action_id": nslcmop_id,
1266 }
1267 desc = await self.RO.deploy(nsr_id, target)
1268 action_id = desc["action_id"]
1269 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1270 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1271 self.logger.debug(
1272 logging_text
1273 + "ns terminate action at RO. action_id={}".format(action_id)
1274 )
1275
1276 # wait until done
1277 delete_timeout = 20 * 60 # 20 minutes
1278 await self._wait_ng_ro(
1279 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1280 )
1281
1282 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1283 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1284 # delete all nsr
1285 await self.RO.delete(nsr_id)
1286 except Exception as e:
1287 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1288 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1289 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1290 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1291 self.logger.debug(
1292 logging_text + "RO_action_id={} already deleted".format(action_id)
1293 )
1294 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1295 failed_detail.append("delete conflict: {}".format(e))
1296 self.logger.debug(
1297 logging_text
1298 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1299 )
1300 else:
1301 failed_detail.append("delete error: {}".format(e))
1302 self.logger.error(
1303 logging_text
1304 + "RO_action_id={} delete error: {}".format(action_id, e)
1305 )
1306
1307 if failed_detail:
1308 stage[2] = "Error deleting from VIM"
1309 else:
1310 stage[2] = "Deleted from VIM"
1311 db_nsr_update["detailed-status"] = " ".join(stage)
1312 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1313 self._write_op_status(nslcmop_id, stage)
1314
1315 if failed_detail:
1316 raise LcmException("; ".join(failed_detail))
1317 return
1318
1319 async def instantiate_RO(
1320 self,
1321 logging_text,
1322 nsr_id,
1323 nsd,
1324 db_nsr,
1325 db_nslcmop,
1326 db_vnfrs,
1327 db_vnfds,
1328 n2vc_key_list,
1329 stage,
1330 ):
1331 """
1332 Instantiate at RO
1333 :param logging_text: preffix text to use at logging
1334 :param nsr_id: nsr identity
1335 :param nsd: database content of ns descriptor
1336 :param db_nsr: database content of ns record
1337 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1338 :param db_vnfrs:
1339 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1340 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1341 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1342 :return: None or exception
1343 """
1344 try:
1345 start_deploy = time()
1346 ns_params = db_nslcmop.get("operationParams")
1347 if ns_params and ns_params.get("timeout_ns_deploy"):
1348 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1349 else:
1350 timeout_ns_deploy = self.timeout.get(
1351 "ns_deploy", self.timeout_ns_deploy
1352 )
1353
1354 # Check for and optionally request placement optimization. Database will be updated if placement activated
1355 stage[2] = "Waiting for Placement."
1356 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1357 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1358 for vnfr in db_vnfrs.values():
1359 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1360 break
1361 else:
1362 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1363
1364 return await self._instantiate_ng_ro(
1365 logging_text,
1366 nsr_id,
1367 nsd,
1368 db_nsr,
1369 db_nslcmop,
1370 db_vnfrs,
1371 db_vnfds,
1372 n2vc_key_list,
1373 stage,
1374 start_deploy,
1375 timeout_ns_deploy,
1376 )
1377 except Exception as e:
1378 stage[2] = "ERROR deploying at VIM"
1379 self.set_vnfr_at_error(db_vnfrs, str(e))
1380 self.logger.error(
1381 "Error deploying at VIM {}".format(e),
1382 exc_info=not isinstance(
1383 e,
1384 (
1385 ROclient.ROClientException,
1386 LcmException,
1387 DbException,
1388 NgRoException,
1389 ),
1390 ),
1391 )
1392 raise
1393
1394 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1395 """
1396 Wait for kdu to be up, get ip address
1397 :param logging_text: prefix use for logging
1398 :param nsr_id:
1399 :param vnfr_id:
1400 :param kdu_name:
1401 :return: IP address
1402 """
1403
1404 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1405 nb_tries = 0
1406
1407 while nb_tries < 360:
1408 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1409 kdur = next(
1410 (
1411 x
1412 for x in get_iterable(db_vnfr, "kdur")
1413 if x.get("kdu-name") == kdu_name
1414 ),
1415 None,
1416 )
1417 if not kdur:
1418 raise LcmException(
1419 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1420 )
1421 if kdur.get("status"):
1422 if kdur["status"] in ("READY", "ENABLED"):
1423 return kdur.get("ip-address")
1424 else:
1425 raise LcmException(
1426 "target KDU={} is in error state".format(kdu_name)
1427 )
1428
1429 await asyncio.sleep(10, loop=self.loop)
1430 nb_tries += 1
1431 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1432
1433 async def wait_vm_up_insert_key_ro(
1434 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1435 ):
1436 """
1437 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1438 :param logging_text: prefix use for logging
1439 :param nsr_id:
1440 :param vnfr_id:
1441 :param vdu_id:
1442 :param vdu_index:
1443 :param pub_key: public ssh key to inject, None to skip
1444 :param user: user to apply the public ssh key
1445 :return: IP address
1446 """
1447
1448 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1449 ro_nsr_id = None
1450 ip_address = None
1451 nb_tries = 0
1452 target_vdu_id = None
1453 ro_retries = 0
1454
1455 while True:
1456
1457 ro_retries += 1
1458 if ro_retries >= 360: # 1 hour
1459 raise LcmException(
1460 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1461 )
1462
1463 await asyncio.sleep(10, loop=self.loop)
1464
1465 # get ip address
1466 if not target_vdu_id:
1467 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1468
1469 if not vdu_id: # for the VNF case
1470 if db_vnfr.get("status") == "ERROR":
1471 raise LcmException(
1472 "Cannot inject ssh-key because target VNF is in error state"
1473 )
1474 ip_address = db_vnfr.get("ip-address")
1475 if not ip_address:
1476 continue
1477 vdur = next(
1478 (
1479 x
1480 for x in get_iterable(db_vnfr, "vdur")
1481 if x.get("ip-address") == ip_address
1482 ),
1483 None,
1484 )
1485 else: # VDU case
1486 vdur = next(
1487 (
1488 x
1489 for x in get_iterable(db_vnfr, "vdur")
1490 if x.get("vdu-id-ref") == vdu_id
1491 and x.get("count-index") == vdu_index
1492 ),
1493 None,
1494 )
1495
1496 if (
1497 not vdur and len(db_vnfr.get("vdur", ())) == 1
1498 ): # If only one, this should be the target vdu
1499 vdur = db_vnfr["vdur"][0]
1500 if not vdur:
1501 raise LcmException(
1502 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1503 vnfr_id, vdu_id, vdu_index
1504 )
1505 )
1506 # New generation RO stores information at "vim_info"
1507 ng_ro_status = None
1508 target_vim = None
1509 if vdur.get("vim_info"):
1510 target_vim = next(
1511 t for t in vdur["vim_info"]
1512 ) # there should be only one key
1513 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1514 if (
1515 vdur.get("pdu-type")
1516 or vdur.get("status") == "ACTIVE"
1517 or ng_ro_status == "ACTIVE"
1518 ):
1519 ip_address = vdur.get("ip-address")
1520 if not ip_address:
1521 continue
1522 target_vdu_id = vdur["vdu-id-ref"]
1523 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1524 raise LcmException(
1525 "Cannot inject ssh-key because target VM is in error state"
1526 )
1527
1528 if not target_vdu_id:
1529 continue
1530
1531 # inject public key into machine
1532 if pub_key and user:
1533 self.logger.debug(logging_text + "Inserting RO key")
1534 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1535 if vdur.get("pdu-type"):
1536 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1537 return ip_address
1538 try:
1539 ro_vm_id = "{}-{}".format(
1540 db_vnfr["member-vnf-index-ref"], target_vdu_id
1541 ) # TODO add vdu_index
1542 if self.ng_ro:
1543 target = {
1544 "action": {
1545 "action": "inject_ssh_key",
1546 "key": pub_key,
1547 "user": user,
1548 },
1549 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1550 }
1551 desc = await self.RO.deploy(nsr_id, target)
1552 action_id = desc["action_id"]
1553 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1554 break
1555 else:
1556 # wait until NS is deployed at RO
1557 if not ro_nsr_id:
1558 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1559 ro_nsr_id = deep_get(
1560 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1561 )
1562 if not ro_nsr_id:
1563 continue
1564 result_dict = await self.RO.create_action(
1565 item="ns",
1566 item_id_name=ro_nsr_id,
1567 descriptor={
1568 "add_public_key": pub_key,
1569 "vms": [ro_vm_id],
1570 "user": user,
1571 },
1572 )
1573 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1574 if not result_dict or not isinstance(result_dict, dict):
1575 raise LcmException(
1576 "Unknown response from RO when injecting key"
1577 )
1578 for result in result_dict.values():
1579 if result.get("vim_result") == 200:
1580 break
1581 else:
1582 raise ROclient.ROClientException(
1583 "error injecting key: {}".format(
1584 result.get("description")
1585 )
1586 )
1587 break
1588 except NgRoException as e:
1589 raise LcmException(
1590 "Reaching max tries injecting key. Error: {}".format(e)
1591 )
1592 except ROclient.ROClientException as e:
1593 if not nb_tries:
1594 self.logger.debug(
1595 logging_text
1596 + "error injecting key: {}. Retrying until {} seconds".format(
1597 e, 20 * 10
1598 )
1599 )
1600 nb_tries += 1
1601 if nb_tries >= 20:
1602 raise LcmException(
1603 "Reaching max tries injecting key. Error: {}".format(e)
1604 )
1605 else:
1606 break
1607
1608 return ip_address
1609
1610 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1611 """
1612 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1613 """
1614 my_vca = vca_deployed_list[vca_index]
1615 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1616 # vdu or kdu: no dependencies
1617 return
1618 timeout = 300
1619 while timeout >= 0:
1620 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1621 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1622 configuration_status_list = db_nsr["configurationStatus"]
1623 for index, vca_deployed in enumerate(configuration_status_list):
1624 if index == vca_index:
1625 # myself
1626 continue
1627 if not my_vca.get("member-vnf-index") or (
1628 vca_deployed.get("member-vnf-index")
1629 == my_vca.get("member-vnf-index")
1630 ):
1631 internal_status = configuration_status_list[index].get("status")
1632 if internal_status == "READY":
1633 continue
1634 elif internal_status == "BROKEN":
1635 raise LcmException(
1636 "Configuration aborted because dependent charm/s has failed"
1637 )
1638 else:
1639 break
1640 else:
1641 # no dependencies, return
1642 return
1643 await asyncio.sleep(10)
1644 timeout -= 1
1645
1646 raise LcmException("Configuration aborted because dependent charm/s timeout")
1647
1648 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1649 vca_id = None
1650 if db_vnfr:
1651 vca_id = deep_get(db_vnfr, ("vca-id",))
1652 elif db_nsr:
1653 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1654 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1655 return vca_id
1656
1657 async def instantiate_N2VC(
1658 self,
1659 logging_text,
1660 vca_index,
1661 nsi_id,
1662 db_nsr,
1663 db_vnfr,
1664 vdu_id,
1665 kdu_name,
1666 vdu_index,
1667 config_descriptor,
1668 deploy_params,
1669 base_folder,
1670 nslcmop_id,
1671 stage,
1672 vca_type,
1673 vca_name,
1674 ee_config_descriptor,
1675 ):
1676 nsr_id = db_nsr["_id"]
1677 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1678 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1679 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1680 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1681 db_dict = {
1682 "collection": "nsrs",
1683 "filter": {"_id": nsr_id},
1684 "path": db_update_entry,
1685 }
1686 step = ""
1687 try:
1688
1689 element_type = "NS"
1690 element_under_configuration = nsr_id
1691
1692 vnfr_id = None
1693 if db_vnfr:
1694 vnfr_id = db_vnfr["_id"]
1695 osm_config["osm"]["vnf_id"] = vnfr_id
1696
1697 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1698
1699 if vca_type == "native_charm":
1700 index_number = 0
1701 else:
1702 index_number = vdu_index or 0
1703
1704 if vnfr_id:
1705 element_type = "VNF"
1706 element_under_configuration = vnfr_id
1707 namespace += ".{}-{}".format(vnfr_id, index_number)
1708 if vdu_id:
1709 namespace += ".{}-{}".format(vdu_id, index_number)
1710 element_type = "VDU"
1711 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1712 osm_config["osm"]["vdu_id"] = vdu_id
1713 elif kdu_name:
1714 namespace += ".{}".format(kdu_name)
1715 element_type = "KDU"
1716 element_under_configuration = kdu_name
1717 osm_config["osm"]["kdu_name"] = kdu_name
1718
1719 # Get artifact path
1720 if base_folder["pkg-dir"]:
1721 artifact_path = "{}/{}/{}/{}".format(
1722 base_folder["folder"],
1723 base_folder["pkg-dir"],
1724 "charms"
1725 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1726 else "helm-charts",
1727 vca_name,
1728 )
1729 else:
1730 artifact_path = "{}/Scripts/{}/{}/".format(
1731 base_folder["folder"],
1732 "charms"
1733 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1734 else "helm-charts",
1735 vca_name,
1736 )
1737
1738 self.logger.debug("Artifact path > {}".format(artifact_path))
1739
1740 # get initial_config_primitive_list that applies to this element
1741 initial_config_primitive_list = config_descriptor.get(
1742 "initial-config-primitive"
1743 )
1744
1745 self.logger.debug(
1746 "Initial config primitive list > {}".format(
1747 initial_config_primitive_list
1748 )
1749 )
1750
1751 # add config if not present for NS charm
1752 ee_descriptor_id = ee_config_descriptor.get("id")
1753 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1754 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1755 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1756 )
1757
1758 self.logger.debug(
1759 "Initial config primitive list #2 > {}".format(
1760 initial_config_primitive_list
1761 )
1762 )
1763 # n2vc_redesign STEP 3.1
1764 # find old ee_id if exists
1765 ee_id = vca_deployed.get("ee_id")
1766
1767 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1768 # create or register execution environment in VCA
1769 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1770
1771 self._write_configuration_status(
1772 nsr_id=nsr_id,
1773 vca_index=vca_index,
1774 status="CREATING",
1775 element_under_configuration=element_under_configuration,
1776 element_type=element_type,
1777 )
1778
1779 step = "create execution environment"
1780 self.logger.debug(logging_text + step)
1781
1782 ee_id = None
1783 credentials = None
1784 if vca_type == "k8s_proxy_charm":
1785 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1786 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1787 namespace=namespace,
1788 artifact_path=artifact_path,
1789 db_dict=db_dict,
1790 vca_id=vca_id,
1791 )
1792 elif vca_type == "helm" or vca_type == "helm-v3":
1793 ee_id, credentials = await self.vca_map[
1794 vca_type
1795 ].create_execution_environment(
1796 namespace=namespace,
1797 reuse_ee_id=ee_id,
1798 db_dict=db_dict,
1799 config=osm_config,
1800 artifact_path=artifact_path,
1801 vca_type=vca_type,
1802 )
1803 else:
1804 ee_id, credentials = await self.vca_map[
1805 vca_type
1806 ].create_execution_environment(
1807 namespace=namespace,
1808 reuse_ee_id=ee_id,
1809 db_dict=db_dict,
1810 vca_id=vca_id,
1811 )
1812
1813 elif vca_type == "native_charm":
1814 step = "Waiting to VM being up and getting IP address"
1815 self.logger.debug(logging_text + step)
1816 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1817 logging_text,
1818 nsr_id,
1819 vnfr_id,
1820 vdu_id,
1821 vdu_index,
1822 user=None,
1823 pub_key=None,
1824 )
1825 credentials = {"hostname": rw_mgmt_ip}
1826 # get username
1827 username = deep_get(
1828 config_descriptor, ("config-access", "ssh-access", "default-user")
1829 )
1830 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1831 # merged. Meanwhile let's get username from initial-config-primitive
1832 if not username and initial_config_primitive_list:
1833 for config_primitive in initial_config_primitive_list:
1834 for param in config_primitive.get("parameter", ()):
1835 if param["name"] == "ssh-username":
1836 username = param["value"]
1837 break
1838 if not username:
1839 raise LcmException(
1840 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1841 "'config-access.ssh-access.default-user'"
1842 )
1843 credentials["username"] = username
1844 # n2vc_redesign STEP 3.2
1845
1846 self._write_configuration_status(
1847 nsr_id=nsr_id,
1848 vca_index=vca_index,
1849 status="REGISTERING",
1850 element_under_configuration=element_under_configuration,
1851 element_type=element_type,
1852 )
1853
1854 step = "register execution environment {}".format(credentials)
1855 self.logger.debug(logging_text + step)
1856 ee_id = await self.vca_map[vca_type].register_execution_environment(
1857 credentials=credentials,
1858 namespace=namespace,
1859 db_dict=db_dict,
1860 vca_id=vca_id,
1861 )
1862
1863 # for compatibility with MON/POL modules, the need model and application name at database
1864 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1865 ee_id_parts = ee_id.split(".")
1866 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1867 if len(ee_id_parts) >= 2:
1868 model_name = ee_id_parts[0]
1869 application_name = ee_id_parts[1]
1870 db_nsr_update[db_update_entry + "model"] = model_name
1871 db_nsr_update[db_update_entry + "application"] = application_name
1872
1873 # n2vc_redesign STEP 3.3
1874 step = "Install configuration Software"
1875
1876 self._write_configuration_status(
1877 nsr_id=nsr_id,
1878 vca_index=vca_index,
1879 status="INSTALLING SW",
1880 element_under_configuration=element_under_configuration,
1881 element_type=element_type,
1882 other_update=db_nsr_update,
1883 )
1884
1885 # TODO check if already done
1886 self.logger.debug(logging_text + step)
1887 config = None
1888 if vca_type == "native_charm":
1889 config_primitive = next(
1890 (p for p in initial_config_primitive_list if p["name"] == "config"),
1891 None,
1892 )
1893 if config_primitive:
1894 config = self._map_primitive_params(
1895 config_primitive, {}, deploy_params
1896 )
1897 num_units = 1
1898 if vca_type == "lxc_proxy_charm":
1899 if element_type == "NS":
1900 num_units = db_nsr.get("config-units") or 1
1901 elif element_type == "VNF":
1902 num_units = db_vnfr.get("config-units") or 1
1903 elif element_type == "VDU":
1904 for v in db_vnfr["vdur"]:
1905 if vdu_id == v["vdu-id-ref"]:
1906 num_units = v.get("config-units") or 1
1907 break
1908 if vca_type != "k8s_proxy_charm":
1909 await self.vca_map[vca_type].install_configuration_sw(
1910 ee_id=ee_id,
1911 artifact_path=artifact_path,
1912 db_dict=db_dict,
1913 config=config,
1914 num_units=num_units,
1915 vca_id=vca_id,
1916 vca_type=vca_type,
1917 )
1918
1919 # write in db flag of configuration_sw already installed
1920 self.update_db_2(
1921 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1922 )
1923
1924 # add relations for this VCA (wait for other peers related with this VCA)
1925 await self._add_vca_relations(
1926 logging_text=logging_text,
1927 nsr_id=nsr_id,
1928 vca_type=vca_type,
1929 vca_index=vca_index,
1930 )
1931
1932 # if SSH access is required, then get execution environment SSH public
1933 # if native charm we have waited already to VM be UP
1934 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1935 pub_key = None
1936 user = None
1937 # self.logger.debug("get ssh key block")
1938 if deep_get(
1939 config_descriptor, ("config-access", "ssh-access", "required")
1940 ):
1941 # self.logger.debug("ssh key needed")
1942 # Needed to inject a ssh key
1943 user = deep_get(
1944 config_descriptor,
1945 ("config-access", "ssh-access", "default-user"),
1946 )
1947 step = "Install configuration Software, getting public ssh key"
1948 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1949 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1950 )
1951
1952 step = "Insert public key into VM user={} ssh_key={}".format(
1953 user, pub_key
1954 )
1955 else:
1956 # self.logger.debug("no need to get ssh key")
1957 step = "Waiting to VM being up and getting IP address"
1958 self.logger.debug(logging_text + step)
1959
1960 # n2vc_redesign STEP 5.1
1961 # wait for RO (ip-address) Insert pub_key into VM
1962 if vnfr_id:
1963 if kdu_name:
1964 rw_mgmt_ip = await self.wait_kdu_up(
1965 logging_text, nsr_id, vnfr_id, kdu_name
1966 )
1967 else:
1968 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1969 logging_text,
1970 nsr_id,
1971 vnfr_id,
1972 vdu_id,
1973 vdu_index,
1974 user=user,
1975 pub_key=pub_key,
1976 )
1977 else:
1978 rw_mgmt_ip = None # This is for a NS configuration
1979
1980 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1981
1982 # store rw_mgmt_ip in deploy params for later replacement
1983 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1984
1985 # n2vc_redesign STEP 6 Execute initial config primitive
1986 step = "execute initial config primitive"
1987
1988 # wait for dependent primitives execution (NS -> VNF -> VDU)
1989 if initial_config_primitive_list:
1990 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1991
1992 # stage, in function of element type: vdu, kdu, vnf or ns
1993 my_vca = vca_deployed_list[vca_index]
1994 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1995 # VDU or KDU
1996 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1997 elif my_vca.get("member-vnf-index"):
1998 # VNF
1999 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2000 else:
2001 # NS
2002 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2003
2004 self._write_configuration_status(
2005 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2006 )
2007
2008 self._write_op_status(op_id=nslcmop_id, stage=stage)
2009
2010 check_if_terminated_needed = True
2011 for initial_config_primitive in initial_config_primitive_list:
2012 # adding information on the vca_deployed if it is a NS execution environment
2013 if not vca_deployed["member-vnf-index"]:
2014 deploy_params["ns_config_info"] = json.dumps(
2015 self._get_ns_config_info(nsr_id)
2016 )
2017 # TODO check if already done
2018 primitive_params_ = self._map_primitive_params(
2019 initial_config_primitive, {}, deploy_params
2020 )
2021
2022 step = "execute primitive '{}' params '{}'".format(
2023 initial_config_primitive["name"], primitive_params_
2024 )
2025 self.logger.debug(logging_text + step)
2026 await self.vca_map[vca_type].exec_primitive(
2027 ee_id=ee_id,
2028 primitive_name=initial_config_primitive["name"],
2029 params_dict=primitive_params_,
2030 db_dict=db_dict,
2031 vca_id=vca_id,
2032 vca_type=vca_type,
2033 )
2034 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2035 if check_if_terminated_needed:
2036 if config_descriptor.get("terminate-config-primitive"):
2037 self.update_db_2(
2038 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2039 )
2040 check_if_terminated_needed = False
2041
2042 # TODO register in database that primitive is done
2043
2044 # STEP 7 Configure metrics
2045 if vca_type == "helm" or vca_type == "helm-v3":
2046 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2047 ee_id=ee_id,
2048 artifact_path=artifact_path,
2049 ee_config_descriptor=ee_config_descriptor,
2050 vnfr_id=vnfr_id,
2051 nsr_id=nsr_id,
2052 target_ip=rw_mgmt_ip,
2053 )
2054 if prometheus_jobs:
2055 self.update_db_2(
2056 "nsrs",
2057 nsr_id,
2058 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2059 )
2060
2061 for job in prometheus_jobs:
2062 self.db.set_one(
2063 "prometheus_jobs",
2064 {
2065 "job_name": job["job_name"]
2066 },
2067 job,
2068 upsert=True,
2069 fail_on_empty=False
2070 )
2071
2072 step = "instantiated at VCA"
2073 self.logger.debug(logging_text + step)
2074
2075 self._write_configuration_status(
2076 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2077 )
2078
2079 except Exception as e: # TODO not use Exception but N2VC exception
2080 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2081 if not isinstance(
2082 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2083 ):
2084 self.logger.error(
2085 "Exception while {} : {}".format(step, e), exc_info=True
2086 )
2087 self._write_configuration_status(
2088 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2089 )
2090 raise LcmException("{} {}".format(step, e)) from e
2091
2092 def _write_ns_status(
2093 self,
2094 nsr_id: str,
2095 ns_state: str,
2096 current_operation: str,
2097 current_operation_id: str,
2098 error_description: str = None,
2099 error_detail: str = None,
2100 other_update: dict = None,
2101 ):
2102 """
2103 Update db_nsr fields.
2104 :param nsr_id:
2105 :param ns_state:
2106 :param current_operation:
2107 :param current_operation_id:
2108 :param error_description:
2109 :param error_detail:
2110 :param other_update: Other required changes at database if provided, will be cleared
2111 :return:
2112 """
2113 try:
2114 db_dict = other_update or {}
2115 db_dict[
2116 "_admin.nslcmop"
2117 ] = current_operation_id # for backward compatibility
2118 db_dict["_admin.current-operation"] = current_operation_id
2119 db_dict["_admin.operation-type"] = (
2120 current_operation if current_operation != "IDLE" else None
2121 )
2122 db_dict["currentOperation"] = current_operation
2123 db_dict["currentOperationID"] = current_operation_id
2124 db_dict["errorDescription"] = error_description
2125 db_dict["errorDetail"] = error_detail
2126
2127 if ns_state:
2128 db_dict["nsState"] = ns_state
2129 self.update_db_2("nsrs", nsr_id, db_dict)
2130 except DbException as e:
2131 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2132
2133 def _write_op_status(
2134 self,
2135 op_id: str,
2136 stage: list = None,
2137 error_message: str = None,
2138 queuePosition: int = 0,
2139 operation_state: str = None,
2140 other_update: dict = None,
2141 ):
2142 try:
2143 db_dict = other_update or {}
2144 db_dict["queuePosition"] = queuePosition
2145 if isinstance(stage, list):
2146 db_dict["stage"] = stage[0]
2147 db_dict["detailed-status"] = " ".join(stage)
2148 elif stage is not None:
2149 db_dict["stage"] = str(stage)
2150
2151 if error_message is not None:
2152 db_dict["errorMessage"] = error_message
2153 if operation_state is not None:
2154 db_dict["operationState"] = operation_state
2155 db_dict["statusEnteredTime"] = time()
2156 self.update_db_2("nslcmops", op_id, db_dict)
2157 except DbException as e:
2158 self.logger.warn(
2159 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2160 )
2161
2162 def _write_all_config_status(self, db_nsr: dict, status: str):
2163 try:
2164 nsr_id = db_nsr["_id"]
2165 # configurationStatus
2166 config_status = db_nsr.get("configurationStatus")
2167 if config_status:
2168 db_nsr_update = {
2169 "configurationStatus.{}.status".format(index): status
2170 for index, v in enumerate(config_status)
2171 if v
2172 }
2173 # update status
2174 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2175
2176 except DbException as e:
2177 self.logger.warn(
2178 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2179 )
2180
2181 def _write_configuration_status(
2182 self,
2183 nsr_id: str,
2184 vca_index: int,
2185 status: str = None,
2186 element_under_configuration: str = None,
2187 element_type: str = None,
2188 other_update: dict = None,
2189 ):
2190
2191 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2192 # .format(vca_index, status))
2193
2194 try:
2195 db_path = "configurationStatus.{}.".format(vca_index)
2196 db_dict = other_update or {}
2197 if status:
2198 db_dict[db_path + "status"] = status
2199 if element_under_configuration:
2200 db_dict[
2201 db_path + "elementUnderConfiguration"
2202 ] = element_under_configuration
2203 if element_type:
2204 db_dict[db_path + "elementType"] = element_type
2205 self.update_db_2("nsrs", nsr_id, db_dict)
2206 except DbException as e:
2207 self.logger.warn(
2208 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2209 status, nsr_id, vca_index, e
2210 )
2211 )
2212
2213 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2214 """
2215 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2216 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2217 Database is used because the result can be obtained from a different LCM worker in case of HA.
2218 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2219 :param db_nslcmop: database content of nslcmop
2220 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2221 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2222 computed 'vim-account-id'
2223 """
2224 modified = False
2225 nslcmop_id = db_nslcmop["_id"]
2226 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2227 if placement_engine == "PLA":
2228 self.logger.debug(
2229 logging_text + "Invoke and wait for placement optimization"
2230 )
2231 await self.msg.aiowrite(
2232 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2233 )
2234 db_poll_interval = 5
2235 wait = db_poll_interval * 10
2236 pla_result = None
2237 while not pla_result and wait >= 0:
2238 await asyncio.sleep(db_poll_interval)
2239 wait -= db_poll_interval
2240 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2241 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2242
2243 if not pla_result:
2244 raise LcmException(
2245 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2246 )
2247
2248 for pla_vnf in pla_result["vnf"]:
2249 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2250 if not pla_vnf.get("vimAccountId") or not vnfr:
2251 continue
2252 modified = True
2253 self.db.set_one(
2254 "vnfrs",
2255 {"_id": vnfr["_id"]},
2256 {"vim-account-id": pla_vnf["vimAccountId"]},
2257 )
2258 # Modifies db_vnfrs
2259 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2260 return modified
2261
2262 def update_nsrs_with_pla_result(self, params):
2263 try:
2264 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2265 self.update_db_2(
2266 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2267 )
2268 except Exception as e:
2269 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2270
2271 async def instantiate(self, nsr_id, nslcmop_id):
2272 """
2273
2274 :param nsr_id: ns instance to deploy
2275 :param nslcmop_id: operation to run
2276 :return:
2277 """
2278
2279 # Try to lock HA task here
2280 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2281 if not task_is_locked_by_me:
2282 self.logger.debug(
2283 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2284 )
2285 return
2286
2287 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2288 self.logger.debug(logging_text + "Enter")
2289
2290 # get all needed from database
2291
2292 # database nsrs record
2293 db_nsr = None
2294
2295 # database nslcmops record
2296 db_nslcmop = None
2297
2298 # update operation on nsrs
2299 db_nsr_update = {}
2300 # update operation on nslcmops
2301 db_nslcmop_update = {}
2302
2303 nslcmop_operation_state = None
2304 db_vnfrs = {} # vnf's info indexed by member-index
2305 # n2vc_info = {}
2306 tasks_dict_info = {} # from task to info text
2307 exc = None
2308 error_list = []
2309 stage = [
2310 "Stage 1/5: preparation of the environment.",
2311 "Waiting for previous operations to terminate.",
2312 "",
2313 ]
2314 # ^ stage, step, VIM progress
2315 try:
2316 # wait for any previous tasks in process
2317 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2318
2319 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2320 stage[1] = "Reading from database."
2321 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2322 db_nsr_update["detailed-status"] = "creating"
2323 db_nsr_update["operational-status"] = "init"
2324 self._write_ns_status(
2325 nsr_id=nsr_id,
2326 ns_state="BUILDING",
2327 current_operation="INSTANTIATING",
2328 current_operation_id=nslcmop_id,
2329 other_update=db_nsr_update,
2330 )
2331 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2332
2333 # read from db: operation
2334 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2335 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2336 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2337 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2338 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2339 )
2340 ns_params = db_nslcmop.get("operationParams")
2341 if ns_params and ns_params.get("timeout_ns_deploy"):
2342 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2343 else:
2344 timeout_ns_deploy = self.timeout.get(
2345 "ns_deploy", self.timeout_ns_deploy
2346 )
2347
2348 # read from db: ns
2349 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2350 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2351 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2352 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2353 self.fs.sync(db_nsr["nsd-id"])
2354 db_nsr["nsd"] = nsd
2355 # nsr_name = db_nsr["name"] # TODO short-name??
2356
2357 # read from db: vnf's of this ns
2358 stage[1] = "Getting vnfrs from db."
2359 self.logger.debug(logging_text + stage[1])
2360 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2361
2362 # read from db: vnfd's for every vnf
2363 db_vnfds = [] # every vnfd data
2364
2365 # for each vnf in ns, read vnfd
2366 for vnfr in db_vnfrs_list:
2367 if vnfr.get("kdur"):
2368 kdur_list = []
2369 for kdur in vnfr["kdur"]:
2370 if kdur.get("additionalParams"):
2371 kdur["additionalParams"] = json.loads(
2372 kdur["additionalParams"]
2373 )
2374 kdur_list.append(kdur)
2375 vnfr["kdur"] = kdur_list
2376
2377 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2378 vnfd_id = vnfr["vnfd-id"]
2379 vnfd_ref = vnfr["vnfd-ref"]
2380 self.fs.sync(vnfd_id)
2381
2382 # if we haven't this vnfd, read it from db
2383 if vnfd_id not in db_vnfds:
2384 # read from db
2385 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2386 vnfd_id, vnfd_ref
2387 )
2388 self.logger.debug(logging_text + stage[1])
2389 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2390
2391 # store vnfd
2392 db_vnfds.append(vnfd)
2393
2394 # Get or generates the _admin.deployed.VCA list
2395 vca_deployed_list = None
2396 if db_nsr["_admin"].get("deployed"):
2397 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2398 if vca_deployed_list is None:
2399 vca_deployed_list = []
2400 configuration_status_list = []
2401 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2402 db_nsr_update["configurationStatus"] = configuration_status_list
2403 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2404 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2405 elif isinstance(vca_deployed_list, dict):
2406 # maintain backward compatibility. Change a dict to list at database
2407 vca_deployed_list = list(vca_deployed_list.values())
2408 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2409 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2410
2411 if not isinstance(
2412 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2413 ):
2414 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2415 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2416
2417 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2418 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2419 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2420 self.db.set_list(
2421 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2422 )
2423
2424 # n2vc_redesign STEP 2 Deploy Network Scenario
2425 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2426 self._write_op_status(op_id=nslcmop_id, stage=stage)
2427
2428 stage[1] = "Deploying KDUs."
2429 # self.logger.debug(logging_text + "Before deploy_kdus")
2430 # Call to deploy_kdus in case exists the "vdu:kdu" param
2431 await self.deploy_kdus(
2432 logging_text=logging_text,
2433 nsr_id=nsr_id,
2434 nslcmop_id=nslcmop_id,
2435 db_vnfrs=db_vnfrs,
2436 db_vnfds=db_vnfds,
2437 task_instantiation_info=tasks_dict_info,
2438 )
2439
2440 stage[1] = "Getting VCA public key."
2441 # n2vc_redesign STEP 1 Get VCA public ssh-key
2442 # feature 1429. Add n2vc public key to needed VMs
2443 n2vc_key = self.n2vc.get_public_key()
2444 n2vc_key_list = [n2vc_key]
2445 if self.vca_config.get("public_key"):
2446 n2vc_key_list.append(self.vca_config["public_key"])
2447
2448 stage[1] = "Deploying NS at VIM."
2449 task_ro = asyncio.ensure_future(
2450 self.instantiate_RO(
2451 logging_text=logging_text,
2452 nsr_id=nsr_id,
2453 nsd=nsd,
2454 db_nsr=db_nsr,
2455 db_nslcmop=db_nslcmop,
2456 db_vnfrs=db_vnfrs,
2457 db_vnfds=db_vnfds,
2458 n2vc_key_list=n2vc_key_list,
2459 stage=stage,
2460 )
2461 )
2462 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2463 tasks_dict_info[task_ro] = "Deploying at VIM"
2464
2465 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2466 stage[1] = "Deploying Execution Environments."
2467 self.logger.debug(logging_text + stage[1])
2468
2469 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2470 for vnf_profile in get_vnf_profiles(nsd):
2471 vnfd_id = vnf_profile["vnfd-id"]
2472 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2473 member_vnf_index = str(vnf_profile["id"])
2474 db_vnfr = db_vnfrs[member_vnf_index]
2475 base_folder = vnfd["_admin"]["storage"]
2476 vdu_id = None
2477 vdu_index = 0
2478 vdu_name = None
2479 kdu_name = None
2480
2481 # Get additional parameters
2482 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2483 if db_vnfr.get("additionalParamsForVnf"):
2484 deploy_params.update(
2485 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2486 )
2487
2488 descriptor_config = get_configuration(vnfd, vnfd["id"])
2489 if descriptor_config:
2490 self._deploy_n2vc(
2491 logging_text=logging_text
2492 + "member_vnf_index={} ".format(member_vnf_index),
2493 db_nsr=db_nsr,
2494 db_vnfr=db_vnfr,
2495 nslcmop_id=nslcmop_id,
2496 nsr_id=nsr_id,
2497 nsi_id=nsi_id,
2498 vnfd_id=vnfd_id,
2499 vdu_id=vdu_id,
2500 kdu_name=kdu_name,
2501 member_vnf_index=member_vnf_index,
2502 vdu_index=vdu_index,
2503 vdu_name=vdu_name,
2504 deploy_params=deploy_params,
2505 descriptor_config=descriptor_config,
2506 base_folder=base_folder,
2507 task_instantiation_info=tasks_dict_info,
2508 stage=stage,
2509 )
2510
2511 # Deploy charms for each VDU that supports one.
2512 for vdud in get_vdu_list(vnfd):
2513 vdu_id = vdud["id"]
2514 descriptor_config = get_configuration(vnfd, vdu_id)
2515 vdur = find_in_list(
2516 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2517 )
2518
2519 if vdur.get("additionalParams"):
2520 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2521 else:
2522 deploy_params_vdu = deploy_params
2523 deploy_params_vdu["OSM"] = get_osm_params(
2524 db_vnfr, vdu_id, vdu_count_index=0
2525 )
2526 vdud_count = get_number_of_instances(vnfd, vdu_id)
2527
2528 self.logger.debug("VDUD > {}".format(vdud))
2529 self.logger.debug(
2530 "Descriptor config > {}".format(descriptor_config)
2531 )
2532 if descriptor_config:
2533 vdu_name = None
2534 kdu_name = None
2535 for vdu_index in range(vdud_count):
2536 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2537 self._deploy_n2vc(
2538 logging_text=logging_text
2539 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2540 member_vnf_index, vdu_id, vdu_index
2541 ),
2542 db_nsr=db_nsr,
2543 db_vnfr=db_vnfr,
2544 nslcmop_id=nslcmop_id,
2545 nsr_id=nsr_id,
2546 nsi_id=nsi_id,
2547 vnfd_id=vnfd_id,
2548 vdu_id=vdu_id,
2549 kdu_name=kdu_name,
2550 member_vnf_index=member_vnf_index,
2551 vdu_index=vdu_index,
2552 vdu_name=vdu_name,
2553 deploy_params=deploy_params_vdu,
2554 descriptor_config=descriptor_config,
2555 base_folder=base_folder,
2556 task_instantiation_info=tasks_dict_info,
2557 stage=stage,
2558 )
2559 for kdud in get_kdu_list(vnfd):
2560 kdu_name = kdud["name"]
2561 descriptor_config = get_configuration(vnfd, kdu_name)
2562 if descriptor_config:
2563 vdu_id = None
2564 vdu_index = 0
2565 vdu_name = None
2566 kdur = next(
2567 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2568 )
2569 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2570 if kdur.get("additionalParams"):
2571 deploy_params_kdu.update(
2572 parse_yaml_strings(kdur["additionalParams"].copy())
2573 )
2574
2575 self._deploy_n2vc(
2576 logging_text=logging_text,
2577 db_nsr=db_nsr,
2578 db_vnfr=db_vnfr,
2579 nslcmop_id=nslcmop_id,
2580 nsr_id=nsr_id,
2581 nsi_id=nsi_id,
2582 vnfd_id=vnfd_id,
2583 vdu_id=vdu_id,
2584 kdu_name=kdu_name,
2585 member_vnf_index=member_vnf_index,
2586 vdu_index=vdu_index,
2587 vdu_name=vdu_name,
2588 deploy_params=deploy_params_kdu,
2589 descriptor_config=descriptor_config,
2590 base_folder=base_folder,
2591 task_instantiation_info=tasks_dict_info,
2592 stage=stage,
2593 )
2594
2595 # Check if this NS has a charm configuration
2596 descriptor_config = nsd.get("ns-configuration")
2597 if descriptor_config and descriptor_config.get("juju"):
2598 vnfd_id = None
2599 db_vnfr = None
2600 member_vnf_index = None
2601 vdu_id = None
2602 kdu_name = None
2603 vdu_index = 0
2604 vdu_name = None
2605
2606 # Get additional parameters
2607 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2608 if db_nsr.get("additionalParamsForNs"):
2609 deploy_params.update(
2610 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2611 )
2612 base_folder = nsd["_admin"]["storage"]
2613 self._deploy_n2vc(
2614 logging_text=logging_text,
2615 db_nsr=db_nsr,
2616 db_vnfr=db_vnfr,
2617 nslcmop_id=nslcmop_id,
2618 nsr_id=nsr_id,
2619 nsi_id=nsi_id,
2620 vnfd_id=vnfd_id,
2621 vdu_id=vdu_id,
2622 kdu_name=kdu_name,
2623 member_vnf_index=member_vnf_index,
2624 vdu_index=vdu_index,
2625 vdu_name=vdu_name,
2626 deploy_params=deploy_params,
2627 descriptor_config=descriptor_config,
2628 base_folder=base_folder,
2629 task_instantiation_info=tasks_dict_info,
2630 stage=stage,
2631 )
2632
2633 # rest of staff will be done at finally
2634
2635 except (
2636 ROclient.ROClientException,
2637 DbException,
2638 LcmException,
2639 N2VCException,
2640 ) as e:
2641 self.logger.error(
2642 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2643 )
2644 exc = e
2645 except asyncio.CancelledError:
2646 self.logger.error(
2647 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2648 )
2649 exc = "Operation was cancelled"
2650 except Exception as e:
2651 exc = traceback.format_exc()
2652 self.logger.critical(
2653 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2654 exc_info=True,
2655 )
2656 finally:
2657 if exc:
2658 error_list.append(str(exc))
2659 try:
2660 # wait for pending tasks
2661 if tasks_dict_info:
2662 stage[1] = "Waiting for instantiate pending tasks."
2663 self.logger.debug(logging_text + stage[1])
2664 error_list += await self._wait_for_tasks(
2665 logging_text,
2666 tasks_dict_info,
2667 timeout_ns_deploy,
2668 stage,
2669 nslcmop_id,
2670 nsr_id=nsr_id,
2671 )
2672 stage[1] = stage[2] = ""
2673 except asyncio.CancelledError:
2674 error_list.append("Cancelled")
2675 # TODO cancel all tasks
2676 except Exception as exc:
2677 error_list.append(str(exc))
2678
2679 # update operation-status
2680 db_nsr_update["operational-status"] = "running"
2681 # let's begin with VCA 'configured' status (later we can change it)
2682 db_nsr_update["config-status"] = "configured"
2683 for task, task_name in tasks_dict_info.items():
2684 if not task.done() or task.cancelled() or task.exception():
2685 if task_name.startswith(self.task_name_deploy_vca):
2686 # A N2VC task is pending
2687 db_nsr_update["config-status"] = "failed"
2688 else:
2689 # RO or KDU task is pending
2690 db_nsr_update["operational-status"] = "failed"
2691
2692 # update status at database
2693 if error_list:
2694 error_detail = ". ".join(error_list)
2695 self.logger.error(logging_text + error_detail)
2696 error_description_nslcmop = "{} Detail: {}".format(
2697 stage[0], error_detail
2698 )
2699 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2700 nslcmop_id, stage[0]
2701 )
2702
2703 db_nsr_update["detailed-status"] = (
2704 error_description_nsr + " Detail: " + error_detail
2705 )
2706 db_nslcmop_update["detailed-status"] = error_detail
2707 nslcmop_operation_state = "FAILED"
2708 ns_state = "BROKEN"
2709 else:
2710 error_detail = None
2711 error_description_nsr = error_description_nslcmop = None
2712 ns_state = "READY"
2713 db_nsr_update["detailed-status"] = "Done"
2714 db_nslcmop_update["detailed-status"] = "Done"
2715 nslcmop_operation_state = "COMPLETED"
2716
2717 if db_nsr:
2718 self._write_ns_status(
2719 nsr_id=nsr_id,
2720 ns_state=ns_state,
2721 current_operation="IDLE",
2722 current_operation_id=None,
2723 error_description=error_description_nsr,
2724 error_detail=error_detail,
2725 other_update=db_nsr_update,
2726 )
2727 self._write_op_status(
2728 op_id=nslcmop_id,
2729 stage="",
2730 error_message=error_description_nslcmop,
2731 operation_state=nslcmop_operation_state,
2732 other_update=db_nslcmop_update,
2733 )
2734
2735 if nslcmop_operation_state:
2736 try:
2737 await self.msg.aiowrite(
2738 "ns",
2739 "instantiated",
2740 {
2741 "nsr_id": nsr_id,
2742 "nslcmop_id": nslcmop_id,
2743 "operationState": nslcmop_operation_state,
2744 },
2745 loop=self.loop,
2746 )
2747 except Exception as e:
2748 self.logger.error(
2749 logging_text + "kafka_write notification Exception {}".format(e)
2750 )
2751
2752 self.logger.debug(logging_text + "Exit")
2753 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2754
2755 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2756 if vnfd_id not in cached_vnfds:
2757 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2758 return cached_vnfds[vnfd_id]
2759
2760 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2761 if vnf_profile_id not in cached_vnfrs:
2762 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2763 "vnfrs",
2764 {
2765 "member-vnf-index-ref": vnf_profile_id,
2766 "nsr-id-ref": nsr_id,
2767 },
2768 )
2769 return cached_vnfrs[vnf_profile_id]
2770
2771 def _is_deployed_vca_in_relation(
2772 self, vca: DeployedVCA, relation: Relation
2773 ) -> bool:
2774 found = False
2775 for endpoint in (relation.provider, relation.requirer):
2776 if endpoint["kdu-resource-profile-id"]:
2777 continue
2778 found = (
2779 vca.vnf_profile_id == endpoint.vnf_profile_id
2780 and vca.vdu_profile_id == endpoint.vdu_profile_id
2781 and vca.execution_environment_ref == endpoint.execution_environment_ref
2782 )
2783 if found:
2784 break
2785 return found
2786
2787 def _update_ee_relation_data_with_implicit_data(
2788 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2789 ):
2790 ee_relation_data = safe_get_ee_relation(
2791 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2792 )
2793 ee_relation_level = EELevel.get_level(ee_relation_data)
2794 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2795 "execution-environment-ref"
2796 ]:
2797 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2798 vnfd_id = vnf_profile["vnfd-id"]
2799 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2800 entity_id = (
2801 vnfd_id
2802 if ee_relation_level == EELevel.VNF
2803 else ee_relation_data["vdu-profile-id"]
2804 )
2805 ee = get_juju_ee_ref(db_vnfd, entity_id)
2806 if not ee:
2807 raise Exception(
2808 f"not execution environments found for ee_relation {ee_relation_data}"
2809 )
2810 ee_relation_data["execution-environment-ref"] = ee["id"]
2811 return ee_relation_data
2812
2813 def _get_ns_relations(
2814 self,
2815 nsr_id: str,
2816 nsd: Dict[str, Any],
2817 vca: DeployedVCA,
2818 cached_vnfds: Dict[str, Any],
2819 ) -> List[Relation]:
2820 relations = []
2821 db_ns_relations = get_ns_configuration_relation_list(nsd)
2822 for r in db_ns_relations:
2823 provider_dict = None
2824 requirer_dict = None
2825 if all(key in r for key in ("provider", "requirer")):
2826 provider_dict = r["provider"]
2827 requirer_dict = r["requirer"]
2828 elif "entities" in r:
2829 provider_id = r["entities"][0]["id"]
2830 provider_dict = {
2831 "nsr-id": nsr_id,
2832 "endpoint": r["entities"][0]["endpoint"],
2833 }
2834 if provider_id != nsd["id"]:
2835 provider_dict["vnf-profile-id"] = provider_id
2836 requirer_id = r["entities"][1]["id"]
2837 requirer_dict = {
2838 "nsr-id": nsr_id,
2839 "endpoint": r["entities"][1]["endpoint"],
2840 }
2841 if requirer_id != nsd["id"]:
2842 requirer_dict["vnf-profile-id"] = requirer_id
2843 else:
2844 raise Exception("provider/requirer or entities must be included in the relation.")
2845 relation_provider = self._update_ee_relation_data_with_implicit_data(
2846 nsr_id, nsd, provider_dict, cached_vnfds
2847 )
2848 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2849 nsr_id, nsd, requirer_dict, cached_vnfds
2850 )
2851 provider = EERelation(relation_provider)
2852 requirer = EERelation(relation_requirer)
2853 relation = Relation(r["name"], provider, requirer)
2854 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2855 if vca_in_relation:
2856 relations.append(relation)
2857 return relations
2858
2859 def _get_vnf_relations(
2860 self,
2861 nsr_id: str,
2862 nsd: Dict[str, Any],
2863 vca: DeployedVCA,
2864 cached_vnfds: Dict[str, Any],
2865 ) -> List[Relation]:
2866 relations = []
2867 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2868 vnf_profile_id = vnf_profile["id"]
2869 vnfd_id = vnf_profile["vnfd-id"]
2870 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2871 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2872 for r in db_vnf_relations:
2873 provider_dict = None
2874 requirer_dict = None
2875 if all(key in r for key in ("provider", "requirer")):
2876 provider_dict = r["provider"]
2877 requirer_dict = r["requirer"]
2878 elif "entities" in r:
2879 provider_id = r["entities"][0]["id"]
2880 provider_dict = {
2881 "nsr-id": nsr_id,
2882 "vnf-profile-id": vnf_profile_id,
2883 "endpoint": r["entities"][0]["endpoint"],
2884 }
2885 if provider_id != vnfd_id:
2886 provider_dict["vdu-profile-id"] = provider_id
2887 requirer_id = r["entities"][1]["id"]
2888 requirer_dict = {
2889 "nsr-id": nsr_id,
2890 "vnf-profile-id": vnf_profile_id,
2891 "endpoint": r["entities"][1]["endpoint"],
2892 }
2893 if requirer_id != vnfd_id:
2894 requirer_dict["vdu-profile-id"] = requirer_id
2895 else:
2896 raise Exception("provider/requirer or entities must be included in the relation.")
2897 relation_provider = self._update_ee_relation_data_with_implicit_data(
2898 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2899 )
2900 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2901 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2902 )
2903 provider = EERelation(relation_provider)
2904 requirer = EERelation(relation_requirer)
2905 relation = Relation(r["name"], provider, requirer)
2906 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2907 if vca_in_relation:
2908 relations.append(relation)
2909 return relations
2910
2911 def _get_kdu_resource_data(
2912 self,
2913 ee_relation: EERelation,
2914 db_nsr: Dict[str, Any],
2915 cached_vnfds: Dict[str, Any],
2916 ) -> DeployedK8sResource:
2917 nsd = get_nsd(db_nsr)
2918 vnf_profiles = get_vnf_profiles(nsd)
2919 vnfd_id = find_in_list(
2920 vnf_profiles,
2921 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2922 )["vnfd-id"]
2923 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2924 kdu_resource_profile = get_kdu_resource_profile(
2925 db_vnfd, ee_relation.kdu_resource_profile_id
2926 )
2927 kdu_name = kdu_resource_profile["kdu-name"]
2928 deployed_kdu, _ = get_deployed_kdu(
2929 db_nsr.get("_admin", ()).get("deployed", ()),
2930 kdu_name,
2931 ee_relation.vnf_profile_id,
2932 )
2933 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2934 return deployed_kdu
2935
2936 def _get_deployed_component(
2937 self,
2938 ee_relation: EERelation,
2939 db_nsr: Dict[str, Any],
2940 cached_vnfds: Dict[str, Any],
2941 ) -> DeployedComponent:
2942 nsr_id = db_nsr["_id"]
2943 deployed_component = None
2944 ee_level = EELevel.get_level(ee_relation)
2945 if ee_level == EELevel.NS:
2946 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2947 if vca:
2948 deployed_component = DeployedVCA(nsr_id, vca)
2949 elif ee_level == EELevel.VNF:
2950 vca = get_deployed_vca(
2951 db_nsr,
2952 {
2953 "vdu_id": None,
2954 "member-vnf-index": ee_relation.vnf_profile_id,
2955 "ee_descriptor_id": ee_relation.execution_environment_ref,
2956 },
2957 )
2958 if vca:
2959 deployed_component = DeployedVCA(nsr_id, vca)
2960 elif ee_level == EELevel.VDU:
2961 vca = get_deployed_vca(
2962 db_nsr,
2963 {
2964 "vdu_id": ee_relation.vdu_profile_id,
2965 "member-vnf-index": ee_relation.vnf_profile_id,
2966 "ee_descriptor_id": ee_relation.execution_environment_ref,
2967 },
2968 )
2969 if vca:
2970 deployed_component = DeployedVCA(nsr_id, vca)
2971 elif ee_level == EELevel.KDU:
2972 kdu_resource_data = self._get_kdu_resource_data(
2973 ee_relation, db_nsr, cached_vnfds
2974 )
2975 if kdu_resource_data:
2976 deployed_component = DeployedK8sResource(kdu_resource_data)
2977 return deployed_component
2978
2979 async def _add_relation(
2980 self,
2981 relation: Relation,
2982 vca_type: str,
2983 db_nsr: Dict[str, Any],
2984 cached_vnfds: Dict[str, Any],
2985 cached_vnfrs: Dict[str, Any],
2986 ) -> bool:
2987 deployed_provider = self._get_deployed_component(
2988 relation.provider, db_nsr, cached_vnfds
2989 )
2990 deployed_requirer = self._get_deployed_component(
2991 relation.requirer, db_nsr, cached_vnfds
2992 )
2993 if (
2994 deployed_provider
2995 and deployed_requirer
2996 and deployed_provider.config_sw_installed
2997 and deployed_requirer.config_sw_installed
2998 ):
2999 provider_db_vnfr = (
3000 self._get_vnfr(
3001 relation.provider.nsr_id,
3002 relation.provider.vnf_profile_id,
3003 cached_vnfrs,
3004 )
3005 if relation.provider.vnf_profile_id
3006 else None
3007 )
3008 requirer_db_vnfr = (
3009 self._get_vnfr(
3010 relation.requirer.nsr_id,
3011 relation.requirer.vnf_profile_id,
3012 cached_vnfrs,
3013 )
3014 if relation.requirer.vnf_profile_id
3015 else None
3016 )
3017 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3018 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3019 provider_relation_endpoint = RelationEndpoint(
3020 deployed_provider.ee_id,
3021 provider_vca_id,
3022 relation.provider.endpoint,
3023 )
3024 requirer_relation_endpoint = RelationEndpoint(
3025 deployed_requirer.ee_id,
3026 requirer_vca_id,
3027 relation.requirer.endpoint,
3028 )
3029 await self.vca_map[vca_type].add_relation(
3030 provider=provider_relation_endpoint,
3031 requirer=requirer_relation_endpoint,
3032 )
3033 # remove entry from relations list
3034 return True
3035 return False
3036
3037 async def _add_vca_relations(
3038 self,
3039 logging_text,
3040 nsr_id,
3041 vca_type: str,
3042 vca_index: int,
3043 timeout: int = 3600,
3044 ) -> bool:
3045
3046 # steps:
3047 # 1. find all relations for this VCA
3048 # 2. wait for other peers related
3049 # 3. add relations
3050
3051 try:
3052 # STEP 1: find all relations for this VCA
3053
3054 # read nsr record
3055 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3056 nsd = get_nsd(db_nsr)
3057
3058 # this VCA data
3059 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3060 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3061
3062 cached_vnfds = {}
3063 cached_vnfrs = {}
3064 relations = []
3065 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3066 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3067
3068 # if no relations, terminate
3069 if not relations:
3070 self.logger.debug(logging_text + " No relations")
3071 return True
3072
3073 self.logger.debug(logging_text + " adding relations {}".format(relations))
3074
3075 # add all relations
3076 start = time()
3077 while True:
3078 # check timeout
3079 now = time()
3080 if now - start >= timeout:
3081 self.logger.error(logging_text + " : timeout adding relations")
3082 return False
3083
3084 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3085 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3086
3087 # for each relation, find the VCA's related
3088 for relation in relations.copy():
3089 added = await self._add_relation(
3090 relation,
3091 vca_type,
3092 db_nsr,
3093 cached_vnfds,
3094 cached_vnfrs,
3095 )
3096 if added:
3097 relations.remove(relation)
3098
3099 if not relations:
3100 self.logger.debug("Relations added")
3101 break
3102 await asyncio.sleep(5.0)
3103
3104 return True
3105
3106 except Exception as e:
3107 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3108 return False
3109
3110 async def _install_kdu(
3111 self,
3112 nsr_id: str,
3113 nsr_db_path: str,
3114 vnfr_data: dict,
3115 kdu_index: int,
3116 kdud: dict,
3117 vnfd: dict,
3118 k8s_instance_info: dict,
3119 k8params: dict = None,
3120 timeout: int = 600,
3121 vca_id: str = None,
3122 ):
3123
3124 try:
3125 k8sclustertype = k8s_instance_info["k8scluster-type"]
3126 # Instantiate kdu
3127 db_dict_install = {
3128 "collection": "nsrs",
3129 "filter": {"_id": nsr_id},
3130 "path": nsr_db_path,
3131 }
3132
3133 if k8s_instance_info.get("kdu-deployment-name"):
3134 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3135 else:
3136 kdu_instance = self.k8scluster_map[
3137 k8sclustertype
3138 ].generate_kdu_instance_name(
3139 db_dict=db_dict_install,
3140 kdu_model=k8s_instance_info["kdu-model"],
3141 kdu_name=k8s_instance_info["kdu-name"],
3142 )
3143 self.update_db_2(
3144 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3145 )
3146 await self.k8scluster_map[k8sclustertype].install(
3147 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3148 kdu_model=k8s_instance_info["kdu-model"],
3149 atomic=True,
3150 params=k8params,
3151 db_dict=db_dict_install,
3152 timeout=timeout,
3153 kdu_name=k8s_instance_info["kdu-name"],
3154 namespace=k8s_instance_info["namespace"],
3155 kdu_instance=kdu_instance,
3156 vca_id=vca_id,
3157 )
3158 self.update_db_2(
3159 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3160 )
3161
3162 # Obtain services to obtain management service ip
3163 services = await self.k8scluster_map[k8sclustertype].get_services(
3164 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3165 kdu_instance=kdu_instance,
3166 namespace=k8s_instance_info["namespace"],
3167 )
3168
3169 # Obtain management service info (if exists)
3170 vnfr_update_dict = {}
3171 kdu_config = get_configuration(vnfd, kdud["name"])
3172 if kdu_config:
3173 target_ee_list = kdu_config.get("execution-environment-list", [])
3174 else:
3175 target_ee_list = []
3176
3177 if services:
3178 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3179 mgmt_services = [
3180 service
3181 for service in kdud.get("service", [])
3182 if service.get("mgmt-service")
3183 ]
3184 for mgmt_service in mgmt_services:
3185 for service in services:
3186 if service["name"].startswith(mgmt_service["name"]):
3187 # Mgmt service found, Obtain service ip
3188 ip = service.get("external_ip", service.get("cluster_ip"))
3189 if isinstance(ip, list) and len(ip) == 1:
3190 ip = ip[0]
3191
3192 vnfr_update_dict[
3193 "kdur.{}.ip-address".format(kdu_index)
3194 ] = ip
3195
3196 # Check if must update also mgmt ip at the vnf
3197 service_external_cp = mgmt_service.get(
3198 "external-connection-point-ref"
3199 )
3200 if service_external_cp:
3201 if (
3202 deep_get(vnfd, ("mgmt-interface", "cp"))
3203 == service_external_cp
3204 ):
3205 vnfr_update_dict["ip-address"] = ip
3206
3207 if find_in_list(
3208 target_ee_list,
3209 lambda ee: ee.get(
3210 "external-connection-point-ref", ""
3211 )
3212 == service_external_cp,
3213 ):
3214 vnfr_update_dict[
3215 "kdur.{}.ip-address".format(kdu_index)
3216 ] = ip
3217 break
3218 else:
3219 self.logger.warn(
3220 "Mgmt service name: {} not found".format(
3221 mgmt_service["name"]
3222 )
3223 )
3224
3225 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3226 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3227
3228 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3229 if (
3230 kdu_config
3231 and kdu_config.get("initial-config-primitive")
3232 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3233 ):
3234 initial_config_primitive_list = kdu_config.get(
3235 "initial-config-primitive"
3236 )
3237 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3238
3239 for initial_config_primitive in initial_config_primitive_list:
3240 primitive_params_ = self._map_primitive_params(
3241 initial_config_primitive, {}, {}
3242 )
3243
3244 await asyncio.wait_for(
3245 self.k8scluster_map[k8sclustertype].exec_primitive(
3246 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3247 kdu_instance=kdu_instance,
3248 primitive_name=initial_config_primitive["name"],
3249 params=primitive_params_,
3250 db_dict=db_dict_install,
3251 vca_id=vca_id,
3252 ),
3253 timeout=timeout,
3254 )
3255
3256 except Exception as e:
3257 # Prepare update db with error and raise exception
3258 try:
3259 self.update_db_2(
3260 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3261 )
3262 self.update_db_2(
3263 "vnfrs",
3264 vnfr_data.get("_id"),
3265 {"kdur.{}.status".format(kdu_index): "ERROR"},
3266 )
3267 except Exception:
3268 # ignore to keep original exception
3269 pass
3270 # reraise original error
3271 raise
3272
3273 return kdu_instance
3274
3275 async def deploy_kdus(
3276 self,
3277 logging_text,
3278 nsr_id,
3279 nslcmop_id,
3280 db_vnfrs,
3281 db_vnfds,
3282 task_instantiation_info,
3283 ):
3284 # Launch kdus if present in the descriptor
3285
3286 k8scluster_id_2_uuic = {
3287 "helm-chart-v3": {},
3288 "helm-chart": {},
3289 "juju-bundle": {},
3290 }
3291
3292 async def _get_cluster_id(cluster_id, cluster_type):
3293 nonlocal k8scluster_id_2_uuic
3294 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3295 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3296
3297 # check if K8scluster is creating and wait look if previous tasks in process
3298 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3299 "k8scluster", cluster_id
3300 )
3301 if task_dependency:
3302 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3303 task_name, cluster_id
3304 )
3305 self.logger.debug(logging_text + text)
3306 await asyncio.wait(task_dependency, timeout=3600)
3307
3308 db_k8scluster = self.db.get_one(
3309 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3310 )
3311 if not db_k8scluster:
3312 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3313
3314 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3315 if not k8s_id:
3316 if cluster_type == "helm-chart-v3":
3317 try:
3318 # backward compatibility for existing clusters that have not been initialized for helm v3
3319 k8s_credentials = yaml.safe_dump(
3320 db_k8scluster.get("credentials")
3321 )
3322 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3323 k8s_credentials, reuse_cluster_uuid=cluster_id
3324 )
3325 db_k8scluster_update = {}
3326 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3327 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3328 db_k8scluster_update[
3329 "_admin.helm-chart-v3.created"
3330 ] = uninstall_sw
3331 db_k8scluster_update[
3332 "_admin.helm-chart-v3.operationalState"
3333 ] = "ENABLED"
3334 self.update_db_2(
3335 "k8sclusters", cluster_id, db_k8scluster_update
3336 )
3337 except Exception as e:
3338 self.logger.error(
3339 logging_text
3340 + "error initializing helm-v3 cluster: {}".format(str(e))
3341 )
3342 raise LcmException(
3343 "K8s cluster '{}' has not been initialized for '{}'".format(
3344 cluster_id, cluster_type
3345 )
3346 )
3347 else:
3348 raise LcmException(
3349 "K8s cluster '{}' has not been initialized for '{}'".format(
3350 cluster_id, cluster_type
3351 )
3352 )
3353 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3354 return k8s_id
3355
3356 logging_text += "Deploy kdus: "
3357 step = ""
3358 try:
3359 db_nsr_update = {"_admin.deployed.K8s": []}
3360 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3361
3362 index = 0
3363 updated_cluster_list = []
3364 updated_v3_cluster_list = []
3365
3366 for vnfr_data in db_vnfrs.values():
3367 vca_id = self.get_vca_id(vnfr_data, {})
3368 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3369 # Step 0: Prepare and set parameters
3370 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3371 vnfd_id = vnfr_data.get("vnfd-id")
3372 vnfd_with_id = find_in_list(
3373 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3374 )
3375 kdud = next(
3376 kdud
3377 for kdud in vnfd_with_id["kdu"]
3378 if kdud["name"] == kdur["kdu-name"]
3379 )
3380 namespace = kdur.get("k8s-namespace")
3381 kdu_deployment_name = kdur.get("kdu-deployment-name")
3382 if kdur.get("helm-chart"):
3383 kdumodel = kdur["helm-chart"]
3384 # Default version: helm3, if helm-version is v2 assign v2
3385 k8sclustertype = "helm-chart-v3"
3386 self.logger.debug("kdur: {}".format(kdur))
3387 if (
3388 kdur.get("helm-version")
3389 and kdur.get("helm-version") == "v2"
3390 ):
3391 k8sclustertype = "helm-chart"
3392 elif kdur.get("juju-bundle"):
3393 kdumodel = kdur["juju-bundle"]
3394 k8sclustertype = "juju-bundle"
3395 else:
3396 raise LcmException(
3397 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3398 "juju-bundle. Maybe an old NBI version is running".format(
3399 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3400 )
3401 )
3402 # check if kdumodel is a file and exists
3403 try:
3404 vnfd_with_id = find_in_list(
3405 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3406 )
3407 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3408 if storage: # may be not present if vnfd has not artifacts
3409 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3410 if storage["pkg-dir"]:
3411 filename = "{}/{}/{}s/{}".format(
3412 storage["folder"],
3413 storage["pkg-dir"],
3414 k8sclustertype,
3415 kdumodel,
3416 )
3417 else:
3418 filename = "{}/Scripts/{}s/{}".format(
3419 storage["folder"],
3420 k8sclustertype,
3421 kdumodel,
3422 )
3423 if self.fs.file_exists(
3424 filename, mode="file"
3425 ) or self.fs.file_exists(filename, mode="dir"):
3426 kdumodel = self.fs.path + filename
3427 except (asyncio.TimeoutError, asyncio.CancelledError):
3428 raise
3429 except Exception: # it is not a file
3430 pass
3431
3432 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3433 step = "Synchronize repos for k8s cluster '{}'".format(
3434 k8s_cluster_id
3435 )
3436 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3437
3438 # Synchronize repos
3439 if (
3440 k8sclustertype == "helm-chart"
3441 and cluster_uuid not in updated_cluster_list
3442 ) or (
3443 k8sclustertype == "helm-chart-v3"
3444 and cluster_uuid not in updated_v3_cluster_list
3445 ):
3446 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3447 self.k8scluster_map[k8sclustertype].synchronize_repos(
3448 cluster_uuid=cluster_uuid
3449 )
3450 )
3451 if del_repo_list or added_repo_dict:
3452 if k8sclustertype == "helm-chart":
3453 unset = {
3454 "_admin.helm_charts_added." + item: None
3455 for item in del_repo_list
3456 }
3457 updated = {
3458 "_admin.helm_charts_added." + item: name
3459 for item, name in added_repo_dict.items()
3460 }
3461 updated_cluster_list.append(cluster_uuid)
3462 elif k8sclustertype == "helm-chart-v3":
3463 unset = {
3464 "_admin.helm_charts_v3_added." + item: None
3465 for item in del_repo_list
3466 }
3467 updated = {
3468 "_admin.helm_charts_v3_added." + item: name
3469 for item, name in added_repo_dict.items()
3470 }
3471 updated_v3_cluster_list.append(cluster_uuid)
3472 self.logger.debug(
3473 logging_text + "repos synchronized on k8s cluster "
3474 "'{}' to_delete: {}, to_add: {}".format(
3475 k8s_cluster_id, del_repo_list, added_repo_dict
3476 )
3477 )
3478 self.db.set_one(
3479 "k8sclusters",
3480 {"_id": k8s_cluster_id},
3481 updated,
3482 unset=unset,
3483 )
3484
3485 # Instantiate kdu
3486 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3487 vnfr_data["member-vnf-index-ref"],
3488 kdur["kdu-name"],
3489 k8s_cluster_id,
3490 )
3491 k8s_instance_info = {
3492 "kdu-instance": None,
3493 "k8scluster-uuid": cluster_uuid,
3494 "k8scluster-type": k8sclustertype,
3495 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3496 "kdu-name": kdur["kdu-name"],
3497 "kdu-model": kdumodel,
3498 "namespace": namespace,
3499 "kdu-deployment-name": kdu_deployment_name,
3500 }
3501 db_path = "_admin.deployed.K8s.{}".format(index)
3502 db_nsr_update[db_path] = k8s_instance_info
3503 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3504 vnfd_with_id = find_in_list(
3505 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3506 )
3507 task = asyncio.ensure_future(
3508 self._install_kdu(
3509 nsr_id,
3510 db_path,
3511 vnfr_data,
3512 kdu_index,
3513 kdud,
3514 vnfd_with_id,
3515 k8s_instance_info,
3516 k8params=desc_params,
3517 timeout=600,
3518 vca_id=vca_id,
3519 )
3520 )
3521 self.lcm_tasks.register(
3522 "ns",
3523 nsr_id,
3524 nslcmop_id,
3525 "instantiate_KDU-{}".format(index),
3526 task,
3527 )
3528 task_instantiation_info[task] = "Deploying KDU {}".format(
3529 kdur["kdu-name"]
3530 )
3531
3532 index += 1
3533
3534 except (LcmException, asyncio.CancelledError):
3535 raise
3536 except Exception as e:
3537 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3538 if isinstance(e, (N2VCException, DbException)):
3539 self.logger.error(logging_text + msg)
3540 else:
3541 self.logger.critical(logging_text + msg, exc_info=True)
3542 raise LcmException(msg)
3543 finally:
3544 if db_nsr_update:
3545 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3546
3547 def _deploy_n2vc(
3548 self,
3549 logging_text,
3550 db_nsr,
3551 db_vnfr,
3552 nslcmop_id,
3553 nsr_id,
3554 nsi_id,
3555 vnfd_id,
3556 vdu_id,
3557 kdu_name,
3558 member_vnf_index,
3559 vdu_index,
3560 vdu_name,
3561 deploy_params,
3562 descriptor_config,
3563 base_folder,
3564 task_instantiation_info,
3565 stage,
3566 ):
3567 # launch instantiate_N2VC in a asyncio task and register task object
3568 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3569 # if not found, create one entry and update database
3570 # fill db_nsr._admin.deployed.VCA.<index>
3571
3572 self.logger.debug(
3573 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3574 )
3575 if "execution-environment-list" in descriptor_config:
3576 ee_list = descriptor_config.get("execution-environment-list", [])
3577 elif "juju" in descriptor_config:
3578 ee_list = [descriptor_config] # ns charms
3579 else: # other types as script are not supported
3580 ee_list = []
3581
3582 for ee_item in ee_list:
3583 self.logger.debug(
3584 logging_text
3585 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3586 ee_item.get("juju"), ee_item.get("helm-chart")
3587 )
3588 )
3589 ee_descriptor_id = ee_item.get("id")
3590 if ee_item.get("juju"):
3591 vca_name = ee_item["juju"].get("charm")
3592 vca_type = (
3593 "lxc_proxy_charm"
3594 if ee_item["juju"].get("charm") is not None
3595 else "native_charm"
3596 )
3597 if ee_item["juju"].get("cloud") == "k8s":
3598 vca_type = "k8s_proxy_charm"
3599 elif ee_item["juju"].get("proxy") is False:
3600 vca_type = "native_charm"
3601 elif ee_item.get("helm-chart"):
3602 vca_name = ee_item["helm-chart"]
3603 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3604 vca_type = "helm"
3605 else:
3606 vca_type = "helm-v3"
3607 else:
3608 self.logger.debug(
3609 logging_text + "skipping non juju neither charm configuration"
3610 )
3611 continue
3612
3613 vca_index = -1
3614 for vca_index, vca_deployed in enumerate(
3615 db_nsr["_admin"]["deployed"]["VCA"]
3616 ):
3617 if not vca_deployed:
3618 continue
3619 if (
3620 vca_deployed.get("member-vnf-index") == member_vnf_index
3621 and vca_deployed.get("vdu_id") == vdu_id
3622 and vca_deployed.get("kdu_name") == kdu_name
3623 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3624 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3625 ):
3626 break
3627 else:
3628 # not found, create one.
3629 target = (
3630 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3631 )
3632 if vdu_id:
3633 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3634 elif kdu_name:
3635 target += "/kdu/{}".format(kdu_name)
3636 vca_deployed = {
3637 "target_element": target,
3638 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3639 "member-vnf-index": member_vnf_index,
3640 "vdu_id": vdu_id,
3641 "kdu_name": kdu_name,
3642 "vdu_count_index": vdu_index,
3643 "operational-status": "init", # TODO revise
3644 "detailed-status": "", # TODO revise
3645 "step": "initial-deploy", # TODO revise
3646 "vnfd_id": vnfd_id,
3647 "vdu_name": vdu_name,
3648 "type": vca_type,
3649 "ee_descriptor_id": ee_descriptor_id,
3650 }
3651 vca_index += 1
3652
3653 # create VCA and configurationStatus in db
3654 db_dict = {
3655 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3656 "configurationStatus.{}".format(vca_index): dict(),
3657 }
3658 self.update_db_2("nsrs", nsr_id, db_dict)
3659
3660 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3661
3662 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3663 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3664 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3665
3666 # Launch task
3667 task_n2vc = asyncio.ensure_future(
3668 self.instantiate_N2VC(
3669 logging_text=logging_text,
3670 vca_index=vca_index,
3671 nsi_id=nsi_id,
3672 db_nsr=db_nsr,
3673 db_vnfr=db_vnfr,
3674 vdu_id=vdu_id,
3675 kdu_name=kdu_name,
3676 vdu_index=vdu_index,
3677 deploy_params=deploy_params,
3678 config_descriptor=descriptor_config,
3679 base_folder=base_folder,
3680 nslcmop_id=nslcmop_id,
3681 stage=stage,
3682 vca_type=vca_type,
3683 vca_name=vca_name,
3684 ee_config_descriptor=ee_item,
3685 )
3686 )
3687 self.lcm_tasks.register(
3688 "ns",
3689 nsr_id,
3690 nslcmop_id,
3691 "instantiate_N2VC-{}".format(vca_index),
3692 task_n2vc,
3693 )
3694 task_instantiation_info[
3695 task_n2vc
3696 ] = self.task_name_deploy_vca + " {}.{}".format(
3697 member_vnf_index or "", vdu_id or ""
3698 )
3699
3700 @staticmethod
3701 def _create_nslcmop(nsr_id, operation, params):
3702 """
3703 Creates a ns-lcm-opp content to be stored at database.
3704 :param nsr_id: internal id of the instance
3705 :param operation: instantiate, terminate, scale, action, ...
3706 :param params: user parameters for the operation
3707 :return: dictionary following SOL005 format
3708 """
3709 # Raise exception if invalid arguments
3710 if not (nsr_id and operation and params):
3711 raise LcmException(
3712 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3713 )
3714 now = time()
3715 _id = str(uuid4())
3716 nslcmop = {
3717 "id": _id,
3718 "_id": _id,
3719 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3720 "operationState": "PROCESSING",
3721 "statusEnteredTime": now,
3722 "nsInstanceId": nsr_id,
3723 "lcmOperationType": operation,
3724 "startTime": now,
3725 "isAutomaticInvocation": False,
3726 "operationParams": params,
3727 "isCancelPending": False,
3728 "links": {
3729 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3730 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3731 },
3732 }
3733 return nslcmop
3734
3735 def _format_additional_params(self, params):
3736 params = params or {}
3737 for key, value in params.items():
3738 if str(value).startswith("!!yaml "):
3739 params[key] = yaml.safe_load(value[7:])
3740 return params
3741
3742 def _get_terminate_primitive_params(self, seq, vnf_index):
3743 primitive = seq.get("name")
3744 primitive_params = {}
3745 params = {
3746 "member_vnf_index": vnf_index,
3747 "primitive": primitive,
3748 "primitive_params": primitive_params,
3749 }
3750 desc_params = {}
3751 return self._map_primitive_params(seq, params, desc_params)
3752
3753 # sub-operations
3754
3755 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3756 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3757 if op.get("operationState") == "COMPLETED":
3758 # b. Skip sub-operation
3759 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3760 return self.SUBOPERATION_STATUS_SKIP
3761 else:
3762 # c. retry executing sub-operation
3763 # The sub-operation exists, and operationState != 'COMPLETED'
3764 # Update operationState = 'PROCESSING' to indicate a retry.
3765 operationState = "PROCESSING"
3766 detailed_status = "In progress"
3767 self._update_suboperation_status(
3768 db_nslcmop, op_index, operationState, detailed_status
3769 )
3770 # Return the sub-operation index
3771 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3772 # with arguments extracted from the sub-operation
3773 return op_index
3774
3775 # Find a sub-operation where all keys in a matching dictionary must match
3776 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3777 def _find_suboperation(self, db_nslcmop, match):
3778 if db_nslcmop and match:
3779 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3780 for i, op in enumerate(op_list):
3781 if all(op.get(k) == match[k] for k in match):
3782 return i
3783 return self.SUBOPERATION_STATUS_NOT_FOUND
3784
3785 # Update status for a sub-operation given its index
3786 def _update_suboperation_status(
3787 self, db_nslcmop, op_index, operationState, detailed_status
3788 ):
3789 # Update DB for HA tasks
3790 q_filter = {"_id": db_nslcmop["_id"]}
3791 update_dict = {
3792 "_admin.operations.{}.operationState".format(op_index): operationState,
3793 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3794 }
3795 self.db.set_one(
3796 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3797 )
3798
3799 # Add sub-operation, return the index of the added sub-operation
3800 # Optionally, set operationState, detailed-status, and operationType
3801 # Status and type are currently set for 'scale' sub-operations:
3802 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3803 # 'detailed-status' : status message
3804 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3805 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3806 def _add_suboperation(
3807 self,
3808 db_nslcmop,
3809 vnf_index,
3810 vdu_id,
3811 vdu_count_index,
3812 vdu_name,
3813 primitive,
3814 mapped_primitive_params,
3815 operationState=None,
3816 detailed_status=None,
3817 operationType=None,
3818 RO_nsr_id=None,
3819 RO_scaling_info=None,
3820 ):
3821 if not db_nslcmop:
3822 return self.SUBOPERATION_STATUS_NOT_FOUND
3823 # Get the "_admin.operations" list, if it exists
3824 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3825 op_list = db_nslcmop_admin.get("operations")
3826 # Create or append to the "_admin.operations" list
3827 new_op = {
3828 "member_vnf_index": vnf_index,
3829 "vdu_id": vdu_id,
3830 "vdu_count_index": vdu_count_index,
3831 "primitive": primitive,
3832 "primitive_params": mapped_primitive_params,
3833 }
3834 if operationState:
3835 new_op["operationState"] = operationState
3836 if detailed_status:
3837 new_op["detailed-status"] = detailed_status
3838 if operationType:
3839 new_op["lcmOperationType"] = operationType
3840 if RO_nsr_id:
3841 new_op["RO_nsr_id"] = RO_nsr_id
3842 if RO_scaling_info:
3843 new_op["RO_scaling_info"] = RO_scaling_info
3844 if not op_list:
3845 # No existing operations, create key 'operations' with current operation as first list element
3846 db_nslcmop_admin.update({"operations": [new_op]})
3847 op_list = db_nslcmop_admin.get("operations")
3848 else:
3849 # Existing operations, append operation to list
3850 op_list.append(new_op)
3851
3852 db_nslcmop_update = {"_admin.operations": op_list}
3853 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3854 op_index = len(op_list) - 1
3855 return op_index
3856
3857 # Helper methods for scale() sub-operations
3858
3859 # pre-scale/post-scale:
3860 # Check for 3 different cases:
3861 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3862 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3863 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3864 def _check_or_add_scale_suboperation(
3865 self,
3866 db_nslcmop,
3867 vnf_index,
3868 vnf_config_primitive,
3869 primitive_params,
3870 operationType,
3871 RO_nsr_id=None,
3872 RO_scaling_info=None,
3873 ):
3874 # Find this sub-operation
3875 if RO_nsr_id and RO_scaling_info:
3876 operationType = "SCALE-RO"
3877 match = {
3878 "member_vnf_index": vnf_index,
3879 "RO_nsr_id": RO_nsr_id,
3880 "RO_scaling_info": RO_scaling_info,
3881 }
3882 else:
3883 match = {
3884 "member_vnf_index": vnf_index,
3885 "primitive": vnf_config_primitive,
3886 "primitive_params": primitive_params,
3887 "lcmOperationType": operationType,
3888 }
3889 op_index = self._find_suboperation(db_nslcmop, match)
3890 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3891 # a. New sub-operation
3892 # The sub-operation does not exist, add it.
3893 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3894 # The following parameters are set to None for all kind of scaling:
3895 vdu_id = None
3896 vdu_count_index = None
3897 vdu_name = None
3898 if RO_nsr_id and RO_scaling_info:
3899 vnf_config_primitive = None
3900 primitive_params = None
3901 else:
3902 RO_nsr_id = None
3903 RO_scaling_info = None
3904 # Initial status for sub-operation
3905 operationState = "PROCESSING"
3906 detailed_status = "In progress"
3907 # Add sub-operation for pre/post-scaling (zero or more operations)
3908 self._add_suboperation(
3909 db_nslcmop,
3910 vnf_index,
3911 vdu_id,
3912 vdu_count_index,
3913 vdu_name,
3914 vnf_config_primitive,
3915 primitive_params,
3916 operationState,
3917 detailed_status,
3918 operationType,
3919 RO_nsr_id,
3920 RO_scaling_info,
3921 )
3922 return self.SUBOPERATION_STATUS_NEW
3923 else:
3924 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3925 # or op_index (operationState != 'COMPLETED')
3926 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3927
3928 # Function to return execution_environment id
3929
3930 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3931 # TODO vdu_index_count
3932 for vca in vca_deployed_list:
3933 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3934 return vca["ee_id"]
3935
3936 async def destroy_N2VC(
3937 self,
3938 logging_text,
3939 db_nslcmop,
3940 vca_deployed,
3941 config_descriptor,
3942 vca_index,
3943 destroy_ee=True,
3944 exec_primitives=True,
3945 scaling_in=False,
3946 vca_id: str = None,
3947 ):
3948 """
3949 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3950 :param logging_text:
3951 :param db_nslcmop:
3952 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3953 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3954 :param vca_index: index in the database _admin.deployed.VCA
3955 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3956 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3957 not executed properly
3958 :param scaling_in: True destroys the application, False destroys the model
3959 :return: None or exception
3960 """
3961
3962 self.logger.debug(
3963 logging_text
3964 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3965 vca_index, vca_deployed, config_descriptor, destroy_ee
3966 )
3967 )
3968
3969 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3970
3971 # execute terminate_primitives
3972 if exec_primitives:
3973 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3974 config_descriptor.get("terminate-config-primitive"),
3975 vca_deployed.get("ee_descriptor_id"),
3976 )
3977 vdu_id = vca_deployed.get("vdu_id")
3978 vdu_count_index = vca_deployed.get("vdu_count_index")
3979 vdu_name = vca_deployed.get("vdu_name")
3980 vnf_index = vca_deployed.get("member-vnf-index")
3981 if terminate_primitives and vca_deployed.get("needed_terminate"):
3982 for seq in terminate_primitives:
3983 # For each sequence in list, get primitive and call _ns_execute_primitive()
3984 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3985 vnf_index, seq.get("name")
3986 )
3987 self.logger.debug(logging_text + step)
3988 # Create the primitive for each sequence, i.e. "primitive": "touch"
3989 primitive = seq.get("name")
3990 mapped_primitive_params = self._get_terminate_primitive_params(
3991 seq, vnf_index
3992 )
3993
3994 # Add sub-operation
3995 self._add_suboperation(
3996 db_nslcmop,
3997 vnf_index,
3998 vdu_id,
3999 vdu_count_index,
4000 vdu_name,
4001 primitive,
4002 mapped_primitive_params,
4003 )
4004 # Sub-operations: Call _ns_execute_primitive() instead of action()
4005 try:
4006 result, result_detail = await self._ns_execute_primitive(
4007 vca_deployed["ee_id"],
4008 primitive,
4009 mapped_primitive_params,
4010 vca_type=vca_type,
4011 vca_id=vca_id,
4012 )
4013 except LcmException:
4014 # this happens when VCA is not deployed. In this case it is not needed to terminate
4015 continue
4016 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4017 if result not in result_ok:
4018 raise LcmException(
4019 "terminate_primitive {} for vnf_member_index={} fails with "
4020 "error {}".format(seq.get("name"), vnf_index, result_detail)
4021 )
4022 # set that this VCA do not need terminated
4023 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4024 vca_index
4025 )
4026 self.update_db_2(
4027 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4028 )
4029
4030 # Delete Prometheus Jobs if any
4031 # This uses NSR_ID, so it will destroy any jobs under this index
4032 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4033
4034 if destroy_ee:
4035 await self.vca_map[vca_type].delete_execution_environment(
4036 vca_deployed["ee_id"],
4037 scaling_in=scaling_in,
4038 vca_type=vca_type,
4039 vca_id=vca_id,
4040 )
4041
4042 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4043 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4044 namespace = "." + db_nsr["_id"]
4045 try:
4046 await self.n2vc.delete_namespace(
4047 namespace=namespace,
4048 total_timeout=self.timeout_charm_delete,
4049 vca_id=vca_id,
4050 )
4051 except N2VCNotFound: # already deleted. Skip
4052 pass
4053 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4054
4055 async def _terminate_RO(
4056 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4057 ):
4058 """
4059 Terminates a deployment from RO
4060 :param logging_text:
4061 :param nsr_deployed: db_nsr._admin.deployed
4062 :param nsr_id:
4063 :param nslcmop_id:
4064 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4065 this method will update only the index 2, but it will write on database the concatenated content of the list
4066 :return:
4067 """
4068 db_nsr_update = {}
4069 failed_detail = []
4070 ro_nsr_id = ro_delete_action = None
4071 if nsr_deployed and nsr_deployed.get("RO"):
4072 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4073 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4074 try:
4075 if ro_nsr_id:
4076 stage[2] = "Deleting ns from VIM."
4077 db_nsr_update["detailed-status"] = " ".join(stage)
4078 self._write_op_status(nslcmop_id, stage)
4079 self.logger.debug(logging_text + stage[2])
4080 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4081 self._write_op_status(nslcmop_id, stage)
4082 desc = await self.RO.delete("ns", ro_nsr_id)
4083 ro_delete_action = desc["action_id"]
4084 db_nsr_update[
4085 "_admin.deployed.RO.nsr_delete_action_id"
4086 ] = ro_delete_action
4087 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4088 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4089 if ro_delete_action:
4090 # wait until NS is deleted from VIM
4091 stage[2] = "Waiting ns deleted from VIM."
4092 detailed_status_old = None
4093 self.logger.debug(
4094 logging_text
4095 + stage[2]
4096 + " RO_id={} ro_delete_action={}".format(
4097 ro_nsr_id, ro_delete_action
4098 )
4099 )
4100 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4101 self._write_op_status(nslcmop_id, stage)
4102
4103 delete_timeout = 20 * 60 # 20 minutes
4104 while delete_timeout > 0:
4105 desc = await self.RO.show(
4106 "ns",
4107 item_id_name=ro_nsr_id,
4108 extra_item="action",
4109 extra_item_id=ro_delete_action,
4110 )
4111
4112 # deploymentStatus
4113 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4114
4115 ns_status, ns_status_info = self.RO.check_action_status(desc)
4116 if ns_status == "ERROR":
4117 raise ROclient.ROClientException(ns_status_info)
4118 elif ns_status == "BUILD":
4119 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4120 elif ns_status == "ACTIVE":
4121 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4122 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4123 break
4124 else:
4125 assert (
4126 False
4127 ), "ROclient.check_action_status returns unknown {}".format(
4128 ns_status
4129 )
4130 if stage[2] != detailed_status_old:
4131 detailed_status_old = stage[2]
4132 db_nsr_update["detailed-status"] = " ".join(stage)
4133 self._write_op_status(nslcmop_id, stage)
4134 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4135 await asyncio.sleep(5, loop=self.loop)
4136 delete_timeout -= 5
4137 else: # delete_timeout <= 0:
4138 raise ROclient.ROClientException(
4139 "Timeout waiting ns deleted from VIM"
4140 )
4141
4142 except Exception as e:
4143 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4144 if (
4145 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4146 ): # not found
4147 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4148 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4149 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4150 self.logger.debug(
4151 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4152 )
4153 elif (
4154 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4155 ): # conflict
4156 failed_detail.append("delete conflict: {}".format(e))
4157 self.logger.debug(
4158 logging_text
4159 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4160 )
4161 else:
4162 failed_detail.append("delete error: {}".format(e))
4163 self.logger.error(
4164 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4165 )
4166
4167 # Delete nsd
4168 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4169 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4170 try:
4171 stage[2] = "Deleting nsd from RO."
4172 db_nsr_update["detailed-status"] = " ".join(stage)
4173 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4174 self._write_op_status(nslcmop_id, stage)
4175 await self.RO.delete("nsd", ro_nsd_id)
4176 self.logger.debug(
4177 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4178 )
4179 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4180 except Exception as e:
4181 if (
4182 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4183 ): # not found
4184 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4185 self.logger.debug(
4186 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4187 )
4188 elif (
4189 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4190 ): # conflict
4191 failed_detail.append(
4192 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4193 )
4194 self.logger.debug(logging_text + failed_detail[-1])
4195 else:
4196 failed_detail.append(
4197 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4198 )
4199 self.logger.error(logging_text + failed_detail[-1])
4200
4201 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4202 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4203 if not vnf_deployed or not vnf_deployed["id"]:
4204 continue
4205 try:
4206 ro_vnfd_id = vnf_deployed["id"]
4207 stage[
4208 2
4209 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4210 vnf_deployed["member-vnf-index"], ro_vnfd_id
4211 )
4212 db_nsr_update["detailed-status"] = " ".join(stage)
4213 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4214 self._write_op_status(nslcmop_id, stage)
4215 await self.RO.delete("vnfd", ro_vnfd_id)
4216 self.logger.debug(
4217 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4218 )
4219 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4220 except Exception as e:
4221 if (
4222 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4223 ): # not found
4224 db_nsr_update[
4225 "_admin.deployed.RO.vnfd.{}.id".format(index)
4226 ] = None
4227 self.logger.debug(
4228 logging_text
4229 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4230 )
4231 elif (
4232 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4233 ): # conflict
4234 failed_detail.append(
4235 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4236 )
4237 self.logger.debug(logging_text + failed_detail[-1])
4238 else:
4239 failed_detail.append(
4240 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4241 )
4242 self.logger.error(logging_text + failed_detail[-1])
4243
4244 if failed_detail:
4245 stage[2] = "Error deleting from VIM"
4246 else:
4247 stage[2] = "Deleted from VIM"
4248 db_nsr_update["detailed-status"] = " ".join(stage)
4249 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4250 self._write_op_status(nslcmop_id, stage)
4251
4252 if failed_detail:
4253 raise LcmException("; ".join(failed_detail))
4254
4255 async def terminate(self, nsr_id, nslcmop_id):
4256 # Try to lock HA task here
4257 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4258 if not task_is_locked_by_me:
4259 return
4260
4261 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4262 self.logger.debug(logging_text + "Enter")
4263 timeout_ns_terminate = self.timeout_ns_terminate
4264 db_nsr = None
4265 db_nslcmop = None
4266 operation_params = None
4267 exc = None
4268 error_list = [] # annotates all failed error messages
4269 db_nslcmop_update = {}
4270 autoremove = False # autoremove after terminated
4271 tasks_dict_info = {}
4272 db_nsr_update = {}
4273 stage = [
4274 "Stage 1/3: Preparing task.",
4275 "Waiting for previous operations to terminate.",
4276 "",
4277 ]
4278 # ^ contains [stage, step, VIM-status]
4279 try:
4280 # wait for any previous tasks in process
4281 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4282
4283 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4284 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4285 operation_params = db_nslcmop.get("operationParams") or {}
4286 if operation_params.get("timeout_ns_terminate"):
4287 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4288 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4289 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4290
4291 db_nsr_update["operational-status"] = "terminating"
4292 db_nsr_update["config-status"] = "terminating"
4293 self._write_ns_status(
4294 nsr_id=nsr_id,
4295 ns_state="TERMINATING",
4296 current_operation="TERMINATING",
4297 current_operation_id=nslcmop_id,
4298 other_update=db_nsr_update,
4299 )
4300 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4301 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4302 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4303 return
4304
4305 stage[1] = "Getting vnf descriptors from db."
4306 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4307 db_vnfrs_dict = {
4308 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4309 }
4310 db_vnfds_from_id = {}
4311 db_vnfds_from_member_index = {}
4312 # Loop over VNFRs
4313 for vnfr in db_vnfrs_list:
4314 vnfd_id = vnfr["vnfd-id"]
4315 if vnfd_id not in db_vnfds_from_id:
4316 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4317 db_vnfds_from_id[vnfd_id] = vnfd
4318 db_vnfds_from_member_index[
4319 vnfr["member-vnf-index-ref"]
4320 ] = db_vnfds_from_id[vnfd_id]
4321
4322 # Destroy individual execution environments when there are terminating primitives.
4323 # Rest of EE will be deleted at once
4324 # TODO - check before calling _destroy_N2VC
4325 # if not operation_params.get("skip_terminate_primitives"):#
4326 # or not vca.get("needed_terminate"):
4327 stage[0] = "Stage 2/3 execute terminating primitives."
4328 self.logger.debug(logging_text + stage[0])
4329 stage[1] = "Looking execution environment that needs terminate."
4330 self.logger.debug(logging_text + stage[1])
4331
4332 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4333 config_descriptor = None
4334 vca_member_vnf_index = vca.get("member-vnf-index")
4335 vca_id = self.get_vca_id(
4336 db_vnfrs_dict.get(vca_member_vnf_index)
4337 if vca_member_vnf_index
4338 else None,
4339 db_nsr,
4340 )
4341 if not vca or not vca.get("ee_id"):
4342 continue
4343 if not vca.get("member-vnf-index"):
4344 # ns
4345 config_descriptor = db_nsr.get("ns-configuration")
4346 elif vca.get("vdu_id"):
4347 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4348 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4349 elif vca.get("kdu_name"):
4350 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4351 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4352 else:
4353 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4354 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4355 vca_type = vca.get("type")
4356 exec_terminate_primitives = not operation_params.get(
4357 "skip_terminate_primitives"
4358 ) and vca.get("needed_terminate")
4359 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4360 # pending native charms
4361 destroy_ee = (
4362 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4363 )
4364 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4365 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4366 task = asyncio.ensure_future(
4367 self.destroy_N2VC(
4368 logging_text,
4369 db_nslcmop,
4370 vca,
4371 config_descriptor,
4372 vca_index,
4373 destroy_ee,
4374 exec_terminate_primitives,
4375 vca_id=vca_id,
4376 )
4377 )
4378 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4379
4380 # wait for pending tasks of terminate primitives
4381 if tasks_dict_info:
4382 self.logger.debug(
4383 logging_text
4384 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4385 )
4386 error_list = await self._wait_for_tasks(
4387 logging_text,
4388 tasks_dict_info,
4389 min(self.timeout_charm_delete, timeout_ns_terminate),
4390 stage,
4391 nslcmop_id,
4392 )
4393 tasks_dict_info.clear()
4394 if error_list:
4395 return # raise LcmException("; ".join(error_list))
4396
4397 # remove All execution environments at once
4398 stage[0] = "Stage 3/3 delete all."
4399
4400 if nsr_deployed.get("VCA"):
4401 stage[1] = "Deleting all execution environments."
4402 self.logger.debug(logging_text + stage[1])
4403 vca_id = self.get_vca_id({}, db_nsr)
4404 task_delete_ee = asyncio.ensure_future(
4405 asyncio.wait_for(
4406 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4407 timeout=self.timeout_charm_delete,
4408 )
4409 )
4410 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4411 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4412
4413 # Delete from k8scluster
4414 stage[1] = "Deleting KDUs."
4415 self.logger.debug(logging_text + stage[1])
4416 # print(nsr_deployed)
4417 for kdu in get_iterable(nsr_deployed, "K8s"):
4418 if not kdu or not kdu.get("kdu-instance"):
4419 continue
4420 kdu_instance = kdu.get("kdu-instance")
4421 if kdu.get("k8scluster-type") in self.k8scluster_map:
4422 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4423 vca_id = self.get_vca_id({}, db_nsr)
4424 task_delete_kdu_instance = asyncio.ensure_future(
4425 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4426 cluster_uuid=kdu.get("k8scluster-uuid"),
4427 kdu_instance=kdu_instance,
4428 vca_id=vca_id,
4429 )
4430 )
4431 else:
4432 self.logger.error(
4433 logging_text
4434 + "Unknown k8s deployment type {}".format(
4435 kdu.get("k8scluster-type")
4436 )
4437 )
4438 continue
4439 tasks_dict_info[
4440 task_delete_kdu_instance
4441 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4442
4443 # remove from RO
4444 stage[1] = "Deleting ns from VIM."
4445 if self.ng_ro:
4446 task_delete_ro = asyncio.ensure_future(
4447 self._terminate_ng_ro(
4448 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4449 )
4450 )
4451 else:
4452 task_delete_ro = asyncio.ensure_future(
4453 self._terminate_RO(
4454 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4455 )
4456 )
4457 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4458
4459 # rest of staff will be done at finally
4460
4461 except (
4462 ROclient.ROClientException,
4463 DbException,
4464 LcmException,
4465 N2VCException,
4466 ) as e:
4467 self.logger.error(logging_text + "Exit Exception {}".format(e))
4468 exc = e
4469 except asyncio.CancelledError:
4470 self.logger.error(
4471 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4472 )
4473 exc = "Operation was cancelled"
4474 except Exception as e:
4475 exc = traceback.format_exc()
4476 self.logger.critical(
4477 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4478 exc_info=True,
4479 )
4480 finally:
4481 if exc:
4482 error_list.append(str(exc))
4483 try:
4484 # wait for pending tasks
4485 if tasks_dict_info:
4486 stage[1] = "Waiting for terminate pending tasks."
4487 self.logger.debug(logging_text + stage[1])
4488 error_list += await self._wait_for_tasks(
4489 logging_text,
4490 tasks_dict_info,
4491 timeout_ns_terminate,
4492 stage,
4493 nslcmop_id,
4494 )
4495 stage[1] = stage[2] = ""
4496 except asyncio.CancelledError:
4497 error_list.append("Cancelled")
4498 # TODO cancell all tasks
4499 except Exception as exc:
4500 error_list.append(str(exc))
4501 # update status at database
4502 if error_list:
4503 error_detail = "; ".join(error_list)
4504 # self.logger.error(logging_text + error_detail)
4505 error_description_nslcmop = "{} Detail: {}".format(
4506 stage[0], error_detail
4507 )
4508 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4509 nslcmop_id, stage[0]
4510 )
4511
4512 db_nsr_update["operational-status"] = "failed"
4513 db_nsr_update["detailed-status"] = (
4514 error_description_nsr + " Detail: " + error_detail
4515 )
4516 db_nslcmop_update["detailed-status"] = error_detail
4517 nslcmop_operation_state = "FAILED"
4518 ns_state = "BROKEN"
4519 else:
4520 error_detail = None
4521 error_description_nsr = error_description_nslcmop = None
4522 ns_state = "NOT_INSTANTIATED"
4523 db_nsr_update["operational-status"] = "terminated"
4524 db_nsr_update["detailed-status"] = "Done"
4525 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4526 db_nslcmop_update["detailed-status"] = "Done"
4527 nslcmop_operation_state = "COMPLETED"
4528
4529 if db_nsr:
4530 self._write_ns_status(
4531 nsr_id=nsr_id,
4532 ns_state=ns_state,
4533 current_operation="IDLE",
4534 current_operation_id=None,
4535 error_description=error_description_nsr,
4536 error_detail=error_detail,
4537 other_update=db_nsr_update,
4538 )
4539 self._write_op_status(
4540 op_id=nslcmop_id,
4541 stage="",
4542 error_message=error_description_nslcmop,
4543 operation_state=nslcmop_operation_state,
4544 other_update=db_nslcmop_update,
4545 )
4546 if ns_state == "NOT_INSTANTIATED":
4547 try:
4548 self.db.set_list(
4549 "vnfrs",
4550 {"nsr-id-ref": nsr_id},
4551 {"_admin.nsState": "NOT_INSTANTIATED"},
4552 )
4553 except DbException as e:
4554 self.logger.warn(
4555 logging_text
4556 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4557 nsr_id, e
4558 )
4559 )
4560 if operation_params:
4561 autoremove = operation_params.get("autoremove", False)
4562 if nslcmop_operation_state:
4563 try:
4564 await self.msg.aiowrite(
4565 "ns",
4566 "terminated",
4567 {
4568 "nsr_id": nsr_id,
4569 "nslcmop_id": nslcmop_id,
4570 "operationState": nslcmop_operation_state,
4571 "autoremove": autoremove,
4572 },
4573 loop=self.loop,
4574 )
4575 except Exception as e:
4576 self.logger.error(
4577 logging_text + "kafka_write notification Exception {}".format(e)
4578 )
4579
4580 self.logger.debug(logging_text + "Exit")
4581 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4582
4583 async def _wait_for_tasks(
4584 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4585 ):
4586 time_start = time()
4587 error_detail_list = []
4588 error_list = []
4589 pending_tasks = list(created_tasks_info.keys())
4590 num_tasks = len(pending_tasks)
4591 num_done = 0
4592 stage[1] = "{}/{}.".format(num_done, num_tasks)
4593 self._write_op_status(nslcmop_id, stage)
4594 while pending_tasks:
4595 new_error = None
4596 _timeout = timeout + time_start - time()
4597 done, pending_tasks = await asyncio.wait(
4598 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4599 )
4600 num_done += len(done)
4601 if not done: # Timeout
4602 for task in pending_tasks:
4603 new_error = created_tasks_info[task] + ": Timeout"
4604 error_detail_list.append(new_error)
4605 error_list.append(new_error)
4606 break
4607 for task in done:
4608 if task.cancelled():
4609 exc = "Cancelled"
4610 else:
4611 exc = task.exception()
4612 if exc:
4613 if isinstance(exc, asyncio.TimeoutError):
4614 exc = "Timeout"
4615 new_error = created_tasks_info[task] + ": {}".format(exc)
4616 error_list.append(created_tasks_info[task])
4617 error_detail_list.append(new_error)
4618 if isinstance(
4619 exc,
4620 (
4621 str,
4622 DbException,
4623 N2VCException,
4624 ROclient.ROClientException,
4625 LcmException,
4626 K8sException,
4627 NgRoException,
4628 ),
4629 ):
4630 self.logger.error(logging_text + new_error)
4631 else:
4632 exc_traceback = "".join(
4633 traceback.format_exception(None, exc, exc.__traceback__)
4634 )
4635 self.logger.error(
4636 logging_text
4637 + created_tasks_info[task]
4638 + " "
4639 + exc_traceback
4640 )
4641 else:
4642 self.logger.debug(
4643 logging_text + created_tasks_info[task] + ": Done"
4644 )
4645 stage[1] = "{}/{}.".format(num_done, num_tasks)
4646 if new_error:
4647 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4648 if nsr_id: # update also nsr
4649 self.update_db_2(
4650 "nsrs",
4651 nsr_id,
4652 {
4653 "errorDescription": "Error at: " + ", ".join(error_list),
4654 "errorDetail": ". ".join(error_detail_list),
4655 },
4656 )
4657 self._write_op_status(nslcmop_id, stage)
4658 return error_detail_list
4659
4660 @staticmethod
4661 def _map_primitive_params(primitive_desc, params, instantiation_params):
4662 """
4663 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4664 The default-value is used. If it is between < > it look for a value at instantiation_params
4665 :param primitive_desc: portion of VNFD/NSD that describes primitive
4666 :param params: Params provided by user
4667 :param instantiation_params: Instantiation params provided by user
4668 :return: a dictionary with the calculated params
4669 """
4670 calculated_params = {}
4671 for parameter in primitive_desc.get("parameter", ()):
4672 param_name = parameter["name"]
4673 if param_name in params:
4674 calculated_params[param_name] = params[param_name]
4675 elif "default-value" in parameter or "value" in parameter:
4676 if "value" in parameter:
4677 calculated_params[param_name] = parameter["value"]
4678 else:
4679 calculated_params[param_name] = parameter["default-value"]
4680 if (
4681 isinstance(calculated_params[param_name], str)
4682 and calculated_params[param_name].startswith("<")
4683 and calculated_params[param_name].endswith(">")
4684 ):
4685 if calculated_params[param_name][1:-1] in instantiation_params:
4686 calculated_params[param_name] = instantiation_params[
4687 calculated_params[param_name][1:-1]
4688 ]
4689 else:
4690 raise LcmException(
4691 "Parameter {} needed to execute primitive {} not provided".format(
4692 calculated_params[param_name], primitive_desc["name"]
4693 )
4694 )
4695 else:
4696 raise LcmException(
4697 "Parameter {} needed to execute primitive {} not provided".format(
4698 param_name, primitive_desc["name"]
4699 )
4700 )
4701
4702 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4703 calculated_params[param_name] = yaml.safe_dump(
4704 calculated_params[param_name], default_flow_style=True, width=256
4705 )
4706 elif isinstance(calculated_params[param_name], str) and calculated_params[
4707 param_name
4708 ].startswith("!!yaml "):
4709 calculated_params[param_name] = calculated_params[param_name][7:]
4710 if parameter.get("data-type") == "INTEGER":
4711 try:
4712 calculated_params[param_name] = int(calculated_params[param_name])
4713 except ValueError: # error converting string to int
4714 raise LcmException(
4715 "Parameter {} of primitive {} must be integer".format(
4716 param_name, primitive_desc["name"]
4717 )
4718 )
4719 elif parameter.get("data-type") == "BOOLEAN":
4720 calculated_params[param_name] = not (
4721 (str(calculated_params[param_name])).lower() == "false"
4722 )
4723
4724 # add always ns_config_info if primitive name is config
4725 if primitive_desc["name"] == "config":
4726 if "ns_config_info" in instantiation_params:
4727 calculated_params["ns_config_info"] = instantiation_params[
4728 "ns_config_info"
4729 ]
4730 return calculated_params
4731
4732 def _look_for_deployed_vca(
4733 self,
4734 deployed_vca,
4735 member_vnf_index,
4736 vdu_id,
4737 vdu_count_index,
4738 kdu_name=None,
4739 ee_descriptor_id=None,
4740 ):
4741 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4742 for vca in deployed_vca:
4743 if not vca:
4744 continue
4745 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4746 continue
4747 if (
4748 vdu_count_index is not None
4749 and vdu_count_index != vca["vdu_count_index"]
4750 ):
4751 continue
4752 if kdu_name and kdu_name != vca["kdu_name"]:
4753 continue
4754 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4755 continue
4756 break
4757 else:
4758 # vca_deployed not found
4759 raise LcmException(
4760 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4761 " is not deployed".format(
4762 member_vnf_index,
4763 vdu_id,
4764 vdu_count_index,
4765 kdu_name,
4766 ee_descriptor_id,
4767 )
4768 )
4769 # get ee_id
4770 ee_id = vca.get("ee_id")
4771 vca_type = vca.get(
4772 "type", "lxc_proxy_charm"
4773 ) # default value for backward compatibility - proxy charm
4774 if not ee_id:
4775 raise LcmException(
4776 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4777 "execution environment".format(
4778 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4779 )
4780 )
4781 return ee_id, vca_type
4782
4783 async def _ns_execute_primitive(
4784 self,
4785 ee_id,
4786 primitive,
4787 primitive_params,
4788 retries=0,
4789 retries_interval=30,
4790 timeout=None,
4791 vca_type=None,
4792 db_dict=None,
4793 vca_id: str = None,
4794 ) -> (str, str):
4795 try:
4796 if primitive == "config":
4797 primitive_params = {"params": primitive_params}
4798
4799 vca_type = vca_type or "lxc_proxy_charm"
4800
4801 while retries >= 0:
4802 try:
4803 output = await asyncio.wait_for(
4804 self.vca_map[vca_type].exec_primitive(
4805 ee_id=ee_id,
4806 primitive_name=primitive,
4807 params_dict=primitive_params,
4808 progress_timeout=self.timeout_progress_primitive,
4809 total_timeout=self.timeout_primitive,
4810 db_dict=db_dict,
4811 vca_id=vca_id,
4812 vca_type=vca_type,
4813 ),
4814 timeout=timeout or self.timeout_primitive,
4815 )
4816 # execution was OK
4817 break
4818 except asyncio.CancelledError:
4819 raise
4820 except Exception as e: # asyncio.TimeoutError
4821 if isinstance(e, asyncio.TimeoutError):
4822 e = "Timeout"
4823 retries -= 1
4824 if retries >= 0:
4825 self.logger.debug(
4826 "Error executing action {} on {} -> {}".format(
4827 primitive, ee_id, e
4828 )
4829 )
4830 # wait and retry
4831 await asyncio.sleep(retries_interval, loop=self.loop)
4832 else:
4833 return "FAILED", str(e)
4834
4835 return "COMPLETED", output
4836
4837 except (LcmException, asyncio.CancelledError):
4838 raise
4839 except Exception as e:
4840 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4841
4842 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4843 """
4844 Updating the vca_status with latest juju information in nsrs record
4845 :param: nsr_id: Id of the nsr
4846 :param: nslcmop_id: Id of the nslcmop
4847 :return: None
4848 """
4849
4850 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4851 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4852 vca_id = self.get_vca_id({}, db_nsr)
4853 if db_nsr["_admin"]["deployed"]["K8s"]:
4854 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4855 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4856 await self._on_update_k8s_db(
4857 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4858 )
4859 else:
4860 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4861 table, filter = "nsrs", {"_id": nsr_id}
4862 path = "_admin.deployed.VCA.{}.".format(vca_index)
4863 await self._on_update_n2vc_db(table, filter, path, {})
4864
4865 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4866 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4867
4868 async def action(self, nsr_id, nslcmop_id):
4869 # Try to lock HA task here
4870 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4871 if not task_is_locked_by_me:
4872 return
4873
4874 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4875 self.logger.debug(logging_text + "Enter")
4876 # get all needed from database
4877 db_nsr = None
4878 db_nslcmop = None
4879 db_nsr_update = {}
4880 db_nslcmop_update = {}
4881 nslcmop_operation_state = None
4882 error_description_nslcmop = None
4883 exc = None
4884 try:
4885 # wait for any previous tasks in process
4886 step = "Waiting for previous operations to terminate"
4887 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4888
4889 self._write_ns_status(
4890 nsr_id=nsr_id,
4891 ns_state=None,
4892 current_operation="RUNNING ACTION",
4893 current_operation_id=nslcmop_id,
4894 )
4895
4896 step = "Getting information from database"
4897 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4898 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4899 if db_nslcmop["operationParams"].get("primitive_params"):
4900 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4901 db_nslcmop["operationParams"]["primitive_params"]
4902 )
4903
4904 nsr_deployed = db_nsr["_admin"].get("deployed")
4905 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4906 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4907 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4908 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4909 primitive = db_nslcmop["operationParams"]["primitive"]
4910 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4911 timeout_ns_action = db_nslcmop["operationParams"].get(
4912 "timeout_ns_action", self.timeout_primitive
4913 )
4914
4915 if vnf_index:
4916 step = "Getting vnfr from database"
4917 db_vnfr = self.db.get_one(
4918 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4919 )
4920 if db_vnfr.get("kdur"):
4921 kdur_list = []
4922 for kdur in db_vnfr["kdur"]:
4923 if kdur.get("additionalParams"):
4924 kdur["additionalParams"] = json.loads(
4925 kdur["additionalParams"]
4926 )
4927 kdur_list.append(kdur)
4928 db_vnfr["kdur"] = kdur_list
4929 step = "Getting vnfd from database"
4930 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4931 else:
4932 step = "Getting nsd from database"
4933 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4934
4935 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4936 # for backward compatibility
4937 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4938 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4939 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4940 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4941
4942 # look for primitive
4943 config_primitive_desc = descriptor_configuration = None
4944 if vdu_id:
4945 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4946 elif kdu_name:
4947 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4948 elif vnf_index:
4949 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4950 else:
4951 descriptor_configuration = db_nsd.get("ns-configuration")
4952
4953 if descriptor_configuration and descriptor_configuration.get(
4954 "config-primitive"
4955 ):
4956 for config_primitive in descriptor_configuration["config-primitive"]:
4957 if config_primitive["name"] == primitive:
4958 config_primitive_desc = config_primitive
4959 break
4960
4961 if not config_primitive_desc:
4962 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4963 raise LcmException(
4964 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4965 primitive
4966 )
4967 )
4968 primitive_name = primitive
4969 ee_descriptor_id = None
4970 else:
4971 primitive_name = config_primitive_desc.get(
4972 "execution-environment-primitive", primitive
4973 )
4974 ee_descriptor_id = config_primitive_desc.get(
4975 "execution-environment-ref"
4976 )
4977
4978 if vnf_index:
4979 if vdu_id:
4980 vdur = next(
4981 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4982 )
4983 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4984 elif kdu_name:
4985 kdur = next(
4986 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4987 )
4988 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4989 else:
4990 desc_params = parse_yaml_strings(
4991 db_vnfr.get("additionalParamsForVnf")
4992 )
4993 else:
4994 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4995 if kdu_name and get_configuration(db_vnfd, kdu_name):
4996 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4997 actions = set()
4998 for primitive in kdu_configuration.get("initial-config-primitive", []):
4999 actions.add(primitive["name"])
5000 for primitive in kdu_configuration.get("config-primitive", []):
5001 actions.add(primitive["name"])
5002 kdu_action = True if primitive_name in actions else False
5003
5004 # TODO check if ns is in a proper status
5005 if kdu_name and (
5006 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5007 ):
5008 # kdur and desc_params already set from before
5009 if primitive_params:
5010 desc_params.update(primitive_params)
5011 # TODO Check if we will need something at vnf level
5012 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5013 if (
5014 kdu_name == kdu["kdu-name"]
5015 and kdu["member-vnf-index"] == vnf_index
5016 ):
5017 break
5018 else:
5019 raise LcmException(
5020 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5021 )
5022
5023 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5024 msg = "unknown k8scluster-type '{}'".format(
5025 kdu.get("k8scluster-type")
5026 )
5027 raise LcmException(msg)
5028
5029 db_dict = {
5030 "collection": "nsrs",
5031 "filter": {"_id": nsr_id},
5032 "path": "_admin.deployed.K8s.{}".format(index),
5033 }
5034 self.logger.debug(
5035 logging_text
5036 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5037 )
5038 step = "Executing kdu {}".format(primitive_name)
5039 if primitive_name == "upgrade":
5040 if desc_params.get("kdu_model"):
5041 kdu_model = desc_params.get("kdu_model")
5042 del desc_params["kdu_model"]
5043 else:
5044 kdu_model = kdu.get("kdu-model")
5045 parts = kdu_model.split(sep=":")
5046 if len(parts) == 2:
5047 kdu_model = parts[0]
5048
5049 detailed_status = await asyncio.wait_for(
5050 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5051 cluster_uuid=kdu.get("k8scluster-uuid"),
5052 kdu_instance=kdu.get("kdu-instance"),
5053 atomic=True,
5054 kdu_model=kdu_model,
5055 params=desc_params,
5056 db_dict=db_dict,
5057 timeout=timeout_ns_action,
5058 ),
5059 timeout=timeout_ns_action + 10,
5060 )
5061 self.logger.debug(
5062 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5063 )
5064 elif primitive_name == "rollback":
5065 detailed_status = await asyncio.wait_for(
5066 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5067 cluster_uuid=kdu.get("k8scluster-uuid"),
5068 kdu_instance=kdu.get("kdu-instance"),
5069 db_dict=db_dict,
5070 ),
5071 timeout=timeout_ns_action,
5072 )
5073 elif primitive_name == "status":
5074 detailed_status = await asyncio.wait_for(
5075 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5076 cluster_uuid=kdu.get("k8scluster-uuid"),
5077 kdu_instance=kdu.get("kdu-instance"),
5078 vca_id=vca_id,
5079 ),
5080 timeout=timeout_ns_action,
5081 )
5082 else:
5083 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5084 kdu["kdu-name"], nsr_id
5085 )
5086 params = self._map_primitive_params(
5087 config_primitive_desc, primitive_params, desc_params
5088 )
5089
5090 detailed_status = await asyncio.wait_for(
5091 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5092 cluster_uuid=kdu.get("k8scluster-uuid"),
5093 kdu_instance=kdu_instance,
5094 primitive_name=primitive_name,
5095 params=params,
5096 db_dict=db_dict,
5097 timeout=timeout_ns_action,
5098 vca_id=vca_id,
5099 ),
5100 timeout=timeout_ns_action,
5101 )
5102
5103 if detailed_status:
5104 nslcmop_operation_state = "COMPLETED"
5105 else:
5106 detailed_status = ""
5107 nslcmop_operation_state = "FAILED"
5108 else:
5109 ee_id, vca_type = self._look_for_deployed_vca(
5110 nsr_deployed["VCA"],
5111 member_vnf_index=vnf_index,
5112 vdu_id=vdu_id,
5113 vdu_count_index=vdu_count_index,
5114 ee_descriptor_id=ee_descriptor_id,
5115 )
5116 for vca_index, vca_deployed in enumerate(
5117 db_nsr["_admin"]["deployed"]["VCA"]
5118 ):
5119 if vca_deployed.get("member-vnf-index") == vnf_index:
5120 db_dict = {
5121 "collection": "nsrs",
5122 "filter": {"_id": nsr_id},
5123 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5124 }
5125 break
5126 (
5127 nslcmop_operation_state,
5128 detailed_status,
5129 ) = await self._ns_execute_primitive(
5130 ee_id,
5131 primitive=primitive_name,
5132 primitive_params=self._map_primitive_params(
5133 config_primitive_desc, primitive_params, desc_params
5134 ),
5135 timeout=timeout_ns_action,
5136 vca_type=vca_type,
5137 db_dict=db_dict,
5138 vca_id=vca_id,
5139 )
5140
5141 db_nslcmop_update["detailed-status"] = detailed_status
5142 error_description_nslcmop = (
5143 detailed_status if nslcmop_operation_state == "FAILED" else ""
5144 )
5145 self.logger.debug(
5146 logging_text
5147 + " task Done with result {} {}".format(
5148 nslcmop_operation_state, detailed_status
5149 )
5150 )
5151 return # database update is called inside finally
5152
5153 except (DbException, LcmException, N2VCException, K8sException) as e:
5154 self.logger.error(logging_text + "Exit Exception {}".format(e))
5155 exc = e
5156 except asyncio.CancelledError:
5157 self.logger.error(
5158 logging_text + "Cancelled Exception while '{}'".format(step)
5159 )
5160 exc = "Operation was cancelled"
5161 except asyncio.TimeoutError:
5162 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5163 exc = "Timeout"
5164 except Exception as e:
5165 exc = traceback.format_exc()
5166 self.logger.critical(
5167 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5168 exc_info=True,
5169 )
5170 finally:
5171 if exc:
5172 db_nslcmop_update[
5173 "detailed-status"
5174 ] = (
5175 detailed_status
5176 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5177 nslcmop_operation_state = "FAILED"
5178 if db_nsr:
5179 self._write_ns_status(
5180 nsr_id=nsr_id,
5181 ns_state=db_nsr[
5182 "nsState"
5183 ], # TODO check if degraded. For the moment use previous status
5184 current_operation="IDLE",
5185 current_operation_id=None,
5186 # error_description=error_description_nsr,
5187 # error_detail=error_detail,
5188 other_update=db_nsr_update,
5189 )
5190
5191 self._write_op_status(
5192 op_id=nslcmop_id,
5193 stage="",
5194 error_message=error_description_nslcmop,
5195 operation_state=nslcmop_operation_state,
5196 other_update=db_nslcmop_update,
5197 )
5198
5199 if nslcmop_operation_state:
5200 try:
5201 await self.msg.aiowrite(
5202 "ns",
5203 "actioned",
5204 {
5205 "nsr_id": nsr_id,
5206 "nslcmop_id": nslcmop_id,
5207 "operationState": nslcmop_operation_state,
5208 },
5209 loop=self.loop,
5210 )
5211 except Exception as e:
5212 self.logger.error(
5213 logging_text + "kafka_write notification Exception {}".format(e)
5214 )
5215 self.logger.debug(logging_text + "Exit")
5216 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5217 return nslcmop_operation_state, detailed_status
5218
5219 async def scale(self, nsr_id, nslcmop_id):
5220 # Try to lock HA task here
5221 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5222 if not task_is_locked_by_me:
5223 return
5224
5225 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5226 stage = ["", "", ""]
5227 tasks_dict_info = {}
5228 # ^ stage, step, VIM progress
5229 self.logger.debug(logging_text + "Enter")
5230 # get all needed from database
5231 db_nsr = None
5232 db_nslcmop_update = {}
5233 db_nsr_update = {}
5234 exc = None
5235 # in case of error, indicates what part of scale was failed to put nsr at error status
5236 scale_process = None
5237 old_operational_status = ""
5238 old_config_status = ""
5239 nsi_id = None
5240 try:
5241 # wait for any previous tasks in process
5242 step = "Waiting for previous operations to terminate"
5243 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5244 self._write_ns_status(
5245 nsr_id=nsr_id,
5246 ns_state=None,
5247 current_operation="SCALING",
5248 current_operation_id=nslcmop_id,
5249 )
5250
5251 step = "Getting nslcmop from database"
5252 self.logger.debug(
5253 step + " after having waited for previous tasks to be completed"
5254 )
5255 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5256
5257 step = "Getting nsr from database"
5258 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5259 old_operational_status = db_nsr["operational-status"]
5260 old_config_status = db_nsr["config-status"]
5261
5262 step = "Parsing scaling parameters"
5263 db_nsr_update["operational-status"] = "scaling"
5264 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5265 nsr_deployed = db_nsr["_admin"].get("deployed")
5266
5267 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5268 "scaleByStepData"
5269 ]["member-vnf-index"]
5270 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5271 "scaleByStepData"
5272 ]["scaling-group-descriptor"]
5273 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5274 # for backward compatibility
5275 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5276 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5277 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5278 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5279
5280 step = "Getting vnfr from database"
5281 db_vnfr = self.db.get_one(
5282 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5283 )
5284
5285 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5286
5287 step = "Getting vnfd from database"
5288 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5289
5290 base_folder = db_vnfd["_admin"]["storage"]
5291
5292 step = "Getting scaling-group-descriptor"
5293 scaling_descriptor = find_in_list(
5294 get_scaling_aspect(db_vnfd),
5295 lambda scale_desc: scale_desc["name"] == scaling_group,
5296 )
5297 if not scaling_descriptor:
5298 raise LcmException(
5299 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5300 "at vnfd:scaling-group-descriptor".format(scaling_group)
5301 )
5302
5303 step = "Sending scale order to VIM"
5304 # TODO check if ns is in a proper status
5305 nb_scale_op = 0
5306 if not db_nsr["_admin"].get("scaling-group"):
5307 self.update_db_2(
5308 "nsrs",
5309 nsr_id,
5310 {
5311 "_admin.scaling-group": [
5312 {"name": scaling_group, "nb-scale-op": 0}
5313 ]
5314 },
5315 )
5316 admin_scale_index = 0
5317 else:
5318 for admin_scale_index, admin_scale_info in enumerate(
5319 db_nsr["_admin"]["scaling-group"]
5320 ):
5321 if admin_scale_info["name"] == scaling_group:
5322 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5323 break
5324 else: # not found, set index one plus last element and add new entry with the name
5325 admin_scale_index += 1
5326 db_nsr_update[
5327 "_admin.scaling-group.{}.name".format(admin_scale_index)
5328 ] = scaling_group
5329
5330 vca_scaling_info = []
5331 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5332 if scaling_type == "SCALE_OUT":
5333 if "aspect-delta-details" not in scaling_descriptor:
5334 raise LcmException(
5335 "Aspect delta details not fount in scaling descriptor {}".format(
5336 scaling_descriptor["name"]
5337 )
5338 )
5339 # count if max-instance-count is reached
5340 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5341
5342 scaling_info["scaling_direction"] = "OUT"
5343 scaling_info["vdu-create"] = {}
5344 scaling_info["kdu-create"] = {}
5345 for delta in deltas:
5346 for vdu_delta in delta.get("vdu-delta", {}):
5347 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5348 # vdu_index also provides the number of instance of the targeted vdu
5349 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5350 cloud_init_text = self._get_vdu_cloud_init_content(
5351 vdud, db_vnfd
5352 )
5353 if cloud_init_text:
5354 additional_params = (
5355 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5356 or {}
5357 )
5358 cloud_init_list = []
5359
5360 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5361 max_instance_count = 10
5362 if vdu_profile and "max-number-of-instances" in vdu_profile:
5363 max_instance_count = vdu_profile.get(
5364 "max-number-of-instances", 10
5365 )
5366
5367 default_instance_num = get_number_of_instances(
5368 db_vnfd, vdud["id"]
5369 )
5370 instances_number = vdu_delta.get("number-of-instances", 1)
5371 nb_scale_op += instances_number
5372
5373 new_instance_count = nb_scale_op + default_instance_num
5374 # Control if new count is over max and vdu count is less than max.
5375 # Then assign new instance count
5376 if new_instance_count > max_instance_count > vdu_count:
5377 instances_number = new_instance_count - max_instance_count
5378 else:
5379 instances_number = instances_number
5380
5381 if new_instance_count > max_instance_count:
5382 raise LcmException(
5383 "reached the limit of {} (max-instance-count) "
5384 "scaling-out operations for the "
5385 "scaling-group-descriptor '{}'".format(
5386 nb_scale_op, scaling_group
5387 )
5388 )
5389 for x in range(vdu_delta.get("number-of-instances", 1)):
5390 if cloud_init_text:
5391 # TODO Information of its own ip is not available because db_vnfr is not updated.
5392 additional_params["OSM"] = get_osm_params(
5393 db_vnfr, vdu_delta["id"], vdu_index + x
5394 )
5395 cloud_init_list.append(
5396 self._parse_cloud_init(
5397 cloud_init_text,
5398 additional_params,
5399 db_vnfd["id"],
5400 vdud["id"],
5401 )
5402 )
5403 vca_scaling_info.append(
5404 {
5405 "osm_vdu_id": vdu_delta["id"],
5406 "member-vnf-index": vnf_index,
5407 "type": "create",
5408 "vdu_index": vdu_index + x,
5409 }
5410 )
5411 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5412 for kdu_delta in delta.get("kdu-resource-delta", {}):
5413 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5414 kdu_name = kdu_profile["kdu-name"]
5415 resource_name = kdu_profile["resource-name"]
5416
5417 # Might have different kdus in the same delta
5418 # Should have list for each kdu
5419 if not scaling_info["kdu-create"].get(kdu_name, None):
5420 scaling_info["kdu-create"][kdu_name] = []
5421
5422 kdur = get_kdur(db_vnfr, kdu_name)
5423 if kdur.get("helm-chart"):
5424 k8s_cluster_type = "helm-chart-v3"
5425 self.logger.debug("kdur: {}".format(kdur))
5426 if (
5427 kdur.get("helm-version")
5428 and kdur.get("helm-version") == "v2"
5429 ):
5430 k8s_cluster_type = "helm-chart"
5431 raise NotImplementedError
5432 elif kdur.get("juju-bundle"):
5433 k8s_cluster_type = "juju-bundle"
5434 else:
5435 raise LcmException(
5436 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5437 "juju-bundle. Maybe an old NBI version is running".format(
5438 db_vnfr["member-vnf-index-ref"], kdu_name
5439 )
5440 )
5441
5442 max_instance_count = 10
5443 if kdu_profile and "max-number-of-instances" in kdu_profile:
5444 max_instance_count = kdu_profile.get(
5445 "max-number-of-instances", 10
5446 )
5447
5448 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5449 deployed_kdu, _ = get_deployed_kdu(
5450 nsr_deployed, kdu_name, vnf_index
5451 )
5452 if deployed_kdu is None:
5453 raise LcmException(
5454 "KDU '{}' for vnf '{}' not deployed".format(
5455 kdu_name, vnf_index
5456 )
5457 )
5458 kdu_instance = deployed_kdu.get("kdu-instance")
5459 instance_num = await self.k8scluster_map[
5460 k8s_cluster_type
5461 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5462 kdu_replica_count = instance_num + kdu_delta.get(
5463 "number-of-instances", 1
5464 )
5465
5466 # Control if new count is over max and instance_num is less than max.
5467 # Then assign max instance number to kdu replica count
5468 if kdu_replica_count > max_instance_count > instance_num:
5469 kdu_replica_count = max_instance_count
5470 if kdu_replica_count > max_instance_count:
5471 raise LcmException(
5472 "reached the limit of {} (max-instance-count) "
5473 "scaling-out operations for the "
5474 "scaling-group-descriptor '{}'".format(
5475 instance_num, scaling_group
5476 )
5477 )
5478
5479 for x in range(kdu_delta.get("number-of-instances", 1)):
5480 vca_scaling_info.append(
5481 {
5482 "osm_kdu_id": kdu_name,
5483 "member-vnf-index": vnf_index,
5484 "type": "create",
5485 "kdu_index": instance_num + x - 1,
5486 }
5487 )
5488 scaling_info["kdu-create"][kdu_name].append(
5489 {
5490 "member-vnf-index": vnf_index,
5491 "type": "create",
5492 "k8s-cluster-type": k8s_cluster_type,
5493 "resource-name": resource_name,
5494 "scale": kdu_replica_count,
5495 }
5496 )
5497 elif scaling_type == "SCALE_IN":
5498 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5499
5500 scaling_info["scaling_direction"] = "IN"
5501 scaling_info["vdu-delete"] = {}
5502 scaling_info["kdu-delete"] = {}
5503
5504 for delta in deltas:
5505 for vdu_delta in delta.get("vdu-delta", {}):
5506 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5507 min_instance_count = 0
5508 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5509 if vdu_profile and "min-number-of-instances" in vdu_profile:
5510 min_instance_count = vdu_profile["min-number-of-instances"]
5511
5512 default_instance_num = get_number_of_instances(
5513 db_vnfd, vdu_delta["id"]
5514 )
5515 instance_num = vdu_delta.get("number-of-instances", 1)
5516 nb_scale_op -= instance_num
5517
5518 new_instance_count = nb_scale_op + default_instance_num
5519
5520 if new_instance_count < min_instance_count < vdu_count:
5521 instances_number = min_instance_count - new_instance_count
5522 else:
5523 instances_number = instance_num
5524
5525 if new_instance_count < min_instance_count:
5526 raise LcmException(
5527 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5528 "scaling-group-descriptor '{}'".format(
5529 nb_scale_op, scaling_group
5530 )
5531 )
5532 for x in range(vdu_delta.get("number-of-instances", 1)):
5533 vca_scaling_info.append(
5534 {
5535 "osm_vdu_id": vdu_delta["id"],
5536 "member-vnf-index": vnf_index,
5537 "type": "delete",
5538 "vdu_index": vdu_index - 1 - x,
5539 }
5540 )
5541 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5542 for kdu_delta in delta.get("kdu-resource-delta", {}):
5543 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5544 kdu_name = kdu_profile["kdu-name"]
5545 resource_name = kdu_profile["resource-name"]
5546
5547 if not scaling_info["kdu-delete"].get(kdu_name, None):
5548 scaling_info["kdu-delete"][kdu_name] = []
5549
5550 kdur = get_kdur(db_vnfr, kdu_name)
5551 if kdur.get("helm-chart"):
5552 k8s_cluster_type = "helm-chart-v3"
5553 self.logger.debug("kdur: {}".format(kdur))
5554 if (
5555 kdur.get("helm-version")
5556 and kdur.get("helm-version") == "v2"
5557 ):
5558 k8s_cluster_type = "helm-chart"
5559 raise NotImplementedError
5560 elif kdur.get("juju-bundle"):
5561 k8s_cluster_type = "juju-bundle"
5562 else:
5563 raise LcmException(
5564 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5565 "juju-bundle. Maybe an old NBI version is running".format(
5566 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5567 )
5568 )
5569
5570 min_instance_count = 0
5571 if kdu_profile and "min-number-of-instances" in kdu_profile:
5572 min_instance_count = kdu_profile["min-number-of-instances"]
5573
5574 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5575 deployed_kdu, _ = get_deployed_kdu(
5576 nsr_deployed, kdu_name, vnf_index
5577 )
5578 if deployed_kdu is None:
5579 raise LcmException(
5580 "KDU '{}' for vnf '{}' not deployed".format(
5581 kdu_name, vnf_index
5582 )
5583 )
5584 kdu_instance = deployed_kdu.get("kdu-instance")
5585 instance_num = await self.k8scluster_map[
5586 k8s_cluster_type
5587 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5588 kdu_replica_count = instance_num - kdu_delta.get(
5589 "number-of-instances", 1
5590 )
5591
5592 if kdu_replica_count < min_instance_count < instance_num:
5593 kdu_replica_count = min_instance_count
5594 if kdu_replica_count < min_instance_count:
5595 raise LcmException(
5596 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5597 "scaling-group-descriptor '{}'".format(
5598 instance_num, scaling_group
5599 )
5600 )
5601
5602 for x in range(kdu_delta.get("number-of-instances", 1)):
5603 vca_scaling_info.append(
5604 {
5605 "osm_kdu_id": kdu_name,
5606 "member-vnf-index": vnf_index,
5607 "type": "delete",
5608 "kdu_index": instance_num - x - 1,
5609 }
5610 )
5611 scaling_info["kdu-delete"][kdu_name].append(
5612 {
5613 "member-vnf-index": vnf_index,
5614 "type": "delete",
5615 "k8s-cluster-type": k8s_cluster_type,
5616 "resource-name": resource_name,
5617 "scale": kdu_replica_count,
5618 }
5619 )
5620
5621 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5622 vdu_delete = copy(scaling_info.get("vdu-delete"))
5623 if scaling_info["scaling_direction"] == "IN":
5624 for vdur in reversed(db_vnfr["vdur"]):
5625 if vdu_delete.get(vdur["vdu-id-ref"]):
5626 vdu_delete[vdur["vdu-id-ref"]] -= 1
5627 scaling_info["vdu"].append(
5628 {
5629 "name": vdur.get("name") or vdur.get("vdu-name"),
5630 "vdu_id": vdur["vdu-id-ref"],
5631 "interface": [],
5632 }
5633 )
5634 for interface in vdur["interfaces"]:
5635 scaling_info["vdu"][-1]["interface"].append(
5636 {
5637 "name": interface["name"],
5638 "ip_address": interface["ip-address"],
5639 "mac_address": interface.get("mac-address"),
5640 }
5641 )
5642 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5643
5644 # PRE-SCALE BEGIN
5645 step = "Executing pre-scale vnf-config-primitive"
5646 if scaling_descriptor.get("scaling-config-action"):
5647 for scaling_config_action in scaling_descriptor[
5648 "scaling-config-action"
5649 ]:
5650 if (
5651 scaling_config_action.get("trigger") == "pre-scale-in"
5652 and scaling_type == "SCALE_IN"
5653 ) or (
5654 scaling_config_action.get("trigger") == "pre-scale-out"
5655 and scaling_type == "SCALE_OUT"
5656 ):
5657 vnf_config_primitive = scaling_config_action[
5658 "vnf-config-primitive-name-ref"
5659 ]
5660 step = db_nslcmop_update[
5661 "detailed-status"
5662 ] = "executing pre-scale scaling-config-action '{}'".format(
5663 vnf_config_primitive
5664 )
5665
5666 # look for primitive
5667 for config_primitive in (
5668 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5669 ).get("config-primitive", ()):
5670 if config_primitive["name"] == vnf_config_primitive:
5671 break
5672 else:
5673 raise LcmException(
5674 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5675 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5676 "primitive".format(scaling_group, vnf_config_primitive)
5677 )
5678
5679 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5680 if db_vnfr.get("additionalParamsForVnf"):
5681 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5682
5683 scale_process = "VCA"
5684 db_nsr_update["config-status"] = "configuring pre-scaling"
5685 primitive_params = self._map_primitive_params(
5686 config_primitive, {}, vnfr_params
5687 )
5688
5689 # Pre-scale retry check: Check if this sub-operation has been executed before
5690 op_index = self._check_or_add_scale_suboperation(
5691 db_nslcmop,
5692 vnf_index,
5693 vnf_config_primitive,
5694 primitive_params,
5695 "PRE-SCALE",
5696 )
5697 if op_index == self.SUBOPERATION_STATUS_SKIP:
5698 # Skip sub-operation
5699 result = "COMPLETED"
5700 result_detail = "Done"
5701 self.logger.debug(
5702 logging_text
5703 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5704 vnf_config_primitive, result, result_detail
5705 )
5706 )
5707 else:
5708 if op_index == self.SUBOPERATION_STATUS_NEW:
5709 # New sub-operation: Get index of this sub-operation
5710 op_index = (
5711 len(db_nslcmop.get("_admin", {}).get("operations"))
5712 - 1
5713 )
5714 self.logger.debug(
5715 logging_text
5716 + "vnf_config_primitive={} New sub-operation".format(
5717 vnf_config_primitive
5718 )
5719 )
5720 else:
5721 # retry: Get registered params for this existing sub-operation
5722 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5723 op_index
5724 ]
5725 vnf_index = op.get("member_vnf_index")
5726 vnf_config_primitive = op.get("primitive")
5727 primitive_params = op.get("primitive_params")
5728 self.logger.debug(
5729 logging_text
5730 + "vnf_config_primitive={} Sub-operation retry".format(
5731 vnf_config_primitive
5732 )
5733 )
5734 # Execute the primitive, either with new (first-time) or registered (reintent) args
5735 ee_descriptor_id = config_primitive.get(
5736 "execution-environment-ref"
5737 )
5738 primitive_name = config_primitive.get(
5739 "execution-environment-primitive", vnf_config_primitive
5740 )
5741 ee_id, vca_type = self._look_for_deployed_vca(
5742 nsr_deployed["VCA"],
5743 member_vnf_index=vnf_index,
5744 vdu_id=None,
5745 vdu_count_index=None,
5746 ee_descriptor_id=ee_descriptor_id,
5747 )
5748 result, result_detail = await self._ns_execute_primitive(
5749 ee_id,
5750 primitive_name,
5751 primitive_params,
5752 vca_type=vca_type,
5753 vca_id=vca_id,
5754 )
5755 self.logger.debug(
5756 logging_text
5757 + "vnf_config_primitive={} Done with result {} {}".format(
5758 vnf_config_primitive, result, result_detail
5759 )
5760 )
5761 # Update operationState = COMPLETED | FAILED
5762 self._update_suboperation_status(
5763 db_nslcmop, op_index, result, result_detail
5764 )
5765
5766 if result == "FAILED":
5767 raise LcmException(result_detail)
5768 db_nsr_update["config-status"] = old_config_status
5769 scale_process = None
5770 # PRE-SCALE END
5771
5772 db_nsr_update[
5773 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5774 ] = nb_scale_op
5775 db_nsr_update[
5776 "_admin.scaling-group.{}.time".format(admin_scale_index)
5777 ] = time()
5778
5779 # SCALE-IN VCA - BEGIN
5780 if vca_scaling_info:
5781 step = db_nslcmop_update[
5782 "detailed-status"
5783 ] = "Deleting the execution environments"
5784 scale_process = "VCA"
5785 for vca_info in vca_scaling_info:
5786 if vca_info["type"] == "delete":
5787 member_vnf_index = str(vca_info["member-vnf-index"])
5788 self.logger.debug(
5789 logging_text + "vdu info: {}".format(vca_info)
5790 )
5791 if vca_info.get("osm_vdu_id"):
5792 vdu_id = vca_info["osm_vdu_id"]
5793 vdu_index = int(vca_info["vdu_index"])
5794 stage[
5795 1
5796 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5797 member_vnf_index, vdu_id, vdu_index
5798 )
5799 else:
5800 vdu_index = 0
5801 kdu_id = vca_info["osm_kdu_id"]
5802 stage[
5803 1
5804 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5805 member_vnf_index, kdu_id, vdu_index
5806 )
5807 stage[2] = step = "Scaling in VCA"
5808 self._write_op_status(op_id=nslcmop_id, stage=stage)
5809 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5810 config_update = db_nsr["configurationStatus"]
5811 for vca_index, vca in enumerate(vca_update):
5812 if (
5813 (vca or vca.get("ee_id"))
5814 and vca["member-vnf-index"] == member_vnf_index
5815 and vca["vdu_count_index"] == vdu_index
5816 ):
5817 if vca.get("vdu_id"):
5818 config_descriptor = get_configuration(
5819 db_vnfd, vca.get("vdu_id")
5820 )
5821 elif vca.get("kdu_name"):
5822 config_descriptor = get_configuration(
5823 db_vnfd, vca.get("kdu_name")
5824 )
5825 else:
5826 config_descriptor = get_configuration(
5827 db_vnfd, db_vnfd["id"]
5828 )
5829 operation_params = (
5830 db_nslcmop.get("operationParams") or {}
5831 )
5832 exec_terminate_primitives = not operation_params.get(
5833 "skip_terminate_primitives"
5834 ) and vca.get("needed_terminate")
5835 task = asyncio.ensure_future(
5836 asyncio.wait_for(
5837 self.destroy_N2VC(
5838 logging_text,
5839 db_nslcmop,
5840 vca,
5841 config_descriptor,
5842 vca_index,
5843 destroy_ee=True,
5844 exec_primitives=exec_terminate_primitives,
5845 scaling_in=True,
5846 vca_id=vca_id,
5847 ),
5848 timeout=self.timeout_charm_delete,
5849 )
5850 )
5851 tasks_dict_info[task] = "Terminating VCA {}".format(
5852 vca.get("ee_id")
5853 )
5854 del vca_update[vca_index]
5855 del config_update[vca_index]
5856 # wait for pending tasks of terminate primitives
5857 if tasks_dict_info:
5858 self.logger.debug(
5859 logging_text
5860 + "Waiting for tasks {}".format(
5861 list(tasks_dict_info.keys())
5862 )
5863 )
5864 error_list = await self._wait_for_tasks(
5865 logging_text,
5866 tasks_dict_info,
5867 min(
5868 self.timeout_charm_delete, self.timeout_ns_terminate
5869 ),
5870 stage,
5871 nslcmop_id,
5872 )
5873 tasks_dict_info.clear()
5874 if error_list:
5875 raise LcmException("; ".join(error_list))
5876
5877 db_vca_and_config_update = {
5878 "_admin.deployed.VCA": vca_update,
5879 "configurationStatus": config_update,
5880 }
5881 self.update_db_2(
5882 "nsrs", db_nsr["_id"], db_vca_and_config_update
5883 )
5884 scale_process = None
5885 # SCALE-IN VCA - END
5886
5887 # SCALE RO - BEGIN
5888 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5889 scale_process = "RO"
5890 if self.ro_config.get("ng"):
5891 await self._scale_ng_ro(
5892 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5893 )
5894 scaling_info.pop("vdu-create", None)
5895 scaling_info.pop("vdu-delete", None)
5896
5897 scale_process = None
5898 # SCALE RO - END
5899
5900 # SCALE KDU - BEGIN
5901 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5902 scale_process = "KDU"
5903 await self._scale_kdu(
5904 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5905 )
5906 scaling_info.pop("kdu-create", None)
5907 scaling_info.pop("kdu-delete", None)
5908
5909 scale_process = None
5910 # SCALE KDU - END
5911
5912 if db_nsr_update:
5913 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5914
5915 # SCALE-UP VCA - BEGIN
5916 if vca_scaling_info:
5917 step = db_nslcmop_update[
5918 "detailed-status"
5919 ] = "Creating new execution environments"
5920 scale_process = "VCA"
5921 for vca_info in vca_scaling_info:
5922 if vca_info["type"] == "create":
5923 member_vnf_index = str(vca_info["member-vnf-index"])
5924 self.logger.debug(
5925 logging_text + "vdu info: {}".format(vca_info)
5926 )
5927 vnfd_id = db_vnfr["vnfd-ref"]
5928 if vca_info.get("osm_vdu_id"):
5929 vdu_index = int(vca_info["vdu_index"])
5930 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5931 if db_vnfr.get("additionalParamsForVnf"):
5932 deploy_params.update(
5933 parse_yaml_strings(
5934 db_vnfr["additionalParamsForVnf"].copy()
5935 )
5936 )
5937 descriptor_config = get_configuration(
5938 db_vnfd, db_vnfd["id"]
5939 )
5940 if descriptor_config:
5941 vdu_id = None
5942 vdu_name = None
5943 kdu_name = None
5944 self._deploy_n2vc(
5945 logging_text=logging_text
5946 + "member_vnf_index={} ".format(member_vnf_index),
5947 db_nsr=db_nsr,
5948 db_vnfr=db_vnfr,
5949 nslcmop_id=nslcmop_id,
5950 nsr_id=nsr_id,
5951 nsi_id=nsi_id,
5952 vnfd_id=vnfd_id,
5953 vdu_id=vdu_id,
5954 kdu_name=kdu_name,
5955 member_vnf_index=member_vnf_index,
5956 vdu_index=vdu_index,
5957 vdu_name=vdu_name,
5958 deploy_params=deploy_params,
5959 descriptor_config=descriptor_config,
5960 base_folder=base_folder,
5961 task_instantiation_info=tasks_dict_info,
5962 stage=stage,
5963 )
5964 vdu_id = vca_info["osm_vdu_id"]
5965 vdur = find_in_list(
5966 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5967 )
5968 descriptor_config = get_configuration(db_vnfd, vdu_id)
5969 if vdur.get("additionalParams"):
5970 deploy_params_vdu = parse_yaml_strings(
5971 vdur["additionalParams"]
5972 )
5973 else:
5974 deploy_params_vdu = deploy_params
5975 deploy_params_vdu["OSM"] = get_osm_params(
5976 db_vnfr, vdu_id, vdu_count_index=vdu_index
5977 )
5978 if descriptor_config:
5979 vdu_name = None
5980 kdu_name = None
5981 stage[
5982 1
5983 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5984 member_vnf_index, vdu_id, vdu_index
5985 )
5986 stage[2] = step = "Scaling out VCA"
5987 self._write_op_status(op_id=nslcmop_id, stage=stage)
5988 self._deploy_n2vc(
5989 logging_text=logging_text
5990 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5991 member_vnf_index, vdu_id, vdu_index
5992 ),
5993 db_nsr=db_nsr,
5994 db_vnfr=db_vnfr,
5995 nslcmop_id=nslcmop_id,
5996 nsr_id=nsr_id,
5997 nsi_id=nsi_id,
5998 vnfd_id=vnfd_id,
5999 vdu_id=vdu_id,
6000 kdu_name=kdu_name,
6001 member_vnf_index=member_vnf_index,
6002 vdu_index=vdu_index,
6003 vdu_name=vdu_name,
6004 deploy_params=deploy_params_vdu,
6005 descriptor_config=descriptor_config,
6006 base_folder=base_folder,
6007 task_instantiation_info=tasks_dict_info,
6008 stage=stage,
6009 )
6010 else:
6011 kdu_name = vca_info["osm_kdu_id"]
6012 descriptor_config = get_configuration(db_vnfd, kdu_name)
6013 if descriptor_config:
6014 vdu_id = None
6015 kdu_index = int(vca_info["kdu_index"])
6016 vdu_name = None
6017 kdur = next(
6018 x
6019 for x in db_vnfr["kdur"]
6020 if x["kdu-name"] == kdu_name
6021 )
6022 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
6023 if kdur.get("additionalParams"):
6024 deploy_params_kdu = parse_yaml_strings(
6025 kdur["additionalParams"]
6026 )
6027
6028 self._deploy_n2vc(
6029 logging_text=logging_text,
6030 db_nsr=db_nsr,
6031 db_vnfr=db_vnfr,
6032 nslcmop_id=nslcmop_id,
6033 nsr_id=nsr_id,
6034 nsi_id=nsi_id,
6035 vnfd_id=vnfd_id,
6036 vdu_id=vdu_id,
6037 kdu_name=kdu_name,
6038 member_vnf_index=member_vnf_index,
6039 vdu_index=kdu_index,
6040 vdu_name=vdu_name,
6041 deploy_params=deploy_params_kdu,
6042 descriptor_config=descriptor_config,
6043 base_folder=base_folder,
6044 task_instantiation_info=tasks_dict_info,
6045 stage=stage,
6046 )
6047 # SCALE-UP VCA - END
6048 scale_process = None
6049
6050 # POST-SCALE BEGIN
6051 # execute primitive service POST-SCALING
6052 step = "Executing post-scale vnf-config-primitive"
6053 if scaling_descriptor.get("scaling-config-action"):
6054 for scaling_config_action in scaling_descriptor[
6055 "scaling-config-action"
6056 ]:
6057 if (
6058 scaling_config_action.get("trigger") == "post-scale-in"
6059 and scaling_type == "SCALE_IN"
6060 ) or (
6061 scaling_config_action.get("trigger") == "post-scale-out"
6062 and scaling_type == "SCALE_OUT"
6063 ):
6064 vnf_config_primitive = scaling_config_action[
6065 "vnf-config-primitive-name-ref"
6066 ]
6067 step = db_nslcmop_update[
6068 "detailed-status"
6069 ] = "executing post-scale scaling-config-action '{}'".format(
6070 vnf_config_primitive
6071 )
6072
6073 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6074 if db_vnfr.get("additionalParamsForVnf"):
6075 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6076
6077 # look for primitive
6078 for config_primitive in (
6079 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6080 ).get("config-primitive", ()):
6081 if config_primitive["name"] == vnf_config_primitive:
6082 break
6083 else:
6084 raise LcmException(
6085 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6086 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6087 "config-primitive".format(
6088 scaling_group, vnf_config_primitive
6089 )
6090 )
6091 scale_process = "VCA"
6092 db_nsr_update["config-status"] = "configuring post-scaling"
6093 primitive_params = self._map_primitive_params(
6094 config_primitive, {}, vnfr_params
6095 )
6096
6097 # Post-scale retry check: Check if this sub-operation has been executed before
6098 op_index = self._check_or_add_scale_suboperation(
6099 db_nslcmop,
6100 vnf_index,
6101 vnf_config_primitive,
6102 primitive_params,
6103 "POST-SCALE",
6104 )
6105 if op_index == self.SUBOPERATION_STATUS_SKIP:
6106 # Skip sub-operation
6107 result = "COMPLETED"
6108 result_detail = "Done"
6109 self.logger.debug(
6110 logging_text
6111 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6112 vnf_config_primitive, result, result_detail
6113 )
6114 )
6115 else:
6116 if op_index == self.SUBOPERATION_STATUS_NEW:
6117 # New sub-operation: Get index of this sub-operation
6118 op_index = (
6119 len(db_nslcmop.get("_admin", {}).get("operations"))
6120 - 1
6121 )
6122 self.logger.debug(
6123 logging_text
6124 + "vnf_config_primitive={} New sub-operation".format(
6125 vnf_config_primitive
6126 )
6127 )
6128 else:
6129 # retry: Get registered params for this existing sub-operation
6130 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6131 op_index
6132 ]
6133 vnf_index = op.get("member_vnf_index")
6134 vnf_config_primitive = op.get("primitive")
6135 primitive_params = op.get("primitive_params")
6136 self.logger.debug(
6137 logging_text
6138 + "vnf_config_primitive={} Sub-operation retry".format(
6139 vnf_config_primitive
6140 )
6141 )
6142 # Execute the primitive, either with new (first-time) or registered (reintent) args
6143 ee_descriptor_id = config_primitive.get(
6144 "execution-environment-ref"
6145 )
6146 primitive_name = config_primitive.get(
6147 "execution-environment-primitive", vnf_config_primitive
6148 )
6149 ee_id, vca_type = self._look_for_deployed_vca(
6150 nsr_deployed["VCA"],
6151 member_vnf_index=vnf_index,
6152 vdu_id=None,
6153 vdu_count_index=None,
6154 ee_descriptor_id=ee_descriptor_id,
6155 )
6156 result, result_detail = await self._ns_execute_primitive(
6157 ee_id,
6158 primitive_name,
6159 primitive_params,
6160 vca_type=vca_type,
6161 vca_id=vca_id,
6162 )
6163 self.logger.debug(
6164 logging_text
6165 + "vnf_config_primitive={} Done with result {} {}".format(
6166 vnf_config_primitive, result, result_detail
6167 )
6168 )
6169 # Update operationState = COMPLETED | FAILED
6170 self._update_suboperation_status(
6171 db_nslcmop, op_index, result, result_detail
6172 )
6173
6174 if result == "FAILED":
6175 raise LcmException(result_detail)
6176 db_nsr_update["config-status"] = old_config_status
6177 scale_process = None
6178 # POST-SCALE END
6179
6180 db_nsr_update[
6181 "detailed-status"
6182 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6183 db_nsr_update["operational-status"] = (
6184 "running"
6185 if old_operational_status == "failed"
6186 else old_operational_status
6187 )
6188 db_nsr_update["config-status"] = old_config_status
6189 return
6190 except (
6191 ROclient.ROClientException,
6192 DbException,
6193 LcmException,
6194 NgRoException,
6195 ) as e:
6196 self.logger.error(logging_text + "Exit Exception {}".format(e))
6197 exc = e
6198 except asyncio.CancelledError:
6199 self.logger.error(
6200 logging_text + "Cancelled Exception while '{}'".format(step)
6201 )
6202 exc = "Operation was cancelled"
6203 except Exception as e:
6204 exc = traceback.format_exc()
6205 self.logger.critical(
6206 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6207 exc_info=True,
6208 )
6209 finally:
6210 self._write_ns_status(
6211 nsr_id=nsr_id,
6212 ns_state=None,
6213 current_operation="IDLE",
6214 current_operation_id=None,
6215 )
6216 if tasks_dict_info:
6217 stage[1] = "Waiting for instantiate pending tasks."
6218 self.logger.debug(logging_text + stage[1])
6219 exc = await self._wait_for_tasks(
6220 logging_text,
6221 tasks_dict_info,
6222 self.timeout_ns_deploy,
6223 stage,
6224 nslcmop_id,
6225 nsr_id=nsr_id,
6226 )
6227 if exc:
6228 db_nslcmop_update[
6229 "detailed-status"
6230 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6231 nslcmop_operation_state = "FAILED"
6232 if db_nsr:
6233 db_nsr_update["operational-status"] = old_operational_status
6234 db_nsr_update["config-status"] = old_config_status
6235 db_nsr_update["detailed-status"] = ""
6236 if scale_process:
6237 if "VCA" in scale_process:
6238 db_nsr_update["config-status"] = "failed"
6239 if "RO" in scale_process:
6240 db_nsr_update["operational-status"] = "failed"
6241 db_nsr_update[
6242 "detailed-status"
6243 ] = "FAILED scaling nslcmop={} {}: {}".format(
6244 nslcmop_id, step, exc
6245 )
6246 else:
6247 error_description_nslcmop = None
6248 nslcmop_operation_state = "COMPLETED"
6249 db_nslcmop_update["detailed-status"] = "Done"
6250
6251 self._write_op_status(
6252 op_id=nslcmop_id,
6253 stage="",
6254 error_message=error_description_nslcmop,
6255 operation_state=nslcmop_operation_state,
6256 other_update=db_nslcmop_update,
6257 )
6258 if db_nsr:
6259 self._write_ns_status(
6260 nsr_id=nsr_id,
6261 ns_state=None,
6262 current_operation="IDLE",
6263 current_operation_id=None,
6264 other_update=db_nsr_update,
6265 )
6266
6267 if nslcmop_operation_state:
6268 try:
6269 msg = {
6270 "nsr_id": nsr_id,
6271 "nslcmop_id": nslcmop_id,
6272 "operationState": nslcmop_operation_state,
6273 }
6274 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6275 except Exception as e:
6276 self.logger.error(
6277 logging_text + "kafka_write notification Exception {}".format(e)
6278 )
6279 self.logger.debug(logging_text + "Exit")
6280 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6281
6282 async def _scale_kdu(
6283 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6284 ):
6285 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6286 for kdu_name in _scaling_info:
6287 for kdu_scaling_info in _scaling_info[kdu_name]:
6288 deployed_kdu, index = get_deployed_kdu(
6289 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6290 )
6291 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6292 kdu_instance = deployed_kdu["kdu-instance"]
6293 scale = int(kdu_scaling_info["scale"])
6294 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6295
6296 db_dict = {
6297 "collection": "nsrs",
6298 "filter": {"_id": nsr_id},
6299 "path": "_admin.deployed.K8s.{}".format(index),
6300 }
6301
6302 step = "scaling application {}".format(
6303 kdu_scaling_info["resource-name"]
6304 )
6305 self.logger.debug(logging_text + step)
6306
6307 if kdu_scaling_info["type"] == "delete":
6308 kdu_config = get_configuration(db_vnfd, kdu_name)
6309 if (
6310 kdu_config
6311 and kdu_config.get("terminate-config-primitive")
6312 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6313 ):
6314 terminate_config_primitive_list = kdu_config.get(
6315 "terminate-config-primitive"
6316 )
6317 terminate_config_primitive_list.sort(
6318 key=lambda val: int(val["seq"])
6319 )
6320
6321 for (
6322 terminate_config_primitive
6323 ) in terminate_config_primitive_list:
6324 primitive_params_ = self._map_primitive_params(
6325 terminate_config_primitive, {}, {}
6326 )
6327 step = "execute terminate config primitive"
6328 self.logger.debug(logging_text + step)
6329 await asyncio.wait_for(
6330 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6331 cluster_uuid=cluster_uuid,
6332 kdu_instance=kdu_instance,
6333 primitive_name=terminate_config_primitive["name"],
6334 params=primitive_params_,
6335 db_dict=db_dict,
6336 vca_id=vca_id,
6337 ),
6338 timeout=600,
6339 )
6340
6341 await asyncio.wait_for(
6342 self.k8scluster_map[k8s_cluster_type].scale(
6343 kdu_instance,
6344 scale,
6345 kdu_scaling_info["resource-name"],
6346 vca_id=vca_id,
6347 ),
6348 timeout=self.timeout_vca_on_error,
6349 )
6350
6351 if kdu_scaling_info["type"] == "create":
6352 kdu_config = get_configuration(db_vnfd, kdu_name)
6353 if (
6354 kdu_config
6355 and kdu_config.get("initial-config-primitive")
6356 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6357 ):
6358 initial_config_primitive_list = kdu_config.get(
6359 "initial-config-primitive"
6360 )
6361 initial_config_primitive_list.sort(
6362 key=lambda val: int(val["seq"])
6363 )
6364
6365 for initial_config_primitive in initial_config_primitive_list:
6366 primitive_params_ = self._map_primitive_params(
6367 initial_config_primitive, {}, {}
6368 )
6369 step = "execute initial config primitive"
6370 self.logger.debug(logging_text + step)
6371 await asyncio.wait_for(
6372 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6373 cluster_uuid=cluster_uuid,
6374 kdu_instance=kdu_instance,
6375 primitive_name=initial_config_primitive["name"],
6376 params=primitive_params_,
6377 db_dict=db_dict,
6378 vca_id=vca_id,
6379 ),
6380 timeout=600,
6381 )
6382
6383 async def _scale_ng_ro(
6384 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6385 ):
6386 nsr_id = db_nslcmop["nsInstanceId"]
6387 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6388 db_vnfrs = {}
6389
6390 # read from db: vnfd's for every vnf
6391 db_vnfds = []
6392
6393 # for each vnf in ns, read vnfd
6394 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6395 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6396 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6397 # if we haven't this vnfd, read it from db
6398 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6399 # read from db
6400 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6401 db_vnfds.append(vnfd)
6402 n2vc_key = self.n2vc.get_public_key()
6403 n2vc_key_list = [n2vc_key]
6404 self.scale_vnfr(
6405 db_vnfr,
6406 vdu_scaling_info.get("vdu-create"),
6407 vdu_scaling_info.get("vdu-delete"),
6408 mark_delete=True,
6409 )
6410 # db_vnfr has been updated, update db_vnfrs to use it
6411 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6412 await self._instantiate_ng_ro(
6413 logging_text,
6414 nsr_id,
6415 db_nsd,
6416 db_nsr,
6417 db_nslcmop,
6418 db_vnfrs,
6419 db_vnfds,
6420 n2vc_key_list,
6421 stage=stage,
6422 start_deploy=time(),
6423 timeout_ns_deploy=self.timeout_ns_deploy,
6424 )
6425 if vdu_scaling_info.get("vdu-delete"):
6426 self.scale_vnfr(
6427 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6428 )
6429
6430 async def extract_prometheus_scrape_jobs(
6431 self,
6432 ee_id,
6433 artifact_path,
6434 ee_config_descriptor,
6435 vnfr_id,
6436 nsr_id,
6437 target_ip
6438 ):
6439 # look if exist a file called 'prometheus*.j2' and
6440 artifact_content = self.fs.dir_ls(artifact_path)
6441 job_file = next(
6442 (
6443 f
6444 for f in artifact_content
6445 if f.startswith("prometheus") and f.endswith(".j2")
6446 ),
6447 None,
6448 )
6449 if not job_file:
6450 return
6451 with self.fs.file_open((artifact_path, job_file), "r") as f:
6452 job_data = f.read()
6453
6454 # TODO get_service
6455 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6456 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6457 host_port = "80"
6458 vnfr_id = vnfr_id.replace("-", "")
6459 variables = {
6460 "JOB_NAME": vnfr_id,
6461 "TARGET_IP": target_ip,
6462 "EXPORTER_POD_IP": host_name,
6463 "EXPORTER_POD_PORT": host_port,
6464 }
6465 job_list = parse_job(job_data, variables)
6466 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6467 for job in job_list:
6468 if (
6469 not isinstance(job.get("job_name"), str)
6470 or vnfr_id not in job["job_name"]
6471 ):
6472 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6473 job["nsr_id"] = nsr_id
6474 job["vnfr_id"] = vnfr_id
6475 return job_list
6476
6477 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6478 """
6479 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6480
6481 :param: vim_account_id: VIM Account ID
6482
6483 :return: (cloud_name, cloud_credential)
6484 """
6485 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6486 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6487
6488 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6489 """
6490 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6491
6492 :param: vim_account_id: VIM Account ID
6493
6494 :return: (cloud_name, cloud_credential)
6495 """
6496 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6497 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")