5bbbb21e1e2f4fdeb840580504516917f850e3ef
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99 from osm_lcm.prometheus import parse_job
100
101 from copy import copy, deepcopy
102 from time import time
103 from uuid import uuid4
104
105 from random import randint
106
107 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
108
109
110 class NsLcm(LcmBase):
111 timeout_vca_on_error = (
112 5 * 60
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete = 10 * 60
117 timeout_primitive = 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive = (
119 10 * 60
120 ) # timeout for some progress in a primitive execution
121
122 SUBOPERATION_STATUS_NOT_FOUND = -1
123 SUBOPERATION_STATUS_NEW = -2
124 SUBOPERATION_STATUS_SKIP = -3
125 task_name_deploy_vca = "Deploying VCA"
126
127 def __init__(self, msg, lcm_tasks, config, loop):
128 """
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
131 :return: None
132 """
133 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
134
135 self.db = Database().instance.db
136 self.fs = Filesystem().instance.fs
137 self.loop = loop
138 self.lcm_tasks = lcm_tasks
139 self.timeout = config["timeout"]
140 self.ro_config = config["ro_config"]
141 self.ng_ro = config["ro_config"].get("ng")
142 self.vca_config = config["VCA"].copy()
143
144 # create N2VC connector
145 self.n2vc = N2VCJujuConnector(
146 log=self.logger,
147 loop=self.loop,
148 on_update_db=self._on_update_n2vc_db,
149 fs=self.fs,
150 db=self.db,
151 )
152
153 self.conn_helm_ee = LCMHelmConn(
154 log=self.logger,
155 loop=self.loop,
156 vca_config=self.vca_config,
157 on_update_db=self._on_update_n2vc_db,
158 )
159
160 self.k8sclusterhelm2 = K8sHelmConnector(
161 kubectl_command=self.vca_config.get("kubectlpath"),
162 helm_command=self.vca_config.get("helmpath"),
163 log=self.logger,
164 on_update_db=None,
165 fs=self.fs,
166 db=self.db,
167 )
168
169 self.k8sclusterhelm3 = K8sHelm3Connector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helm3path"),
172 fs=self.fs,
173 log=self.logger,
174 db=self.db,
175 on_update_db=None,
176 )
177
178 self.k8sclusterjuju = K8sJujuConnector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 juju_command=self.vca_config.get("jujupath"),
181 log=self.logger,
182 loop=self.loop,
183 on_update_db=self._on_update_k8s_db,
184 fs=self.fs,
185 db=self.db,
186 )
187
188 self.k8scluster_map = {
189 "helm-chart": self.k8sclusterhelm2,
190 "helm-chart-v3": self.k8sclusterhelm3,
191 "chart": self.k8sclusterhelm3,
192 "juju-bundle": self.k8sclusterjuju,
193 "juju": self.k8sclusterjuju,
194 }
195
196 self.vca_map = {
197 "lxc_proxy_charm": self.n2vc,
198 "native_charm": self.n2vc,
199 "k8s_proxy_charm": self.n2vc,
200 "helm": self.conn_helm_ee,
201 "helm-v3": self.conn_helm_ee,
202 }
203
204 # create RO client
205 self.RO = NgRoClient(self.loop, **self.ro_config)
206
207 @staticmethod
208 def increment_ip_mac(ip_mac, vm_index=1):
209 if not isinstance(ip_mac, str):
210 return ip_mac
211 try:
212 # try with ipv4 look for last dot
213 i = ip_mac.rfind(".")
214 if i > 0:
215 i += 1
216 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i = ip_mac.rfind(":")
219 if i > 0:
220 i += 1
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
223 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
224 )
225 except Exception:
226 pass
227 return None
228
229 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
230
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
232
233 try:
234 # TODO filter RO descriptor fields...
235
236 # write to database
237 db_dict = dict()
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict["deploymentStatus"] = ro_descriptor
240 self.update_db_2("nsrs", nsrs_id, db_dict)
241
242 except Exception as e:
243 self.logger.warn(
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
245 )
246
247 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
248
249 # remove last dot from path (if exists)
250 if path.endswith("."):
251 path = path[:-1]
252
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
255 try:
256
257 nsr_id = filter.get("_id")
258
259 # read ns record from database
260 nsr = self.db.get_one(table="nsrs", q_filter=filter)
261 current_ns_status = nsr.get("nsState")
262
263 # get vca status for NS
264 status_dict = await self.n2vc.get_status(
265 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
266 )
267
268 # vcaStatus
269 db_dict = dict()
270 db_dict["vcaStatus"] = status_dict
271 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
272
273 # update configurationStatus for this VCA
274 try:
275 vca_index = int(path[path.rfind(".") + 1 :])
276
277 vca_list = deep_get(
278 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
279 )
280 vca_status = vca_list[vca_index].get("status")
281
282 configuration_status_list = nsr.get("configurationStatus")
283 config_status = configuration_status_list[vca_index].get("status")
284
285 if config_status == "BROKEN" and vca_status != "failed":
286 db_dict["configurationStatus"][vca_index] = "READY"
287 elif config_status != "BROKEN" and vca_status == "failed":
288 db_dict["configurationStatus"][vca_index] = "BROKEN"
289 except Exception as e:
290 # not update configurationStatus
291 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
292
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
295 is_degraded = False
296 if current_ns_status in ("READY", "DEGRADED"):
297 error_description = ""
298 # check machines
299 if status_dict.get("machines"):
300 for machine_id in status_dict.get("machines"):
301 machine = status_dict.get("machines").get(machine_id)
302 # check machine agent-status
303 if machine.get("agent-status"):
304 s = machine.get("agent-status").get("status")
305 if s != "started":
306 is_degraded = True
307 error_description += (
308 "machine {} agent-status={} ; ".format(
309 machine_id, s
310 )
311 )
312 # check machine instance status
313 if machine.get("instance-status"):
314 s = machine.get("instance-status").get("status")
315 if s != "running":
316 is_degraded = True
317 error_description += (
318 "machine {} instance-status={} ; ".format(
319 machine_id, s
320 )
321 )
322 # check applications
323 if status_dict.get("applications"):
324 for app_id in status_dict.get("applications"):
325 app = status_dict.get("applications").get(app_id)
326 # check application status
327 if app.get("status"):
328 s = app.get("status").get("status")
329 if s != "active":
330 is_degraded = True
331 error_description += (
332 "application {} status={} ; ".format(app_id, s)
333 )
334
335 if error_description:
336 db_dict["errorDescription"] = error_description
337 if current_ns_status == "READY" and is_degraded:
338 db_dict["nsState"] = "DEGRADED"
339 if current_ns_status == "DEGRADED" and not is_degraded:
340 db_dict["nsState"] = "READY"
341
342 # write to database
343 self.update_db_2("nsrs", nsr_id, db_dict)
344
345 except (asyncio.CancelledError, asyncio.TimeoutError):
346 raise
347 except Exception as e:
348 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
349
350 async def _on_update_k8s_db(
351 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
352 ):
353 """
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :cluster_type: The cluster type (juju, k8s)
359 :return: none
360 """
361
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
364
365 nsr_id = filter.get("_id")
366 try:
367 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
368 cluster_uuid=cluster_uuid,
369 kdu_instance=kdu_instance,
370 yaml_format=False,
371 complete_status=True,
372 vca_id=vca_id,
373 )
374
375 # vcaStatus
376 db_dict = dict()
377 db_dict["vcaStatus"] = {nsr_id: vca_status}
378
379 if cluster_type in ("juju-bundle", "juju"):
380 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
381 # status in a similar way between Juju Bundles and Helm Charts on this side
382 await self.k8sclusterjuju.update_vca_status(
383 db_dict["vcaStatus"],
384 kdu_instance,
385 vca_id=vca_id,
386 )
387
388 self.logger.debug(
389 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
390 )
391
392 # write to database
393 self.update_db_2("nsrs", nsr_id, db_dict)
394 except (asyncio.CancelledError, asyncio.TimeoutError):
395 raise
396 except Exception as e:
397 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
398
399 @staticmethod
400 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
401 try:
402 env = Environment(undefined=StrictUndefined)
403 template = env.from_string(cloud_init_text)
404 return template.render(additional_params or {})
405 except UndefinedError as e:
406 raise LcmException(
407 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
408 "file, must be provided in the instantiation parameters inside the "
409 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
410 )
411 except (TemplateError, TemplateNotFound) as e:
412 raise LcmException(
413 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
414 vnfd_id, vdu_id, e
415 )
416 )
417
418 def _get_vdu_cloud_init_content(self, vdu, vnfd):
419 cloud_init_content = cloud_init_file = None
420 try:
421 if vdu.get("cloud-init-file"):
422 base_folder = vnfd["_admin"]["storage"]
423 if base_folder["pkg-dir"]:
424 cloud_init_file = "{}/{}/cloud_init/{}".format(
425 base_folder["folder"],
426 base_folder["pkg-dir"],
427 vdu["cloud-init-file"],
428 )
429 else:
430 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
431 base_folder["folder"],
432 vdu["cloud-init-file"],
433 )
434 with self.fs.file_open(cloud_init_file, "r") as ci_file:
435 cloud_init_content = ci_file.read()
436 elif vdu.get("cloud-init"):
437 cloud_init_content = vdu["cloud-init"]
438
439 return cloud_init_content
440 except FsException as e:
441 raise LcmException(
442 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
443 vnfd["id"], vdu["id"], cloud_init_file, e
444 )
445 )
446
447 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
448 vdur = next(
449 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
450 {}
451 )
452 additional_params = vdur.get("additionalParams")
453 return parse_yaml_strings(additional_params)
454
455 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
456 """
457 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
458 :param vnfd: input vnfd
459 :param new_id: overrides vnf id if provided
460 :param additionalParams: Instantiation params for VNFs provided
461 :param nsrId: Id of the NSR
462 :return: copy of vnfd
463 """
464 vnfd_RO = deepcopy(vnfd)
465 # remove unused by RO configuration, monitoring, scaling and internal keys
466 vnfd_RO.pop("_id", None)
467 vnfd_RO.pop("_admin", None)
468 vnfd_RO.pop("monitoring-param", None)
469 vnfd_RO.pop("scaling-group-descriptor", None)
470 vnfd_RO.pop("kdu", None)
471 vnfd_RO.pop("k8s-cluster", None)
472 if new_id:
473 vnfd_RO["id"] = new_id
474
475 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
476 for vdu in get_iterable(vnfd_RO, "vdu"):
477 vdu.pop("cloud-init-file", None)
478 vdu.pop("cloud-init", None)
479 return vnfd_RO
480
481 @staticmethod
482 def ip_profile_2_RO(ip_profile):
483 RO_ip_profile = deepcopy(ip_profile)
484 if "dns-server" in RO_ip_profile:
485 if isinstance(RO_ip_profile["dns-server"], list):
486 RO_ip_profile["dns-address"] = []
487 for ds in RO_ip_profile.pop("dns-server"):
488 RO_ip_profile["dns-address"].append(ds["address"])
489 else:
490 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
491 if RO_ip_profile.get("ip-version") == "ipv4":
492 RO_ip_profile["ip-version"] = "IPv4"
493 if RO_ip_profile.get("ip-version") == "ipv6":
494 RO_ip_profile["ip-version"] = "IPv6"
495 if "dhcp-params" in RO_ip_profile:
496 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
497 return RO_ip_profile
498
499 def _get_ro_vim_id_for_vim_account(self, vim_account):
500 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
501 if db_vim["_admin"]["operationalState"] != "ENABLED":
502 raise LcmException(
503 "VIM={} is not available. operationalState={}".format(
504 vim_account, db_vim["_admin"]["operationalState"]
505 )
506 )
507 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
508 return RO_vim_id
509
510 def get_ro_wim_id_for_wim_account(self, wim_account):
511 if isinstance(wim_account, str):
512 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
513 if db_wim["_admin"]["operationalState"] != "ENABLED":
514 raise LcmException(
515 "WIM={} is not available. operationalState={}".format(
516 wim_account, db_wim["_admin"]["operationalState"]
517 )
518 )
519 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
520 return RO_wim_id
521 else:
522 return wim_account
523
524 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
525
526 db_vdu_push_list = []
527 template_vdur = []
528 db_update = {"_admin.modified": time()}
529 if vdu_create:
530 for vdu_id, vdu_count in vdu_create.items():
531 vdur = next(
532 (
533 vdur
534 for vdur in reversed(db_vnfr["vdur"])
535 if vdur["vdu-id-ref"] == vdu_id
536 ),
537 None,
538 )
539 if not vdur:
540 # Read the template saved in the db:
541 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
542 vdur_template = db_vnfr.get("vdur-template")
543 if not vdur_template:
544 raise LcmException(
545 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
546 vdu_id
547 )
548 )
549 vdur = vdur_template[0]
550 #Delete a template from the database after using it
551 self.db.set_one("vnfrs",
552 {"_id": db_vnfr["_id"]},
553 None,
554 pull={"vdur-template": {"_id": vdur['_id']}}
555 )
556 for count in range(vdu_count):
557 vdur_copy = deepcopy(vdur)
558 vdur_copy["status"] = "BUILD"
559 vdur_copy["status-detailed"] = None
560 vdur_copy["ip-address"] = None
561 vdur_copy["_id"] = str(uuid4())
562 vdur_copy["count-index"] += count + 1
563 vdur_copy["id"] = "{}-{}".format(
564 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
565 )
566 vdur_copy.pop("vim_info", None)
567 for iface in vdur_copy["interfaces"]:
568 if iface.get("fixed-ip"):
569 iface["ip-address"] = self.increment_ip_mac(
570 iface["ip-address"], count + 1
571 )
572 else:
573 iface.pop("ip-address", None)
574 if iface.get("fixed-mac"):
575 iface["mac-address"] = self.increment_ip_mac(
576 iface["mac-address"], count + 1
577 )
578 else:
579 iface.pop("mac-address", None)
580 if db_vnfr["vdur"]:
581 iface.pop(
582 "mgmt_vnf", None
583 ) # only first vdu can be managment of vnf
584 db_vdu_push_list.append(vdur_copy)
585 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
586 if vdu_delete:
587 if len(db_vnfr["vdur"]) == 1:
588 # The scale will move to 0 instances
589 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
590 template_vdur = [db_vnfr["vdur"][0]]
591 for vdu_id, vdu_count in vdu_delete.items():
592 if mark_delete:
593 indexes_to_delete = [
594 iv[0]
595 for iv in enumerate(db_vnfr["vdur"])
596 if iv[1]["vdu-id-ref"] == vdu_id
597 ]
598 db_update.update(
599 {
600 "vdur.{}.status".format(i): "DELETING"
601 for i in indexes_to_delete[-vdu_count:]
602 }
603 )
604 else:
605 # it must be deleted one by one because common.db does not allow otherwise
606 vdus_to_delete = [
607 v
608 for v in reversed(db_vnfr["vdur"])
609 if v["vdu-id-ref"] == vdu_id
610 ]
611 for vdu in vdus_to_delete[:vdu_count]:
612 self.db.set_one(
613 "vnfrs",
614 {"_id": db_vnfr["_id"]},
615 None,
616 pull={"vdur": {"_id": vdu["_id"]}},
617 )
618 db_push = {}
619 if db_vdu_push_list:
620 db_push["vdur"] = db_vdu_push_list
621 if template_vdur:
622 db_push["vdur-template"] = template_vdur
623 if not db_push:
624 db_push = None
625 db_vnfr["vdur-template"] = template_vdur
626 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
627 # modify passed dictionary db_vnfr
628 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
629 db_vnfr["vdur"] = db_vnfr_["vdur"]
630
631 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
632 """
633 Updates database nsr with the RO info for the created vld
634 :param ns_update_nsr: dictionary to be filled with the updated info
635 :param db_nsr: content of db_nsr. This is also modified
636 :param nsr_desc_RO: nsr descriptor from RO
637 :return: Nothing, LcmException is raised on errors
638 """
639
640 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
641 for net_RO in get_iterable(nsr_desc_RO, "nets"):
642 if vld["id"] != net_RO.get("ns_net_osm_id"):
643 continue
644 vld["vim-id"] = net_RO.get("vim_net_id")
645 vld["name"] = net_RO.get("vim_name")
646 vld["status"] = net_RO.get("status")
647 vld["status-detailed"] = net_RO.get("error_msg")
648 ns_update_nsr["vld.{}".format(vld_index)] = vld
649 break
650 else:
651 raise LcmException(
652 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
653 )
654
655 def set_vnfr_at_error(self, db_vnfrs, error_text):
656 try:
657 for db_vnfr in db_vnfrs.values():
658 vnfr_update = {"status": "ERROR"}
659 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
660 if "status" not in vdur:
661 vdur["status"] = "ERROR"
662 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
663 if error_text:
664 vdur["status-detailed"] = str(error_text)
665 vnfr_update[
666 "vdur.{}.status-detailed".format(vdu_index)
667 ] = "ERROR"
668 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
669 except DbException as e:
670 self.logger.error("Cannot update vnf. {}".format(e))
671
672 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
673 """
674 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
675 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
676 :param nsr_desc_RO: nsr descriptor from RO
677 :return: Nothing, LcmException is raised on errors
678 """
679 for vnf_index, db_vnfr in db_vnfrs.items():
680 for vnf_RO in nsr_desc_RO["vnfs"]:
681 if vnf_RO["member_vnf_index"] != vnf_index:
682 continue
683 vnfr_update = {}
684 if vnf_RO.get("ip_address"):
685 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
686 "ip_address"
687 ].split(";")[0]
688 elif not db_vnfr.get("ip-address"):
689 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
690 raise LcmExceptionNoMgmtIP(
691 "ns member_vnf_index '{}' has no IP address".format(
692 vnf_index
693 )
694 )
695
696 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
697 vdur_RO_count_index = 0
698 if vdur.get("pdu-type"):
699 continue
700 for vdur_RO in get_iterable(vnf_RO, "vms"):
701 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
702 continue
703 if vdur["count-index"] != vdur_RO_count_index:
704 vdur_RO_count_index += 1
705 continue
706 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
707 if vdur_RO.get("ip_address"):
708 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
709 else:
710 vdur["ip-address"] = None
711 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
712 vdur["name"] = vdur_RO.get("vim_name")
713 vdur["status"] = vdur_RO.get("status")
714 vdur["status-detailed"] = vdur_RO.get("error_msg")
715 for ifacer in get_iterable(vdur, "interfaces"):
716 for interface_RO in get_iterable(vdur_RO, "interfaces"):
717 if ifacer["name"] == interface_RO.get("internal_name"):
718 ifacer["ip-address"] = interface_RO.get(
719 "ip_address"
720 )
721 ifacer["mac-address"] = interface_RO.get(
722 "mac_address"
723 )
724 break
725 else:
726 raise LcmException(
727 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
728 "from VIM info".format(
729 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
730 )
731 )
732 vnfr_update["vdur.{}".format(vdu_index)] = vdur
733 break
734 else:
735 raise LcmException(
736 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
737 "VIM info".format(
738 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
739 )
740 )
741
742 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
743 for net_RO in get_iterable(nsr_desc_RO, "nets"):
744 if vld["id"] != net_RO.get("vnf_net_osm_id"):
745 continue
746 vld["vim-id"] = net_RO.get("vim_net_id")
747 vld["name"] = net_RO.get("vim_name")
748 vld["status"] = net_RO.get("status")
749 vld["status-detailed"] = net_RO.get("error_msg")
750 vnfr_update["vld.{}".format(vld_index)] = vld
751 break
752 else:
753 raise LcmException(
754 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
755 vnf_index, vld["id"]
756 )
757 )
758
759 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
760 break
761
762 else:
763 raise LcmException(
764 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
765 vnf_index
766 )
767 )
768
769 def _get_ns_config_info(self, nsr_id):
770 """
771 Generates a mapping between vnf,vdu elements and the N2VC id
772 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
773 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
774 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
775 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
776 """
777 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
778 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
779 mapping = {}
780 ns_config_info = {"osm-config-mapping": mapping}
781 for vca in vca_deployed_list:
782 if not vca["member-vnf-index"]:
783 continue
784 if not vca["vdu_id"]:
785 mapping[vca["member-vnf-index"]] = vca["application"]
786 else:
787 mapping[
788 "{}.{}.{}".format(
789 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
790 )
791 ] = vca["application"]
792 return ns_config_info
793
794 async def _instantiate_ng_ro(
795 self,
796 logging_text,
797 nsr_id,
798 nsd,
799 db_nsr,
800 db_nslcmop,
801 db_vnfrs,
802 db_vnfds,
803 n2vc_key_list,
804 stage,
805 start_deploy,
806 timeout_ns_deploy,
807 ):
808
809 db_vims = {}
810
811 def get_vim_account(vim_account_id):
812 nonlocal db_vims
813 if vim_account_id in db_vims:
814 return db_vims[vim_account_id]
815 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
816 db_vims[vim_account_id] = db_vim
817 return db_vim
818
819 # modify target_vld info with instantiation parameters
820 def parse_vld_instantiation_params(
821 target_vim, target_vld, vld_params, target_sdn
822 ):
823 if vld_params.get("ip-profile"):
824 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
825 "ip-profile"
826 ]
827 if vld_params.get("provider-network"):
828 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
829 "provider-network"
830 ]
831 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
832 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
833 "provider-network"
834 ]["sdn-ports"]
835 if vld_params.get("wimAccountId"):
836 target_wim = "wim:{}".format(vld_params["wimAccountId"])
837 target_vld["vim_info"][target_wim] = {}
838 for param in ("vim-network-name", "vim-network-id"):
839 if vld_params.get(param):
840 if isinstance(vld_params[param], dict):
841 for vim, vim_net in vld_params[param].items():
842 other_target_vim = "vim:" + vim
843 populate_dict(
844 target_vld["vim_info"],
845 (other_target_vim, param.replace("-", "_")),
846 vim_net,
847 )
848 else: # isinstance str
849 target_vld["vim_info"][target_vim][
850 param.replace("-", "_")
851 ] = vld_params[param]
852 if vld_params.get("common_id"):
853 target_vld["common_id"] = vld_params.get("common_id")
854
855 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
856 def update_ns_vld_target(target, ns_params):
857 for vnf_params in ns_params.get("vnf", ()):
858 if vnf_params.get("vimAccountId"):
859 target_vnf = next(
860 (
861 vnfr
862 for vnfr in db_vnfrs.values()
863 if vnf_params["member-vnf-index"]
864 == vnfr["member-vnf-index-ref"]
865 ),
866 None,
867 )
868 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
869 for a_index, a_vld in enumerate(target["ns"]["vld"]):
870 target_vld = find_in_list(
871 get_iterable(vdur, "interfaces"),
872 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
873 )
874 if target_vld:
875 if vnf_params.get("vimAccountId") not in a_vld.get(
876 "vim_info", {}
877 ):
878 target["ns"]["vld"][a_index].get("vim_info").update(
879 {
880 "vim:{}".format(vnf_params["vimAccountId"]): {
881 "vim_network_name": ""
882 }
883 }
884 )
885
886 nslcmop_id = db_nslcmop["_id"]
887 target = {
888 "name": db_nsr["name"],
889 "ns": {"vld": []},
890 "vnf": [],
891 "image": deepcopy(db_nsr["image"]),
892 "flavor": deepcopy(db_nsr["flavor"]),
893 "action_id": nslcmop_id,
894 "cloud_init_content": {},
895 }
896 for image in target["image"]:
897 image["vim_info"] = {}
898 for flavor in target["flavor"]:
899 flavor["vim_info"] = {}
900 if db_nsr.get("affinity-or-anti-affinity-group"):
901 target["affinity-or-anti-affinity-group"] = deepcopy(
902 db_nsr["affinity-or-anti-affinity-group"]
903 )
904 for affinity_or_anti_affinity_group in target[
905 "affinity-or-anti-affinity-group"
906 ]:
907 affinity_or_anti_affinity_group["vim_info"] = {}
908
909 if db_nslcmop.get("lcmOperationType") != "instantiate":
910 # get parameters of instantiation:
911 db_nslcmop_instantiate = self.db.get_list(
912 "nslcmops",
913 {
914 "nsInstanceId": db_nslcmop["nsInstanceId"],
915 "lcmOperationType": "instantiate",
916 },
917 )[-1]
918 ns_params = db_nslcmop_instantiate.get("operationParams")
919 else:
920 ns_params = db_nslcmop.get("operationParams")
921 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
922 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
923
924 cp2target = {}
925 for vld_index, vld in enumerate(db_nsr.get("vld")):
926 target_vim = "vim:{}".format(ns_params["vimAccountId"])
927 target_vld = {
928 "id": vld["id"],
929 "name": vld["name"],
930 "mgmt-network": vld.get("mgmt-network", False),
931 "type": vld.get("type"),
932 "vim_info": {
933 target_vim: {
934 "vim_network_name": vld.get("vim-network-name"),
935 "vim_account_id": ns_params["vimAccountId"],
936 }
937 },
938 }
939 # check if this network needs SDN assist
940 if vld.get("pci-interfaces"):
941 db_vim = get_vim_account(ns_params["vimAccountId"])
942 sdnc_id = db_vim["config"].get("sdn-controller")
943 if sdnc_id:
944 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
945 target_sdn = "sdn:{}".format(sdnc_id)
946 target_vld["vim_info"][target_sdn] = {
947 "sdn": True,
948 "target_vim": target_vim,
949 "vlds": [sdn_vld],
950 "type": vld.get("type"),
951 }
952
953 nsd_vnf_profiles = get_vnf_profiles(nsd)
954 for nsd_vnf_profile in nsd_vnf_profiles:
955 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
956 if cp["virtual-link-profile-id"] == vld["id"]:
957 cp2target[
958 "member_vnf:{}.{}".format(
959 cp["constituent-cpd-id"][0][
960 "constituent-base-element-id"
961 ],
962 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
963 )
964 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
965
966 # check at nsd descriptor, if there is an ip-profile
967 vld_params = {}
968 nsd_vlp = find_in_list(
969 get_virtual_link_profiles(nsd),
970 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
971 == vld["id"],
972 )
973 if (
974 nsd_vlp
975 and nsd_vlp.get("virtual-link-protocol-data")
976 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
977 ):
978 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
979 "l3-protocol-data"
980 ]
981 ip_profile_dest_data = {}
982 if "ip-version" in ip_profile_source_data:
983 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
984 "ip-version"
985 ]
986 if "cidr" in ip_profile_source_data:
987 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
988 "cidr"
989 ]
990 if "gateway-ip" in ip_profile_source_data:
991 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
992 "gateway-ip"
993 ]
994 if "dhcp-enabled" in ip_profile_source_data:
995 ip_profile_dest_data["dhcp-params"] = {
996 "enabled": ip_profile_source_data["dhcp-enabled"]
997 }
998 vld_params["ip-profile"] = ip_profile_dest_data
999
1000 # update vld_params with instantiation params
1001 vld_instantiation_params = find_in_list(
1002 get_iterable(ns_params, "vld"),
1003 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
1004 )
1005 if vld_instantiation_params:
1006 vld_params.update(vld_instantiation_params)
1007 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
1008 target["ns"]["vld"].append(target_vld)
1009 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1010 update_ns_vld_target(target, ns_params)
1011
1012 for vnfr in db_vnfrs.values():
1013 vnfd = find_in_list(
1014 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1015 )
1016 vnf_params = find_in_list(
1017 get_iterable(ns_params, "vnf"),
1018 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1019 )
1020 target_vnf = deepcopy(vnfr)
1021 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1022 for vld in target_vnf.get("vld", ()):
1023 # check if connected to a ns.vld, to fill target'
1024 vnf_cp = find_in_list(
1025 vnfd.get("int-virtual-link-desc", ()),
1026 lambda cpd: cpd.get("id") == vld["id"],
1027 )
1028 if vnf_cp:
1029 ns_cp = "member_vnf:{}.{}".format(
1030 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1031 )
1032 if cp2target.get(ns_cp):
1033 vld["target"] = cp2target[ns_cp]
1034
1035 vld["vim_info"] = {
1036 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1037 }
1038 # check if this network needs SDN assist
1039 target_sdn = None
1040 if vld.get("pci-interfaces"):
1041 db_vim = get_vim_account(vnfr["vim-account-id"])
1042 sdnc_id = db_vim["config"].get("sdn-controller")
1043 if sdnc_id:
1044 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1045 target_sdn = "sdn:{}".format(sdnc_id)
1046 vld["vim_info"][target_sdn] = {
1047 "sdn": True,
1048 "target_vim": target_vim,
1049 "vlds": [sdn_vld],
1050 "type": vld.get("type"),
1051 }
1052
1053 # check at vnfd descriptor, if there is an ip-profile
1054 vld_params = {}
1055 vnfd_vlp = find_in_list(
1056 get_virtual_link_profiles(vnfd),
1057 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1058 )
1059 if (
1060 vnfd_vlp
1061 and vnfd_vlp.get("virtual-link-protocol-data")
1062 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1063 ):
1064 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1065 "l3-protocol-data"
1066 ]
1067 ip_profile_dest_data = {}
1068 if "ip-version" in ip_profile_source_data:
1069 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1070 "ip-version"
1071 ]
1072 if "cidr" in ip_profile_source_data:
1073 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1074 "cidr"
1075 ]
1076 if "gateway-ip" in ip_profile_source_data:
1077 ip_profile_dest_data[
1078 "gateway-address"
1079 ] = ip_profile_source_data["gateway-ip"]
1080 if "dhcp-enabled" in ip_profile_source_data:
1081 ip_profile_dest_data["dhcp-params"] = {
1082 "enabled": ip_profile_source_data["dhcp-enabled"]
1083 }
1084
1085 vld_params["ip-profile"] = ip_profile_dest_data
1086 # update vld_params with instantiation params
1087 if vnf_params:
1088 vld_instantiation_params = find_in_list(
1089 get_iterable(vnf_params, "internal-vld"),
1090 lambda i_vld: i_vld["name"] == vld["id"],
1091 )
1092 if vld_instantiation_params:
1093 vld_params.update(vld_instantiation_params)
1094 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1095
1096 vdur_list = []
1097 for vdur in target_vnf.get("vdur", ()):
1098 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1099 continue # This vdu must not be created
1100 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1101
1102 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1103
1104 if ssh_keys_all:
1105 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1106 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1107 if (
1108 vdu_configuration
1109 and vdu_configuration.get("config-access")
1110 and vdu_configuration.get("config-access").get("ssh-access")
1111 ):
1112 vdur["ssh-keys"] = ssh_keys_all
1113 vdur["ssh-access-required"] = vdu_configuration[
1114 "config-access"
1115 ]["ssh-access"]["required"]
1116 elif (
1117 vnf_configuration
1118 and vnf_configuration.get("config-access")
1119 and vnf_configuration.get("config-access").get("ssh-access")
1120 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1121 ):
1122 vdur["ssh-keys"] = ssh_keys_all
1123 vdur["ssh-access-required"] = vnf_configuration[
1124 "config-access"
1125 ]["ssh-access"]["required"]
1126 elif ssh_keys_instantiation and find_in_list(
1127 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1128 ):
1129 vdur["ssh-keys"] = ssh_keys_instantiation
1130
1131 self.logger.debug("NS > vdur > {}".format(vdur))
1132
1133 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1134 # cloud-init
1135 if vdud.get("cloud-init-file"):
1136 vdur["cloud-init"] = "{}:file:{}".format(
1137 vnfd["_id"], vdud.get("cloud-init-file")
1138 )
1139 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1140 if vdur["cloud-init"] not in target["cloud_init_content"]:
1141 base_folder = vnfd["_admin"]["storage"]
1142 if base_folder["pkg-dir"]:
1143 cloud_init_file = "{}/{}/cloud_init/{}".format(
1144 base_folder["folder"],
1145 base_folder["pkg-dir"],
1146 vdud.get("cloud-init-file"),
1147 )
1148 else:
1149 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1150 base_folder["folder"],
1151 vdud.get("cloud-init-file"),
1152 )
1153 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1154 target["cloud_init_content"][
1155 vdur["cloud-init"]
1156 ] = ci_file.read()
1157 elif vdud.get("cloud-init"):
1158 vdur["cloud-init"] = "{}:vdu:{}".format(
1159 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1160 )
1161 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1162 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1163 "cloud-init"
1164 ]
1165 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1166 deploy_params_vdu = self._format_additional_params(
1167 vdur.get("additionalParams") or {}
1168 )
1169 deploy_params_vdu["OSM"] = get_osm_params(
1170 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1171 )
1172 vdur["additionalParams"] = deploy_params_vdu
1173
1174 # flavor
1175 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1176 if target_vim not in ns_flavor["vim_info"]:
1177 ns_flavor["vim_info"][target_vim] = {}
1178
1179 # deal with images
1180 # in case alternative images are provided we must check if they should be applied
1181 # for the vim_type, modify the vim_type taking into account
1182 ns_image_id = int(vdur["ns-image-id"])
1183 if vdur.get("alt-image-ids"):
1184 db_vim = get_vim_account(vnfr["vim-account-id"])
1185 vim_type = db_vim["vim_type"]
1186 for alt_image_id in vdur.get("alt-image-ids"):
1187 ns_alt_image = target["image"][int(alt_image_id)]
1188 if vim_type == ns_alt_image.get("vim-type"):
1189 # must use alternative image
1190 self.logger.debug(
1191 "use alternative image id: {}".format(alt_image_id)
1192 )
1193 ns_image_id = alt_image_id
1194 vdur["ns-image-id"] = ns_image_id
1195 break
1196 ns_image = target["image"][int(ns_image_id)]
1197 if target_vim not in ns_image["vim_info"]:
1198 ns_image["vim_info"][target_vim] = {}
1199
1200 # Affinity groups
1201 if vdur.get("affinity-or-anti-affinity-group-id"):
1202 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1203 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1204 if target_vim not in ns_ags["vim_info"]:
1205 ns_ags["vim_info"][target_vim] = {}
1206
1207 vdur["vim_info"] = {target_vim: {}}
1208 # instantiation parameters
1209 # if vnf_params:
1210 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1211 # vdud["id"]), None)
1212 vdur_list.append(vdur)
1213 target_vnf["vdur"] = vdur_list
1214 target["vnf"].append(target_vnf)
1215
1216 desc = await self.RO.deploy(nsr_id, target)
1217 self.logger.debug("RO return > {}".format(desc))
1218 action_id = desc["action_id"]
1219 await self._wait_ng_ro(
1220 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1221 )
1222
1223 # Updating NSR
1224 db_nsr_update = {
1225 "_admin.deployed.RO.operational-status": "running",
1226 "detailed-status": " ".join(stage),
1227 }
1228 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1229 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1230 self._write_op_status(nslcmop_id, stage)
1231 self.logger.debug(
1232 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1233 )
1234 return
1235
1236 async def _wait_ng_ro(
1237 self,
1238 nsr_id,
1239 action_id,
1240 nslcmop_id=None,
1241 start_time=None,
1242 timeout=600,
1243 stage=None,
1244 ):
1245 detailed_status_old = None
1246 db_nsr_update = {}
1247 start_time = start_time or time()
1248 while time() <= start_time + timeout:
1249 desc_status = await self.RO.status(nsr_id, action_id)
1250 self.logger.debug("Wait NG RO > {}".format(desc_status))
1251 if desc_status["status"] == "FAILED":
1252 raise NgRoException(desc_status["details"])
1253 elif desc_status["status"] == "BUILD":
1254 if stage:
1255 stage[2] = "VIM: ({})".format(desc_status["details"])
1256 elif desc_status["status"] == "DONE":
1257 if stage:
1258 stage[2] = "Deployed at VIM"
1259 break
1260 else:
1261 assert False, "ROclient.check_ns_status returns unknown {}".format(
1262 desc_status["status"]
1263 )
1264 if stage and nslcmop_id and stage[2] != detailed_status_old:
1265 detailed_status_old = stage[2]
1266 db_nsr_update["detailed-status"] = " ".join(stage)
1267 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1268 self._write_op_status(nslcmop_id, stage)
1269 await asyncio.sleep(15, loop=self.loop)
1270 else: # timeout_ns_deploy
1271 raise NgRoException("Timeout waiting ns to deploy")
1272
1273 async def _terminate_ng_ro(
1274 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1275 ):
1276 db_nsr_update = {}
1277 failed_detail = []
1278 action_id = None
1279 start_deploy = time()
1280 try:
1281 target = {
1282 "ns": {"vld": []},
1283 "vnf": [],
1284 "image": [],
1285 "flavor": [],
1286 "action_id": nslcmop_id,
1287 }
1288 desc = await self.RO.deploy(nsr_id, target)
1289 action_id = desc["action_id"]
1290 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1291 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1292 self.logger.debug(
1293 logging_text
1294 + "ns terminate action at RO. action_id={}".format(action_id)
1295 )
1296
1297 # wait until done
1298 delete_timeout = 20 * 60 # 20 minutes
1299 await self._wait_ng_ro(
1300 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1301 )
1302
1303 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1304 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1305 # delete all nsr
1306 await self.RO.delete(nsr_id)
1307 except Exception as e:
1308 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1309 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1310 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1311 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1312 self.logger.debug(
1313 logging_text + "RO_action_id={} already deleted".format(action_id)
1314 )
1315 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1316 failed_detail.append("delete conflict: {}".format(e))
1317 self.logger.debug(
1318 logging_text
1319 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1320 )
1321 else:
1322 failed_detail.append("delete error: {}".format(e))
1323 self.logger.error(
1324 logging_text
1325 + "RO_action_id={} delete error: {}".format(action_id, e)
1326 )
1327
1328 if failed_detail:
1329 stage[2] = "Error deleting from VIM"
1330 else:
1331 stage[2] = "Deleted from VIM"
1332 db_nsr_update["detailed-status"] = " ".join(stage)
1333 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1334 self._write_op_status(nslcmop_id, stage)
1335
1336 if failed_detail:
1337 raise LcmException("; ".join(failed_detail))
1338 return
1339
1340 async def instantiate_RO(
1341 self,
1342 logging_text,
1343 nsr_id,
1344 nsd,
1345 db_nsr,
1346 db_nslcmop,
1347 db_vnfrs,
1348 db_vnfds,
1349 n2vc_key_list,
1350 stage,
1351 ):
1352 """
1353 Instantiate at RO
1354 :param logging_text: preffix text to use at logging
1355 :param nsr_id: nsr identity
1356 :param nsd: database content of ns descriptor
1357 :param db_nsr: database content of ns record
1358 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1359 :param db_vnfrs:
1360 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1361 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1362 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1363 :return: None or exception
1364 """
1365 try:
1366 start_deploy = time()
1367 ns_params = db_nslcmop.get("operationParams")
1368 if ns_params and ns_params.get("timeout_ns_deploy"):
1369 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1370 else:
1371 timeout_ns_deploy = self.timeout.get(
1372 "ns_deploy", self.timeout_ns_deploy
1373 )
1374
1375 # Check for and optionally request placement optimization. Database will be updated if placement activated
1376 stage[2] = "Waiting for Placement."
1377 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1378 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1379 for vnfr in db_vnfrs.values():
1380 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1381 break
1382 else:
1383 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1384
1385 return await self._instantiate_ng_ro(
1386 logging_text,
1387 nsr_id,
1388 nsd,
1389 db_nsr,
1390 db_nslcmop,
1391 db_vnfrs,
1392 db_vnfds,
1393 n2vc_key_list,
1394 stage,
1395 start_deploy,
1396 timeout_ns_deploy,
1397 )
1398 except Exception as e:
1399 stage[2] = "ERROR deploying at VIM"
1400 self.set_vnfr_at_error(db_vnfrs, str(e))
1401 self.logger.error(
1402 "Error deploying at VIM {}".format(e),
1403 exc_info=not isinstance(
1404 e,
1405 (
1406 ROclient.ROClientException,
1407 LcmException,
1408 DbException,
1409 NgRoException,
1410 ),
1411 ),
1412 )
1413 raise
1414
1415 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1416 """
1417 Wait for kdu to be up, get ip address
1418 :param logging_text: prefix use for logging
1419 :param nsr_id:
1420 :param vnfr_id:
1421 :param kdu_name:
1422 :return: IP address
1423 """
1424
1425 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1426 nb_tries = 0
1427
1428 while nb_tries < 360:
1429 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1430 kdur = next(
1431 (
1432 x
1433 for x in get_iterable(db_vnfr, "kdur")
1434 if x.get("kdu-name") == kdu_name
1435 ),
1436 None,
1437 )
1438 if not kdur:
1439 raise LcmException(
1440 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1441 )
1442 if kdur.get("status"):
1443 if kdur["status"] in ("READY", "ENABLED"):
1444 return kdur.get("ip-address")
1445 else:
1446 raise LcmException(
1447 "target KDU={} is in error state".format(kdu_name)
1448 )
1449
1450 await asyncio.sleep(10, loop=self.loop)
1451 nb_tries += 1
1452 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1453
1454 async def wait_vm_up_insert_key_ro(
1455 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1456 ):
1457 """
1458 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1459 :param logging_text: prefix use for logging
1460 :param nsr_id:
1461 :param vnfr_id:
1462 :param vdu_id:
1463 :param vdu_index:
1464 :param pub_key: public ssh key to inject, None to skip
1465 :param user: user to apply the public ssh key
1466 :return: IP address
1467 """
1468
1469 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1470 ro_nsr_id = None
1471 ip_address = None
1472 nb_tries = 0
1473 target_vdu_id = None
1474 ro_retries = 0
1475
1476 while True:
1477
1478 ro_retries += 1
1479 if ro_retries >= 360: # 1 hour
1480 raise LcmException(
1481 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1482 )
1483
1484 await asyncio.sleep(10, loop=self.loop)
1485
1486 # get ip address
1487 if not target_vdu_id:
1488 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1489
1490 if not vdu_id: # for the VNF case
1491 if db_vnfr.get("status") == "ERROR":
1492 raise LcmException(
1493 "Cannot inject ssh-key because target VNF is in error state"
1494 )
1495 ip_address = db_vnfr.get("ip-address")
1496 if not ip_address:
1497 continue
1498 vdur = next(
1499 (
1500 x
1501 for x in get_iterable(db_vnfr, "vdur")
1502 if x.get("ip-address") == ip_address
1503 ),
1504 None,
1505 )
1506 else: # VDU case
1507 vdur = next(
1508 (
1509 x
1510 for x in get_iterable(db_vnfr, "vdur")
1511 if x.get("vdu-id-ref") == vdu_id
1512 and x.get("count-index") == vdu_index
1513 ),
1514 None,
1515 )
1516
1517 if (
1518 not vdur and len(db_vnfr.get("vdur", ())) == 1
1519 ): # If only one, this should be the target vdu
1520 vdur = db_vnfr["vdur"][0]
1521 if not vdur:
1522 raise LcmException(
1523 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1524 vnfr_id, vdu_id, vdu_index
1525 )
1526 )
1527 # New generation RO stores information at "vim_info"
1528 ng_ro_status = None
1529 target_vim = None
1530 if vdur.get("vim_info"):
1531 target_vim = next(
1532 t for t in vdur["vim_info"]
1533 ) # there should be only one key
1534 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1535 if (
1536 vdur.get("pdu-type")
1537 or vdur.get("status") == "ACTIVE"
1538 or ng_ro_status == "ACTIVE"
1539 ):
1540 ip_address = vdur.get("ip-address")
1541 if not ip_address:
1542 continue
1543 target_vdu_id = vdur["vdu-id-ref"]
1544 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1545 raise LcmException(
1546 "Cannot inject ssh-key because target VM is in error state"
1547 )
1548
1549 if not target_vdu_id:
1550 continue
1551
1552 # inject public key into machine
1553 if pub_key and user:
1554 self.logger.debug(logging_text + "Inserting RO key")
1555 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1556 if vdur.get("pdu-type"):
1557 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1558 return ip_address
1559 try:
1560 ro_vm_id = "{}-{}".format(
1561 db_vnfr["member-vnf-index-ref"], target_vdu_id
1562 ) # TODO add vdu_index
1563 if self.ng_ro:
1564 target = {
1565 "action": {
1566 "action": "inject_ssh_key",
1567 "key": pub_key,
1568 "user": user,
1569 },
1570 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1571 }
1572 desc = await self.RO.deploy(nsr_id, target)
1573 action_id = desc["action_id"]
1574 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1575 break
1576 else:
1577 # wait until NS is deployed at RO
1578 if not ro_nsr_id:
1579 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1580 ro_nsr_id = deep_get(
1581 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1582 )
1583 if not ro_nsr_id:
1584 continue
1585 result_dict = await self.RO.create_action(
1586 item="ns",
1587 item_id_name=ro_nsr_id,
1588 descriptor={
1589 "add_public_key": pub_key,
1590 "vms": [ro_vm_id],
1591 "user": user,
1592 },
1593 )
1594 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1595 if not result_dict or not isinstance(result_dict, dict):
1596 raise LcmException(
1597 "Unknown response from RO when injecting key"
1598 )
1599 for result in result_dict.values():
1600 if result.get("vim_result") == 200:
1601 break
1602 else:
1603 raise ROclient.ROClientException(
1604 "error injecting key: {}".format(
1605 result.get("description")
1606 )
1607 )
1608 break
1609 except NgRoException as e:
1610 raise LcmException(
1611 "Reaching max tries injecting key. Error: {}".format(e)
1612 )
1613 except ROclient.ROClientException as e:
1614 if not nb_tries:
1615 self.logger.debug(
1616 logging_text
1617 + "error injecting key: {}. Retrying until {} seconds".format(
1618 e, 20 * 10
1619 )
1620 )
1621 nb_tries += 1
1622 if nb_tries >= 20:
1623 raise LcmException(
1624 "Reaching max tries injecting key. Error: {}".format(e)
1625 )
1626 else:
1627 break
1628
1629 return ip_address
1630
1631 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1632 """
1633 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1634 """
1635 my_vca = vca_deployed_list[vca_index]
1636 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1637 # vdu or kdu: no dependencies
1638 return
1639 timeout = 300
1640 while timeout >= 0:
1641 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1642 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1643 configuration_status_list = db_nsr["configurationStatus"]
1644 for index, vca_deployed in enumerate(configuration_status_list):
1645 if index == vca_index:
1646 # myself
1647 continue
1648 if not my_vca.get("member-vnf-index") or (
1649 vca_deployed.get("member-vnf-index")
1650 == my_vca.get("member-vnf-index")
1651 ):
1652 internal_status = configuration_status_list[index].get("status")
1653 if internal_status == "READY":
1654 continue
1655 elif internal_status == "BROKEN":
1656 raise LcmException(
1657 "Configuration aborted because dependent charm/s has failed"
1658 )
1659 else:
1660 break
1661 else:
1662 # no dependencies, return
1663 return
1664 await asyncio.sleep(10)
1665 timeout -= 1
1666
1667 raise LcmException("Configuration aborted because dependent charm/s timeout")
1668
1669 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1670 vca_id = None
1671 if db_vnfr:
1672 vca_id = deep_get(db_vnfr, ("vca-id",))
1673 elif db_nsr:
1674 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1675 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1676 return vca_id
1677
1678 async def instantiate_N2VC(
1679 self,
1680 logging_text,
1681 vca_index,
1682 nsi_id,
1683 db_nsr,
1684 db_vnfr,
1685 vdu_id,
1686 kdu_name,
1687 vdu_index,
1688 config_descriptor,
1689 deploy_params,
1690 base_folder,
1691 nslcmop_id,
1692 stage,
1693 vca_type,
1694 vca_name,
1695 ee_config_descriptor,
1696 ):
1697 nsr_id = db_nsr["_id"]
1698 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1699 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1700 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1701 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1702 db_dict = {
1703 "collection": "nsrs",
1704 "filter": {"_id": nsr_id},
1705 "path": db_update_entry,
1706 }
1707 step = ""
1708 try:
1709
1710 element_type = "NS"
1711 element_under_configuration = nsr_id
1712
1713 vnfr_id = None
1714 if db_vnfr:
1715 vnfr_id = db_vnfr["_id"]
1716 osm_config["osm"]["vnf_id"] = vnfr_id
1717
1718 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1719
1720 if vca_type == "native_charm":
1721 index_number = 0
1722 else:
1723 index_number = vdu_index or 0
1724
1725 if vnfr_id:
1726 element_type = "VNF"
1727 element_under_configuration = vnfr_id
1728 namespace += ".{}-{}".format(vnfr_id, index_number)
1729 if vdu_id:
1730 namespace += ".{}-{}".format(vdu_id, index_number)
1731 element_type = "VDU"
1732 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1733 osm_config["osm"]["vdu_id"] = vdu_id
1734 elif kdu_name:
1735 namespace += ".{}".format(kdu_name)
1736 element_type = "KDU"
1737 element_under_configuration = kdu_name
1738 osm_config["osm"]["kdu_name"] = kdu_name
1739
1740 # Get artifact path
1741 if base_folder["pkg-dir"]:
1742 artifact_path = "{}/{}/{}/{}".format(
1743 base_folder["folder"],
1744 base_folder["pkg-dir"],
1745 "charms"
1746 if vca_type
1747 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1748 else "helm-charts",
1749 vca_name,
1750 )
1751 else:
1752 artifact_path = "{}/Scripts/{}/{}/".format(
1753 base_folder["folder"],
1754 "charms"
1755 if vca_type
1756 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1757 else "helm-charts",
1758 vca_name,
1759 )
1760
1761 self.logger.debug("Artifact path > {}".format(artifact_path))
1762
1763 # get initial_config_primitive_list that applies to this element
1764 initial_config_primitive_list = config_descriptor.get(
1765 "initial-config-primitive"
1766 )
1767
1768 self.logger.debug(
1769 "Initial config primitive list > {}".format(
1770 initial_config_primitive_list
1771 )
1772 )
1773
1774 # add config if not present for NS charm
1775 ee_descriptor_id = ee_config_descriptor.get("id")
1776 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1777 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1778 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1779 )
1780
1781 self.logger.debug(
1782 "Initial config primitive list #2 > {}".format(
1783 initial_config_primitive_list
1784 )
1785 )
1786 # n2vc_redesign STEP 3.1
1787 # find old ee_id if exists
1788 ee_id = vca_deployed.get("ee_id")
1789
1790 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1791 # create or register execution environment in VCA
1792 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1793
1794 self._write_configuration_status(
1795 nsr_id=nsr_id,
1796 vca_index=vca_index,
1797 status="CREATING",
1798 element_under_configuration=element_under_configuration,
1799 element_type=element_type,
1800 )
1801
1802 step = "create execution environment"
1803 self.logger.debug(logging_text + step)
1804
1805 ee_id = None
1806 credentials = None
1807 if vca_type == "k8s_proxy_charm":
1808 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1809 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1810 namespace=namespace,
1811 artifact_path=artifact_path,
1812 db_dict=db_dict,
1813 vca_id=vca_id,
1814 )
1815 elif vca_type == "helm" or vca_type == "helm-v3":
1816 ee_id, credentials = await self.vca_map[
1817 vca_type
1818 ].create_execution_environment(
1819 namespace=namespace,
1820 reuse_ee_id=ee_id,
1821 db_dict=db_dict,
1822 config=osm_config,
1823 artifact_path=artifact_path,
1824 vca_type=vca_type,
1825 )
1826 else:
1827 ee_id, credentials = await self.vca_map[
1828 vca_type
1829 ].create_execution_environment(
1830 namespace=namespace,
1831 reuse_ee_id=ee_id,
1832 db_dict=db_dict,
1833 vca_id=vca_id,
1834 )
1835
1836 elif vca_type == "native_charm":
1837 step = "Waiting to VM being up and getting IP address"
1838 self.logger.debug(logging_text + step)
1839 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1840 logging_text,
1841 nsr_id,
1842 vnfr_id,
1843 vdu_id,
1844 vdu_index,
1845 user=None,
1846 pub_key=None,
1847 )
1848 credentials = {"hostname": rw_mgmt_ip}
1849 # get username
1850 username = deep_get(
1851 config_descriptor, ("config-access", "ssh-access", "default-user")
1852 )
1853 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1854 # merged. Meanwhile let's get username from initial-config-primitive
1855 if not username and initial_config_primitive_list:
1856 for config_primitive in initial_config_primitive_list:
1857 for param in config_primitive.get("parameter", ()):
1858 if param["name"] == "ssh-username":
1859 username = param["value"]
1860 break
1861 if not username:
1862 raise LcmException(
1863 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1864 "'config-access.ssh-access.default-user'"
1865 )
1866 credentials["username"] = username
1867 # n2vc_redesign STEP 3.2
1868
1869 self._write_configuration_status(
1870 nsr_id=nsr_id,
1871 vca_index=vca_index,
1872 status="REGISTERING",
1873 element_under_configuration=element_under_configuration,
1874 element_type=element_type,
1875 )
1876
1877 step = "register execution environment {}".format(credentials)
1878 self.logger.debug(logging_text + step)
1879 ee_id = await self.vca_map[vca_type].register_execution_environment(
1880 credentials=credentials,
1881 namespace=namespace,
1882 db_dict=db_dict,
1883 vca_id=vca_id,
1884 )
1885
1886 # for compatibility with MON/POL modules, the need model and application name at database
1887 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1888 ee_id_parts = ee_id.split(".")
1889 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1890 if len(ee_id_parts) >= 2:
1891 model_name = ee_id_parts[0]
1892 application_name = ee_id_parts[1]
1893 db_nsr_update[db_update_entry + "model"] = model_name
1894 db_nsr_update[db_update_entry + "application"] = application_name
1895
1896 # n2vc_redesign STEP 3.3
1897 step = "Install configuration Software"
1898
1899 self._write_configuration_status(
1900 nsr_id=nsr_id,
1901 vca_index=vca_index,
1902 status="INSTALLING SW",
1903 element_under_configuration=element_under_configuration,
1904 element_type=element_type,
1905 other_update=db_nsr_update,
1906 )
1907
1908 # TODO check if already done
1909 self.logger.debug(logging_text + step)
1910 config = None
1911 if vca_type == "native_charm":
1912 config_primitive = next(
1913 (p for p in initial_config_primitive_list if p["name"] == "config"),
1914 None,
1915 )
1916 if config_primitive:
1917 config = self._map_primitive_params(
1918 config_primitive, {}, deploy_params
1919 )
1920 num_units = 1
1921 if vca_type == "lxc_proxy_charm":
1922 if element_type == "NS":
1923 num_units = db_nsr.get("config-units") or 1
1924 elif element_type == "VNF":
1925 num_units = db_vnfr.get("config-units") or 1
1926 elif element_type == "VDU":
1927 for v in db_vnfr["vdur"]:
1928 if vdu_id == v["vdu-id-ref"]:
1929 num_units = v.get("config-units") or 1
1930 break
1931 if vca_type != "k8s_proxy_charm":
1932 await self.vca_map[vca_type].install_configuration_sw(
1933 ee_id=ee_id,
1934 artifact_path=artifact_path,
1935 db_dict=db_dict,
1936 config=config,
1937 num_units=num_units,
1938 vca_id=vca_id,
1939 vca_type=vca_type,
1940 )
1941
1942 # write in db flag of configuration_sw already installed
1943 self.update_db_2(
1944 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1945 )
1946
1947 # add relations for this VCA (wait for other peers related with this VCA)
1948 await self._add_vca_relations(
1949 logging_text=logging_text,
1950 nsr_id=nsr_id,
1951 vca_type=vca_type,
1952 vca_index=vca_index,
1953 )
1954
1955 # if SSH access is required, then get execution environment SSH public
1956 # if native charm we have waited already to VM be UP
1957 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1958 pub_key = None
1959 user = None
1960 # self.logger.debug("get ssh key block")
1961 if deep_get(
1962 config_descriptor, ("config-access", "ssh-access", "required")
1963 ):
1964 # self.logger.debug("ssh key needed")
1965 # Needed to inject a ssh key
1966 user = deep_get(
1967 config_descriptor,
1968 ("config-access", "ssh-access", "default-user"),
1969 )
1970 step = "Install configuration Software, getting public ssh key"
1971 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1972 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1973 )
1974
1975 step = "Insert public key into VM user={} ssh_key={}".format(
1976 user, pub_key
1977 )
1978 else:
1979 # self.logger.debug("no need to get ssh key")
1980 step = "Waiting to VM being up and getting IP address"
1981 self.logger.debug(logging_text + step)
1982
1983 # n2vc_redesign STEP 5.1
1984 # wait for RO (ip-address) Insert pub_key into VM
1985 if vnfr_id:
1986 if kdu_name:
1987 rw_mgmt_ip = await self.wait_kdu_up(
1988 logging_text, nsr_id, vnfr_id, kdu_name
1989 )
1990 else:
1991 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1992 logging_text,
1993 nsr_id,
1994 vnfr_id,
1995 vdu_id,
1996 vdu_index,
1997 user=user,
1998 pub_key=pub_key,
1999 )
2000 else:
2001 rw_mgmt_ip = None # This is for a NS configuration
2002
2003 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
2004
2005 # store rw_mgmt_ip in deploy params for later replacement
2006 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
2007
2008 # n2vc_redesign STEP 6 Execute initial config primitive
2009 step = "execute initial config primitive"
2010
2011 # wait for dependent primitives execution (NS -> VNF -> VDU)
2012 if initial_config_primitive_list:
2013 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
2014
2015 # stage, in function of element type: vdu, kdu, vnf or ns
2016 my_vca = vca_deployed_list[vca_index]
2017 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
2018 # VDU or KDU
2019 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2020 elif my_vca.get("member-vnf-index"):
2021 # VNF
2022 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2023 else:
2024 # NS
2025 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2026
2027 self._write_configuration_status(
2028 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2029 )
2030
2031 self._write_op_status(op_id=nslcmop_id, stage=stage)
2032
2033 check_if_terminated_needed = True
2034 for initial_config_primitive in initial_config_primitive_list:
2035 # adding information on the vca_deployed if it is a NS execution environment
2036 if not vca_deployed["member-vnf-index"]:
2037 deploy_params["ns_config_info"] = json.dumps(
2038 self._get_ns_config_info(nsr_id)
2039 )
2040 # TODO check if already done
2041 primitive_params_ = self._map_primitive_params(
2042 initial_config_primitive, {}, deploy_params
2043 )
2044
2045 step = "execute primitive '{}' params '{}'".format(
2046 initial_config_primitive["name"], primitive_params_
2047 )
2048 self.logger.debug(logging_text + step)
2049 await self.vca_map[vca_type].exec_primitive(
2050 ee_id=ee_id,
2051 primitive_name=initial_config_primitive["name"],
2052 params_dict=primitive_params_,
2053 db_dict=db_dict,
2054 vca_id=vca_id,
2055 vca_type=vca_type,
2056 )
2057 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2058 if check_if_terminated_needed:
2059 if config_descriptor.get("terminate-config-primitive"):
2060 self.update_db_2(
2061 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2062 )
2063 check_if_terminated_needed = False
2064
2065 # TODO register in database that primitive is done
2066
2067 # STEP 7 Configure metrics
2068 if vca_type == "helm" or vca_type == "helm-v3":
2069 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2070 ee_id=ee_id,
2071 artifact_path=artifact_path,
2072 ee_config_descriptor=ee_config_descriptor,
2073 vnfr_id=vnfr_id,
2074 nsr_id=nsr_id,
2075 target_ip=rw_mgmt_ip,
2076 )
2077 if prometheus_jobs:
2078 self.update_db_2(
2079 "nsrs",
2080 nsr_id,
2081 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2082 )
2083
2084 for job in prometheus_jobs:
2085 self.db.set_one(
2086 "prometheus_jobs",
2087 {"job_name": job["job_name"]},
2088 job,
2089 upsert=True,
2090 fail_on_empty=False,
2091 )
2092
2093 step = "instantiated at VCA"
2094 self.logger.debug(logging_text + step)
2095
2096 self._write_configuration_status(
2097 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2098 )
2099
2100 except Exception as e: # TODO not use Exception but N2VC exception
2101 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2102 if not isinstance(
2103 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2104 ):
2105 self.logger.error(
2106 "Exception while {} : {}".format(step, e), exc_info=True
2107 )
2108 self._write_configuration_status(
2109 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2110 )
2111 raise LcmException("{} {}".format(step, e)) from e
2112
2113 def _write_ns_status(
2114 self,
2115 nsr_id: str,
2116 ns_state: str,
2117 current_operation: str,
2118 current_operation_id: str,
2119 error_description: str = None,
2120 error_detail: str = None,
2121 other_update: dict = None,
2122 ):
2123 """
2124 Update db_nsr fields.
2125 :param nsr_id:
2126 :param ns_state:
2127 :param current_operation:
2128 :param current_operation_id:
2129 :param error_description:
2130 :param error_detail:
2131 :param other_update: Other required changes at database if provided, will be cleared
2132 :return:
2133 """
2134 try:
2135 db_dict = other_update or {}
2136 db_dict[
2137 "_admin.nslcmop"
2138 ] = current_operation_id # for backward compatibility
2139 db_dict["_admin.current-operation"] = current_operation_id
2140 db_dict["_admin.operation-type"] = (
2141 current_operation if current_operation != "IDLE" else None
2142 )
2143 db_dict["currentOperation"] = current_operation
2144 db_dict["currentOperationID"] = current_operation_id
2145 db_dict["errorDescription"] = error_description
2146 db_dict["errorDetail"] = error_detail
2147
2148 if ns_state:
2149 db_dict["nsState"] = ns_state
2150 self.update_db_2("nsrs", nsr_id, db_dict)
2151 except DbException as e:
2152 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2153
2154 def _write_op_status(
2155 self,
2156 op_id: str,
2157 stage: list = None,
2158 error_message: str = None,
2159 queuePosition: int = 0,
2160 operation_state: str = None,
2161 other_update: dict = None,
2162 ):
2163 try:
2164 db_dict = other_update or {}
2165 db_dict["queuePosition"] = queuePosition
2166 if isinstance(stage, list):
2167 db_dict["stage"] = stage[0]
2168 db_dict["detailed-status"] = " ".join(stage)
2169 elif stage is not None:
2170 db_dict["stage"] = str(stage)
2171
2172 if error_message is not None:
2173 db_dict["errorMessage"] = error_message
2174 if operation_state is not None:
2175 db_dict["operationState"] = operation_state
2176 db_dict["statusEnteredTime"] = time()
2177 self.update_db_2("nslcmops", op_id, db_dict)
2178 except DbException as e:
2179 self.logger.warn(
2180 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2181 )
2182
2183 def _write_all_config_status(self, db_nsr: dict, status: str):
2184 try:
2185 nsr_id = db_nsr["_id"]
2186 # configurationStatus
2187 config_status = db_nsr.get("configurationStatus")
2188 if config_status:
2189 db_nsr_update = {
2190 "configurationStatus.{}.status".format(index): status
2191 for index, v in enumerate(config_status)
2192 if v
2193 }
2194 # update status
2195 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2196
2197 except DbException as e:
2198 self.logger.warn(
2199 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2200 )
2201
2202 def _write_configuration_status(
2203 self,
2204 nsr_id: str,
2205 vca_index: int,
2206 status: str = None,
2207 element_under_configuration: str = None,
2208 element_type: str = None,
2209 other_update: dict = None,
2210 ):
2211
2212 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2213 # .format(vca_index, status))
2214
2215 try:
2216 db_path = "configurationStatus.{}.".format(vca_index)
2217 db_dict = other_update or {}
2218 if status:
2219 db_dict[db_path + "status"] = status
2220 if element_under_configuration:
2221 db_dict[
2222 db_path + "elementUnderConfiguration"
2223 ] = element_under_configuration
2224 if element_type:
2225 db_dict[db_path + "elementType"] = element_type
2226 self.update_db_2("nsrs", nsr_id, db_dict)
2227 except DbException as e:
2228 self.logger.warn(
2229 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2230 status, nsr_id, vca_index, e
2231 )
2232 )
2233
2234 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2235 """
2236 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2237 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2238 Database is used because the result can be obtained from a different LCM worker in case of HA.
2239 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2240 :param db_nslcmop: database content of nslcmop
2241 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2242 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2243 computed 'vim-account-id'
2244 """
2245 modified = False
2246 nslcmop_id = db_nslcmop["_id"]
2247 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2248 if placement_engine == "PLA":
2249 self.logger.debug(
2250 logging_text + "Invoke and wait for placement optimization"
2251 )
2252 await self.msg.aiowrite(
2253 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2254 )
2255 db_poll_interval = 5
2256 wait = db_poll_interval * 10
2257 pla_result = None
2258 while not pla_result and wait >= 0:
2259 await asyncio.sleep(db_poll_interval)
2260 wait -= db_poll_interval
2261 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2262 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2263
2264 if not pla_result:
2265 raise LcmException(
2266 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2267 )
2268
2269 for pla_vnf in pla_result["vnf"]:
2270 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2271 if not pla_vnf.get("vimAccountId") or not vnfr:
2272 continue
2273 modified = True
2274 self.db.set_one(
2275 "vnfrs",
2276 {"_id": vnfr["_id"]},
2277 {"vim-account-id": pla_vnf["vimAccountId"]},
2278 )
2279 # Modifies db_vnfrs
2280 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2281 return modified
2282
2283 def update_nsrs_with_pla_result(self, params):
2284 try:
2285 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2286 self.update_db_2(
2287 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2288 )
2289 except Exception as e:
2290 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2291
2292 async def instantiate(self, nsr_id, nslcmop_id):
2293 """
2294
2295 :param nsr_id: ns instance to deploy
2296 :param nslcmop_id: operation to run
2297 :return:
2298 """
2299
2300 # Try to lock HA task here
2301 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2302 if not task_is_locked_by_me:
2303 self.logger.debug(
2304 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2305 )
2306 return
2307
2308 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2309 self.logger.debug(logging_text + "Enter")
2310
2311 # get all needed from database
2312
2313 # database nsrs record
2314 db_nsr = None
2315
2316 # database nslcmops record
2317 db_nslcmop = None
2318
2319 # update operation on nsrs
2320 db_nsr_update = {}
2321 # update operation on nslcmops
2322 db_nslcmop_update = {}
2323
2324 nslcmop_operation_state = None
2325 db_vnfrs = {} # vnf's info indexed by member-index
2326 # n2vc_info = {}
2327 tasks_dict_info = {} # from task to info text
2328 exc = None
2329 error_list = []
2330 stage = [
2331 "Stage 1/5: preparation of the environment.",
2332 "Waiting for previous operations to terminate.",
2333 "",
2334 ]
2335 # ^ stage, step, VIM progress
2336 try:
2337 # wait for any previous tasks in process
2338 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2339
2340 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2341 stage[1] = "Reading from database."
2342 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2343 db_nsr_update["detailed-status"] = "creating"
2344 db_nsr_update["operational-status"] = "init"
2345 self._write_ns_status(
2346 nsr_id=nsr_id,
2347 ns_state="BUILDING",
2348 current_operation="INSTANTIATING",
2349 current_operation_id=nslcmop_id,
2350 other_update=db_nsr_update,
2351 )
2352 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2353
2354 # read from db: operation
2355 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2356 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2357 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2358 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2359 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2360 )
2361 ns_params = db_nslcmop.get("operationParams")
2362 if ns_params and ns_params.get("timeout_ns_deploy"):
2363 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2364 else:
2365 timeout_ns_deploy = self.timeout.get(
2366 "ns_deploy", self.timeout_ns_deploy
2367 )
2368
2369 # read from db: ns
2370 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2371 self.logger.debug(logging_text + stage[1])
2372 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2373 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2374 self.logger.debug(logging_text + stage[1])
2375 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2376 self.fs.sync(db_nsr["nsd-id"])
2377 db_nsr["nsd"] = nsd
2378 # nsr_name = db_nsr["name"] # TODO short-name??
2379
2380 # read from db: vnf's of this ns
2381 stage[1] = "Getting vnfrs from db."
2382 self.logger.debug(logging_text + stage[1])
2383 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2384
2385 # read from db: vnfd's for every vnf
2386 db_vnfds = [] # every vnfd data
2387
2388 # for each vnf in ns, read vnfd
2389 for vnfr in db_vnfrs_list:
2390 if vnfr.get("kdur"):
2391 kdur_list = []
2392 for kdur in vnfr["kdur"]:
2393 if kdur.get("additionalParams"):
2394 kdur["additionalParams"] = json.loads(
2395 kdur["additionalParams"]
2396 )
2397 kdur_list.append(kdur)
2398 vnfr["kdur"] = kdur_list
2399
2400 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2401 vnfd_id = vnfr["vnfd-id"]
2402 vnfd_ref = vnfr["vnfd-ref"]
2403 self.fs.sync(vnfd_id)
2404
2405 # if we haven't this vnfd, read it from db
2406 if vnfd_id not in db_vnfds:
2407 # read from db
2408 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2409 vnfd_id, vnfd_ref
2410 )
2411 self.logger.debug(logging_text + stage[1])
2412 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2413
2414 # store vnfd
2415 db_vnfds.append(vnfd)
2416
2417 # Get or generates the _admin.deployed.VCA list
2418 vca_deployed_list = None
2419 if db_nsr["_admin"].get("deployed"):
2420 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2421 if vca_deployed_list is None:
2422 vca_deployed_list = []
2423 configuration_status_list = []
2424 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2425 db_nsr_update["configurationStatus"] = configuration_status_list
2426 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2427 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2428 elif isinstance(vca_deployed_list, dict):
2429 # maintain backward compatibility. Change a dict to list at database
2430 vca_deployed_list = list(vca_deployed_list.values())
2431 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2432 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2433
2434 if not isinstance(
2435 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2436 ):
2437 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2438 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2439
2440 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2441 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2442 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2443 self.db.set_list(
2444 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2445 )
2446
2447 # n2vc_redesign STEP 2 Deploy Network Scenario
2448 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2449 self._write_op_status(op_id=nslcmop_id, stage=stage)
2450
2451 stage[1] = "Deploying KDUs."
2452 # self.logger.debug(logging_text + "Before deploy_kdus")
2453 # Call to deploy_kdus in case exists the "vdu:kdu" param
2454 await self.deploy_kdus(
2455 logging_text=logging_text,
2456 nsr_id=nsr_id,
2457 nslcmop_id=nslcmop_id,
2458 db_vnfrs=db_vnfrs,
2459 db_vnfds=db_vnfds,
2460 task_instantiation_info=tasks_dict_info,
2461 )
2462
2463 stage[1] = "Getting VCA public key."
2464 # n2vc_redesign STEP 1 Get VCA public ssh-key
2465 # feature 1429. Add n2vc public key to needed VMs
2466 n2vc_key = self.n2vc.get_public_key()
2467 n2vc_key_list = [n2vc_key]
2468 if self.vca_config.get("public_key"):
2469 n2vc_key_list.append(self.vca_config["public_key"])
2470
2471 stage[1] = "Deploying NS at VIM."
2472 task_ro = asyncio.ensure_future(
2473 self.instantiate_RO(
2474 logging_text=logging_text,
2475 nsr_id=nsr_id,
2476 nsd=nsd,
2477 db_nsr=db_nsr,
2478 db_nslcmop=db_nslcmop,
2479 db_vnfrs=db_vnfrs,
2480 db_vnfds=db_vnfds,
2481 n2vc_key_list=n2vc_key_list,
2482 stage=stage,
2483 )
2484 )
2485 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2486 tasks_dict_info[task_ro] = "Deploying at VIM"
2487
2488 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2489 stage[1] = "Deploying Execution Environments."
2490 self.logger.debug(logging_text + stage[1])
2491
2492 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2493 for vnf_profile in get_vnf_profiles(nsd):
2494 vnfd_id = vnf_profile["vnfd-id"]
2495 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2496 member_vnf_index = str(vnf_profile["id"])
2497 db_vnfr = db_vnfrs[member_vnf_index]
2498 base_folder = vnfd["_admin"]["storage"]
2499 vdu_id = None
2500 vdu_index = 0
2501 vdu_name = None
2502 kdu_name = None
2503
2504 # Get additional parameters
2505 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2506 if db_vnfr.get("additionalParamsForVnf"):
2507 deploy_params.update(
2508 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2509 )
2510
2511 descriptor_config = get_configuration(vnfd, vnfd["id"])
2512 if descriptor_config:
2513 self._deploy_n2vc(
2514 logging_text=logging_text
2515 + "member_vnf_index={} ".format(member_vnf_index),
2516 db_nsr=db_nsr,
2517 db_vnfr=db_vnfr,
2518 nslcmop_id=nslcmop_id,
2519 nsr_id=nsr_id,
2520 nsi_id=nsi_id,
2521 vnfd_id=vnfd_id,
2522 vdu_id=vdu_id,
2523 kdu_name=kdu_name,
2524 member_vnf_index=member_vnf_index,
2525 vdu_index=vdu_index,
2526 vdu_name=vdu_name,
2527 deploy_params=deploy_params,
2528 descriptor_config=descriptor_config,
2529 base_folder=base_folder,
2530 task_instantiation_info=tasks_dict_info,
2531 stage=stage,
2532 )
2533
2534 # Deploy charms for each VDU that supports one.
2535 for vdud in get_vdu_list(vnfd):
2536 vdu_id = vdud["id"]
2537 descriptor_config = get_configuration(vnfd, vdu_id)
2538 vdur = find_in_list(
2539 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2540 )
2541
2542 if vdur.get("additionalParams"):
2543 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2544 else:
2545 deploy_params_vdu = deploy_params
2546 deploy_params_vdu["OSM"] = get_osm_params(
2547 db_vnfr, vdu_id, vdu_count_index=0
2548 )
2549 vdud_count = get_number_of_instances(vnfd, vdu_id)
2550
2551 self.logger.debug("VDUD > {}".format(vdud))
2552 self.logger.debug(
2553 "Descriptor config > {}".format(descriptor_config)
2554 )
2555 if descriptor_config:
2556 vdu_name = None
2557 kdu_name = None
2558 for vdu_index in range(vdud_count):
2559 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2560 self._deploy_n2vc(
2561 logging_text=logging_text
2562 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2563 member_vnf_index, vdu_id, vdu_index
2564 ),
2565 db_nsr=db_nsr,
2566 db_vnfr=db_vnfr,
2567 nslcmop_id=nslcmop_id,
2568 nsr_id=nsr_id,
2569 nsi_id=nsi_id,
2570 vnfd_id=vnfd_id,
2571 vdu_id=vdu_id,
2572 kdu_name=kdu_name,
2573 member_vnf_index=member_vnf_index,
2574 vdu_index=vdu_index,
2575 vdu_name=vdu_name,
2576 deploy_params=deploy_params_vdu,
2577 descriptor_config=descriptor_config,
2578 base_folder=base_folder,
2579 task_instantiation_info=tasks_dict_info,
2580 stage=stage,
2581 )
2582 for kdud in get_kdu_list(vnfd):
2583 kdu_name = kdud["name"]
2584 descriptor_config = get_configuration(vnfd, kdu_name)
2585 if descriptor_config:
2586 vdu_id = None
2587 vdu_index = 0
2588 vdu_name = None
2589 kdur = next(
2590 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2591 )
2592 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2593 if kdur.get("additionalParams"):
2594 deploy_params_kdu.update(
2595 parse_yaml_strings(kdur["additionalParams"].copy())
2596 )
2597
2598 self._deploy_n2vc(
2599 logging_text=logging_text,
2600 db_nsr=db_nsr,
2601 db_vnfr=db_vnfr,
2602 nslcmop_id=nslcmop_id,
2603 nsr_id=nsr_id,
2604 nsi_id=nsi_id,
2605 vnfd_id=vnfd_id,
2606 vdu_id=vdu_id,
2607 kdu_name=kdu_name,
2608 member_vnf_index=member_vnf_index,
2609 vdu_index=vdu_index,
2610 vdu_name=vdu_name,
2611 deploy_params=deploy_params_kdu,
2612 descriptor_config=descriptor_config,
2613 base_folder=base_folder,
2614 task_instantiation_info=tasks_dict_info,
2615 stage=stage,
2616 )
2617
2618 # Check if this NS has a charm configuration
2619 descriptor_config = nsd.get("ns-configuration")
2620 if descriptor_config and descriptor_config.get("juju"):
2621 vnfd_id = None
2622 db_vnfr = None
2623 member_vnf_index = None
2624 vdu_id = None
2625 kdu_name = None
2626 vdu_index = 0
2627 vdu_name = None
2628
2629 # Get additional parameters
2630 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2631 if db_nsr.get("additionalParamsForNs"):
2632 deploy_params.update(
2633 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2634 )
2635 base_folder = nsd["_admin"]["storage"]
2636 self._deploy_n2vc(
2637 logging_text=logging_text,
2638 db_nsr=db_nsr,
2639 db_vnfr=db_vnfr,
2640 nslcmop_id=nslcmop_id,
2641 nsr_id=nsr_id,
2642 nsi_id=nsi_id,
2643 vnfd_id=vnfd_id,
2644 vdu_id=vdu_id,
2645 kdu_name=kdu_name,
2646 member_vnf_index=member_vnf_index,
2647 vdu_index=vdu_index,
2648 vdu_name=vdu_name,
2649 deploy_params=deploy_params,
2650 descriptor_config=descriptor_config,
2651 base_folder=base_folder,
2652 task_instantiation_info=tasks_dict_info,
2653 stage=stage,
2654 )
2655
2656 # rest of staff will be done at finally
2657
2658 except (
2659 ROclient.ROClientException,
2660 DbException,
2661 LcmException,
2662 N2VCException,
2663 ) as e:
2664 self.logger.error(
2665 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2666 )
2667 exc = e
2668 except asyncio.CancelledError:
2669 self.logger.error(
2670 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2671 )
2672 exc = "Operation was cancelled"
2673 except Exception as e:
2674 exc = traceback.format_exc()
2675 self.logger.critical(
2676 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2677 exc_info=True,
2678 )
2679 finally:
2680 if exc:
2681 error_list.append(str(exc))
2682 try:
2683 # wait for pending tasks
2684 if tasks_dict_info:
2685 stage[1] = "Waiting for instantiate pending tasks."
2686 self.logger.debug(logging_text + stage[1])
2687 error_list += await self._wait_for_tasks(
2688 logging_text,
2689 tasks_dict_info,
2690 timeout_ns_deploy,
2691 stage,
2692 nslcmop_id,
2693 nsr_id=nsr_id,
2694 )
2695 stage[1] = stage[2] = ""
2696 except asyncio.CancelledError:
2697 error_list.append("Cancelled")
2698 # TODO cancel all tasks
2699 except Exception as exc:
2700 error_list.append(str(exc))
2701
2702 # update operation-status
2703 db_nsr_update["operational-status"] = "running"
2704 # let's begin with VCA 'configured' status (later we can change it)
2705 db_nsr_update["config-status"] = "configured"
2706 for task, task_name in tasks_dict_info.items():
2707 if not task.done() or task.cancelled() or task.exception():
2708 if task_name.startswith(self.task_name_deploy_vca):
2709 # A N2VC task is pending
2710 db_nsr_update["config-status"] = "failed"
2711 else:
2712 # RO or KDU task is pending
2713 db_nsr_update["operational-status"] = "failed"
2714
2715 # update status at database
2716 if error_list:
2717 error_detail = ". ".join(error_list)
2718 self.logger.error(logging_text + error_detail)
2719 error_description_nslcmop = "{} Detail: {}".format(
2720 stage[0], error_detail
2721 )
2722 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2723 nslcmop_id, stage[0]
2724 )
2725
2726 db_nsr_update["detailed-status"] = (
2727 error_description_nsr + " Detail: " + error_detail
2728 )
2729 db_nslcmop_update["detailed-status"] = error_detail
2730 nslcmop_operation_state = "FAILED"
2731 ns_state = "BROKEN"
2732 else:
2733 error_detail = None
2734 error_description_nsr = error_description_nslcmop = None
2735 ns_state = "READY"
2736 db_nsr_update["detailed-status"] = "Done"
2737 db_nslcmop_update["detailed-status"] = "Done"
2738 nslcmop_operation_state = "COMPLETED"
2739
2740 if db_nsr:
2741 self._write_ns_status(
2742 nsr_id=nsr_id,
2743 ns_state=ns_state,
2744 current_operation="IDLE",
2745 current_operation_id=None,
2746 error_description=error_description_nsr,
2747 error_detail=error_detail,
2748 other_update=db_nsr_update,
2749 )
2750 self._write_op_status(
2751 op_id=nslcmop_id,
2752 stage="",
2753 error_message=error_description_nslcmop,
2754 operation_state=nslcmop_operation_state,
2755 other_update=db_nslcmop_update,
2756 )
2757
2758 if nslcmop_operation_state:
2759 try:
2760 await self.msg.aiowrite(
2761 "ns",
2762 "instantiated",
2763 {
2764 "nsr_id": nsr_id,
2765 "nslcmop_id": nslcmop_id,
2766 "operationState": nslcmop_operation_state,
2767 },
2768 loop=self.loop,
2769 )
2770 except Exception as e:
2771 self.logger.error(
2772 logging_text + "kafka_write notification Exception {}".format(e)
2773 )
2774
2775 self.logger.debug(logging_text + "Exit")
2776 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2777
2778 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2779 if vnfd_id not in cached_vnfds:
2780 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2781 return cached_vnfds[vnfd_id]
2782
2783 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2784 if vnf_profile_id not in cached_vnfrs:
2785 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2786 "vnfrs",
2787 {
2788 "member-vnf-index-ref": vnf_profile_id,
2789 "nsr-id-ref": nsr_id,
2790 },
2791 )
2792 return cached_vnfrs[vnf_profile_id]
2793
2794 def _is_deployed_vca_in_relation(
2795 self, vca: DeployedVCA, relation: Relation
2796 ) -> bool:
2797 found = False
2798 for endpoint in (relation.provider, relation.requirer):
2799 if endpoint["kdu-resource-profile-id"]:
2800 continue
2801 found = (
2802 vca.vnf_profile_id == endpoint.vnf_profile_id
2803 and vca.vdu_profile_id == endpoint.vdu_profile_id
2804 and vca.execution_environment_ref == endpoint.execution_environment_ref
2805 )
2806 if found:
2807 break
2808 return found
2809
2810 def _update_ee_relation_data_with_implicit_data(
2811 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2812 ):
2813 ee_relation_data = safe_get_ee_relation(
2814 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2815 )
2816 ee_relation_level = EELevel.get_level(ee_relation_data)
2817 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2818 "execution-environment-ref"
2819 ]:
2820 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2821 vnfd_id = vnf_profile["vnfd-id"]
2822 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2823 entity_id = (
2824 vnfd_id
2825 if ee_relation_level == EELevel.VNF
2826 else ee_relation_data["vdu-profile-id"]
2827 )
2828 ee = get_juju_ee_ref(db_vnfd, entity_id)
2829 if not ee:
2830 raise Exception(
2831 f"not execution environments found for ee_relation {ee_relation_data}"
2832 )
2833 ee_relation_data["execution-environment-ref"] = ee["id"]
2834 return ee_relation_data
2835
2836 def _get_ns_relations(
2837 self,
2838 nsr_id: str,
2839 nsd: Dict[str, Any],
2840 vca: DeployedVCA,
2841 cached_vnfds: Dict[str, Any],
2842 ) -> List[Relation]:
2843 relations = []
2844 db_ns_relations = get_ns_configuration_relation_list(nsd)
2845 for r in db_ns_relations:
2846 provider_dict = None
2847 requirer_dict = None
2848 if all(key in r for key in ("provider", "requirer")):
2849 provider_dict = r["provider"]
2850 requirer_dict = r["requirer"]
2851 elif "entities" in r:
2852 provider_id = r["entities"][0]["id"]
2853 provider_dict = {
2854 "nsr-id": nsr_id,
2855 "endpoint": r["entities"][0]["endpoint"],
2856 }
2857 if provider_id != nsd["id"]:
2858 provider_dict["vnf-profile-id"] = provider_id
2859 requirer_id = r["entities"][1]["id"]
2860 requirer_dict = {
2861 "nsr-id": nsr_id,
2862 "endpoint": r["entities"][1]["endpoint"],
2863 }
2864 if requirer_id != nsd["id"]:
2865 requirer_dict["vnf-profile-id"] = requirer_id
2866 else:
2867 raise Exception(
2868 "provider/requirer or entities must be included in the relation."
2869 )
2870 relation_provider = self._update_ee_relation_data_with_implicit_data(
2871 nsr_id, nsd, provider_dict, cached_vnfds
2872 )
2873 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2874 nsr_id, nsd, requirer_dict, cached_vnfds
2875 )
2876 provider = EERelation(relation_provider)
2877 requirer = EERelation(relation_requirer)
2878 relation = Relation(r["name"], provider, requirer)
2879 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2880 if vca_in_relation:
2881 relations.append(relation)
2882 return relations
2883
2884 def _get_vnf_relations(
2885 self,
2886 nsr_id: str,
2887 nsd: Dict[str, Any],
2888 vca: DeployedVCA,
2889 cached_vnfds: Dict[str, Any],
2890 ) -> List[Relation]:
2891 relations = []
2892 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2893 vnf_profile_id = vnf_profile["id"]
2894 vnfd_id = vnf_profile["vnfd-id"]
2895 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2896 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2897 for r in db_vnf_relations:
2898 provider_dict = None
2899 requirer_dict = None
2900 if all(key in r for key in ("provider", "requirer")):
2901 provider_dict = r["provider"]
2902 requirer_dict = r["requirer"]
2903 elif "entities" in r:
2904 provider_id = r["entities"][0]["id"]
2905 provider_dict = {
2906 "nsr-id": nsr_id,
2907 "vnf-profile-id": vnf_profile_id,
2908 "endpoint": r["entities"][0]["endpoint"],
2909 }
2910 if provider_id != vnfd_id:
2911 provider_dict["vdu-profile-id"] = provider_id
2912 requirer_id = r["entities"][1]["id"]
2913 requirer_dict = {
2914 "nsr-id": nsr_id,
2915 "vnf-profile-id": vnf_profile_id,
2916 "endpoint": r["entities"][1]["endpoint"],
2917 }
2918 if requirer_id != vnfd_id:
2919 requirer_dict["vdu-profile-id"] = requirer_id
2920 else:
2921 raise Exception(
2922 "provider/requirer or entities must be included in the relation."
2923 )
2924 relation_provider = self._update_ee_relation_data_with_implicit_data(
2925 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2926 )
2927 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2928 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2929 )
2930 provider = EERelation(relation_provider)
2931 requirer = EERelation(relation_requirer)
2932 relation = Relation(r["name"], provider, requirer)
2933 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2934 if vca_in_relation:
2935 relations.append(relation)
2936 return relations
2937
2938 def _get_kdu_resource_data(
2939 self,
2940 ee_relation: EERelation,
2941 db_nsr: Dict[str, Any],
2942 cached_vnfds: Dict[str, Any],
2943 ) -> DeployedK8sResource:
2944 nsd = get_nsd(db_nsr)
2945 vnf_profiles = get_vnf_profiles(nsd)
2946 vnfd_id = find_in_list(
2947 vnf_profiles,
2948 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2949 )["vnfd-id"]
2950 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2951 kdu_resource_profile = get_kdu_resource_profile(
2952 db_vnfd, ee_relation.kdu_resource_profile_id
2953 )
2954 kdu_name = kdu_resource_profile["kdu-name"]
2955 deployed_kdu, _ = get_deployed_kdu(
2956 db_nsr.get("_admin", ()).get("deployed", ()),
2957 kdu_name,
2958 ee_relation.vnf_profile_id,
2959 )
2960 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2961 return deployed_kdu
2962
2963 def _get_deployed_component(
2964 self,
2965 ee_relation: EERelation,
2966 db_nsr: Dict[str, Any],
2967 cached_vnfds: Dict[str, Any],
2968 ) -> DeployedComponent:
2969 nsr_id = db_nsr["_id"]
2970 deployed_component = None
2971 ee_level = EELevel.get_level(ee_relation)
2972 if ee_level == EELevel.NS:
2973 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2974 if vca:
2975 deployed_component = DeployedVCA(nsr_id, vca)
2976 elif ee_level == EELevel.VNF:
2977 vca = get_deployed_vca(
2978 db_nsr,
2979 {
2980 "vdu_id": None,
2981 "member-vnf-index": ee_relation.vnf_profile_id,
2982 "ee_descriptor_id": ee_relation.execution_environment_ref,
2983 },
2984 )
2985 if vca:
2986 deployed_component = DeployedVCA(nsr_id, vca)
2987 elif ee_level == EELevel.VDU:
2988 vca = get_deployed_vca(
2989 db_nsr,
2990 {
2991 "vdu_id": ee_relation.vdu_profile_id,
2992 "member-vnf-index": ee_relation.vnf_profile_id,
2993 "ee_descriptor_id": ee_relation.execution_environment_ref,
2994 },
2995 )
2996 if vca:
2997 deployed_component = DeployedVCA(nsr_id, vca)
2998 elif ee_level == EELevel.KDU:
2999 kdu_resource_data = self._get_kdu_resource_data(
3000 ee_relation, db_nsr, cached_vnfds
3001 )
3002 if kdu_resource_data:
3003 deployed_component = DeployedK8sResource(kdu_resource_data)
3004 return deployed_component
3005
3006 async def _add_relation(
3007 self,
3008 relation: Relation,
3009 vca_type: str,
3010 db_nsr: Dict[str, Any],
3011 cached_vnfds: Dict[str, Any],
3012 cached_vnfrs: Dict[str, Any],
3013 ) -> bool:
3014 deployed_provider = self._get_deployed_component(
3015 relation.provider, db_nsr, cached_vnfds
3016 )
3017 deployed_requirer = self._get_deployed_component(
3018 relation.requirer, db_nsr, cached_vnfds
3019 )
3020 if (
3021 deployed_provider
3022 and deployed_requirer
3023 and deployed_provider.config_sw_installed
3024 and deployed_requirer.config_sw_installed
3025 ):
3026 provider_db_vnfr = (
3027 self._get_vnfr(
3028 relation.provider.nsr_id,
3029 relation.provider.vnf_profile_id,
3030 cached_vnfrs,
3031 )
3032 if relation.provider.vnf_profile_id
3033 else None
3034 )
3035 requirer_db_vnfr = (
3036 self._get_vnfr(
3037 relation.requirer.nsr_id,
3038 relation.requirer.vnf_profile_id,
3039 cached_vnfrs,
3040 )
3041 if relation.requirer.vnf_profile_id
3042 else None
3043 )
3044 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3045 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3046 provider_relation_endpoint = RelationEndpoint(
3047 deployed_provider.ee_id,
3048 provider_vca_id,
3049 relation.provider.endpoint,
3050 )
3051 requirer_relation_endpoint = RelationEndpoint(
3052 deployed_requirer.ee_id,
3053 requirer_vca_id,
3054 relation.requirer.endpoint,
3055 )
3056 await self.vca_map[vca_type].add_relation(
3057 provider=provider_relation_endpoint,
3058 requirer=requirer_relation_endpoint,
3059 )
3060 # remove entry from relations list
3061 return True
3062 return False
3063
3064 async def _add_vca_relations(
3065 self,
3066 logging_text,
3067 nsr_id,
3068 vca_type: str,
3069 vca_index: int,
3070 timeout: int = 3600,
3071 ) -> bool:
3072
3073 # steps:
3074 # 1. find all relations for this VCA
3075 # 2. wait for other peers related
3076 # 3. add relations
3077
3078 try:
3079 # STEP 1: find all relations for this VCA
3080
3081 # read nsr record
3082 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3083 nsd = get_nsd(db_nsr)
3084
3085 # this VCA data
3086 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3087 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3088
3089 cached_vnfds = {}
3090 cached_vnfrs = {}
3091 relations = []
3092 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3093 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3094
3095 # if no relations, terminate
3096 if not relations:
3097 self.logger.debug(logging_text + " No relations")
3098 return True
3099
3100 self.logger.debug(logging_text + " adding relations {}".format(relations))
3101
3102 # add all relations
3103 start = time()
3104 while True:
3105 # check timeout
3106 now = time()
3107 if now - start >= timeout:
3108 self.logger.error(logging_text + " : timeout adding relations")
3109 return False
3110
3111 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3112 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3113
3114 # for each relation, find the VCA's related
3115 for relation in relations.copy():
3116 added = await self._add_relation(
3117 relation,
3118 vca_type,
3119 db_nsr,
3120 cached_vnfds,
3121 cached_vnfrs,
3122 )
3123 if added:
3124 relations.remove(relation)
3125
3126 if not relations:
3127 self.logger.debug("Relations added")
3128 break
3129 await asyncio.sleep(5.0)
3130
3131 return True
3132
3133 except Exception as e:
3134 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3135 return False
3136
3137 async def _install_kdu(
3138 self,
3139 nsr_id: str,
3140 nsr_db_path: str,
3141 vnfr_data: dict,
3142 kdu_index: int,
3143 kdud: dict,
3144 vnfd: dict,
3145 k8s_instance_info: dict,
3146 k8params: dict = None,
3147 timeout: int = 600,
3148 vca_id: str = None,
3149 ):
3150
3151 try:
3152 k8sclustertype = k8s_instance_info["k8scluster-type"]
3153 # Instantiate kdu
3154 db_dict_install = {
3155 "collection": "nsrs",
3156 "filter": {"_id": nsr_id},
3157 "path": nsr_db_path,
3158 }
3159
3160 if k8s_instance_info.get("kdu-deployment-name"):
3161 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3162 else:
3163 kdu_instance = self.k8scluster_map[
3164 k8sclustertype
3165 ].generate_kdu_instance_name(
3166 db_dict=db_dict_install,
3167 kdu_model=k8s_instance_info["kdu-model"],
3168 kdu_name=k8s_instance_info["kdu-name"],
3169 )
3170 self.update_db_2(
3171 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3172 )
3173 await self.k8scluster_map[k8sclustertype].install(
3174 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3175 kdu_model=k8s_instance_info["kdu-model"],
3176 atomic=True,
3177 params=k8params,
3178 db_dict=db_dict_install,
3179 timeout=timeout,
3180 kdu_name=k8s_instance_info["kdu-name"],
3181 namespace=k8s_instance_info["namespace"],
3182 kdu_instance=kdu_instance,
3183 vca_id=vca_id,
3184 )
3185 self.update_db_2(
3186 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3187 )
3188
3189 # Obtain services to obtain management service ip
3190 services = await self.k8scluster_map[k8sclustertype].get_services(
3191 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3192 kdu_instance=kdu_instance,
3193 namespace=k8s_instance_info["namespace"],
3194 )
3195
3196 # Obtain management service info (if exists)
3197 vnfr_update_dict = {}
3198 kdu_config = get_configuration(vnfd, kdud["name"])
3199 if kdu_config:
3200 target_ee_list = kdu_config.get("execution-environment-list", [])
3201 else:
3202 target_ee_list = []
3203
3204 if services:
3205 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3206 mgmt_services = [
3207 service
3208 for service in kdud.get("service", [])
3209 if service.get("mgmt-service")
3210 ]
3211 for mgmt_service in mgmt_services:
3212 for service in services:
3213 if service["name"].startswith(mgmt_service["name"]):
3214 # Mgmt service found, Obtain service ip
3215 ip = service.get("external_ip", service.get("cluster_ip"))
3216 if isinstance(ip, list) and len(ip) == 1:
3217 ip = ip[0]
3218
3219 vnfr_update_dict[
3220 "kdur.{}.ip-address".format(kdu_index)
3221 ] = ip
3222
3223 # Check if must update also mgmt ip at the vnf
3224 service_external_cp = mgmt_service.get(
3225 "external-connection-point-ref"
3226 )
3227 if service_external_cp:
3228 if (
3229 deep_get(vnfd, ("mgmt-interface", "cp"))
3230 == service_external_cp
3231 ):
3232 vnfr_update_dict["ip-address"] = ip
3233
3234 if find_in_list(
3235 target_ee_list,
3236 lambda ee: ee.get(
3237 "external-connection-point-ref", ""
3238 )
3239 == service_external_cp,
3240 ):
3241 vnfr_update_dict[
3242 "kdur.{}.ip-address".format(kdu_index)
3243 ] = ip
3244 break
3245 else:
3246 self.logger.warn(
3247 "Mgmt service name: {} not found".format(
3248 mgmt_service["name"]
3249 )
3250 )
3251
3252 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3253 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3254
3255 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3256 if (
3257 kdu_config
3258 and kdu_config.get("initial-config-primitive")
3259 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3260 ):
3261 initial_config_primitive_list = kdu_config.get(
3262 "initial-config-primitive"
3263 )
3264 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3265
3266 for initial_config_primitive in initial_config_primitive_list:
3267 primitive_params_ = self._map_primitive_params(
3268 initial_config_primitive, {}, {}
3269 )
3270
3271 await asyncio.wait_for(
3272 self.k8scluster_map[k8sclustertype].exec_primitive(
3273 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3274 kdu_instance=kdu_instance,
3275 primitive_name=initial_config_primitive["name"],
3276 params=primitive_params_,
3277 db_dict=db_dict_install,
3278 vca_id=vca_id,
3279 ),
3280 timeout=timeout,
3281 )
3282
3283 except Exception as e:
3284 # Prepare update db with error and raise exception
3285 try:
3286 self.update_db_2(
3287 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3288 )
3289 self.update_db_2(
3290 "vnfrs",
3291 vnfr_data.get("_id"),
3292 {"kdur.{}.status".format(kdu_index): "ERROR"},
3293 )
3294 except Exception:
3295 # ignore to keep original exception
3296 pass
3297 # reraise original error
3298 raise
3299
3300 return kdu_instance
3301
3302 async def deploy_kdus(
3303 self,
3304 logging_text,
3305 nsr_id,
3306 nslcmop_id,
3307 db_vnfrs,
3308 db_vnfds,
3309 task_instantiation_info,
3310 ):
3311 # Launch kdus if present in the descriptor
3312
3313 k8scluster_id_2_uuic = {
3314 "helm-chart-v3": {},
3315 "helm-chart": {},
3316 "juju-bundle": {},
3317 }
3318
3319 async def _get_cluster_id(cluster_id, cluster_type):
3320 nonlocal k8scluster_id_2_uuic
3321 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3322 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3323
3324 # check if K8scluster is creating and wait look if previous tasks in process
3325 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3326 "k8scluster", cluster_id
3327 )
3328 if task_dependency:
3329 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3330 task_name, cluster_id
3331 )
3332 self.logger.debug(logging_text + text)
3333 await asyncio.wait(task_dependency, timeout=3600)
3334
3335 db_k8scluster = self.db.get_one(
3336 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3337 )
3338 if not db_k8scluster:
3339 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3340
3341 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3342 if not k8s_id:
3343 if cluster_type == "helm-chart-v3":
3344 try:
3345 # backward compatibility for existing clusters that have not been initialized for helm v3
3346 k8s_credentials = yaml.safe_dump(
3347 db_k8scluster.get("credentials")
3348 )
3349 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3350 k8s_credentials, reuse_cluster_uuid=cluster_id
3351 )
3352 db_k8scluster_update = {}
3353 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3354 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3355 db_k8scluster_update[
3356 "_admin.helm-chart-v3.created"
3357 ] = uninstall_sw
3358 db_k8scluster_update[
3359 "_admin.helm-chart-v3.operationalState"
3360 ] = "ENABLED"
3361 self.update_db_2(
3362 "k8sclusters", cluster_id, db_k8scluster_update
3363 )
3364 except Exception as e:
3365 self.logger.error(
3366 logging_text
3367 + "error initializing helm-v3 cluster: {}".format(str(e))
3368 )
3369 raise LcmException(
3370 "K8s cluster '{}' has not been initialized for '{}'".format(
3371 cluster_id, cluster_type
3372 )
3373 )
3374 else:
3375 raise LcmException(
3376 "K8s cluster '{}' has not been initialized for '{}'".format(
3377 cluster_id, cluster_type
3378 )
3379 )
3380 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3381 return k8s_id
3382
3383 logging_text += "Deploy kdus: "
3384 step = ""
3385 try:
3386 db_nsr_update = {"_admin.deployed.K8s": []}
3387 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3388
3389 index = 0
3390 updated_cluster_list = []
3391 updated_v3_cluster_list = []
3392
3393 for vnfr_data in db_vnfrs.values():
3394 vca_id = self.get_vca_id(vnfr_data, {})
3395 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3396 # Step 0: Prepare and set parameters
3397 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3398 vnfd_id = vnfr_data.get("vnfd-id")
3399 vnfd_with_id = find_in_list(
3400 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3401 )
3402 kdud = next(
3403 kdud
3404 for kdud in vnfd_with_id["kdu"]
3405 if kdud["name"] == kdur["kdu-name"]
3406 )
3407 namespace = kdur.get("k8s-namespace")
3408 kdu_deployment_name = kdur.get("kdu-deployment-name")
3409 if kdur.get("helm-chart"):
3410 kdumodel = kdur["helm-chart"]
3411 # Default version: helm3, if helm-version is v2 assign v2
3412 k8sclustertype = "helm-chart-v3"
3413 self.logger.debug("kdur: {}".format(kdur))
3414 if (
3415 kdur.get("helm-version")
3416 and kdur.get("helm-version") == "v2"
3417 ):
3418 k8sclustertype = "helm-chart"
3419 elif kdur.get("juju-bundle"):
3420 kdumodel = kdur["juju-bundle"]
3421 k8sclustertype = "juju-bundle"
3422 else:
3423 raise LcmException(
3424 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3425 "juju-bundle. Maybe an old NBI version is running".format(
3426 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3427 )
3428 )
3429 # check if kdumodel is a file and exists
3430 try:
3431 vnfd_with_id = find_in_list(
3432 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3433 )
3434 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3435 if storage: # may be not present if vnfd has not artifacts
3436 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3437 if storage["pkg-dir"]:
3438 filename = "{}/{}/{}s/{}".format(
3439 storage["folder"],
3440 storage["pkg-dir"],
3441 k8sclustertype,
3442 kdumodel,
3443 )
3444 else:
3445 filename = "{}/Scripts/{}s/{}".format(
3446 storage["folder"],
3447 k8sclustertype,
3448 kdumodel,
3449 )
3450 if self.fs.file_exists(
3451 filename, mode="file"
3452 ) or self.fs.file_exists(filename, mode="dir"):
3453 kdumodel = self.fs.path + filename
3454 except (asyncio.TimeoutError, asyncio.CancelledError):
3455 raise
3456 except Exception: # it is not a file
3457 pass
3458
3459 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3460 step = "Synchronize repos for k8s cluster '{}'".format(
3461 k8s_cluster_id
3462 )
3463 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3464
3465 # Synchronize repos
3466 if (
3467 k8sclustertype == "helm-chart"
3468 and cluster_uuid not in updated_cluster_list
3469 ) or (
3470 k8sclustertype == "helm-chart-v3"
3471 and cluster_uuid not in updated_v3_cluster_list
3472 ):
3473 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3474 self.k8scluster_map[k8sclustertype].synchronize_repos(
3475 cluster_uuid=cluster_uuid
3476 )
3477 )
3478 if del_repo_list or added_repo_dict:
3479 if k8sclustertype == "helm-chart":
3480 unset = {
3481 "_admin.helm_charts_added." + item: None
3482 for item in del_repo_list
3483 }
3484 updated = {
3485 "_admin.helm_charts_added." + item: name
3486 for item, name in added_repo_dict.items()
3487 }
3488 updated_cluster_list.append(cluster_uuid)
3489 elif k8sclustertype == "helm-chart-v3":
3490 unset = {
3491 "_admin.helm_charts_v3_added." + item: None
3492 for item in del_repo_list
3493 }
3494 updated = {
3495 "_admin.helm_charts_v3_added." + item: name
3496 for item, name in added_repo_dict.items()
3497 }
3498 updated_v3_cluster_list.append(cluster_uuid)
3499 self.logger.debug(
3500 logging_text + "repos synchronized on k8s cluster "
3501 "'{}' to_delete: {}, to_add: {}".format(
3502 k8s_cluster_id, del_repo_list, added_repo_dict
3503 )
3504 )
3505 self.db.set_one(
3506 "k8sclusters",
3507 {"_id": k8s_cluster_id},
3508 updated,
3509 unset=unset,
3510 )
3511
3512 # Instantiate kdu
3513 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3514 vnfr_data["member-vnf-index-ref"],
3515 kdur["kdu-name"],
3516 k8s_cluster_id,
3517 )
3518 k8s_instance_info = {
3519 "kdu-instance": None,
3520 "k8scluster-uuid": cluster_uuid,
3521 "k8scluster-type": k8sclustertype,
3522 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3523 "kdu-name": kdur["kdu-name"],
3524 "kdu-model": kdumodel,
3525 "namespace": namespace,
3526 "kdu-deployment-name": kdu_deployment_name,
3527 }
3528 db_path = "_admin.deployed.K8s.{}".format(index)
3529 db_nsr_update[db_path] = k8s_instance_info
3530 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3531 vnfd_with_id = find_in_list(
3532 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3533 )
3534 task = asyncio.ensure_future(
3535 self._install_kdu(
3536 nsr_id,
3537 db_path,
3538 vnfr_data,
3539 kdu_index,
3540 kdud,
3541 vnfd_with_id,
3542 k8s_instance_info,
3543 k8params=desc_params,
3544 timeout=600,
3545 vca_id=vca_id,
3546 )
3547 )
3548 self.lcm_tasks.register(
3549 "ns",
3550 nsr_id,
3551 nslcmop_id,
3552 "instantiate_KDU-{}".format(index),
3553 task,
3554 )
3555 task_instantiation_info[task] = "Deploying KDU {}".format(
3556 kdur["kdu-name"]
3557 )
3558
3559 index += 1
3560
3561 except (LcmException, asyncio.CancelledError):
3562 raise
3563 except Exception as e:
3564 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3565 if isinstance(e, (N2VCException, DbException)):
3566 self.logger.error(logging_text + msg)
3567 else:
3568 self.logger.critical(logging_text + msg, exc_info=True)
3569 raise LcmException(msg)
3570 finally:
3571 if db_nsr_update:
3572 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3573
3574 def _deploy_n2vc(
3575 self,
3576 logging_text,
3577 db_nsr,
3578 db_vnfr,
3579 nslcmop_id,
3580 nsr_id,
3581 nsi_id,
3582 vnfd_id,
3583 vdu_id,
3584 kdu_name,
3585 member_vnf_index,
3586 vdu_index,
3587 vdu_name,
3588 deploy_params,
3589 descriptor_config,
3590 base_folder,
3591 task_instantiation_info,
3592 stage,
3593 ):
3594 # launch instantiate_N2VC in a asyncio task and register task object
3595 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3596 # if not found, create one entry and update database
3597 # fill db_nsr._admin.deployed.VCA.<index>
3598
3599 self.logger.debug(
3600 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3601 )
3602 if "execution-environment-list" in descriptor_config:
3603 ee_list = descriptor_config.get("execution-environment-list", [])
3604 elif "juju" in descriptor_config:
3605 ee_list = [descriptor_config] # ns charms
3606 else: # other types as script are not supported
3607 ee_list = []
3608
3609 for ee_item in ee_list:
3610 self.logger.debug(
3611 logging_text
3612 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3613 ee_item.get("juju"), ee_item.get("helm-chart")
3614 )
3615 )
3616 ee_descriptor_id = ee_item.get("id")
3617 if ee_item.get("juju"):
3618 vca_name = ee_item["juju"].get("charm")
3619 vca_type = (
3620 "lxc_proxy_charm"
3621 if ee_item["juju"].get("charm") is not None
3622 else "native_charm"
3623 )
3624 if ee_item["juju"].get("cloud") == "k8s":
3625 vca_type = "k8s_proxy_charm"
3626 elif ee_item["juju"].get("proxy") is False:
3627 vca_type = "native_charm"
3628 elif ee_item.get("helm-chart"):
3629 vca_name = ee_item["helm-chart"]
3630 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3631 vca_type = "helm"
3632 else:
3633 vca_type = "helm-v3"
3634 else:
3635 self.logger.debug(
3636 logging_text + "skipping non juju neither charm configuration"
3637 )
3638 continue
3639
3640 vca_index = -1
3641 for vca_index, vca_deployed in enumerate(
3642 db_nsr["_admin"]["deployed"]["VCA"]
3643 ):
3644 if not vca_deployed:
3645 continue
3646 if (
3647 vca_deployed.get("member-vnf-index") == member_vnf_index
3648 and vca_deployed.get("vdu_id") == vdu_id
3649 and vca_deployed.get("kdu_name") == kdu_name
3650 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3651 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3652 ):
3653 break
3654 else:
3655 # not found, create one.
3656 target = (
3657 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3658 )
3659 if vdu_id:
3660 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3661 elif kdu_name:
3662 target += "/kdu/{}".format(kdu_name)
3663 vca_deployed = {
3664 "target_element": target,
3665 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3666 "member-vnf-index": member_vnf_index,
3667 "vdu_id": vdu_id,
3668 "kdu_name": kdu_name,
3669 "vdu_count_index": vdu_index,
3670 "operational-status": "init", # TODO revise
3671 "detailed-status": "", # TODO revise
3672 "step": "initial-deploy", # TODO revise
3673 "vnfd_id": vnfd_id,
3674 "vdu_name": vdu_name,
3675 "type": vca_type,
3676 "ee_descriptor_id": ee_descriptor_id,
3677 }
3678 vca_index += 1
3679
3680 # create VCA and configurationStatus in db
3681 db_dict = {
3682 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3683 "configurationStatus.{}".format(vca_index): dict(),
3684 }
3685 self.update_db_2("nsrs", nsr_id, db_dict)
3686
3687 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3688
3689 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3690 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3691 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3692
3693 # Launch task
3694 task_n2vc = asyncio.ensure_future(
3695 self.instantiate_N2VC(
3696 logging_text=logging_text,
3697 vca_index=vca_index,
3698 nsi_id=nsi_id,
3699 db_nsr=db_nsr,
3700 db_vnfr=db_vnfr,
3701 vdu_id=vdu_id,
3702 kdu_name=kdu_name,
3703 vdu_index=vdu_index,
3704 deploy_params=deploy_params,
3705 config_descriptor=descriptor_config,
3706 base_folder=base_folder,
3707 nslcmop_id=nslcmop_id,
3708 stage=stage,
3709 vca_type=vca_type,
3710 vca_name=vca_name,
3711 ee_config_descriptor=ee_item,
3712 )
3713 )
3714 self.lcm_tasks.register(
3715 "ns",
3716 nsr_id,
3717 nslcmop_id,
3718 "instantiate_N2VC-{}".format(vca_index),
3719 task_n2vc,
3720 )
3721 task_instantiation_info[
3722 task_n2vc
3723 ] = self.task_name_deploy_vca + " {}.{}".format(
3724 member_vnf_index or "", vdu_id or ""
3725 )
3726
3727 @staticmethod
3728 def _create_nslcmop(nsr_id, operation, params):
3729 """
3730 Creates a ns-lcm-opp content to be stored at database.
3731 :param nsr_id: internal id of the instance
3732 :param operation: instantiate, terminate, scale, action, ...
3733 :param params: user parameters for the operation
3734 :return: dictionary following SOL005 format
3735 """
3736 # Raise exception if invalid arguments
3737 if not (nsr_id and operation and params):
3738 raise LcmException(
3739 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3740 )
3741 now = time()
3742 _id = str(uuid4())
3743 nslcmop = {
3744 "id": _id,
3745 "_id": _id,
3746 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3747 "operationState": "PROCESSING",
3748 "statusEnteredTime": now,
3749 "nsInstanceId": nsr_id,
3750 "lcmOperationType": operation,
3751 "startTime": now,
3752 "isAutomaticInvocation": False,
3753 "operationParams": params,
3754 "isCancelPending": False,
3755 "links": {
3756 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3757 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3758 },
3759 }
3760 return nslcmop
3761
3762 def _format_additional_params(self, params):
3763 params = params or {}
3764 for key, value in params.items():
3765 if str(value).startswith("!!yaml "):
3766 params[key] = yaml.safe_load(value[7:])
3767 return params
3768
3769 def _get_terminate_primitive_params(self, seq, vnf_index):
3770 primitive = seq.get("name")
3771 primitive_params = {}
3772 params = {
3773 "member_vnf_index": vnf_index,
3774 "primitive": primitive,
3775 "primitive_params": primitive_params,
3776 }
3777 desc_params = {}
3778 return self._map_primitive_params(seq, params, desc_params)
3779
3780 # sub-operations
3781
3782 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3783 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3784 if op.get("operationState") == "COMPLETED":
3785 # b. Skip sub-operation
3786 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3787 return self.SUBOPERATION_STATUS_SKIP
3788 else:
3789 # c. retry executing sub-operation
3790 # The sub-operation exists, and operationState != 'COMPLETED'
3791 # Update operationState = 'PROCESSING' to indicate a retry.
3792 operationState = "PROCESSING"
3793 detailed_status = "In progress"
3794 self._update_suboperation_status(
3795 db_nslcmop, op_index, operationState, detailed_status
3796 )
3797 # Return the sub-operation index
3798 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3799 # with arguments extracted from the sub-operation
3800 return op_index
3801
3802 # Find a sub-operation where all keys in a matching dictionary must match
3803 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3804 def _find_suboperation(self, db_nslcmop, match):
3805 if db_nslcmop and match:
3806 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3807 for i, op in enumerate(op_list):
3808 if all(op.get(k) == match[k] for k in match):
3809 return i
3810 return self.SUBOPERATION_STATUS_NOT_FOUND
3811
3812 # Update status for a sub-operation given its index
3813 def _update_suboperation_status(
3814 self, db_nslcmop, op_index, operationState, detailed_status
3815 ):
3816 # Update DB for HA tasks
3817 q_filter = {"_id": db_nslcmop["_id"]}
3818 update_dict = {
3819 "_admin.operations.{}.operationState".format(op_index): operationState,
3820 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3821 }
3822 self.db.set_one(
3823 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3824 )
3825
3826 # Add sub-operation, return the index of the added sub-operation
3827 # Optionally, set operationState, detailed-status, and operationType
3828 # Status and type are currently set for 'scale' sub-operations:
3829 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3830 # 'detailed-status' : status message
3831 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3832 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3833 def _add_suboperation(
3834 self,
3835 db_nslcmop,
3836 vnf_index,
3837 vdu_id,
3838 vdu_count_index,
3839 vdu_name,
3840 primitive,
3841 mapped_primitive_params,
3842 operationState=None,
3843 detailed_status=None,
3844 operationType=None,
3845 RO_nsr_id=None,
3846 RO_scaling_info=None,
3847 ):
3848 if not db_nslcmop:
3849 return self.SUBOPERATION_STATUS_NOT_FOUND
3850 # Get the "_admin.operations" list, if it exists
3851 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3852 op_list = db_nslcmop_admin.get("operations")
3853 # Create or append to the "_admin.operations" list
3854 new_op = {
3855 "member_vnf_index": vnf_index,
3856 "vdu_id": vdu_id,
3857 "vdu_count_index": vdu_count_index,
3858 "primitive": primitive,
3859 "primitive_params": mapped_primitive_params,
3860 }
3861 if operationState:
3862 new_op["operationState"] = operationState
3863 if detailed_status:
3864 new_op["detailed-status"] = detailed_status
3865 if operationType:
3866 new_op["lcmOperationType"] = operationType
3867 if RO_nsr_id:
3868 new_op["RO_nsr_id"] = RO_nsr_id
3869 if RO_scaling_info:
3870 new_op["RO_scaling_info"] = RO_scaling_info
3871 if not op_list:
3872 # No existing operations, create key 'operations' with current operation as first list element
3873 db_nslcmop_admin.update({"operations": [new_op]})
3874 op_list = db_nslcmop_admin.get("operations")
3875 else:
3876 # Existing operations, append operation to list
3877 op_list.append(new_op)
3878
3879 db_nslcmop_update = {"_admin.operations": op_list}
3880 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3881 op_index = len(op_list) - 1
3882 return op_index
3883
3884 # Helper methods for scale() sub-operations
3885
3886 # pre-scale/post-scale:
3887 # Check for 3 different cases:
3888 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3889 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3890 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3891 def _check_or_add_scale_suboperation(
3892 self,
3893 db_nslcmop,
3894 vnf_index,
3895 vnf_config_primitive,
3896 primitive_params,
3897 operationType,
3898 RO_nsr_id=None,
3899 RO_scaling_info=None,
3900 ):
3901 # Find this sub-operation
3902 if RO_nsr_id and RO_scaling_info:
3903 operationType = "SCALE-RO"
3904 match = {
3905 "member_vnf_index": vnf_index,
3906 "RO_nsr_id": RO_nsr_id,
3907 "RO_scaling_info": RO_scaling_info,
3908 }
3909 else:
3910 match = {
3911 "member_vnf_index": vnf_index,
3912 "primitive": vnf_config_primitive,
3913 "primitive_params": primitive_params,
3914 "lcmOperationType": operationType,
3915 }
3916 op_index = self._find_suboperation(db_nslcmop, match)
3917 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3918 # a. New sub-operation
3919 # The sub-operation does not exist, add it.
3920 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3921 # The following parameters are set to None for all kind of scaling:
3922 vdu_id = None
3923 vdu_count_index = None
3924 vdu_name = None
3925 if RO_nsr_id and RO_scaling_info:
3926 vnf_config_primitive = None
3927 primitive_params = None
3928 else:
3929 RO_nsr_id = None
3930 RO_scaling_info = None
3931 # Initial status for sub-operation
3932 operationState = "PROCESSING"
3933 detailed_status = "In progress"
3934 # Add sub-operation for pre/post-scaling (zero or more operations)
3935 self._add_suboperation(
3936 db_nslcmop,
3937 vnf_index,
3938 vdu_id,
3939 vdu_count_index,
3940 vdu_name,
3941 vnf_config_primitive,
3942 primitive_params,
3943 operationState,
3944 detailed_status,
3945 operationType,
3946 RO_nsr_id,
3947 RO_scaling_info,
3948 )
3949 return self.SUBOPERATION_STATUS_NEW
3950 else:
3951 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3952 # or op_index (operationState != 'COMPLETED')
3953 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3954
3955 # Function to return execution_environment id
3956
3957 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3958 # TODO vdu_index_count
3959 for vca in vca_deployed_list:
3960 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3961 return vca["ee_id"]
3962
3963 async def destroy_N2VC(
3964 self,
3965 logging_text,
3966 db_nslcmop,
3967 vca_deployed,
3968 config_descriptor,
3969 vca_index,
3970 destroy_ee=True,
3971 exec_primitives=True,
3972 scaling_in=False,
3973 vca_id: str = None,
3974 ):
3975 """
3976 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3977 :param logging_text:
3978 :param db_nslcmop:
3979 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3980 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3981 :param vca_index: index in the database _admin.deployed.VCA
3982 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3983 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3984 not executed properly
3985 :param scaling_in: True destroys the application, False destroys the model
3986 :return: None or exception
3987 """
3988
3989 self.logger.debug(
3990 logging_text
3991 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3992 vca_index, vca_deployed, config_descriptor, destroy_ee
3993 )
3994 )
3995
3996 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3997
3998 # execute terminate_primitives
3999 if exec_primitives:
4000 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
4001 config_descriptor.get("terminate-config-primitive"),
4002 vca_deployed.get("ee_descriptor_id"),
4003 )
4004 vdu_id = vca_deployed.get("vdu_id")
4005 vdu_count_index = vca_deployed.get("vdu_count_index")
4006 vdu_name = vca_deployed.get("vdu_name")
4007 vnf_index = vca_deployed.get("member-vnf-index")
4008 if terminate_primitives and vca_deployed.get("needed_terminate"):
4009 for seq in terminate_primitives:
4010 # For each sequence in list, get primitive and call _ns_execute_primitive()
4011 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
4012 vnf_index, seq.get("name")
4013 )
4014 self.logger.debug(logging_text + step)
4015 # Create the primitive for each sequence, i.e. "primitive": "touch"
4016 primitive = seq.get("name")
4017 mapped_primitive_params = self._get_terminate_primitive_params(
4018 seq, vnf_index
4019 )
4020
4021 # Add sub-operation
4022 self._add_suboperation(
4023 db_nslcmop,
4024 vnf_index,
4025 vdu_id,
4026 vdu_count_index,
4027 vdu_name,
4028 primitive,
4029 mapped_primitive_params,
4030 )
4031 # Sub-operations: Call _ns_execute_primitive() instead of action()
4032 try:
4033 result, result_detail = await self._ns_execute_primitive(
4034 vca_deployed["ee_id"],
4035 primitive,
4036 mapped_primitive_params,
4037 vca_type=vca_type,
4038 vca_id=vca_id,
4039 )
4040 except LcmException:
4041 # this happens when VCA is not deployed. In this case it is not needed to terminate
4042 continue
4043 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4044 if result not in result_ok:
4045 raise LcmException(
4046 "terminate_primitive {} for vnf_member_index={} fails with "
4047 "error {}".format(seq.get("name"), vnf_index, result_detail)
4048 )
4049 # set that this VCA do not need terminated
4050 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4051 vca_index
4052 )
4053 self.update_db_2(
4054 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4055 )
4056
4057 # Delete Prometheus Jobs if any
4058 # This uses NSR_ID, so it will destroy any jobs under this index
4059 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4060
4061 if destroy_ee:
4062 await self.vca_map[vca_type].delete_execution_environment(
4063 vca_deployed["ee_id"],
4064 scaling_in=scaling_in,
4065 vca_type=vca_type,
4066 vca_id=vca_id,
4067 )
4068
4069 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4070 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4071 namespace = "." + db_nsr["_id"]
4072 try:
4073 await self.n2vc.delete_namespace(
4074 namespace=namespace,
4075 total_timeout=self.timeout_charm_delete,
4076 vca_id=vca_id,
4077 )
4078 except N2VCNotFound: # already deleted. Skip
4079 pass
4080 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4081
4082 async def _terminate_RO(
4083 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4084 ):
4085 """
4086 Terminates a deployment from RO
4087 :param logging_text:
4088 :param nsr_deployed: db_nsr._admin.deployed
4089 :param nsr_id:
4090 :param nslcmop_id:
4091 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4092 this method will update only the index 2, but it will write on database the concatenated content of the list
4093 :return:
4094 """
4095 db_nsr_update = {}
4096 failed_detail = []
4097 ro_nsr_id = ro_delete_action = None
4098 if nsr_deployed and nsr_deployed.get("RO"):
4099 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4100 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4101 try:
4102 if ro_nsr_id:
4103 stage[2] = "Deleting ns from VIM."
4104 db_nsr_update["detailed-status"] = " ".join(stage)
4105 self._write_op_status(nslcmop_id, stage)
4106 self.logger.debug(logging_text + stage[2])
4107 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4108 self._write_op_status(nslcmop_id, stage)
4109 desc = await self.RO.delete("ns", ro_nsr_id)
4110 ro_delete_action = desc["action_id"]
4111 db_nsr_update[
4112 "_admin.deployed.RO.nsr_delete_action_id"
4113 ] = ro_delete_action
4114 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4115 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4116 if ro_delete_action:
4117 # wait until NS is deleted from VIM
4118 stage[2] = "Waiting ns deleted from VIM."
4119 detailed_status_old = None
4120 self.logger.debug(
4121 logging_text
4122 + stage[2]
4123 + " RO_id={} ro_delete_action={}".format(
4124 ro_nsr_id, ro_delete_action
4125 )
4126 )
4127 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4128 self._write_op_status(nslcmop_id, stage)
4129
4130 delete_timeout = 20 * 60 # 20 minutes
4131 while delete_timeout > 0:
4132 desc = await self.RO.show(
4133 "ns",
4134 item_id_name=ro_nsr_id,
4135 extra_item="action",
4136 extra_item_id=ro_delete_action,
4137 )
4138
4139 # deploymentStatus
4140 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4141
4142 ns_status, ns_status_info = self.RO.check_action_status(desc)
4143 if ns_status == "ERROR":
4144 raise ROclient.ROClientException(ns_status_info)
4145 elif ns_status == "BUILD":
4146 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4147 elif ns_status == "ACTIVE":
4148 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4149 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4150 break
4151 else:
4152 assert (
4153 False
4154 ), "ROclient.check_action_status returns unknown {}".format(
4155 ns_status
4156 )
4157 if stage[2] != detailed_status_old:
4158 detailed_status_old = stage[2]
4159 db_nsr_update["detailed-status"] = " ".join(stage)
4160 self._write_op_status(nslcmop_id, stage)
4161 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4162 await asyncio.sleep(5, loop=self.loop)
4163 delete_timeout -= 5
4164 else: # delete_timeout <= 0:
4165 raise ROclient.ROClientException(
4166 "Timeout waiting ns deleted from VIM"
4167 )
4168
4169 except Exception as e:
4170 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4171 if (
4172 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4173 ): # not found
4174 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4175 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4176 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4177 self.logger.debug(
4178 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4179 )
4180 elif (
4181 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4182 ): # conflict
4183 failed_detail.append("delete conflict: {}".format(e))
4184 self.logger.debug(
4185 logging_text
4186 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4187 )
4188 else:
4189 failed_detail.append("delete error: {}".format(e))
4190 self.logger.error(
4191 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4192 )
4193
4194 # Delete nsd
4195 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4196 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4197 try:
4198 stage[2] = "Deleting nsd from RO."
4199 db_nsr_update["detailed-status"] = " ".join(stage)
4200 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4201 self._write_op_status(nslcmop_id, stage)
4202 await self.RO.delete("nsd", ro_nsd_id)
4203 self.logger.debug(
4204 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4205 )
4206 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4207 except Exception as e:
4208 if (
4209 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4210 ): # not found
4211 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4212 self.logger.debug(
4213 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4214 )
4215 elif (
4216 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4217 ): # conflict
4218 failed_detail.append(
4219 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4220 )
4221 self.logger.debug(logging_text + failed_detail[-1])
4222 else:
4223 failed_detail.append(
4224 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4225 )
4226 self.logger.error(logging_text + failed_detail[-1])
4227
4228 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4229 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4230 if not vnf_deployed or not vnf_deployed["id"]:
4231 continue
4232 try:
4233 ro_vnfd_id = vnf_deployed["id"]
4234 stage[
4235 2
4236 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4237 vnf_deployed["member-vnf-index"], ro_vnfd_id
4238 )
4239 db_nsr_update["detailed-status"] = " ".join(stage)
4240 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4241 self._write_op_status(nslcmop_id, stage)
4242 await self.RO.delete("vnfd", ro_vnfd_id)
4243 self.logger.debug(
4244 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4245 )
4246 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4247 except Exception as e:
4248 if (
4249 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4250 ): # not found
4251 db_nsr_update[
4252 "_admin.deployed.RO.vnfd.{}.id".format(index)
4253 ] = None
4254 self.logger.debug(
4255 logging_text
4256 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4257 )
4258 elif (
4259 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4260 ): # conflict
4261 failed_detail.append(
4262 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4263 )
4264 self.logger.debug(logging_text + failed_detail[-1])
4265 else:
4266 failed_detail.append(
4267 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4268 )
4269 self.logger.error(logging_text + failed_detail[-1])
4270
4271 if failed_detail:
4272 stage[2] = "Error deleting from VIM"
4273 else:
4274 stage[2] = "Deleted from VIM"
4275 db_nsr_update["detailed-status"] = " ".join(stage)
4276 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4277 self._write_op_status(nslcmop_id, stage)
4278
4279 if failed_detail:
4280 raise LcmException("; ".join(failed_detail))
4281
4282 async def terminate(self, nsr_id, nslcmop_id):
4283 # Try to lock HA task here
4284 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4285 if not task_is_locked_by_me:
4286 return
4287
4288 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4289 self.logger.debug(logging_text + "Enter")
4290 timeout_ns_terminate = self.timeout_ns_terminate
4291 db_nsr = None
4292 db_nslcmop = None
4293 operation_params = None
4294 exc = None
4295 error_list = [] # annotates all failed error messages
4296 db_nslcmop_update = {}
4297 autoremove = False # autoremove after terminated
4298 tasks_dict_info = {}
4299 db_nsr_update = {}
4300 stage = [
4301 "Stage 1/3: Preparing task.",
4302 "Waiting for previous operations to terminate.",
4303 "",
4304 ]
4305 # ^ contains [stage, step, VIM-status]
4306 try:
4307 # wait for any previous tasks in process
4308 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4309
4310 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4311 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4312 operation_params = db_nslcmop.get("operationParams") or {}
4313 if operation_params.get("timeout_ns_terminate"):
4314 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4315 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4316 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4317
4318 db_nsr_update["operational-status"] = "terminating"
4319 db_nsr_update["config-status"] = "terminating"
4320 self._write_ns_status(
4321 nsr_id=nsr_id,
4322 ns_state="TERMINATING",
4323 current_operation="TERMINATING",
4324 current_operation_id=nslcmop_id,
4325 other_update=db_nsr_update,
4326 )
4327 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4328 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4329 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4330 return
4331
4332 stage[1] = "Getting vnf descriptors from db."
4333 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4334 db_vnfrs_dict = {
4335 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4336 }
4337 db_vnfds_from_id = {}
4338 db_vnfds_from_member_index = {}
4339 # Loop over VNFRs
4340 for vnfr in db_vnfrs_list:
4341 vnfd_id = vnfr["vnfd-id"]
4342 if vnfd_id not in db_vnfds_from_id:
4343 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4344 db_vnfds_from_id[vnfd_id] = vnfd
4345 db_vnfds_from_member_index[
4346 vnfr["member-vnf-index-ref"]
4347 ] = db_vnfds_from_id[vnfd_id]
4348
4349 # Destroy individual execution environments when there are terminating primitives.
4350 # Rest of EE will be deleted at once
4351 # TODO - check before calling _destroy_N2VC
4352 # if not operation_params.get("skip_terminate_primitives"):#
4353 # or not vca.get("needed_terminate"):
4354 stage[0] = "Stage 2/3 execute terminating primitives."
4355 self.logger.debug(logging_text + stage[0])
4356 stage[1] = "Looking execution environment that needs terminate."
4357 self.logger.debug(logging_text + stage[1])
4358
4359 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4360 config_descriptor = None
4361 vca_member_vnf_index = vca.get("member-vnf-index")
4362 vca_id = self.get_vca_id(
4363 db_vnfrs_dict.get(vca_member_vnf_index)
4364 if vca_member_vnf_index
4365 else None,
4366 db_nsr,
4367 )
4368 if not vca or not vca.get("ee_id"):
4369 continue
4370 if not vca.get("member-vnf-index"):
4371 # ns
4372 config_descriptor = db_nsr.get("ns-configuration")
4373 elif vca.get("vdu_id"):
4374 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4375 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4376 elif vca.get("kdu_name"):
4377 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4378 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4379 else:
4380 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4381 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4382 vca_type = vca.get("type")
4383 exec_terminate_primitives = not operation_params.get(
4384 "skip_terminate_primitives"
4385 ) and vca.get("needed_terminate")
4386 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4387 # pending native charms
4388 destroy_ee = (
4389 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4390 )
4391 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4392 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4393 task = asyncio.ensure_future(
4394 self.destroy_N2VC(
4395 logging_text,
4396 db_nslcmop,
4397 vca,
4398 config_descriptor,
4399 vca_index,
4400 destroy_ee,
4401 exec_terminate_primitives,
4402 vca_id=vca_id,
4403 )
4404 )
4405 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4406
4407 # wait for pending tasks of terminate primitives
4408 if tasks_dict_info:
4409 self.logger.debug(
4410 logging_text
4411 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4412 )
4413 error_list = await self._wait_for_tasks(
4414 logging_text,
4415 tasks_dict_info,
4416 min(self.timeout_charm_delete, timeout_ns_terminate),
4417 stage,
4418 nslcmop_id,
4419 )
4420 tasks_dict_info.clear()
4421 if error_list:
4422 return # raise LcmException("; ".join(error_list))
4423
4424 # remove All execution environments at once
4425 stage[0] = "Stage 3/3 delete all."
4426
4427 if nsr_deployed.get("VCA"):
4428 stage[1] = "Deleting all execution environments."
4429 self.logger.debug(logging_text + stage[1])
4430 vca_id = self.get_vca_id({}, db_nsr)
4431 task_delete_ee = asyncio.ensure_future(
4432 asyncio.wait_for(
4433 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4434 timeout=self.timeout_charm_delete,
4435 )
4436 )
4437 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4438 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4439
4440 # Delete from k8scluster
4441 stage[1] = "Deleting KDUs."
4442 self.logger.debug(logging_text + stage[1])
4443 # print(nsr_deployed)
4444 for kdu in get_iterable(nsr_deployed, "K8s"):
4445 if not kdu or not kdu.get("kdu-instance"):
4446 continue
4447 kdu_instance = kdu.get("kdu-instance")
4448 if kdu.get("k8scluster-type") in self.k8scluster_map:
4449 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4450 vca_id = self.get_vca_id({}, db_nsr)
4451 task_delete_kdu_instance = asyncio.ensure_future(
4452 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4453 cluster_uuid=kdu.get("k8scluster-uuid"),
4454 kdu_instance=kdu_instance,
4455 vca_id=vca_id,
4456 )
4457 )
4458 else:
4459 self.logger.error(
4460 logging_text
4461 + "Unknown k8s deployment type {}".format(
4462 kdu.get("k8scluster-type")
4463 )
4464 )
4465 continue
4466 tasks_dict_info[
4467 task_delete_kdu_instance
4468 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4469
4470 # remove from RO
4471 stage[1] = "Deleting ns from VIM."
4472 if self.ng_ro:
4473 task_delete_ro = asyncio.ensure_future(
4474 self._terminate_ng_ro(
4475 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4476 )
4477 )
4478 else:
4479 task_delete_ro = asyncio.ensure_future(
4480 self._terminate_RO(
4481 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4482 )
4483 )
4484 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4485
4486 # rest of staff will be done at finally
4487
4488 except (
4489 ROclient.ROClientException,
4490 DbException,
4491 LcmException,
4492 N2VCException,
4493 ) as e:
4494 self.logger.error(logging_text + "Exit Exception {}".format(e))
4495 exc = e
4496 except asyncio.CancelledError:
4497 self.logger.error(
4498 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4499 )
4500 exc = "Operation was cancelled"
4501 except Exception as e:
4502 exc = traceback.format_exc()
4503 self.logger.critical(
4504 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4505 exc_info=True,
4506 )
4507 finally:
4508 if exc:
4509 error_list.append(str(exc))
4510 try:
4511 # wait for pending tasks
4512 if tasks_dict_info:
4513 stage[1] = "Waiting for terminate pending tasks."
4514 self.logger.debug(logging_text + stage[1])
4515 error_list += await self._wait_for_tasks(
4516 logging_text,
4517 tasks_dict_info,
4518 timeout_ns_terminate,
4519 stage,
4520 nslcmop_id,
4521 )
4522 stage[1] = stage[2] = ""
4523 except asyncio.CancelledError:
4524 error_list.append("Cancelled")
4525 # TODO cancell all tasks
4526 except Exception as exc:
4527 error_list.append(str(exc))
4528 # update status at database
4529 if error_list:
4530 error_detail = "; ".join(error_list)
4531 # self.logger.error(logging_text + error_detail)
4532 error_description_nslcmop = "{} Detail: {}".format(
4533 stage[0], error_detail
4534 )
4535 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4536 nslcmop_id, stage[0]
4537 )
4538
4539 db_nsr_update["operational-status"] = "failed"
4540 db_nsr_update["detailed-status"] = (
4541 error_description_nsr + " Detail: " + error_detail
4542 )
4543 db_nslcmop_update["detailed-status"] = error_detail
4544 nslcmop_operation_state = "FAILED"
4545 ns_state = "BROKEN"
4546 else:
4547 error_detail = None
4548 error_description_nsr = error_description_nslcmop = None
4549 ns_state = "NOT_INSTANTIATED"
4550 db_nsr_update["operational-status"] = "terminated"
4551 db_nsr_update["detailed-status"] = "Done"
4552 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4553 db_nslcmop_update["detailed-status"] = "Done"
4554 nslcmop_operation_state = "COMPLETED"
4555
4556 if db_nsr:
4557 self._write_ns_status(
4558 nsr_id=nsr_id,
4559 ns_state=ns_state,
4560 current_operation="IDLE",
4561 current_operation_id=None,
4562 error_description=error_description_nsr,
4563 error_detail=error_detail,
4564 other_update=db_nsr_update,
4565 )
4566 self._write_op_status(
4567 op_id=nslcmop_id,
4568 stage="",
4569 error_message=error_description_nslcmop,
4570 operation_state=nslcmop_operation_state,
4571 other_update=db_nslcmop_update,
4572 )
4573 if ns_state == "NOT_INSTANTIATED":
4574 try:
4575 self.db.set_list(
4576 "vnfrs",
4577 {"nsr-id-ref": nsr_id},
4578 {"_admin.nsState": "NOT_INSTANTIATED"},
4579 )
4580 except DbException as e:
4581 self.logger.warn(
4582 logging_text
4583 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4584 nsr_id, e
4585 )
4586 )
4587 if operation_params:
4588 autoremove = operation_params.get("autoremove", False)
4589 if nslcmop_operation_state:
4590 try:
4591 await self.msg.aiowrite(
4592 "ns",
4593 "terminated",
4594 {
4595 "nsr_id": nsr_id,
4596 "nslcmop_id": nslcmop_id,
4597 "operationState": nslcmop_operation_state,
4598 "autoremove": autoremove,
4599 },
4600 loop=self.loop,
4601 )
4602 except Exception as e:
4603 self.logger.error(
4604 logging_text + "kafka_write notification Exception {}".format(e)
4605 )
4606
4607 self.logger.debug(logging_text + "Exit")
4608 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4609
4610 async def _wait_for_tasks(
4611 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4612 ):
4613 time_start = time()
4614 error_detail_list = []
4615 error_list = []
4616 pending_tasks = list(created_tasks_info.keys())
4617 num_tasks = len(pending_tasks)
4618 num_done = 0
4619 stage[1] = "{}/{}.".format(num_done, num_tasks)
4620 self._write_op_status(nslcmop_id, stage)
4621 while pending_tasks:
4622 new_error = None
4623 _timeout = timeout + time_start - time()
4624 done, pending_tasks = await asyncio.wait(
4625 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4626 )
4627 num_done += len(done)
4628 if not done: # Timeout
4629 for task in pending_tasks:
4630 new_error = created_tasks_info[task] + ": Timeout"
4631 error_detail_list.append(new_error)
4632 error_list.append(new_error)
4633 break
4634 for task in done:
4635 if task.cancelled():
4636 exc = "Cancelled"
4637 else:
4638 exc = task.exception()
4639 if exc:
4640 if isinstance(exc, asyncio.TimeoutError):
4641 exc = "Timeout"
4642 new_error = created_tasks_info[task] + ": {}".format(exc)
4643 error_list.append(created_tasks_info[task])
4644 error_detail_list.append(new_error)
4645 if isinstance(
4646 exc,
4647 (
4648 str,
4649 DbException,
4650 N2VCException,
4651 ROclient.ROClientException,
4652 LcmException,
4653 K8sException,
4654 NgRoException,
4655 ),
4656 ):
4657 self.logger.error(logging_text + new_error)
4658 else:
4659 exc_traceback = "".join(
4660 traceback.format_exception(None, exc, exc.__traceback__)
4661 )
4662 self.logger.error(
4663 logging_text
4664 + created_tasks_info[task]
4665 + " "
4666 + exc_traceback
4667 )
4668 else:
4669 self.logger.debug(
4670 logging_text + created_tasks_info[task] + ": Done"
4671 )
4672 stage[1] = "{}/{}.".format(num_done, num_tasks)
4673 if new_error:
4674 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4675 if nsr_id: # update also nsr
4676 self.update_db_2(
4677 "nsrs",
4678 nsr_id,
4679 {
4680 "errorDescription": "Error at: " + ", ".join(error_list),
4681 "errorDetail": ". ".join(error_detail_list),
4682 },
4683 )
4684 self._write_op_status(nslcmop_id, stage)
4685 return error_detail_list
4686
4687 @staticmethod
4688 def _map_primitive_params(primitive_desc, params, instantiation_params):
4689 """
4690 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4691 The default-value is used. If it is between < > it look for a value at instantiation_params
4692 :param primitive_desc: portion of VNFD/NSD that describes primitive
4693 :param params: Params provided by user
4694 :param instantiation_params: Instantiation params provided by user
4695 :return: a dictionary with the calculated params
4696 """
4697 calculated_params = {}
4698 for parameter in primitive_desc.get("parameter", ()):
4699 param_name = parameter["name"]
4700 if param_name in params:
4701 calculated_params[param_name] = params[param_name]
4702 elif "default-value" in parameter or "value" in parameter:
4703 if "value" in parameter:
4704 calculated_params[param_name] = parameter["value"]
4705 else:
4706 calculated_params[param_name] = parameter["default-value"]
4707 if (
4708 isinstance(calculated_params[param_name], str)
4709 and calculated_params[param_name].startswith("<")
4710 and calculated_params[param_name].endswith(">")
4711 ):
4712 if calculated_params[param_name][1:-1] in instantiation_params:
4713 calculated_params[param_name] = instantiation_params[
4714 calculated_params[param_name][1:-1]
4715 ]
4716 else:
4717 raise LcmException(
4718 "Parameter {} needed to execute primitive {} not provided".format(
4719 calculated_params[param_name], primitive_desc["name"]
4720 )
4721 )
4722 else:
4723 raise LcmException(
4724 "Parameter {} needed to execute primitive {} not provided".format(
4725 param_name, primitive_desc["name"]
4726 )
4727 )
4728
4729 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4730 calculated_params[param_name] = yaml.safe_dump(
4731 calculated_params[param_name], default_flow_style=True, width=256
4732 )
4733 elif isinstance(calculated_params[param_name], str) and calculated_params[
4734 param_name
4735 ].startswith("!!yaml "):
4736 calculated_params[param_name] = calculated_params[param_name][7:]
4737 if parameter.get("data-type") == "INTEGER":
4738 try:
4739 calculated_params[param_name] = int(calculated_params[param_name])
4740 except ValueError: # error converting string to int
4741 raise LcmException(
4742 "Parameter {} of primitive {} must be integer".format(
4743 param_name, primitive_desc["name"]
4744 )
4745 )
4746 elif parameter.get("data-type") == "BOOLEAN":
4747 calculated_params[param_name] = not (
4748 (str(calculated_params[param_name])).lower() == "false"
4749 )
4750
4751 # add always ns_config_info if primitive name is config
4752 if primitive_desc["name"] == "config":
4753 if "ns_config_info" in instantiation_params:
4754 calculated_params["ns_config_info"] = instantiation_params[
4755 "ns_config_info"
4756 ]
4757 return calculated_params
4758
4759 def _look_for_deployed_vca(
4760 self,
4761 deployed_vca,
4762 member_vnf_index,
4763 vdu_id,
4764 vdu_count_index,
4765 kdu_name=None,
4766 ee_descriptor_id=None,
4767 ):
4768 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4769 for vca in deployed_vca:
4770 if not vca:
4771 continue
4772 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4773 continue
4774 if (
4775 vdu_count_index is not None
4776 and vdu_count_index != vca["vdu_count_index"]
4777 ):
4778 continue
4779 if kdu_name and kdu_name != vca["kdu_name"]:
4780 continue
4781 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4782 continue
4783 break
4784 else:
4785 # vca_deployed not found
4786 raise LcmException(
4787 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4788 " is not deployed".format(
4789 member_vnf_index,
4790 vdu_id,
4791 vdu_count_index,
4792 kdu_name,
4793 ee_descriptor_id,
4794 )
4795 )
4796 # get ee_id
4797 ee_id = vca.get("ee_id")
4798 vca_type = vca.get(
4799 "type", "lxc_proxy_charm"
4800 ) # default value for backward compatibility - proxy charm
4801 if not ee_id:
4802 raise LcmException(
4803 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4804 "execution environment".format(
4805 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4806 )
4807 )
4808 return ee_id, vca_type
4809
4810 async def _ns_execute_primitive(
4811 self,
4812 ee_id,
4813 primitive,
4814 primitive_params,
4815 retries=0,
4816 retries_interval=30,
4817 timeout=None,
4818 vca_type=None,
4819 db_dict=None,
4820 vca_id: str = None,
4821 ) -> (str, str):
4822 try:
4823 if primitive == "config":
4824 primitive_params = {"params": primitive_params}
4825
4826 vca_type = vca_type or "lxc_proxy_charm"
4827
4828 while retries >= 0:
4829 try:
4830 output = await asyncio.wait_for(
4831 self.vca_map[vca_type].exec_primitive(
4832 ee_id=ee_id,
4833 primitive_name=primitive,
4834 params_dict=primitive_params,
4835 progress_timeout=self.timeout_progress_primitive,
4836 total_timeout=self.timeout_primitive,
4837 db_dict=db_dict,
4838 vca_id=vca_id,
4839 vca_type=vca_type,
4840 ),
4841 timeout=timeout or self.timeout_primitive,
4842 )
4843 # execution was OK
4844 break
4845 except asyncio.CancelledError:
4846 raise
4847 except Exception as e: # asyncio.TimeoutError
4848 if isinstance(e, asyncio.TimeoutError):
4849 e = "Timeout"
4850 retries -= 1
4851 if retries >= 0:
4852 self.logger.debug(
4853 "Error executing action {} on {} -> {}".format(
4854 primitive, ee_id, e
4855 )
4856 )
4857 # wait and retry
4858 await asyncio.sleep(retries_interval, loop=self.loop)
4859 else:
4860 return "FAILED", str(e)
4861
4862 return "COMPLETED", output
4863
4864 except (LcmException, asyncio.CancelledError):
4865 raise
4866 except Exception as e:
4867 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4868
4869 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4870 """
4871 Updating the vca_status with latest juju information in nsrs record
4872 :param: nsr_id: Id of the nsr
4873 :param: nslcmop_id: Id of the nslcmop
4874 :return: None
4875 """
4876
4877 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4878 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4879 vca_id = self.get_vca_id({}, db_nsr)
4880 if db_nsr["_admin"]["deployed"]["K8s"]:
4881 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4882 cluster_uuid, kdu_instance, cluster_type = (
4883 k8s["k8scluster-uuid"],
4884 k8s["kdu-instance"],
4885 k8s["k8scluster-type"],
4886 )
4887 await self._on_update_k8s_db(
4888 cluster_uuid=cluster_uuid,
4889 kdu_instance=kdu_instance,
4890 filter={"_id": nsr_id},
4891 vca_id=vca_id,
4892 cluster_type=cluster_type,
4893 )
4894 else:
4895 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4896 table, filter = "nsrs", {"_id": nsr_id}
4897 path = "_admin.deployed.VCA.{}.".format(vca_index)
4898 await self._on_update_n2vc_db(table, filter, path, {})
4899
4900 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4901 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4902
4903 async def action(self, nsr_id, nslcmop_id):
4904 # Try to lock HA task here
4905 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4906 if not task_is_locked_by_me:
4907 return
4908
4909 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4910 self.logger.debug(logging_text + "Enter")
4911 # get all needed from database
4912 db_nsr = None
4913 db_nslcmop = None
4914 db_nsr_update = {}
4915 db_nslcmop_update = {}
4916 nslcmop_operation_state = None
4917 error_description_nslcmop = None
4918 exc = None
4919 try:
4920 # wait for any previous tasks in process
4921 step = "Waiting for previous operations to terminate"
4922 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4923
4924 self._write_ns_status(
4925 nsr_id=nsr_id,
4926 ns_state=None,
4927 current_operation="RUNNING ACTION",
4928 current_operation_id=nslcmop_id,
4929 )
4930
4931 step = "Getting information from database"
4932 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4933 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4934 if db_nslcmop["operationParams"].get("primitive_params"):
4935 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4936 db_nslcmop["operationParams"]["primitive_params"]
4937 )
4938
4939 nsr_deployed = db_nsr["_admin"].get("deployed")
4940 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4941 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4942 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4943 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4944 primitive = db_nslcmop["operationParams"]["primitive"]
4945 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4946 timeout_ns_action = db_nslcmop["operationParams"].get(
4947 "timeout_ns_action", self.timeout_primitive
4948 )
4949
4950 if vnf_index:
4951 step = "Getting vnfr from database"
4952 db_vnfr = self.db.get_one(
4953 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4954 )
4955 if db_vnfr.get("kdur"):
4956 kdur_list = []
4957 for kdur in db_vnfr["kdur"]:
4958 if kdur.get("additionalParams"):
4959 kdur["additionalParams"] = json.loads(
4960 kdur["additionalParams"]
4961 )
4962 kdur_list.append(kdur)
4963 db_vnfr["kdur"] = kdur_list
4964 step = "Getting vnfd from database"
4965 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4966
4967 # Sync filesystem before running a primitive
4968 self.fs.sync(db_vnfr["vnfd-id"])
4969 else:
4970 step = "Getting nsd from database"
4971 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4972
4973 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4974 # for backward compatibility
4975 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4976 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4977 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4978 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4979
4980 # look for primitive
4981 config_primitive_desc = descriptor_configuration = None
4982 if vdu_id:
4983 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4984 elif kdu_name:
4985 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4986 elif vnf_index:
4987 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4988 else:
4989 descriptor_configuration = db_nsd.get("ns-configuration")
4990
4991 if descriptor_configuration and descriptor_configuration.get(
4992 "config-primitive"
4993 ):
4994 for config_primitive in descriptor_configuration["config-primitive"]:
4995 if config_primitive["name"] == primitive:
4996 config_primitive_desc = config_primitive
4997 break
4998
4999 if not config_primitive_desc:
5000 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
5001 raise LcmException(
5002 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5003 primitive
5004 )
5005 )
5006 primitive_name = primitive
5007 ee_descriptor_id = None
5008 else:
5009 primitive_name = config_primitive_desc.get(
5010 "execution-environment-primitive", primitive
5011 )
5012 ee_descriptor_id = config_primitive_desc.get(
5013 "execution-environment-ref"
5014 )
5015
5016 if vnf_index:
5017 if vdu_id:
5018 vdur = next(
5019 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
5020 )
5021 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
5022 elif kdu_name:
5023 kdur = next(
5024 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
5025 )
5026 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
5027 else:
5028 desc_params = parse_yaml_strings(
5029 db_vnfr.get("additionalParamsForVnf")
5030 )
5031 else:
5032 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
5033 if kdu_name and get_configuration(db_vnfd, kdu_name):
5034 kdu_configuration = get_configuration(db_vnfd, kdu_name)
5035 actions = set()
5036 for primitive in kdu_configuration.get("initial-config-primitive", []):
5037 actions.add(primitive["name"])
5038 for primitive in kdu_configuration.get("config-primitive", []):
5039 actions.add(primitive["name"])
5040 kdu_action = True if primitive_name in actions else False
5041
5042 # TODO check if ns is in a proper status
5043 if kdu_name and (
5044 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5045 ):
5046 # kdur and desc_params already set from before
5047 if primitive_params:
5048 desc_params.update(primitive_params)
5049 # TODO Check if we will need something at vnf level
5050 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5051 if (
5052 kdu_name == kdu["kdu-name"]
5053 and kdu["member-vnf-index"] == vnf_index
5054 ):
5055 break
5056 else:
5057 raise LcmException(
5058 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5059 )
5060
5061 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5062 msg = "unknown k8scluster-type '{}'".format(
5063 kdu.get("k8scluster-type")
5064 )
5065 raise LcmException(msg)
5066
5067 db_dict = {
5068 "collection": "nsrs",
5069 "filter": {"_id": nsr_id},
5070 "path": "_admin.deployed.K8s.{}".format(index),
5071 }
5072 self.logger.debug(
5073 logging_text
5074 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5075 )
5076 step = "Executing kdu {}".format(primitive_name)
5077 if primitive_name == "upgrade":
5078 if desc_params.get("kdu_model"):
5079 kdu_model = desc_params.get("kdu_model")
5080 del desc_params["kdu_model"]
5081 else:
5082 kdu_model = kdu.get("kdu-model")
5083 parts = kdu_model.split(sep=":")
5084 if len(parts) == 2:
5085 kdu_model = parts[0]
5086
5087 detailed_status = await asyncio.wait_for(
5088 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5089 cluster_uuid=kdu.get("k8scluster-uuid"),
5090 kdu_instance=kdu.get("kdu-instance"),
5091 atomic=True,
5092 kdu_model=kdu_model,
5093 params=desc_params,
5094 db_dict=db_dict,
5095 timeout=timeout_ns_action,
5096 ),
5097 timeout=timeout_ns_action + 10,
5098 )
5099 self.logger.debug(
5100 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5101 )
5102 elif primitive_name == "rollback":
5103 detailed_status = await asyncio.wait_for(
5104 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5105 cluster_uuid=kdu.get("k8scluster-uuid"),
5106 kdu_instance=kdu.get("kdu-instance"),
5107 db_dict=db_dict,
5108 ),
5109 timeout=timeout_ns_action,
5110 )
5111 elif primitive_name == "status":
5112 detailed_status = await asyncio.wait_for(
5113 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5114 cluster_uuid=kdu.get("k8scluster-uuid"),
5115 kdu_instance=kdu.get("kdu-instance"),
5116 vca_id=vca_id,
5117 ),
5118 timeout=timeout_ns_action,
5119 )
5120 else:
5121 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5122 kdu["kdu-name"], nsr_id
5123 )
5124 params = self._map_primitive_params(
5125 config_primitive_desc, primitive_params, desc_params
5126 )
5127
5128 detailed_status = await asyncio.wait_for(
5129 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5130 cluster_uuid=kdu.get("k8scluster-uuid"),
5131 kdu_instance=kdu_instance,
5132 primitive_name=primitive_name,
5133 params=params,
5134 db_dict=db_dict,
5135 timeout=timeout_ns_action,
5136 vca_id=vca_id,
5137 ),
5138 timeout=timeout_ns_action,
5139 )
5140
5141 if detailed_status:
5142 nslcmop_operation_state = "COMPLETED"
5143 else:
5144 detailed_status = ""
5145 nslcmop_operation_state = "FAILED"
5146 else:
5147 ee_id, vca_type = self._look_for_deployed_vca(
5148 nsr_deployed["VCA"],
5149 member_vnf_index=vnf_index,
5150 vdu_id=vdu_id,
5151 vdu_count_index=vdu_count_index,
5152 ee_descriptor_id=ee_descriptor_id,
5153 )
5154 for vca_index, vca_deployed in enumerate(
5155 db_nsr["_admin"]["deployed"]["VCA"]
5156 ):
5157 if vca_deployed.get("member-vnf-index") == vnf_index:
5158 db_dict = {
5159 "collection": "nsrs",
5160 "filter": {"_id": nsr_id},
5161 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5162 }
5163 break
5164 (
5165 nslcmop_operation_state,
5166 detailed_status,
5167 ) = await self._ns_execute_primitive(
5168 ee_id,
5169 primitive=primitive_name,
5170 primitive_params=self._map_primitive_params(
5171 config_primitive_desc, primitive_params, desc_params
5172 ),
5173 timeout=timeout_ns_action,
5174 vca_type=vca_type,
5175 db_dict=db_dict,
5176 vca_id=vca_id,
5177 )
5178
5179 db_nslcmop_update["detailed-status"] = detailed_status
5180 error_description_nslcmop = (
5181 detailed_status if nslcmop_operation_state == "FAILED" else ""
5182 )
5183 self.logger.debug(
5184 logging_text
5185 + " task Done with result {} {}".format(
5186 nslcmop_operation_state, detailed_status
5187 )
5188 )
5189 return # database update is called inside finally
5190
5191 except (DbException, LcmException, N2VCException, K8sException) as e:
5192 self.logger.error(logging_text + "Exit Exception {}".format(e))
5193 exc = e
5194 except asyncio.CancelledError:
5195 self.logger.error(
5196 logging_text + "Cancelled Exception while '{}'".format(step)
5197 )
5198 exc = "Operation was cancelled"
5199 except asyncio.TimeoutError:
5200 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5201 exc = "Timeout"
5202 except Exception as e:
5203 exc = traceback.format_exc()
5204 self.logger.critical(
5205 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5206 exc_info=True,
5207 )
5208 finally:
5209 if exc:
5210 db_nslcmop_update[
5211 "detailed-status"
5212 ] = (
5213 detailed_status
5214 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5215 nslcmop_operation_state = "FAILED"
5216 if db_nsr:
5217 self._write_ns_status(
5218 nsr_id=nsr_id,
5219 ns_state=db_nsr[
5220 "nsState"
5221 ], # TODO check if degraded. For the moment use previous status
5222 current_operation="IDLE",
5223 current_operation_id=None,
5224 # error_description=error_description_nsr,
5225 # error_detail=error_detail,
5226 other_update=db_nsr_update,
5227 )
5228
5229 self._write_op_status(
5230 op_id=nslcmop_id,
5231 stage="",
5232 error_message=error_description_nslcmop,
5233 operation_state=nslcmop_operation_state,
5234 other_update=db_nslcmop_update,
5235 )
5236
5237 if nslcmop_operation_state:
5238 try:
5239 await self.msg.aiowrite(
5240 "ns",
5241 "actioned",
5242 {
5243 "nsr_id": nsr_id,
5244 "nslcmop_id": nslcmop_id,
5245 "operationState": nslcmop_operation_state,
5246 },
5247 loop=self.loop,
5248 )
5249 except Exception as e:
5250 self.logger.error(
5251 logging_text + "kafka_write notification Exception {}".format(e)
5252 )
5253 self.logger.debug(logging_text + "Exit")
5254 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5255 return nslcmop_operation_state, detailed_status
5256
5257 async def scale(self, nsr_id, nslcmop_id):
5258 # Try to lock HA task here
5259 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5260 if not task_is_locked_by_me:
5261 return
5262
5263 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5264 stage = ["", "", ""]
5265 tasks_dict_info = {}
5266 # ^ stage, step, VIM progress
5267 self.logger.debug(logging_text + "Enter")
5268 # get all needed from database
5269 db_nsr = None
5270 db_nslcmop_update = {}
5271 db_nsr_update = {}
5272 exc = None
5273 # in case of error, indicates what part of scale was failed to put nsr at error status
5274 scale_process = None
5275 old_operational_status = ""
5276 old_config_status = ""
5277 nsi_id = None
5278 try:
5279 # wait for any previous tasks in process
5280 step = "Waiting for previous operations to terminate"
5281 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5282 self._write_ns_status(
5283 nsr_id=nsr_id,
5284 ns_state=None,
5285 current_operation="SCALING",
5286 current_operation_id=nslcmop_id,
5287 )
5288
5289 step = "Getting nslcmop from database"
5290 self.logger.debug(
5291 step + " after having waited for previous tasks to be completed"
5292 )
5293 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5294
5295 step = "Getting nsr from database"
5296 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5297 old_operational_status = db_nsr["operational-status"]
5298 old_config_status = db_nsr["config-status"]
5299
5300 step = "Parsing scaling parameters"
5301 db_nsr_update["operational-status"] = "scaling"
5302 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5303 nsr_deployed = db_nsr["_admin"].get("deployed")
5304
5305 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5306 "scaleByStepData"
5307 ]["member-vnf-index"]
5308 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5309 "scaleByStepData"
5310 ]["scaling-group-descriptor"]
5311 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5312 # for backward compatibility
5313 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5314 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5315 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5316 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5317
5318 step = "Getting vnfr from database"
5319 db_vnfr = self.db.get_one(
5320 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5321 )
5322
5323 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5324
5325 step = "Getting vnfd from database"
5326 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5327
5328 base_folder = db_vnfd["_admin"]["storage"]
5329
5330 step = "Getting scaling-group-descriptor"
5331 scaling_descriptor = find_in_list(
5332 get_scaling_aspect(db_vnfd),
5333 lambda scale_desc: scale_desc["name"] == scaling_group,
5334 )
5335 if not scaling_descriptor:
5336 raise LcmException(
5337 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5338 "at vnfd:scaling-group-descriptor".format(scaling_group)
5339 )
5340
5341 step = "Sending scale order to VIM"
5342 # TODO check if ns is in a proper status
5343 nb_scale_op = 0
5344 if not db_nsr["_admin"].get("scaling-group"):
5345 self.update_db_2(
5346 "nsrs",
5347 nsr_id,
5348 {
5349 "_admin.scaling-group": [
5350 {"name": scaling_group, "nb-scale-op": 0}
5351 ]
5352 },
5353 )
5354 admin_scale_index = 0
5355 else:
5356 for admin_scale_index, admin_scale_info in enumerate(
5357 db_nsr["_admin"]["scaling-group"]
5358 ):
5359 if admin_scale_info["name"] == scaling_group:
5360 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5361 break
5362 else: # not found, set index one plus last element and add new entry with the name
5363 admin_scale_index += 1
5364 db_nsr_update[
5365 "_admin.scaling-group.{}.name".format(admin_scale_index)
5366 ] = scaling_group
5367
5368 vca_scaling_info = []
5369 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5370 if scaling_type == "SCALE_OUT":
5371 if "aspect-delta-details" not in scaling_descriptor:
5372 raise LcmException(
5373 "Aspect delta details not fount in scaling descriptor {}".format(
5374 scaling_descriptor["name"]
5375 )
5376 )
5377 # count if max-instance-count is reached
5378 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5379
5380 scaling_info["scaling_direction"] = "OUT"
5381 scaling_info["vdu-create"] = {}
5382 scaling_info["kdu-create"] = {}
5383 for delta in deltas:
5384 for vdu_delta in delta.get("vdu-delta", {}):
5385 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5386 # vdu_index also provides the number of instance of the targeted vdu
5387 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5388 cloud_init_text = self._get_vdu_cloud_init_content(
5389 vdud, db_vnfd
5390 )
5391 if cloud_init_text:
5392 additional_params = (
5393 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5394 or {}
5395 )
5396 cloud_init_list = []
5397
5398 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5399 max_instance_count = 10
5400 if vdu_profile and "max-number-of-instances" in vdu_profile:
5401 max_instance_count = vdu_profile.get(
5402 "max-number-of-instances", 10
5403 )
5404
5405 default_instance_num = get_number_of_instances(
5406 db_vnfd, vdud["id"]
5407 )
5408 instances_number = vdu_delta.get("number-of-instances", 1)
5409 nb_scale_op += instances_number
5410
5411 new_instance_count = nb_scale_op + default_instance_num
5412 # Control if new count is over max and vdu count is less than max.
5413 # Then assign new instance count
5414 if new_instance_count > max_instance_count > vdu_count:
5415 instances_number = new_instance_count - max_instance_count
5416 else:
5417 instances_number = instances_number
5418
5419 if new_instance_count > max_instance_count:
5420 raise LcmException(
5421 "reached the limit of {} (max-instance-count) "
5422 "scaling-out operations for the "
5423 "scaling-group-descriptor '{}'".format(
5424 nb_scale_op, scaling_group
5425 )
5426 )
5427 for x in range(vdu_delta.get("number-of-instances", 1)):
5428 if cloud_init_text:
5429 # TODO Information of its own ip is not available because db_vnfr is not updated.
5430 additional_params["OSM"] = get_osm_params(
5431 db_vnfr, vdu_delta["id"], vdu_index + x
5432 )
5433 cloud_init_list.append(
5434 self._parse_cloud_init(
5435 cloud_init_text,
5436 additional_params,
5437 db_vnfd["id"],
5438 vdud["id"],
5439 )
5440 )
5441 vca_scaling_info.append(
5442 {
5443 "osm_vdu_id": vdu_delta["id"],
5444 "member-vnf-index": vnf_index,
5445 "type": "create",
5446 "vdu_index": vdu_index + x,
5447 }
5448 )
5449 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5450 for kdu_delta in delta.get("kdu-resource-delta", {}):
5451 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5452 kdu_name = kdu_profile["kdu-name"]
5453 resource_name = kdu_profile.get("resource-name", "")
5454
5455 # Might have different kdus in the same delta
5456 # Should have list for each kdu
5457 if not scaling_info["kdu-create"].get(kdu_name, None):
5458 scaling_info["kdu-create"][kdu_name] = []
5459
5460 kdur = get_kdur(db_vnfr, kdu_name)
5461 if kdur.get("helm-chart"):
5462 k8s_cluster_type = "helm-chart-v3"
5463 self.logger.debug("kdur: {}".format(kdur))
5464 if (
5465 kdur.get("helm-version")
5466 and kdur.get("helm-version") == "v2"
5467 ):
5468 k8s_cluster_type = "helm-chart"
5469 elif kdur.get("juju-bundle"):
5470 k8s_cluster_type = "juju-bundle"
5471 else:
5472 raise LcmException(
5473 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5474 "juju-bundle. Maybe an old NBI version is running".format(
5475 db_vnfr["member-vnf-index-ref"], kdu_name
5476 )
5477 )
5478
5479 max_instance_count = 10
5480 if kdu_profile and "max-number-of-instances" in kdu_profile:
5481 max_instance_count = kdu_profile.get(
5482 "max-number-of-instances", 10
5483 )
5484
5485 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5486 deployed_kdu, _ = get_deployed_kdu(
5487 nsr_deployed, kdu_name, vnf_index
5488 )
5489 if deployed_kdu is None:
5490 raise LcmException(
5491 "KDU '{}' for vnf '{}' not deployed".format(
5492 kdu_name, vnf_index
5493 )
5494 )
5495 kdu_instance = deployed_kdu.get("kdu-instance")
5496 instance_num = await self.k8scluster_map[
5497 k8s_cluster_type
5498 ].get_scale_count(
5499 resource_name,
5500 kdu_instance,
5501 vca_id=vca_id,
5502 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
5503 kdu_model=deployed_kdu.get("kdu-model"),
5504 )
5505 kdu_replica_count = instance_num + kdu_delta.get(
5506 "number-of-instances", 1
5507 )
5508
5509 # Control if new count is over max and instance_num is less than max.
5510 # Then assign max instance number to kdu replica count
5511 if kdu_replica_count > max_instance_count > instance_num:
5512 kdu_replica_count = max_instance_count
5513 if kdu_replica_count > max_instance_count:
5514 raise LcmException(
5515 "reached the limit of {} (max-instance-count) "
5516 "scaling-out operations for the "
5517 "scaling-group-descriptor '{}'".format(
5518 instance_num, scaling_group
5519 )
5520 )
5521
5522 for x in range(kdu_delta.get("number-of-instances", 1)):
5523 vca_scaling_info.append(
5524 {
5525 "osm_kdu_id": kdu_name,
5526 "member-vnf-index": vnf_index,
5527 "type": "create",
5528 "kdu_index": instance_num + x - 1,
5529 }
5530 )
5531 scaling_info["kdu-create"][kdu_name].append(
5532 {
5533 "member-vnf-index": vnf_index,
5534 "type": "create",
5535 "k8s-cluster-type": k8s_cluster_type,
5536 "resource-name": resource_name,
5537 "scale": kdu_replica_count,
5538 }
5539 )
5540 elif scaling_type == "SCALE_IN":
5541 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5542
5543 scaling_info["scaling_direction"] = "IN"
5544 scaling_info["vdu-delete"] = {}
5545 scaling_info["kdu-delete"] = {}
5546
5547 for delta in deltas:
5548 for vdu_delta in delta.get("vdu-delta", {}):
5549 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5550 min_instance_count = 0
5551 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5552 if vdu_profile and "min-number-of-instances" in vdu_profile:
5553 min_instance_count = vdu_profile["min-number-of-instances"]
5554
5555 default_instance_num = get_number_of_instances(
5556 db_vnfd, vdu_delta["id"]
5557 )
5558 instance_num = vdu_delta.get("number-of-instances", 1)
5559 nb_scale_op -= instance_num
5560
5561 new_instance_count = nb_scale_op + default_instance_num
5562
5563 if new_instance_count < min_instance_count < vdu_count:
5564 instances_number = min_instance_count - new_instance_count
5565 else:
5566 instances_number = instance_num
5567
5568 if new_instance_count < min_instance_count:
5569 raise LcmException(
5570 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5571 "scaling-group-descriptor '{}'".format(
5572 nb_scale_op, scaling_group
5573 )
5574 )
5575 for x in range(vdu_delta.get("number-of-instances", 1)):
5576 vca_scaling_info.append(
5577 {
5578 "osm_vdu_id": vdu_delta["id"],
5579 "member-vnf-index": vnf_index,
5580 "type": "delete",
5581 "vdu_index": vdu_index - 1 - x,
5582 }
5583 )
5584 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5585 for kdu_delta in delta.get("kdu-resource-delta", {}):
5586 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5587 kdu_name = kdu_profile["kdu-name"]
5588 resource_name = kdu_profile.get("resource-name", "")
5589
5590 if not scaling_info["kdu-delete"].get(kdu_name, None):
5591 scaling_info["kdu-delete"][kdu_name] = []
5592
5593 kdur = get_kdur(db_vnfr, kdu_name)
5594 if kdur.get("helm-chart"):
5595 k8s_cluster_type = "helm-chart-v3"
5596 self.logger.debug("kdur: {}".format(kdur))
5597 if (
5598 kdur.get("helm-version")
5599 and kdur.get("helm-version") == "v2"
5600 ):
5601 k8s_cluster_type = "helm-chart"
5602 elif kdur.get("juju-bundle"):
5603 k8s_cluster_type = "juju-bundle"
5604 else:
5605 raise LcmException(
5606 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5607 "juju-bundle. Maybe an old NBI version is running".format(
5608 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5609 )
5610 )
5611
5612 min_instance_count = 0
5613 if kdu_profile and "min-number-of-instances" in kdu_profile:
5614 min_instance_count = kdu_profile["min-number-of-instances"]
5615
5616 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5617 deployed_kdu, _ = get_deployed_kdu(
5618 nsr_deployed, kdu_name, vnf_index
5619 )
5620 if deployed_kdu is None:
5621 raise LcmException(
5622 "KDU '{}' for vnf '{}' not deployed".format(
5623 kdu_name, vnf_index
5624 )
5625 )
5626 kdu_instance = deployed_kdu.get("kdu-instance")
5627 instance_num = await self.k8scluster_map[
5628 k8s_cluster_type
5629 ].get_scale_count(
5630 resource_name,
5631 kdu_instance,
5632 vca_id=vca_id,
5633 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
5634 kdu_model=deployed_kdu.get("kdu-model"),
5635 )
5636 kdu_replica_count = instance_num - kdu_delta.get(
5637 "number-of-instances", 1
5638 )
5639
5640 if kdu_replica_count < min_instance_count < instance_num:
5641 kdu_replica_count = min_instance_count
5642 if kdu_replica_count < min_instance_count:
5643 raise LcmException(
5644 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5645 "scaling-group-descriptor '{}'".format(
5646 instance_num, scaling_group
5647 )
5648 )
5649
5650 for x in range(kdu_delta.get("number-of-instances", 1)):
5651 vca_scaling_info.append(
5652 {
5653 "osm_kdu_id": kdu_name,
5654 "member-vnf-index": vnf_index,
5655 "type": "delete",
5656 "kdu_index": instance_num - x - 1,
5657 }
5658 )
5659 scaling_info["kdu-delete"][kdu_name].append(
5660 {
5661 "member-vnf-index": vnf_index,
5662 "type": "delete",
5663 "k8s-cluster-type": k8s_cluster_type,
5664 "resource-name": resource_name,
5665 "scale": kdu_replica_count,
5666 }
5667 )
5668
5669 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5670 vdu_delete = copy(scaling_info.get("vdu-delete"))
5671 if scaling_info["scaling_direction"] == "IN":
5672 for vdur in reversed(db_vnfr["vdur"]):
5673 if vdu_delete.get(vdur["vdu-id-ref"]):
5674 vdu_delete[vdur["vdu-id-ref"]] -= 1
5675 scaling_info["vdu"].append(
5676 {
5677 "name": vdur.get("name") or vdur.get("vdu-name"),
5678 "vdu_id": vdur["vdu-id-ref"],
5679 "interface": [],
5680 }
5681 )
5682 for interface in vdur["interfaces"]:
5683 scaling_info["vdu"][-1]["interface"].append(
5684 {
5685 "name": interface["name"],
5686 "ip_address": interface["ip-address"],
5687 "mac_address": interface.get("mac-address"),
5688 }
5689 )
5690 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5691
5692 # PRE-SCALE BEGIN
5693 step = "Executing pre-scale vnf-config-primitive"
5694 if scaling_descriptor.get("scaling-config-action"):
5695 for scaling_config_action in scaling_descriptor[
5696 "scaling-config-action"
5697 ]:
5698 if (
5699 scaling_config_action.get("trigger") == "pre-scale-in"
5700 and scaling_type == "SCALE_IN"
5701 ) or (
5702 scaling_config_action.get("trigger") == "pre-scale-out"
5703 and scaling_type == "SCALE_OUT"
5704 ):
5705 vnf_config_primitive = scaling_config_action[
5706 "vnf-config-primitive-name-ref"
5707 ]
5708 step = db_nslcmop_update[
5709 "detailed-status"
5710 ] = "executing pre-scale scaling-config-action '{}'".format(
5711 vnf_config_primitive
5712 )
5713
5714 # look for primitive
5715 for config_primitive in (
5716 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5717 ).get("config-primitive", ()):
5718 if config_primitive["name"] == vnf_config_primitive:
5719 break
5720 else:
5721 raise LcmException(
5722 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5723 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5724 "primitive".format(scaling_group, vnf_config_primitive)
5725 )
5726
5727 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5728 if db_vnfr.get("additionalParamsForVnf"):
5729 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5730
5731 scale_process = "VCA"
5732 db_nsr_update["config-status"] = "configuring pre-scaling"
5733 primitive_params = self._map_primitive_params(
5734 config_primitive, {}, vnfr_params
5735 )
5736
5737 # Pre-scale retry check: Check if this sub-operation has been executed before
5738 op_index = self._check_or_add_scale_suboperation(
5739 db_nslcmop,
5740 vnf_index,
5741 vnf_config_primitive,
5742 primitive_params,
5743 "PRE-SCALE",
5744 )
5745 if op_index == self.SUBOPERATION_STATUS_SKIP:
5746 # Skip sub-operation
5747 result = "COMPLETED"
5748 result_detail = "Done"
5749 self.logger.debug(
5750 logging_text
5751 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5752 vnf_config_primitive, result, result_detail
5753 )
5754 )
5755 else:
5756 if op_index == self.SUBOPERATION_STATUS_NEW:
5757 # New sub-operation: Get index of this sub-operation
5758 op_index = (
5759 len(db_nslcmop.get("_admin", {}).get("operations"))
5760 - 1
5761 )
5762 self.logger.debug(
5763 logging_text
5764 + "vnf_config_primitive={} New sub-operation".format(
5765 vnf_config_primitive
5766 )
5767 )
5768 else:
5769 # retry: Get registered params for this existing sub-operation
5770 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5771 op_index
5772 ]
5773 vnf_index = op.get("member_vnf_index")
5774 vnf_config_primitive = op.get("primitive")
5775 primitive_params = op.get("primitive_params")
5776 self.logger.debug(
5777 logging_text
5778 + "vnf_config_primitive={} Sub-operation retry".format(
5779 vnf_config_primitive
5780 )
5781 )
5782 # Execute the primitive, either with new (first-time) or registered (reintent) args
5783 ee_descriptor_id = config_primitive.get(
5784 "execution-environment-ref"
5785 )
5786 primitive_name = config_primitive.get(
5787 "execution-environment-primitive", vnf_config_primitive
5788 )
5789 ee_id, vca_type = self._look_for_deployed_vca(
5790 nsr_deployed["VCA"],
5791 member_vnf_index=vnf_index,
5792 vdu_id=None,
5793 vdu_count_index=None,
5794 ee_descriptor_id=ee_descriptor_id,
5795 )
5796 result, result_detail = await self._ns_execute_primitive(
5797 ee_id,
5798 primitive_name,
5799 primitive_params,
5800 vca_type=vca_type,
5801 vca_id=vca_id,
5802 )
5803 self.logger.debug(
5804 logging_text
5805 + "vnf_config_primitive={} Done with result {} {}".format(
5806 vnf_config_primitive, result, result_detail
5807 )
5808 )
5809 # Update operationState = COMPLETED | FAILED
5810 self._update_suboperation_status(
5811 db_nslcmop, op_index, result, result_detail
5812 )
5813
5814 if result == "FAILED":
5815 raise LcmException(result_detail)
5816 db_nsr_update["config-status"] = old_config_status
5817 scale_process = None
5818 # PRE-SCALE END
5819
5820 db_nsr_update[
5821 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5822 ] = nb_scale_op
5823 db_nsr_update[
5824 "_admin.scaling-group.{}.time".format(admin_scale_index)
5825 ] = time()
5826
5827 # SCALE-IN VCA - BEGIN
5828 if vca_scaling_info:
5829 step = db_nslcmop_update[
5830 "detailed-status"
5831 ] = "Deleting the execution environments"
5832 scale_process = "VCA"
5833 for vca_info in vca_scaling_info:
5834 if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
5835 member_vnf_index = str(vca_info["member-vnf-index"])
5836 self.logger.debug(
5837 logging_text + "vdu info: {}".format(vca_info)
5838 )
5839 if vca_info.get("osm_vdu_id"):
5840 vdu_id = vca_info["osm_vdu_id"]
5841 vdu_index = int(vca_info["vdu_index"])
5842 stage[
5843 1
5844 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5845 member_vnf_index, vdu_id, vdu_index
5846 )
5847 stage[2] = step = "Scaling in VCA"
5848 self._write_op_status(op_id=nslcmop_id, stage=stage)
5849 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5850 config_update = db_nsr["configurationStatus"]
5851 for vca_index, vca in enumerate(vca_update):
5852 if (
5853 (vca or vca.get("ee_id"))
5854 and vca["member-vnf-index"] == member_vnf_index
5855 and vca["vdu_count_index"] == vdu_index
5856 ):
5857 if vca.get("vdu_id"):
5858 config_descriptor = get_configuration(
5859 db_vnfd, vca.get("vdu_id")
5860 )
5861 elif vca.get("kdu_name"):
5862 config_descriptor = get_configuration(
5863 db_vnfd, vca.get("kdu_name")
5864 )
5865 else:
5866 config_descriptor = get_configuration(
5867 db_vnfd, db_vnfd["id"]
5868 )
5869 operation_params = (
5870 db_nslcmop.get("operationParams") or {}
5871 )
5872 exec_terminate_primitives = not operation_params.get(
5873 "skip_terminate_primitives"
5874 ) and vca.get("needed_terminate")
5875 task = asyncio.ensure_future(
5876 asyncio.wait_for(
5877 self.destroy_N2VC(
5878 logging_text,
5879 db_nslcmop,
5880 vca,
5881 config_descriptor,
5882 vca_index,
5883 destroy_ee=True,
5884 exec_primitives=exec_terminate_primitives,
5885 scaling_in=True,
5886 vca_id=vca_id,
5887 ),
5888 timeout=self.timeout_charm_delete,
5889 )
5890 )
5891 tasks_dict_info[task] = "Terminating VCA {}".format(
5892 vca.get("ee_id")
5893 )
5894 del vca_update[vca_index]
5895 del config_update[vca_index]
5896 # wait for pending tasks of terminate primitives
5897 if tasks_dict_info:
5898 self.logger.debug(
5899 logging_text
5900 + "Waiting for tasks {}".format(
5901 list(tasks_dict_info.keys())
5902 )
5903 )
5904 error_list = await self._wait_for_tasks(
5905 logging_text,
5906 tasks_dict_info,
5907 min(
5908 self.timeout_charm_delete, self.timeout_ns_terminate
5909 ),
5910 stage,
5911 nslcmop_id,
5912 )
5913 tasks_dict_info.clear()
5914 if error_list:
5915 raise LcmException("; ".join(error_list))
5916
5917 db_vca_and_config_update = {
5918 "_admin.deployed.VCA": vca_update,
5919 "configurationStatus": config_update,
5920 }
5921 self.update_db_2(
5922 "nsrs", db_nsr["_id"], db_vca_and_config_update
5923 )
5924 scale_process = None
5925 # SCALE-IN VCA - END
5926
5927 # SCALE RO - BEGIN
5928 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5929 scale_process = "RO"
5930 if self.ro_config.get("ng"):
5931 await self._scale_ng_ro(
5932 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5933 )
5934 scaling_info.pop("vdu-create", None)
5935 scaling_info.pop("vdu-delete", None)
5936
5937 scale_process = None
5938 # SCALE RO - END
5939
5940 # SCALE KDU - BEGIN
5941 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5942 scale_process = "KDU"
5943 await self._scale_kdu(
5944 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5945 )
5946 scaling_info.pop("kdu-create", None)
5947 scaling_info.pop("kdu-delete", None)
5948
5949 scale_process = None
5950 # SCALE KDU - END
5951
5952 if db_nsr_update:
5953 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5954
5955 # SCALE-UP VCA - BEGIN
5956 if vca_scaling_info:
5957 step = db_nslcmop_update[
5958 "detailed-status"
5959 ] = "Creating new execution environments"
5960 scale_process = "VCA"
5961 for vca_info in vca_scaling_info:
5962 if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
5963 member_vnf_index = str(vca_info["member-vnf-index"])
5964 self.logger.debug(
5965 logging_text + "vdu info: {}".format(vca_info)
5966 )
5967 vnfd_id = db_vnfr["vnfd-ref"]
5968 if vca_info.get("osm_vdu_id"):
5969 vdu_index = int(vca_info["vdu_index"])
5970 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5971 if db_vnfr.get("additionalParamsForVnf"):
5972 deploy_params.update(
5973 parse_yaml_strings(
5974 db_vnfr["additionalParamsForVnf"].copy()
5975 )
5976 )
5977 descriptor_config = get_configuration(
5978 db_vnfd, db_vnfd["id"]
5979 )
5980 if descriptor_config:
5981 vdu_id = None
5982 vdu_name = None
5983 kdu_name = None
5984 self._deploy_n2vc(
5985 logging_text=logging_text
5986 + "member_vnf_index={} ".format(member_vnf_index),
5987 db_nsr=db_nsr,
5988 db_vnfr=db_vnfr,
5989 nslcmop_id=nslcmop_id,
5990 nsr_id=nsr_id,
5991 nsi_id=nsi_id,
5992 vnfd_id=vnfd_id,
5993 vdu_id=vdu_id,
5994 kdu_name=kdu_name,
5995 member_vnf_index=member_vnf_index,
5996 vdu_index=vdu_index,
5997 vdu_name=vdu_name,
5998 deploy_params=deploy_params,
5999 descriptor_config=descriptor_config,
6000 base_folder=base_folder,
6001 task_instantiation_info=tasks_dict_info,
6002 stage=stage,
6003 )
6004 vdu_id = vca_info["osm_vdu_id"]
6005 vdur = find_in_list(
6006 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
6007 )
6008 descriptor_config = get_configuration(db_vnfd, vdu_id)
6009 if vdur.get("additionalParams"):
6010 deploy_params_vdu = parse_yaml_strings(
6011 vdur["additionalParams"]
6012 )
6013 else:
6014 deploy_params_vdu = deploy_params
6015 deploy_params_vdu["OSM"] = get_osm_params(
6016 db_vnfr, vdu_id, vdu_count_index=vdu_index
6017 )
6018 if descriptor_config:
6019 vdu_name = None
6020 kdu_name = None
6021 stage[
6022 1
6023 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6024 member_vnf_index, vdu_id, vdu_index
6025 )
6026 stage[2] = step = "Scaling out VCA"
6027 self._write_op_status(op_id=nslcmop_id, stage=stage)
6028 self._deploy_n2vc(
6029 logging_text=logging_text
6030 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6031 member_vnf_index, vdu_id, vdu_index
6032 ),
6033 db_nsr=db_nsr,
6034 db_vnfr=db_vnfr,
6035 nslcmop_id=nslcmop_id,
6036 nsr_id=nsr_id,
6037 nsi_id=nsi_id,
6038 vnfd_id=vnfd_id,
6039 vdu_id=vdu_id,
6040 kdu_name=kdu_name,
6041 member_vnf_index=member_vnf_index,
6042 vdu_index=vdu_index,
6043 vdu_name=vdu_name,
6044 deploy_params=deploy_params_vdu,
6045 descriptor_config=descriptor_config,
6046 base_folder=base_folder,
6047 task_instantiation_info=tasks_dict_info,
6048 stage=stage,
6049 )
6050 # SCALE-UP VCA - END
6051 scale_process = None
6052
6053 # POST-SCALE BEGIN
6054 # execute primitive service POST-SCALING
6055 step = "Executing post-scale vnf-config-primitive"
6056 if scaling_descriptor.get("scaling-config-action"):
6057 for scaling_config_action in scaling_descriptor[
6058 "scaling-config-action"
6059 ]:
6060 if (
6061 scaling_config_action.get("trigger") == "post-scale-in"
6062 and scaling_type == "SCALE_IN"
6063 ) or (
6064 scaling_config_action.get("trigger") == "post-scale-out"
6065 and scaling_type == "SCALE_OUT"
6066 ):
6067 vnf_config_primitive = scaling_config_action[
6068 "vnf-config-primitive-name-ref"
6069 ]
6070 step = db_nslcmop_update[
6071 "detailed-status"
6072 ] = "executing post-scale scaling-config-action '{}'".format(
6073 vnf_config_primitive
6074 )
6075
6076 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6077 if db_vnfr.get("additionalParamsForVnf"):
6078 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6079
6080 # look for primitive
6081 for config_primitive in (
6082 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6083 ).get("config-primitive", ()):
6084 if config_primitive["name"] == vnf_config_primitive:
6085 break
6086 else:
6087 raise LcmException(
6088 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6089 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6090 "config-primitive".format(
6091 scaling_group, vnf_config_primitive
6092 )
6093 )
6094 scale_process = "VCA"
6095 db_nsr_update["config-status"] = "configuring post-scaling"
6096 primitive_params = self._map_primitive_params(
6097 config_primitive, {}, vnfr_params
6098 )
6099
6100 # Post-scale retry check: Check if this sub-operation has been executed before
6101 op_index = self._check_or_add_scale_suboperation(
6102 db_nslcmop,
6103 vnf_index,
6104 vnf_config_primitive,
6105 primitive_params,
6106 "POST-SCALE",
6107 )
6108 if op_index == self.SUBOPERATION_STATUS_SKIP:
6109 # Skip sub-operation
6110 result = "COMPLETED"
6111 result_detail = "Done"
6112 self.logger.debug(
6113 logging_text
6114 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6115 vnf_config_primitive, result, result_detail
6116 )
6117 )
6118 else:
6119 if op_index == self.SUBOPERATION_STATUS_NEW:
6120 # New sub-operation: Get index of this sub-operation
6121 op_index = (
6122 len(db_nslcmop.get("_admin", {}).get("operations"))
6123 - 1
6124 )
6125 self.logger.debug(
6126 logging_text
6127 + "vnf_config_primitive={} New sub-operation".format(
6128 vnf_config_primitive
6129 )
6130 )
6131 else:
6132 # retry: Get registered params for this existing sub-operation
6133 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6134 op_index
6135 ]
6136 vnf_index = op.get("member_vnf_index")
6137 vnf_config_primitive = op.get("primitive")
6138 primitive_params = op.get("primitive_params")
6139 self.logger.debug(
6140 logging_text
6141 + "vnf_config_primitive={} Sub-operation retry".format(
6142 vnf_config_primitive
6143 )
6144 )
6145 # Execute the primitive, either with new (first-time) or registered (reintent) args
6146 ee_descriptor_id = config_primitive.get(
6147 "execution-environment-ref"
6148 )
6149 primitive_name = config_primitive.get(
6150 "execution-environment-primitive", vnf_config_primitive
6151 )
6152 ee_id, vca_type = self._look_for_deployed_vca(
6153 nsr_deployed["VCA"],
6154 member_vnf_index=vnf_index,
6155 vdu_id=None,
6156 vdu_count_index=None,
6157 ee_descriptor_id=ee_descriptor_id,
6158 )
6159 result, result_detail = await self._ns_execute_primitive(
6160 ee_id,
6161 primitive_name,
6162 primitive_params,
6163 vca_type=vca_type,
6164 vca_id=vca_id,
6165 )
6166 self.logger.debug(
6167 logging_text
6168 + "vnf_config_primitive={} Done with result {} {}".format(
6169 vnf_config_primitive, result, result_detail
6170 )
6171 )
6172 # Update operationState = COMPLETED | FAILED
6173 self._update_suboperation_status(
6174 db_nslcmop, op_index, result, result_detail
6175 )
6176
6177 if result == "FAILED":
6178 raise LcmException(result_detail)
6179 db_nsr_update["config-status"] = old_config_status
6180 scale_process = None
6181 # POST-SCALE END
6182
6183 db_nsr_update[
6184 "detailed-status"
6185 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6186 db_nsr_update["operational-status"] = (
6187 "running"
6188 if old_operational_status == "failed"
6189 else old_operational_status
6190 )
6191 db_nsr_update["config-status"] = old_config_status
6192 return
6193 except (
6194 ROclient.ROClientException,
6195 DbException,
6196 LcmException,
6197 NgRoException,
6198 ) as e:
6199 self.logger.error(logging_text + "Exit Exception {}".format(e))
6200 exc = e
6201 except asyncio.CancelledError:
6202 self.logger.error(
6203 logging_text + "Cancelled Exception while '{}'".format(step)
6204 )
6205 exc = "Operation was cancelled"
6206 except Exception as e:
6207 exc = traceback.format_exc()
6208 self.logger.critical(
6209 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6210 exc_info=True,
6211 )
6212 finally:
6213 self._write_ns_status(
6214 nsr_id=nsr_id,
6215 ns_state=None,
6216 current_operation="IDLE",
6217 current_operation_id=None,
6218 )
6219 if tasks_dict_info:
6220 stage[1] = "Waiting for instantiate pending tasks."
6221 self.logger.debug(logging_text + stage[1])
6222 exc = await self._wait_for_tasks(
6223 logging_text,
6224 tasks_dict_info,
6225 self.timeout_ns_deploy,
6226 stage,
6227 nslcmop_id,
6228 nsr_id=nsr_id,
6229 )
6230 if exc:
6231 db_nslcmop_update[
6232 "detailed-status"
6233 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6234 nslcmop_operation_state = "FAILED"
6235 if db_nsr:
6236 db_nsr_update["operational-status"] = old_operational_status
6237 db_nsr_update["config-status"] = old_config_status
6238 db_nsr_update["detailed-status"] = ""
6239 if scale_process:
6240 if "VCA" in scale_process:
6241 db_nsr_update["config-status"] = "failed"
6242 if "RO" in scale_process:
6243 db_nsr_update["operational-status"] = "failed"
6244 db_nsr_update[
6245 "detailed-status"
6246 ] = "FAILED scaling nslcmop={} {}: {}".format(
6247 nslcmop_id, step, exc
6248 )
6249 else:
6250 error_description_nslcmop = None
6251 nslcmop_operation_state = "COMPLETED"
6252 db_nslcmop_update["detailed-status"] = "Done"
6253
6254 self._write_op_status(
6255 op_id=nslcmop_id,
6256 stage="",
6257 error_message=error_description_nslcmop,
6258 operation_state=nslcmop_operation_state,
6259 other_update=db_nslcmop_update,
6260 )
6261 if db_nsr:
6262 self._write_ns_status(
6263 nsr_id=nsr_id,
6264 ns_state=None,
6265 current_operation="IDLE",
6266 current_operation_id=None,
6267 other_update=db_nsr_update,
6268 )
6269
6270 if nslcmop_operation_state:
6271 try:
6272 msg = {
6273 "nsr_id": nsr_id,
6274 "nslcmop_id": nslcmop_id,
6275 "operationState": nslcmop_operation_state,
6276 }
6277 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6278 except Exception as e:
6279 self.logger.error(
6280 logging_text + "kafka_write notification Exception {}".format(e)
6281 )
6282 self.logger.debug(logging_text + "Exit")
6283 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6284
6285 async def _scale_kdu(
6286 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6287 ):
6288 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6289 for kdu_name in _scaling_info:
6290 for kdu_scaling_info in _scaling_info[kdu_name]:
6291 deployed_kdu, index = get_deployed_kdu(
6292 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6293 )
6294 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6295 kdu_instance = deployed_kdu["kdu-instance"]
6296 kdu_model = deployed_kdu.get("kdu-model")
6297 scale = int(kdu_scaling_info["scale"])
6298 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6299
6300 db_dict = {
6301 "collection": "nsrs",
6302 "filter": {"_id": nsr_id},
6303 "path": "_admin.deployed.K8s.{}".format(index),
6304 }
6305
6306 step = "scaling application {}".format(
6307 kdu_scaling_info["resource-name"]
6308 )
6309 self.logger.debug(logging_text + step)
6310
6311 if kdu_scaling_info["type"] == "delete":
6312 kdu_config = get_configuration(db_vnfd, kdu_name)
6313 if (
6314 kdu_config
6315 and kdu_config.get("terminate-config-primitive")
6316 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6317 ):
6318 terminate_config_primitive_list = kdu_config.get(
6319 "terminate-config-primitive"
6320 )
6321 terminate_config_primitive_list.sort(
6322 key=lambda val: int(val["seq"])
6323 )
6324
6325 for (
6326 terminate_config_primitive
6327 ) in terminate_config_primitive_list:
6328 primitive_params_ = self._map_primitive_params(
6329 terminate_config_primitive, {}, {}
6330 )
6331 step = "execute terminate config primitive"
6332 self.logger.debug(logging_text + step)
6333 await asyncio.wait_for(
6334 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6335 cluster_uuid=cluster_uuid,
6336 kdu_instance=kdu_instance,
6337 primitive_name=terminate_config_primitive["name"],
6338 params=primitive_params_,
6339 db_dict=db_dict,
6340 vca_id=vca_id,
6341 ),
6342 timeout=600,
6343 )
6344
6345 await asyncio.wait_for(
6346 self.k8scluster_map[k8s_cluster_type].scale(
6347 kdu_instance,
6348 scale,
6349 kdu_scaling_info["resource-name"],
6350 vca_id=vca_id,
6351 cluster_uuid=cluster_uuid,
6352 kdu_model=kdu_model,
6353 atomic=True,
6354 db_dict=db_dict,
6355 ),
6356 timeout=self.timeout_vca_on_error,
6357 )
6358
6359 if kdu_scaling_info["type"] == "create":
6360 kdu_config = get_configuration(db_vnfd, kdu_name)
6361 if (
6362 kdu_config
6363 and kdu_config.get("initial-config-primitive")
6364 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6365 ):
6366 initial_config_primitive_list = kdu_config.get(
6367 "initial-config-primitive"
6368 )
6369 initial_config_primitive_list.sort(
6370 key=lambda val: int(val["seq"])
6371 )
6372
6373 for initial_config_primitive in initial_config_primitive_list:
6374 primitive_params_ = self._map_primitive_params(
6375 initial_config_primitive, {}, {}
6376 )
6377 step = "execute initial config primitive"
6378 self.logger.debug(logging_text + step)
6379 await asyncio.wait_for(
6380 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6381 cluster_uuid=cluster_uuid,
6382 kdu_instance=kdu_instance,
6383 primitive_name=initial_config_primitive["name"],
6384 params=primitive_params_,
6385 db_dict=db_dict,
6386 vca_id=vca_id,
6387 ),
6388 timeout=600,
6389 )
6390
6391 async def _scale_ng_ro(
6392 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6393 ):
6394 nsr_id = db_nslcmop["nsInstanceId"]
6395 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6396 db_vnfrs = {}
6397
6398 # read from db: vnfd's for every vnf
6399 db_vnfds = []
6400
6401 # for each vnf in ns, read vnfd
6402 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6403 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6404 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6405 # if we haven't this vnfd, read it from db
6406 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6407 # read from db
6408 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6409 db_vnfds.append(vnfd)
6410 n2vc_key = self.n2vc.get_public_key()
6411 n2vc_key_list = [n2vc_key]
6412 self.scale_vnfr(
6413 db_vnfr,
6414 vdu_scaling_info.get("vdu-create"),
6415 vdu_scaling_info.get("vdu-delete"),
6416 mark_delete=True,
6417 )
6418 # db_vnfr has been updated, update db_vnfrs to use it
6419 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6420 await self._instantiate_ng_ro(
6421 logging_text,
6422 nsr_id,
6423 db_nsd,
6424 db_nsr,
6425 db_nslcmop,
6426 db_vnfrs,
6427 db_vnfds,
6428 n2vc_key_list,
6429 stage=stage,
6430 start_deploy=time(),
6431 timeout_ns_deploy=self.timeout_ns_deploy,
6432 )
6433 if vdu_scaling_info.get("vdu-delete"):
6434 self.scale_vnfr(
6435 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6436 )
6437
6438 async def extract_prometheus_scrape_jobs(
6439 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6440 ):
6441 # look if exist a file called 'prometheus*.j2' and
6442 artifact_content = self.fs.dir_ls(artifact_path)
6443 job_file = next(
6444 (
6445 f
6446 for f in artifact_content
6447 if f.startswith("prometheus") and f.endswith(".j2")
6448 ),
6449 None,
6450 )
6451 if not job_file:
6452 return
6453 with self.fs.file_open((artifact_path, job_file), "r") as f:
6454 job_data = f.read()
6455
6456 # TODO get_service
6457 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6458 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6459 host_port = "80"
6460 vnfr_id = vnfr_id.replace("-", "")
6461 variables = {
6462 "JOB_NAME": vnfr_id,
6463 "TARGET_IP": target_ip,
6464 "EXPORTER_POD_IP": host_name,
6465 "EXPORTER_POD_PORT": host_port,
6466 }
6467 job_list = parse_job(job_data, variables)
6468 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6469 for job in job_list:
6470 if (
6471 not isinstance(job.get("job_name"), str)
6472 or vnfr_id not in job["job_name"]
6473 ):
6474 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6475 job["nsr_id"] = nsr_id
6476 job["vnfr_id"] = vnfr_id
6477 return job_list
6478
6479 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6480 """
6481 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6482
6483 :param: vim_account_id: VIM Account ID
6484
6485 :return: (cloud_name, cloud_credential)
6486 """
6487 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6488 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6489
6490 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6491 """
6492 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6493
6494 :param: vim_account_id: VIM Account ID
6495
6496 :return: (cloud_name, cloud_credential)
6497 """
6498 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6499 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")