Feature 10908: LCM process NS update request
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import shutil
21 from typing import Any, Dict, List
22 import yaml
23 import logging
24 import logging.handlers
25 import traceback
26 import json
27 from jinja2 import (
28 Environment,
29 TemplateError,
30 TemplateNotFound,
31 StrictUndefined,
32 UndefinedError,
33 )
34
35 from osm_lcm import ROclient
36 from osm_lcm.data_utils.nsr import (
37 get_deployed_kdu,
38 get_deployed_vca,
39 get_deployed_vca_list,
40 get_nsd,
41 )
42 from osm_lcm.data_utils.vca import (
43 DeployedComponent,
44 DeployedK8sResource,
45 DeployedVCA,
46 EELevel,
47 Relation,
48 EERelation,
49 safe_get_ee_relation,
50 )
51 from osm_lcm.ng_ro import NgRoClient, NgRoException
52 from osm_lcm.lcm_utils import (
53 LcmException,
54 LcmExceptionNoMgmtIP,
55 LcmBase,
56 deep_get,
57 get_iterable,
58 populate_dict,
59 check_juju_bundle_existence,
60 get_charm_artifact_path,
61 )
62 from osm_lcm.data_utils.nsd import (
63 get_ns_configuration_relation_list,
64 get_vnf_profile,
65 get_vnf_profiles,
66 )
67 from osm_lcm.data_utils.vnfd import (
68 get_kdu,
69 get_kdu_services,
70 get_relation_list,
71 get_vdu_list,
72 get_vdu_profile,
73 get_ee_sorted_initial_config_primitive_list,
74 get_ee_sorted_terminate_config_primitive_list,
75 get_kdu_list,
76 get_virtual_link_profiles,
77 get_vdu,
78 get_configuration,
79 get_vdu_index,
80 get_scaling_aspect,
81 get_number_of_instances,
82 get_juju_ee_ref,
83 get_kdu_resource_profile,
84 find_software_version,
85 )
86 from osm_lcm.data_utils.list_utils import find_in_list
87 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
88 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
89 from osm_lcm.data_utils.database.vim_account import VimAccountDB
90 from n2vc.definitions import RelationEndpoint
91 from n2vc.k8s_helm_conn import K8sHelmConnector
92 from n2vc.k8s_helm3_conn import K8sHelm3Connector
93 from n2vc.k8s_juju_conn import K8sJujuConnector
94
95 from osm_common.dbbase import DbException
96 from osm_common.fsbase import FsException
97
98 from osm_lcm.data_utils.database.database import Database
99 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
100
101 from n2vc.n2vc_juju_conn import N2VCJujuConnector
102 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
103
104 from osm_lcm.lcm_helm_conn import LCMHelmConn
105 from osm_lcm.osm_config import OsmConfigBuilder
106 from osm_lcm.prometheus import parse_job
107
108 from copy import copy, deepcopy
109 from time import time
110 from uuid import uuid4
111
112 from random import randint
113
114 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
115
116
117 class NsLcm(LcmBase):
118 timeout_vca_on_error = (
119 5 * 60
120 ) # Time for charm from first time at blocked,error status to mark as failed
121 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
122 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
123 timeout_charm_delete = 10 * 60
124 timeout_primitive = 30 * 60 # timeout for primitive execution
125 timeout_ns_update = 30 * 60 # timeout for ns update
126 timeout_progress_primitive = (
127 10 * 60
128 ) # timeout for some progress in a primitive execution
129
130 SUBOPERATION_STATUS_NOT_FOUND = -1
131 SUBOPERATION_STATUS_NEW = -2
132 SUBOPERATION_STATUS_SKIP = -3
133 task_name_deploy_vca = "Deploying VCA"
134
135 def __init__(self, msg, lcm_tasks, config, loop):
136 """
137 Init, Connect to database, filesystem storage, and messaging
138 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
139 :return: None
140 """
141 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
142
143 self.db = Database().instance.db
144 self.fs = Filesystem().instance.fs
145 self.loop = loop
146 self.lcm_tasks = lcm_tasks
147 self.timeout = config["timeout"]
148 self.ro_config = config["ro_config"]
149 self.ng_ro = config["ro_config"].get("ng")
150 self.vca_config = config["VCA"].copy()
151
152 # create N2VC connector
153 self.n2vc = N2VCJujuConnector(
154 log=self.logger,
155 loop=self.loop,
156 on_update_db=self._on_update_n2vc_db,
157 fs=self.fs,
158 db=self.db,
159 )
160
161 self.conn_helm_ee = LCMHelmConn(
162 log=self.logger,
163 loop=self.loop,
164 vca_config=self.vca_config,
165 on_update_db=self._on_update_n2vc_db,
166 )
167
168 self.k8sclusterhelm2 = K8sHelmConnector(
169 kubectl_command=self.vca_config.get("kubectlpath"),
170 helm_command=self.vca_config.get("helmpath"),
171 log=self.logger,
172 on_update_db=None,
173 fs=self.fs,
174 db=self.db,
175 )
176
177 self.k8sclusterhelm3 = K8sHelm3Connector(
178 kubectl_command=self.vca_config.get("kubectlpath"),
179 helm_command=self.vca_config.get("helm3path"),
180 fs=self.fs,
181 log=self.logger,
182 db=self.db,
183 on_update_db=None,
184 )
185
186 self.k8sclusterjuju = K8sJujuConnector(
187 kubectl_command=self.vca_config.get("kubectlpath"),
188 juju_command=self.vca_config.get("jujupath"),
189 log=self.logger,
190 loop=self.loop,
191 on_update_db=self._on_update_k8s_db,
192 fs=self.fs,
193 db=self.db,
194 )
195
196 self.k8scluster_map = {
197 "helm-chart": self.k8sclusterhelm2,
198 "helm-chart-v3": self.k8sclusterhelm3,
199 "chart": self.k8sclusterhelm3,
200 "juju-bundle": self.k8sclusterjuju,
201 "juju": self.k8sclusterjuju,
202 }
203
204 self.vca_map = {
205 "lxc_proxy_charm": self.n2vc,
206 "native_charm": self.n2vc,
207 "k8s_proxy_charm": self.n2vc,
208 "helm": self.conn_helm_ee,
209 "helm-v3": self.conn_helm_ee,
210 }
211
212 # create RO client
213 self.RO = NgRoClient(self.loop, **self.ro_config)
214
215 @staticmethod
216 def increment_ip_mac(ip_mac, vm_index=1):
217 if not isinstance(ip_mac, str):
218 return ip_mac
219 try:
220 # try with ipv4 look for last dot
221 i = ip_mac.rfind(".")
222 if i > 0:
223 i += 1
224 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
225 # try with ipv6 or mac look for last colon. Operate in hex
226 i = ip_mac.rfind(":")
227 if i > 0:
228 i += 1
229 # format in hex, len can be 2 for mac or 4 for ipv6
230 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
231 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
232 )
233 except Exception:
234 pass
235 return None
236
237 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
238
239 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
240
241 try:
242 # TODO filter RO descriptor fields...
243
244 # write to database
245 db_dict = dict()
246 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
247 db_dict["deploymentStatus"] = ro_descriptor
248 self.update_db_2("nsrs", nsrs_id, db_dict)
249
250 except Exception as e:
251 self.logger.warn(
252 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
253 )
254
255 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
256
257 # remove last dot from path (if exists)
258 if path.endswith("."):
259 path = path[:-1]
260
261 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
262 # .format(table, filter, path, updated_data))
263 try:
264
265 nsr_id = filter.get("_id")
266
267 # read ns record from database
268 nsr = self.db.get_one(table="nsrs", q_filter=filter)
269 current_ns_status = nsr.get("nsState")
270
271 # get vca status for NS
272 status_dict = await self.n2vc.get_status(
273 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
274 )
275
276 # vcaStatus
277 db_dict = dict()
278 db_dict["vcaStatus"] = status_dict
279 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
280
281 # update configurationStatus for this VCA
282 try:
283 vca_index = int(path[path.rfind(".") + 1 :])
284
285 vca_list = deep_get(
286 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
287 )
288 vca_status = vca_list[vca_index].get("status")
289
290 configuration_status_list = nsr.get("configurationStatus")
291 config_status = configuration_status_list[vca_index].get("status")
292
293 if config_status == "BROKEN" and vca_status != "failed":
294 db_dict["configurationStatus"][vca_index] = "READY"
295 elif config_status != "BROKEN" and vca_status == "failed":
296 db_dict["configurationStatus"][vca_index] = "BROKEN"
297 except Exception as e:
298 # not update configurationStatus
299 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
300
301 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
302 # if nsState = 'DEGRADED' check if all is OK
303 is_degraded = False
304 if current_ns_status in ("READY", "DEGRADED"):
305 error_description = ""
306 # check machines
307 if status_dict.get("machines"):
308 for machine_id in status_dict.get("machines"):
309 machine = status_dict.get("machines").get(machine_id)
310 # check machine agent-status
311 if machine.get("agent-status"):
312 s = machine.get("agent-status").get("status")
313 if s != "started":
314 is_degraded = True
315 error_description += (
316 "machine {} agent-status={} ; ".format(
317 machine_id, s
318 )
319 )
320 # check machine instance status
321 if machine.get("instance-status"):
322 s = machine.get("instance-status").get("status")
323 if s != "running":
324 is_degraded = True
325 error_description += (
326 "machine {} instance-status={} ; ".format(
327 machine_id, s
328 )
329 )
330 # check applications
331 if status_dict.get("applications"):
332 for app_id in status_dict.get("applications"):
333 app = status_dict.get("applications").get(app_id)
334 # check application status
335 if app.get("status"):
336 s = app.get("status").get("status")
337 if s != "active":
338 is_degraded = True
339 error_description += (
340 "application {} status={} ; ".format(app_id, s)
341 )
342
343 if error_description:
344 db_dict["errorDescription"] = error_description
345 if current_ns_status == "READY" and is_degraded:
346 db_dict["nsState"] = "DEGRADED"
347 if current_ns_status == "DEGRADED" and not is_degraded:
348 db_dict["nsState"] = "READY"
349
350 # write to database
351 self.update_db_2("nsrs", nsr_id, db_dict)
352
353 except (asyncio.CancelledError, asyncio.TimeoutError):
354 raise
355 except Exception as e:
356 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
357
358 async def _on_update_k8s_db(
359 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
360 ):
361 """
362 Updating vca status in NSR record
363 :param cluster_uuid: UUID of a k8s cluster
364 :param kdu_instance: The unique name of the KDU instance
365 :param filter: To get nsr_id
366 :cluster_type: The cluster type (juju, k8s)
367 :return: none
368 """
369
370 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
371 # .format(cluster_uuid, kdu_instance, filter))
372
373 nsr_id = filter.get("_id")
374 try:
375 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
376 cluster_uuid=cluster_uuid,
377 kdu_instance=kdu_instance,
378 yaml_format=False,
379 complete_status=True,
380 vca_id=vca_id,
381 )
382
383 # vcaStatus
384 db_dict = dict()
385 db_dict["vcaStatus"] = {nsr_id: vca_status}
386
387 if cluster_type in ("juju-bundle", "juju"):
388 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
389 # status in a similar way between Juju Bundles and Helm Charts on this side
390 await self.k8sclusterjuju.update_vca_status(
391 db_dict["vcaStatus"],
392 kdu_instance,
393 vca_id=vca_id,
394 )
395
396 self.logger.debug(
397 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
398 )
399
400 # write to database
401 self.update_db_2("nsrs", nsr_id, db_dict)
402 except (asyncio.CancelledError, asyncio.TimeoutError):
403 raise
404 except Exception as e:
405 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
406
407 @staticmethod
408 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
409 try:
410 env = Environment(undefined=StrictUndefined)
411 template = env.from_string(cloud_init_text)
412 return template.render(additional_params or {})
413 except UndefinedError as e:
414 raise LcmException(
415 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
416 "file, must be provided in the instantiation parameters inside the "
417 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
418 )
419 except (TemplateError, TemplateNotFound) as e:
420 raise LcmException(
421 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
422 vnfd_id, vdu_id, e
423 )
424 )
425
426 def _get_vdu_cloud_init_content(self, vdu, vnfd):
427 cloud_init_content = cloud_init_file = None
428 try:
429 if vdu.get("cloud-init-file"):
430 base_folder = vnfd["_admin"]["storage"]
431 if base_folder["pkg-dir"]:
432 cloud_init_file = "{}/{}/cloud_init/{}".format(
433 base_folder["folder"],
434 base_folder["pkg-dir"],
435 vdu["cloud-init-file"],
436 )
437 else:
438 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
439 base_folder["folder"],
440 vdu["cloud-init-file"],
441 )
442 with self.fs.file_open(cloud_init_file, "r") as ci_file:
443 cloud_init_content = ci_file.read()
444 elif vdu.get("cloud-init"):
445 cloud_init_content = vdu["cloud-init"]
446
447 return cloud_init_content
448 except FsException as e:
449 raise LcmException(
450 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
451 vnfd["id"], vdu["id"], cloud_init_file, e
452 )
453 )
454
455 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
456 vdur = next(
457 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
458 {}
459 )
460 additional_params = vdur.get("additionalParams")
461 return parse_yaml_strings(additional_params)
462
463 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
464 """
465 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
466 :param vnfd: input vnfd
467 :param new_id: overrides vnf id if provided
468 :param additionalParams: Instantiation params for VNFs provided
469 :param nsrId: Id of the NSR
470 :return: copy of vnfd
471 """
472 vnfd_RO = deepcopy(vnfd)
473 # remove unused by RO configuration, monitoring, scaling and internal keys
474 vnfd_RO.pop("_id", None)
475 vnfd_RO.pop("_admin", None)
476 vnfd_RO.pop("monitoring-param", None)
477 vnfd_RO.pop("scaling-group-descriptor", None)
478 vnfd_RO.pop("kdu", None)
479 vnfd_RO.pop("k8s-cluster", None)
480 if new_id:
481 vnfd_RO["id"] = new_id
482
483 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
484 for vdu in get_iterable(vnfd_RO, "vdu"):
485 vdu.pop("cloud-init-file", None)
486 vdu.pop("cloud-init", None)
487 return vnfd_RO
488
489 @staticmethod
490 def ip_profile_2_RO(ip_profile):
491 RO_ip_profile = deepcopy(ip_profile)
492 if "dns-server" in RO_ip_profile:
493 if isinstance(RO_ip_profile["dns-server"], list):
494 RO_ip_profile["dns-address"] = []
495 for ds in RO_ip_profile.pop("dns-server"):
496 RO_ip_profile["dns-address"].append(ds["address"])
497 else:
498 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
499 if RO_ip_profile.get("ip-version") == "ipv4":
500 RO_ip_profile["ip-version"] = "IPv4"
501 if RO_ip_profile.get("ip-version") == "ipv6":
502 RO_ip_profile["ip-version"] = "IPv6"
503 if "dhcp-params" in RO_ip_profile:
504 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
505 return RO_ip_profile
506
507 def _get_ro_vim_id_for_vim_account(self, vim_account):
508 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
509 if db_vim["_admin"]["operationalState"] != "ENABLED":
510 raise LcmException(
511 "VIM={} is not available. operationalState={}".format(
512 vim_account, db_vim["_admin"]["operationalState"]
513 )
514 )
515 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
516 return RO_vim_id
517
518 def get_ro_wim_id_for_wim_account(self, wim_account):
519 if isinstance(wim_account, str):
520 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
521 if db_wim["_admin"]["operationalState"] != "ENABLED":
522 raise LcmException(
523 "WIM={} is not available. operationalState={}".format(
524 wim_account, db_wim["_admin"]["operationalState"]
525 )
526 )
527 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
528 return RO_wim_id
529 else:
530 return wim_account
531
532 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
533
534 db_vdu_push_list = []
535 template_vdur = []
536 db_update = {"_admin.modified": time()}
537 if vdu_create:
538 for vdu_id, vdu_count in vdu_create.items():
539 vdur = next(
540 (
541 vdur
542 for vdur in reversed(db_vnfr["vdur"])
543 if vdur["vdu-id-ref"] == vdu_id
544 ),
545 None,
546 )
547 if not vdur:
548 # Read the template saved in the db:
549 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
550 vdur_template = db_vnfr.get("vdur-template")
551 if not vdur_template:
552 raise LcmException(
553 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
554 vdu_id
555 )
556 )
557 vdur = vdur_template[0]
558 #Delete a template from the database after using it
559 self.db.set_one("vnfrs",
560 {"_id": db_vnfr["_id"]},
561 None,
562 pull={"vdur-template": {"_id": vdur['_id']}}
563 )
564 for count in range(vdu_count):
565 vdur_copy = deepcopy(vdur)
566 vdur_copy["status"] = "BUILD"
567 vdur_copy["status-detailed"] = None
568 vdur_copy["ip-address"] = None
569 vdur_copy["_id"] = str(uuid4())
570 vdur_copy["count-index"] += count + 1
571 vdur_copy["id"] = "{}-{}".format(
572 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
573 )
574 vdur_copy.pop("vim_info", None)
575 for iface in vdur_copy["interfaces"]:
576 if iface.get("fixed-ip"):
577 iface["ip-address"] = self.increment_ip_mac(
578 iface["ip-address"], count + 1
579 )
580 else:
581 iface.pop("ip-address", None)
582 if iface.get("fixed-mac"):
583 iface["mac-address"] = self.increment_ip_mac(
584 iface["mac-address"], count + 1
585 )
586 else:
587 iface.pop("mac-address", None)
588 if db_vnfr["vdur"]:
589 iface.pop(
590 "mgmt_vnf", None
591 ) # only first vdu can be managment of vnf
592 db_vdu_push_list.append(vdur_copy)
593 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
594 if vdu_delete:
595 if len(db_vnfr["vdur"]) == 1:
596 # The scale will move to 0 instances
597 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
598 template_vdur = [db_vnfr["vdur"][0]]
599 for vdu_id, vdu_count in vdu_delete.items():
600 if mark_delete:
601 indexes_to_delete = [
602 iv[0]
603 for iv in enumerate(db_vnfr["vdur"])
604 if iv[1]["vdu-id-ref"] == vdu_id
605 ]
606 db_update.update(
607 {
608 "vdur.{}.status".format(i): "DELETING"
609 for i in indexes_to_delete[-vdu_count:]
610 }
611 )
612 else:
613 # it must be deleted one by one because common.db does not allow otherwise
614 vdus_to_delete = [
615 v
616 for v in reversed(db_vnfr["vdur"])
617 if v["vdu-id-ref"] == vdu_id
618 ]
619 for vdu in vdus_to_delete[:vdu_count]:
620 self.db.set_one(
621 "vnfrs",
622 {"_id": db_vnfr["_id"]},
623 None,
624 pull={"vdur": {"_id": vdu["_id"]}},
625 )
626 db_push = {}
627 if db_vdu_push_list:
628 db_push["vdur"] = db_vdu_push_list
629 if template_vdur:
630 db_push["vdur-template"] = template_vdur
631 if not db_push:
632 db_push = None
633 db_vnfr["vdur-template"] = template_vdur
634 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
635 # modify passed dictionary db_vnfr
636 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
637 db_vnfr["vdur"] = db_vnfr_["vdur"]
638
639 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
640 """
641 Updates database nsr with the RO info for the created vld
642 :param ns_update_nsr: dictionary to be filled with the updated info
643 :param db_nsr: content of db_nsr. This is also modified
644 :param nsr_desc_RO: nsr descriptor from RO
645 :return: Nothing, LcmException is raised on errors
646 """
647
648 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
649 for net_RO in get_iterable(nsr_desc_RO, "nets"):
650 if vld["id"] != net_RO.get("ns_net_osm_id"):
651 continue
652 vld["vim-id"] = net_RO.get("vim_net_id")
653 vld["name"] = net_RO.get("vim_name")
654 vld["status"] = net_RO.get("status")
655 vld["status-detailed"] = net_RO.get("error_msg")
656 ns_update_nsr["vld.{}".format(vld_index)] = vld
657 break
658 else:
659 raise LcmException(
660 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
661 )
662
663 def set_vnfr_at_error(self, db_vnfrs, error_text):
664 try:
665 for db_vnfr in db_vnfrs.values():
666 vnfr_update = {"status": "ERROR"}
667 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
668 if "status" not in vdur:
669 vdur["status"] = "ERROR"
670 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
671 if error_text:
672 vdur["status-detailed"] = str(error_text)
673 vnfr_update[
674 "vdur.{}.status-detailed".format(vdu_index)
675 ] = "ERROR"
676 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
677 except DbException as e:
678 self.logger.error("Cannot update vnf. {}".format(e))
679
680 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
681 """
682 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
683 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
684 :param nsr_desc_RO: nsr descriptor from RO
685 :return: Nothing, LcmException is raised on errors
686 """
687 for vnf_index, db_vnfr in db_vnfrs.items():
688 for vnf_RO in nsr_desc_RO["vnfs"]:
689 if vnf_RO["member_vnf_index"] != vnf_index:
690 continue
691 vnfr_update = {}
692 if vnf_RO.get("ip_address"):
693 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
694 "ip_address"
695 ].split(";")[0]
696 elif not db_vnfr.get("ip-address"):
697 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
698 raise LcmExceptionNoMgmtIP(
699 "ns member_vnf_index '{}' has no IP address".format(
700 vnf_index
701 )
702 )
703
704 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
705 vdur_RO_count_index = 0
706 if vdur.get("pdu-type"):
707 continue
708 for vdur_RO in get_iterable(vnf_RO, "vms"):
709 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
710 continue
711 if vdur["count-index"] != vdur_RO_count_index:
712 vdur_RO_count_index += 1
713 continue
714 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
715 if vdur_RO.get("ip_address"):
716 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
717 else:
718 vdur["ip-address"] = None
719 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
720 vdur["name"] = vdur_RO.get("vim_name")
721 vdur["status"] = vdur_RO.get("status")
722 vdur["status-detailed"] = vdur_RO.get("error_msg")
723 for ifacer in get_iterable(vdur, "interfaces"):
724 for interface_RO in get_iterable(vdur_RO, "interfaces"):
725 if ifacer["name"] == interface_RO.get("internal_name"):
726 ifacer["ip-address"] = interface_RO.get(
727 "ip_address"
728 )
729 ifacer["mac-address"] = interface_RO.get(
730 "mac_address"
731 )
732 break
733 else:
734 raise LcmException(
735 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
736 "from VIM info".format(
737 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
738 )
739 )
740 vnfr_update["vdur.{}".format(vdu_index)] = vdur
741 break
742 else:
743 raise LcmException(
744 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
745 "VIM info".format(
746 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
747 )
748 )
749
750 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
751 for net_RO in get_iterable(nsr_desc_RO, "nets"):
752 if vld["id"] != net_RO.get("vnf_net_osm_id"):
753 continue
754 vld["vim-id"] = net_RO.get("vim_net_id")
755 vld["name"] = net_RO.get("vim_name")
756 vld["status"] = net_RO.get("status")
757 vld["status-detailed"] = net_RO.get("error_msg")
758 vnfr_update["vld.{}".format(vld_index)] = vld
759 break
760 else:
761 raise LcmException(
762 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
763 vnf_index, vld["id"]
764 )
765 )
766
767 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
768 break
769
770 else:
771 raise LcmException(
772 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
773 vnf_index
774 )
775 )
776
777 def _get_ns_config_info(self, nsr_id):
778 """
779 Generates a mapping between vnf,vdu elements and the N2VC id
780 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
781 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
782 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
783 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
784 """
785 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
786 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
787 mapping = {}
788 ns_config_info = {"osm-config-mapping": mapping}
789 for vca in vca_deployed_list:
790 if not vca["member-vnf-index"]:
791 continue
792 if not vca["vdu_id"]:
793 mapping[vca["member-vnf-index"]] = vca["application"]
794 else:
795 mapping[
796 "{}.{}.{}".format(
797 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
798 )
799 ] = vca["application"]
800 return ns_config_info
801
802 async def _instantiate_ng_ro(
803 self,
804 logging_text,
805 nsr_id,
806 nsd,
807 db_nsr,
808 db_nslcmop,
809 db_vnfrs,
810 db_vnfds,
811 n2vc_key_list,
812 stage,
813 start_deploy,
814 timeout_ns_deploy,
815 ):
816
817 db_vims = {}
818
819 def get_vim_account(vim_account_id):
820 nonlocal db_vims
821 if vim_account_id in db_vims:
822 return db_vims[vim_account_id]
823 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
824 db_vims[vim_account_id] = db_vim
825 return db_vim
826
827 # modify target_vld info with instantiation parameters
828 def parse_vld_instantiation_params(
829 target_vim, target_vld, vld_params, target_sdn
830 ):
831 if vld_params.get("ip-profile"):
832 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
833 "ip-profile"
834 ]
835 if vld_params.get("provider-network"):
836 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
837 "provider-network"
838 ]
839 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
840 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
841 "provider-network"
842 ]["sdn-ports"]
843 if vld_params.get("wimAccountId"):
844 target_wim = "wim:{}".format(vld_params["wimAccountId"])
845 target_vld["vim_info"][target_wim] = {}
846 for param in ("vim-network-name", "vim-network-id"):
847 if vld_params.get(param):
848 if isinstance(vld_params[param], dict):
849 for vim, vim_net in vld_params[param].items():
850 other_target_vim = "vim:" + vim
851 populate_dict(
852 target_vld["vim_info"],
853 (other_target_vim, param.replace("-", "_")),
854 vim_net,
855 )
856 else: # isinstance str
857 target_vld["vim_info"][target_vim][
858 param.replace("-", "_")
859 ] = vld_params[param]
860 if vld_params.get("common_id"):
861 target_vld["common_id"] = vld_params.get("common_id")
862
863 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
864 def update_ns_vld_target(target, ns_params):
865 for vnf_params in ns_params.get("vnf", ()):
866 if vnf_params.get("vimAccountId"):
867 target_vnf = next(
868 (
869 vnfr
870 for vnfr in db_vnfrs.values()
871 if vnf_params["member-vnf-index"]
872 == vnfr["member-vnf-index-ref"]
873 ),
874 None,
875 )
876 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
877 for a_index, a_vld in enumerate(target["ns"]["vld"]):
878 target_vld = find_in_list(
879 get_iterable(vdur, "interfaces"),
880 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
881 )
882 if target_vld:
883 if vnf_params.get("vimAccountId") not in a_vld.get(
884 "vim_info", {}
885 ):
886 target["ns"]["vld"][a_index].get("vim_info").update(
887 {
888 "vim:{}".format(vnf_params["vimAccountId"]): {
889 "vim_network_name": ""
890 }
891 }
892 )
893
894 nslcmop_id = db_nslcmop["_id"]
895 target = {
896 "name": db_nsr["name"],
897 "ns": {"vld": []},
898 "vnf": [],
899 "image": deepcopy(db_nsr["image"]),
900 "flavor": deepcopy(db_nsr["flavor"]),
901 "action_id": nslcmop_id,
902 "cloud_init_content": {},
903 }
904 for image in target["image"]:
905 image["vim_info"] = {}
906 for flavor in target["flavor"]:
907 flavor["vim_info"] = {}
908 if db_nsr.get("affinity-or-anti-affinity-group"):
909 target["affinity-or-anti-affinity-group"] = deepcopy(
910 db_nsr["affinity-or-anti-affinity-group"]
911 )
912 for affinity_or_anti_affinity_group in target[
913 "affinity-or-anti-affinity-group"
914 ]:
915 affinity_or_anti_affinity_group["vim_info"] = {}
916
917 if db_nslcmop.get("lcmOperationType") != "instantiate":
918 # get parameters of instantiation:
919 db_nslcmop_instantiate = self.db.get_list(
920 "nslcmops",
921 {
922 "nsInstanceId": db_nslcmop["nsInstanceId"],
923 "lcmOperationType": "instantiate",
924 },
925 )[-1]
926 ns_params = db_nslcmop_instantiate.get("operationParams")
927 else:
928 ns_params = db_nslcmop.get("operationParams")
929 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
930 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
931
932 cp2target = {}
933 for vld_index, vld in enumerate(db_nsr.get("vld")):
934 target_vim = "vim:{}".format(ns_params["vimAccountId"])
935 target_vld = {
936 "id": vld["id"],
937 "name": vld["name"],
938 "mgmt-network": vld.get("mgmt-network", False),
939 "type": vld.get("type"),
940 "vim_info": {
941 target_vim: {
942 "vim_network_name": vld.get("vim-network-name"),
943 "vim_account_id": ns_params["vimAccountId"],
944 }
945 },
946 }
947 # check if this network needs SDN assist
948 if vld.get("pci-interfaces"):
949 db_vim = get_vim_account(ns_params["vimAccountId"])
950 sdnc_id = db_vim["config"].get("sdn-controller")
951 if sdnc_id:
952 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
953 target_sdn = "sdn:{}".format(sdnc_id)
954 target_vld["vim_info"][target_sdn] = {
955 "sdn": True,
956 "target_vim": target_vim,
957 "vlds": [sdn_vld],
958 "type": vld.get("type"),
959 }
960
961 nsd_vnf_profiles = get_vnf_profiles(nsd)
962 for nsd_vnf_profile in nsd_vnf_profiles:
963 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
964 if cp["virtual-link-profile-id"] == vld["id"]:
965 cp2target[
966 "member_vnf:{}.{}".format(
967 cp["constituent-cpd-id"][0][
968 "constituent-base-element-id"
969 ],
970 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
971 )
972 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
973
974 # check at nsd descriptor, if there is an ip-profile
975 vld_params = {}
976 nsd_vlp = find_in_list(
977 get_virtual_link_profiles(nsd),
978 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
979 == vld["id"],
980 )
981 if (
982 nsd_vlp
983 and nsd_vlp.get("virtual-link-protocol-data")
984 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
985 ):
986 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
987 "l3-protocol-data"
988 ]
989 ip_profile_dest_data = {}
990 if "ip-version" in ip_profile_source_data:
991 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
992 "ip-version"
993 ]
994 if "cidr" in ip_profile_source_data:
995 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
996 "cidr"
997 ]
998 if "gateway-ip" in ip_profile_source_data:
999 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
1000 "gateway-ip"
1001 ]
1002 if "dhcp-enabled" in ip_profile_source_data:
1003 ip_profile_dest_data["dhcp-params"] = {
1004 "enabled": ip_profile_source_data["dhcp-enabled"]
1005 }
1006 vld_params["ip-profile"] = ip_profile_dest_data
1007
1008 # update vld_params with instantiation params
1009 vld_instantiation_params = find_in_list(
1010 get_iterable(ns_params, "vld"),
1011 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
1012 )
1013 if vld_instantiation_params:
1014 vld_params.update(vld_instantiation_params)
1015 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
1016 target["ns"]["vld"].append(target_vld)
1017 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1018 update_ns_vld_target(target, ns_params)
1019
1020 for vnfr in db_vnfrs.values():
1021 vnfd = find_in_list(
1022 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1023 )
1024 vnf_params = find_in_list(
1025 get_iterable(ns_params, "vnf"),
1026 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1027 )
1028 target_vnf = deepcopy(vnfr)
1029 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1030 for vld in target_vnf.get("vld", ()):
1031 # check if connected to a ns.vld, to fill target'
1032 vnf_cp = find_in_list(
1033 vnfd.get("int-virtual-link-desc", ()),
1034 lambda cpd: cpd.get("id") == vld["id"],
1035 )
1036 if vnf_cp:
1037 ns_cp = "member_vnf:{}.{}".format(
1038 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1039 )
1040 if cp2target.get(ns_cp):
1041 vld["target"] = cp2target[ns_cp]
1042
1043 vld["vim_info"] = {
1044 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1045 }
1046 # check if this network needs SDN assist
1047 target_sdn = None
1048 if vld.get("pci-interfaces"):
1049 db_vim = get_vim_account(vnfr["vim-account-id"])
1050 sdnc_id = db_vim["config"].get("sdn-controller")
1051 if sdnc_id:
1052 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1053 target_sdn = "sdn:{}".format(sdnc_id)
1054 vld["vim_info"][target_sdn] = {
1055 "sdn": True,
1056 "target_vim": target_vim,
1057 "vlds": [sdn_vld],
1058 "type": vld.get("type"),
1059 }
1060
1061 # check at vnfd descriptor, if there is an ip-profile
1062 vld_params = {}
1063 vnfd_vlp = find_in_list(
1064 get_virtual_link_profiles(vnfd),
1065 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1066 )
1067 if (
1068 vnfd_vlp
1069 and vnfd_vlp.get("virtual-link-protocol-data")
1070 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1071 ):
1072 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1073 "l3-protocol-data"
1074 ]
1075 ip_profile_dest_data = {}
1076 if "ip-version" in ip_profile_source_data:
1077 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1078 "ip-version"
1079 ]
1080 if "cidr" in ip_profile_source_data:
1081 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1082 "cidr"
1083 ]
1084 if "gateway-ip" in ip_profile_source_data:
1085 ip_profile_dest_data[
1086 "gateway-address"
1087 ] = ip_profile_source_data["gateway-ip"]
1088 if "dhcp-enabled" in ip_profile_source_data:
1089 ip_profile_dest_data["dhcp-params"] = {
1090 "enabled": ip_profile_source_data["dhcp-enabled"]
1091 }
1092
1093 vld_params["ip-profile"] = ip_profile_dest_data
1094 # update vld_params with instantiation params
1095 if vnf_params:
1096 vld_instantiation_params = find_in_list(
1097 get_iterable(vnf_params, "internal-vld"),
1098 lambda i_vld: i_vld["name"] == vld["id"],
1099 )
1100 if vld_instantiation_params:
1101 vld_params.update(vld_instantiation_params)
1102 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1103
1104 vdur_list = []
1105 for vdur in target_vnf.get("vdur", ()):
1106 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1107 continue # This vdu must not be created
1108 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1109
1110 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1111
1112 if ssh_keys_all:
1113 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1114 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1115 if (
1116 vdu_configuration
1117 and vdu_configuration.get("config-access")
1118 and vdu_configuration.get("config-access").get("ssh-access")
1119 ):
1120 vdur["ssh-keys"] = ssh_keys_all
1121 vdur["ssh-access-required"] = vdu_configuration[
1122 "config-access"
1123 ]["ssh-access"]["required"]
1124 elif (
1125 vnf_configuration
1126 and vnf_configuration.get("config-access")
1127 and vnf_configuration.get("config-access").get("ssh-access")
1128 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1129 ):
1130 vdur["ssh-keys"] = ssh_keys_all
1131 vdur["ssh-access-required"] = vnf_configuration[
1132 "config-access"
1133 ]["ssh-access"]["required"]
1134 elif ssh_keys_instantiation and find_in_list(
1135 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1136 ):
1137 vdur["ssh-keys"] = ssh_keys_instantiation
1138
1139 self.logger.debug("NS > vdur > {}".format(vdur))
1140
1141 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1142 # cloud-init
1143 if vdud.get("cloud-init-file"):
1144 vdur["cloud-init"] = "{}:file:{}".format(
1145 vnfd["_id"], vdud.get("cloud-init-file")
1146 )
1147 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1148 if vdur["cloud-init"] not in target["cloud_init_content"]:
1149 base_folder = vnfd["_admin"]["storage"]
1150 if base_folder["pkg-dir"]:
1151 cloud_init_file = "{}/{}/cloud_init/{}".format(
1152 base_folder["folder"],
1153 base_folder["pkg-dir"],
1154 vdud.get("cloud-init-file"),
1155 )
1156 else:
1157 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1158 base_folder["folder"],
1159 vdud.get("cloud-init-file"),
1160 )
1161 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1162 target["cloud_init_content"][
1163 vdur["cloud-init"]
1164 ] = ci_file.read()
1165 elif vdud.get("cloud-init"):
1166 vdur["cloud-init"] = "{}:vdu:{}".format(
1167 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1168 )
1169 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1170 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1171 "cloud-init"
1172 ]
1173 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1174 deploy_params_vdu = self._format_additional_params(
1175 vdur.get("additionalParams") or {}
1176 )
1177 deploy_params_vdu["OSM"] = get_osm_params(
1178 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1179 )
1180 vdur["additionalParams"] = deploy_params_vdu
1181
1182 # flavor
1183 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1184 if target_vim not in ns_flavor["vim_info"]:
1185 ns_flavor["vim_info"][target_vim] = {}
1186
1187 # deal with images
1188 # in case alternative images are provided we must check if they should be applied
1189 # for the vim_type, modify the vim_type taking into account
1190 ns_image_id = int(vdur["ns-image-id"])
1191 if vdur.get("alt-image-ids"):
1192 db_vim = get_vim_account(vnfr["vim-account-id"])
1193 vim_type = db_vim["vim_type"]
1194 for alt_image_id in vdur.get("alt-image-ids"):
1195 ns_alt_image = target["image"][int(alt_image_id)]
1196 if vim_type == ns_alt_image.get("vim-type"):
1197 # must use alternative image
1198 self.logger.debug(
1199 "use alternative image id: {}".format(alt_image_id)
1200 )
1201 ns_image_id = alt_image_id
1202 vdur["ns-image-id"] = ns_image_id
1203 break
1204 ns_image = target["image"][int(ns_image_id)]
1205 if target_vim not in ns_image["vim_info"]:
1206 ns_image["vim_info"][target_vim] = {}
1207
1208 # Affinity groups
1209 if vdur.get("affinity-or-anti-affinity-group-id"):
1210 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1211 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1212 if target_vim not in ns_ags["vim_info"]:
1213 ns_ags["vim_info"][target_vim] = {}
1214
1215 vdur["vim_info"] = {target_vim: {}}
1216 # instantiation parameters
1217 # if vnf_params:
1218 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1219 # vdud["id"]), None)
1220 vdur_list.append(vdur)
1221 target_vnf["vdur"] = vdur_list
1222 target["vnf"].append(target_vnf)
1223
1224 desc = await self.RO.deploy(nsr_id, target)
1225 self.logger.debug("RO return > {}".format(desc))
1226 action_id = desc["action_id"]
1227 await self._wait_ng_ro(
1228 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1229 )
1230
1231 # Updating NSR
1232 db_nsr_update = {
1233 "_admin.deployed.RO.operational-status": "running",
1234 "detailed-status": " ".join(stage),
1235 }
1236 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1237 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1238 self._write_op_status(nslcmop_id, stage)
1239 self.logger.debug(
1240 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1241 )
1242 return
1243
1244 async def _wait_ng_ro(
1245 self,
1246 nsr_id,
1247 action_id,
1248 nslcmop_id=None,
1249 start_time=None,
1250 timeout=600,
1251 stage=None,
1252 ):
1253 detailed_status_old = None
1254 db_nsr_update = {}
1255 start_time = start_time or time()
1256 while time() <= start_time + timeout:
1257 desc_status = await self.RO.status(nsr_id, action_id)
1258 self.logger.debug("Wait NG RO > {}".format(desc_status))
1259 if desc_status["status"] == "FAILED":
1260 raise NgRoException(desc_status["details"])
1261 elif desc_status["status"] == "BUILD":
1262 if stage:
1263 stage[2] = "VIM: ({})".format(desc_status["details"])
1264 elif desc_status["status"] == "DONE":
1265 if stage:
1266 stage[2] = "Deployed at VIM"
1267 break
1268 else:
1269 assert False, "ROclient.check_ns_status returns unknown {}".format(
1270 desc_status["status"]
1271 )
1272 if stage and nslcmop_id and stage[2] != detailed_status_old:
1273 detailed_status_old = stage[2]
1274 db_nsr_update["detailed-status"] = " ".join(stage)
1275 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1276 self._write_op_status(nslcmop_id, stage)
1277 await asyncio.sleep(15, loop=self.loop)
1278 else: # timeout_ns_deploy
1279 raise NgRoException("Timeout waiting ns to deploy")
1280
1281 async def _terminate_ng_ro(
1282 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1283 ):
1284 db_nsr_update = {}
1285 failed_detail = []
1286 action_id = None
1287 start_deploy = time()
1288 try:
1289 target = {
1290 "ns": {"vld": []},
1291 "vnf": [],
1292 "image": [],
1293 "flavor": [],
1294 "action_id": nslcmop_id,
1295 }
1296 desc = await self.RO.deploy(nsr_id, target)
1297 action_id = desc["action_id"]
1298 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1299 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1300 self.logger.debug(
1301 logging_text
1302 + "ns terminate action at RO. action_id={}".format(action_id)
1303 )
1304
1305 # wait until done
1306 delete_timeout = 20 * 60 # 20 minutes
1307 await self._wait_ng_ro(
1308 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1309 )
1310
1311 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1312 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1313 # delete all nsr
1314 await self.RO.delete(nsr_id)
1315 except Exception as e:
1316 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1317 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1318 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1319 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1320 self.logger.debug(
1321 logging_text + "RO_action_id={} already deleted".format(action_id)
1322 )
1323 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1324 failed_detail.append("delete conflict: {}".format(e))
1325 self.logger.debug(
1326 logging_text
1327 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1328 )
1329 else:
1330 failed_detail.append("delete error: {}".format(e))
1331 self.logger.error(
1332 logging_text
1333 + "RO_action_id={} delete error: {}".format(action_id, e)
1334 )
1335
1336 if failed_detail:
1337 stage[2] = "Error deleting from VIM"
1338 else:
1339 stage[2] = "Deleted from VIM"
1340 db_nsr_update["detailed-status"] = " ".join(stage)
1341 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1342 self._write_op_status(nslcmop_id, stage)
1343
1344 if failed_detail:
1345 raise LcmException("; ".join(failed_detail))
1346 return
1347
1348 async def instantiate_RO(
1349 self,
1350 logging_text,
1351 nsr_id,
1352 nsd,
1353 db_nsr,
1354 db_nslcmop,
1355 db_vnfrs,
1356 db_vnfds,
1357 n2vc_key_list,
1358 stage,
1359 ):
1360 """
1361 Instantiate at RO
1362 :param logging_text: preffix text to use at logging
1363 :param nsr_id: nsr identity
1364 :param nsd: database content of ns descriptor
1365 :param db_nsr: database content of ns record
1366 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1367 :param db_vnfrs:
1368 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1369 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1370 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1371 :return: None or exception
1372 """
1373 try:
1374 start_deploy = time()
1375 ns_params = db_nslcmop.get("operationParams")
1376 if ns_params and ns_params.get("timeout_ns_deploy"):
1377 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1378 else:
1379 timeout_ns_deploy = self.timeout.get(
1380 "ns_deploy", self.timeout_ns_deploy
1381 )
1382
1383 # Check for and optionally request placement optimization. Database will be updated if placement activated
1384 stage[2] = "Waiting for Placement."
1385 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1386 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1387 for vnfr in db_vnfrs.values():
1388 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1389 break
1390 else:
1391 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1392
1393 return await self._instantiate_ng_ro(
1394 logging_text,
1395 nsr_id,
1396 nsd,
1397 db_nsr,
1398 db_nslcmop,
1399 db_vnfrs,
1400 db_vnfds,
1401 n2vc_key_list,
1402 stage,
1403 start_deploy,
1404 timeout_ns_deploy,
1405 )
1406 except Exception as e:
1407 stage[2] = "ERROR deploying at VIM"
1408 self.set_vnfr_at_error(db_vnfrs, str(e))
1409 self.logger.error(
1410 "Error deploying at VIM {}".format(e),
1411 exc_info=not isinstance(
1412 e,
1413 (
1414 ROclient.ROClientException,
1415 LcmException,
1416 DbException,
1417 NgRoException,
1418 ),
1419 ),
1420 )
1421 raise
1422
1423 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1424 """
1425 Wait for kdu to be up, get ip address
1426 :param logging_text: prefix use for logging
1427 :param nsr_id:
1428 :param vnfr_id:
1429 :param kdu_name:
1430 :return: IP address, K8s services
1431 """
1432
1433 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1434 nb_tries = 0
1435
1436 while nb_tries < 360:
1437 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1438 kdur = next(
1439 (
1440 x
1441 for x in get_iterable(db_vnfr, "kdur")
1442 if x.get("kdu-name") == kdu_name
1443 ),
1444 None,
1445 )
1446 if not kdur:
1447 raise LcmException(
1448 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1449 )
1450 if kdur.get("status"):
1451 if kdur["status"] in ("READY", "ENABLED"):
1452 return kdur.get("ip-address"), kdur.get("services")
1453 else:
1454 raise LcmException(
1455 "target KDU={} is in error state".format(kdu_name)
1456 )
1457
1458 await asyncio.sleep(10, loop=self.loop)
1459 nb_tries += 1
1460 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1461
1462 async def wait_vm_up_insert_key_ro(
1463 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1464 ):
1465 """
1466 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1467 :param logging_text: prefix use for logging
1468 :param nsr_id:
1469 :param vnfr_id:
1470 :param vdu_id:
1471 :param vdu_index:
1472 :param pub_key: public ssh key to inject, None to skip
1473 :param user: user to apply the public ssh key
1474 :return: IP address
1475 """
1476
1477 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1478 ro_nsr_id = None
1479 ip_address = None
1480 nb_tries = 0
1481 target_vdu_id = None
1482 ro_retries = 0
1483
1484 while True:
1485
1486 ro_retries += 1
1487 if ro_retries >= 360: # 1 hour
1488 raise LcmException(
1489 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1490 )
1491
1492 await asyncio.sleep(10, loop=self.loop)
1493
1494 # get ip address
1495 if not target_vdu_id:
1496 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1497
1498 if not vdu_id: # for the VNF case
1499 if db_vnfr.get("status") == "ERROR":
1500 raise LcmException(
1501 "Cannot inject ssh-key because target VNF is in error state"
1502 )
1503 ip_address = db_vnfr.get("ip-address")
1504 if not ip_address:
1505 continue
1506 vdur = next(
1507 (
1508 x
1509 for x in get_iterable(db_vnfr, "vdur")
1510 if x.get("ip-address") == ip_address
1511 ),
1512 None,
1513 )
1514 else: # VDU case
1515 vdur = next(
1516 (
1517 x
1518 for x in get_iterable(db_vnfr, "vdur")
1519 if x.get("vdu-id-ref") == vdu_id
1520 and x.get("count-index") == vdu_index
1521 ),
1522 None,
1523 )
1524
1525 if (
1526 not vdur and len(db_vnfr.get("vdur", ())) == 1
1527 ): # If only one, this should be the target vdu
1528 vdur = db_vnfr["vdur"][0]
1529 if not vdur:
1530 raise LcmException(
1531 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1532 vnfr_id, vdu_id, vdu_index
1533 )
1534 )
1535 # New generation RO stores information at "vim_info"
1536 ng_ro_status = None
1537 target_vim = None
1538 if vdur.get("vim_info"):
1539 target_vim = next(
1540 t for t in vdur["vim_info"]
1541 ) # there should be only one key
1542 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1543 if (
1544 vdur.get("pdu-type")
1545 or vdur.get("status") == "ACTIVE"
1546 or ng_ro_status == "ACTIVE"
1547 ):
1548 ip_address = vdur.get("ip-address")
1549 if not ip_address:
1550 continue
1551 target_vdu_id = vdur["vdu-id-ref"]
1552 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1553 raise LcmException(
1554 "Cannot inject ssh-key because target VM is in error state"
1555 )
1556
1557 if not target_vdu_id:
1558 continue
1559
1560 # inject public key into machine
1561 if pub_key and user:
1562 self.logger.debug(logging_text + "Inserting RO key")
1563 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1564 if vdur.get("pdu-type"):
1565 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1566 return ip_address
1567 try:
1568 ro_vm_id = "{}-{}".format(
1569 db_vnfr["member-vnf-index-ref"], target_vdu_id
1570 ) # TODO add vdu_index
1571 if self.ng_ro:
1572 target = {
1573 "action": {
1574 "action": "inject_ssh_key",
1575 "key": pub_key,
1576 "user": user,
1577 },
1578 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1579 }
1580 desc = await self.RO.deploy(nsr_id, target)
1581 action_id = desc["action_id"]
1582 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1583 break
1584 else:
1585 # wait until NS is deployed at RO
1586 if not ro_nsr_id:
1587 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1588 ro_nsr_id = deep_get(
1589 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1590 )
1591 if not ro_nsr_id:
1592 continue
1593 result_dict = await self.RO.create_action(
1594 item="ns",
1595 item_id_name=ro_nsr_id,
1596 descriptor={
1597 "add_public_key": pub_key,
1598 "vms": [ro_vm_id],
1599 "user": user,
1600 },
1601 )
1602 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1603 if not result_dict or not isinstance(result_dict, dict):
1604 raise LcmException(
1605 "Unknown response from RO when injecting key"
1606 )
1607 for result in result_dict.values():
1608 if result.get("vim_result") == 200:
1609 break
1610 else:
1611 raise ROclient.ROClientException(
1612 "error injecting key: {}".format(
1613 result.get("description")
1614 )
1615 )
1616 break
1617 except NgRoException as e:
1618 raise LcmException(
1619 "Reaching max tries injecting key. Error: {}".format(e)
1620 )
1621 except ROclient.ROClientException as e:
1622 if not nb_tries:
1623 self.logger.debug(
1624 logging_text
1625 + "error injecting key: {}. Retrying until {} seconds".format(
1626 e, 20 * 10
1627 )
1628 )
1629 nb_tries += 1
1630 if nb_tries >= 20:
1631 raise LcmException(
1632 "Reaching max tries injecting key. Error: {}".format(e)
1633 )
1634 else:
1635 break
1636
1637 return ip_address
1638
1639 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1640 """
1641 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1642 """
1643 my_vca = vca_deployed_list[vca_index]
1644 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1645 # vdu or kdu: no dependencies
1646 return
1647 timeout = 300
1648 while timeout >= 0:
1649 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1650 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1651 configuration_status_list = db_nsr["configurationStatus"]
1652 for index, vca_deployed in enumerate(configuration_status_list):
1653 if index == vca_index:
1654 # myself
1655 continue
1656 if not my_vca.get("member-vnf-index") or (
1657 vca_deployed.get("member-vnf-index")
1658 == my_vca.get("member-vnf-index")
1659 ):
1660 internal_status = configuration_status_list[index].get("status")
1661 if internal_status == "READY":
1662 continue
1663 elif internal_status == "BROKEN":
1664 raise LcmException(
1665 "Configuration aborted because dependent charm/s has failed"
1666 )
1667 else:
1668 break
1669 else:
1670 # no dependencies, return
1671 return
1672 await asyncio.sleep(10)
1673 timeout -= 1
1674
1675 raise LcmException("Configuration aborted because dependent charm/s timeout")
1676
1677 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1678 vca_id = None
1679 if db_vnfr:
1680 vca_id = deep_get(db_vnfr, ("vca-id",))
1681 elif db_nsr:
1682 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1683 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1684 return vca_id
1685
1686 async def instantiate_N2VC(
1687 self,
1688 logging_text,
1689 vca_index,
1690 nsi_id,
1691 db_nsr,
1692 db_vnfr,
1693 vdu_id,
1694 kdu_name,
1695 vdu_index,
1696 config_descriptor,
1697 deploy_params,
1698 base_folder,
1699 nslcmop_id,
1700 stage,
1701 vca_type,
1702 vca_name,
1703 ee_config_descriptor,
1704 ):
1705 nsr_id = db_nsr["_id"]
1706 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1707 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1708 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1709 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1710 db_dict = {
1711 "collection": "nsrs",
1712 "filter": {"_id": nsr_id},
1713 "path": db_update_entry,
1714 }
1715 step = ""
1716 try:
1717
1718 element_type = "NS"
1719 element_under_configuration = nsr_id
1720
1721 vnfr_id = None
1722 if db_vnfr:
1723 vnfr_id = db_vnfr["_id"]
1724 osm_config["osm"]["vnf_id"] = vnfr_id
1725
1726 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1727
1728 if vca_type == "native_charm":
1729 index_number = 0
1730 else:
1731 index_number = vdu_index or 0
1732
1733 if vnfr_id:
1734 element_type = "VNF"
1735 element_under_configuration = vnfr_id
1736 namespace += ".{}-{}".format(vnfr_id, index_number)
1737 if vdu_id:
1738 namespace += ".{}-{}".format(vdu_id, index_number)
1739 element_type = "VDU"
1740 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1741 osm_config["osm"]["vdu_id"] = vdu_id
1742 elif kdu_name:
1743 namespace += ".{}".format(kdu_name)
1744 element_type = "KDU"
1745 element_under_configuration = kdu_name
1746 osm_config["osm"]["kdu_name"] = kdu_name
1747
1748 # Get artifact path
1749 if base_folder["pkg-dir"]:
1750 artifact_path = "{}/{}/{}/{}".format(
1751 base_folder["folder"],
1752 base_folder["pkg-dir"],
1753 "charms"
1754 if vca_type
1755 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1756 else "helm-charts",
1757 vca_name,
1758 )
1759 else:
1760 artifact_path = "{}/Scripts/{}/{}/".format(
1761 base_folder["folder"],
1762 "charms"
1763 if vca_type
1764 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1765 else "helm-charts",
1766 vca_name,
1767 )
1768
1769 self.logger.debug("Artifact path > {}".format(artifact_path))
1770
1771 # get initial_config_primitive_list that applies to this element
1772 initial_config_primitive_list = config_descriptor.get(
1773 "initial-config-primitive"
1774 )
1775
1776 self.logger.debug(
1777 "Initial config primitive list > {}".format(
1778 initial_config_primitive_list
1779 )
1780 )
1781
1782 # add config if not present for NS charm
1783 ee_descriptor_id = ee_config_descriptor.get("id")
1784 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1785 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1786 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1787 )
1788
1789 self.logger.debug(
1790 "Initial config primitive list #2 > {}".format(
1791 initial_config_primitive_list
1792 )
1793 )
1794 # n2vc_redesign STEP 3.1
1795 # find old ee_id if exists
1796 ee_id = vca_deployed.get("ee_id")
1797
1798 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1799 # create or register execution environment in VCA
1800 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1801
1802 self._write_configuration_status(
1803 nsr_id=nsr_id,
1804 vca_index=vca_index,
1805 status="CREATING",
1806 element_under_configuration=element_under_configuration,
1807 element_type=element_type,
1808 )
1809
1810 step = "create execution environment"
1811 self.logger.debug(logging_text + step)
1812
1813 ee_id = None
1814 credentials = None
1815 if vca_type == "k8s_proxy_charm":
1816 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1817 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1818 namespace=namespace,
1819 artifact_path=artifact_path,
1820 db_dict=db_dict,
1821 vca_id=vca_id,
1822 )
1823 elif vca_type == "helm" or vca_type == "helm-v3":
1824 ee_id, credentials = await self.vca_map[
1825 vca_type
1826 ].create_execution_environment(
1827 namespace=namespace,
1828 reuse_ee_id=ee_id,
1829 db_dict=db_dict,
1830 config=osm_config,
1831 artifact_path=artifact_path,
1832 vca_type=vca_type,
1833 )
1834 else:
1835 ee_id, credentials = await self.vca_map[
1836 vca_type
1837 ].create_execution_environment(
1838 namespace=namespace,
1839 reuse_ee_id=ee_id,
1840 db_dict=db_dict,
1841 vca_id=vca_id,
1842 )
1843
1844 elif vca_type == "native_charm":
1845 step = "Waiting to VM being up and getting IP address"
1846 self.logger.debug(logging_text + step)
1847 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1848 logging_text,
1849 nsr_id,
1850 vnfr_id,
1851 vdu_id,
1852 vdu_index,
1853 user=None,
1854 pub_key=None,
1855 )
1856 credentials = {"hostname": rw_mgmt_ip}
1857 # get username
1858 username = deep_get(
1859 config_descriptor, ("config-access", "ssh-access", "default-user")
1860 )
1861 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1862 # merged. Meanwhile let's get username from initial-config-primitive
1863 if not username and initial_config_primitive_list:
1864 for config_primitive in initial_config_primitive_list:
1865 for param in config_primitive.get("parameter", ()):
1866 if param["name"] == "ssh-username":
1867 username = param["value"]
1868 break
1869 if not username:
1870 raise LcmException(
1871 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1872 "'config-access.ssh-access.default-user'"
1873 )
1874 credentials["username"] = username
1875 # n2vc_redesign STEP 3.2
1876
1877 self._write_configuration_status(
1878 nsr_id=nsr_id,
1879 vca_index=vca_index,
1880 status="REGISTERING",
1881 element_under_configuration=element_under_configuration,
1882 element_type=element_type,
1883 )
1884
1885 step = "register execution environment {}".format(credentials)
1886 self.logger.debug(logging_text + step)
1887 ee_id = await self.vca_map[vca_type].register_execution_environment(
1888 credentials=credentials,
1889 namespace=namespace,
1890 db_dict=db_dict,
1891 vca_id=vca_id,
1892 )
1893
1894 # for compatibility with MON/POL modules, the need model and application name at database
1895 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1896 ee_id_parts = ee_id.split(".")
1897 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1898 if len(ee_id_parts) >= 2:
1899 model_name = ee_id_parts[0]
1900 application_name = ee_id_parts[1]
1901 db_nsr_update[db_update_entry + "model"] = model_name
1902 db_nsr_update[db_update_entry + "application"] = application_name
1903
1904 # n2vc_redesign STEP 3.3
1905 step = "Install configuration Software"
1906
1907 self._write_configuration_status(
1908 nsr_id=nsr_id,
1909 vca_index=vca_index,
1910 status="INSTALLING SW",
1911 element_under_configuration=element_under_configuration,
1912 element_type=element_type,
1913 other_update=db_nsr_update,
1914 )
1915
1916 # TODO check if already done
1917 self.logger.debug(logging_text + step)
1918 config = None
1919 if vca_type == "native_charm":
1920 config_primitive = next(
1921 (p for p in initial_config_primitive_list if p["name"] == "config"),
1922 None,
1923 )
1924 if config_primitive:
1925 config = self._map_primitive_params(
1926 config_primitive, {}, deploy_params
1927 )
1928 num_units = 1
1929 if vca_type == "lxc_proxy_charm":
1930 if element_type == "NS":
1931 num_units = db_nsr.get("config-units") or 1
1932 elif element_type == "VNF":
1933 num_units = db_vnfr.get("config-units") or 1
1934 elif element_type == "VDU":
1935 for v in db_vnfr["vdur"]:
1936 if vdu_id == v["vdu-id-ref"]:
1937 num_units = v.get("config-units") or 1
1938 break
1939 if vca_type != "k8s_proxy_charm":
1940 await self.vca_map[vca_type].install_configuration_sw(
1941 ee_id=ee_id,
1942 artifact_path=artifact_path,
1943 db_dict=db_dict,
1944 config=config,
1945 num_units=num_units,
1946 vca_id=vca_id,
1947 vca_type=vca_type,
1948 )
1949
1950 # write in db flag of configuration_sw already installed
1951 self.update_db_2(
1952 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1953 )
1954
1955 # add relations for this VCA (wait for other peers related with this VCA)
1956 await self._add_vca_relations(
1957 logging_text=logging_text,
1958 nsr_id=nsr_id,
1959 vca_type=vca_type,
1960 vca_index=vca_index,
1961 )
1962
1963 # if SSH access is required, then get execution environment SSH public
1964 # if native charm we have waited already to VM be UP
1965 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1966 pub_key = None
1967 user = None
1968 # self.logger.debug("get ssh key block")
1969 if deep_get(
1970 config_descriptor, ("config-access", "ssh-access", "required")
1971 ):
1972 # self.logger.debug("ssh key needed")
1973 # Needed to inject a ssh key
1974 user = deep_get(
1975 config_descriptor,
1976 ("config-access", "ssh-access", "default-user"),
1977 )
1978 step = "Install configuration Software, getting public ssh key"
1979 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1980 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1981 )
1982
1983 step = "Insert public key into VM user={} ssh_key={}".format(
1984 user, pub_key
1985 )
1986 else:
1987 # self.logger.debug("no need to get ssh key")
1988 step = "Waiting to VM being up and getting IP address"
1989 self.logger.debug(logging_text + step)
1990
1991 # n2vc_redesign STEP 5.1
1992 # wait for RO (ip-address) Insert pub_key into VM
1993 if vnfr_id:
1994 if kdu_name:
1995 rw_mgmt_ip, services = await self.wait_kdu_up(
1996 logging_text, nsr_id, vnfr_id, kdu_name
1997 )
1998 vnfd = self.db.get_one(
1999 "vnfds_revisions",
2000 {"_id": f'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2001 )
2002 kdu = get_kdu(vnfd, kdu_name)
2003 kdu_services = [
2004 service["name"] for service in get_kdu_services(kdu)
2005 ]
2006 exposed_services = []
2007 for service in services:
2008 if any(s in service["name"] for s in kdu_services):
2009 exposed_services.append(service)
2010 await self.vca_map[vca_type].exec_primitive(
2011 ee_id=ee_id,
2012 primitive_name="config",
2013 params_dict={
2014 "osm-config": json.dumps(
2015 OsmConfigBuilder(
2016 k8s={"services": exposed_services}
2017 ).build()
2018 )
2019 },
2020 vca_id=vca_id,
2021 )
2022 else:
2023 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
2024 logging_text,
2025 nsr_id,
2026 vnfr_id,
2027 vdu_id,
2028 vdu_index,
2029 user=user,
2030 pub_key=pub_key,
2031 )
2032
2033 else:
2034 rw_mgmt_ip = None # This is for a NS configuration
2035
2036 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
2037
2038 # store rw_mgmt_ip in deploy params for later replacement
2039 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
2040
2041 # n2vc_redesign STEP 6 Execute initial config primitive
2042 step = "execute initial config primitive"
2043
2044 # wait for dependent primitives execution (NS -> VNF -> VDU)
2045 if initial_config_primitive_list:
2046 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
2047
2048 # stage, in function of element type: vdu, kdu, vnf or ns
2049 my_vca = vca_deployed_list[vca_index]
2050 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
2051 # VDU or KDU
2052 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2053 elif my_vca.get("member-vnf-index"):
2054 # VNF
2055 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2056 else:
2057 # NS
2058 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2059
2060 self._write_configuration_status(
2061 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2062 )
2063
2064 self._write_op_status(op_id=nslcmop_id, stage=stage)
2065
2066 check_if_terminated_needed = True
2067 for initial_config_primitive in initial_config_primitive_list:
2068 # adding information on the vca_deployed if it is a NS execution environment
2069 if not vca_deployed["member-vnf-index"]:
2070 deploy_params["ns_config_info"] = json.dumps(
2071 self._get_ns_config_info(nsr_id)
2072 )
2073 # TODO check if already done
2074 primitive_params_ = self._map_primitive_params(
2075 initial_config_primitive, {}, deploy_params
2076 )
2077
2078 step = "execute primitive '{}' params '{}'".format(
2079 initial_config_primitive["name"], primitive_params_
2080 )
2081 self.logger.debug(logging_text + step)
2082 await self.vca_map[vca_type].exec_primitive(
2083 ee_id=ee_id,
2084 primitive_name=initial_config_primitive["name"],
2085 params_dict=primitive_params_,
2086 db_dict=db_dict,
2087 vca_id=vca_id,
2088 vca_type=vca_type,
2089 )
2090 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2091 if check_if_terminated_needed:
2092 if config_descriptor.get("terminate-config-primitive"):
2093 self.update_db_2(
2094 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2095 )
2096 check_if_terminated_needed = False
2097
2098 # TODO register in database that primitive is done
2099
2100 # STEP 7 Configure metrics
2101 if vca_type == "helm" or vca_type == "helm-v3":
2102 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2103 ee_id=ee_id,
2104 artifact_path=artifact_path,
2105 ee_config_descriptor=ee_config_descriptor,
2106 vnfr_id=vnfr_id,
2107 nsr_id=nsr_id,
2108 target_ip=rw_mgmt_ip,
2109 )
2110 if prometheus_jobs:
2111 self.update_db_2(
2112 "nsrs",
2113 nsr_id,
2114 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2115 )
2116
2117 for job in prometheus_jobs:
2118 self.db.set_one(
2119 "prometheus_jobs",
2120 {"job_name": job["job_name"]},
2121 job,
2122 upsert=True,
2123 fail_on_empty=False,
2124 )
2125
2126 step = "instantiated at VCA"
2127 self.logger.debug(logging_text + step)
2128
2129 self._write_configuration_status(
2130 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2131 )
2132
2133 except Exception as e: # TODO not use Exception but N2VC exception
2134 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2135 if not isinstance(
2136 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2137 ):
2138 self.logger.error(
2139 "Exception while {} : {}".format(step, e), exc_info=True
2140 )
2141 self._write_configuration_status(
2142 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2143 )
2144 raise LcmException("{} {}".format(step, e)) from e
2145
2146 def _write_ns_status(
2147 self,
2148 nsr_id: str,
2149 ns_state: str,
2150 current_operation: str,
2151 current_operation_id: str,
2152 error_description: str = None,
2153 error_detail: str = None,
2154 other_update: dict = None,
2155 ):
2156 """
2157 Update db_nsr fields.
2158 :param nsr_id:
2159 :param ns_state:
2160 :param current_operation:
2161 :param current_operation_id:
2162 :param error_description:
2163 :param error_detail:
2164 :param other_update: Other required changes at database if provided, will be cleared
2165 :return:
2166 """
2167 try:
2168 db_dict = other_update or {}
2169 db_dict[
2170 "_admin.nslcmop"
2171 ] = current_operation_id # for backward compatibility
2172 db_dict["_admin.current-operation"] = current_operation_id
2173 db_dict["_admin.operation-type"] = (
2174 current_operation if current_operation != "IDLE" else None
2175 )
2176 db_dict["currentOperation"] = current_operation
2177 db_dict["currentOperationID"] = current_operation_id
2178 db_dict["errorDescription"] = error_description
2179 db_dict["errorDetail"] = error_detail
2180
2181 if ns_state:
2182 db_dict["nsState"] = ns_state
2183 self.update_db_2("nsrs", nsr_id, db_dict)
2184 except DbException as e:
2185 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2186
2187 def _write_op_status(
2188 self,
2189 op_id: str,
2190 stage: list = None,
2191 error_message: str = None,
2192 queuePosition: int = 0,
2193 operation_state: str = None,
2194 other_update: dict = None,
2195 ):
2196 try:
2197 db_dict = other_update or {}
2198 db_dict["queuePosition"] = queuePosition
2199 if isinstance(stage, list):
2200 db_dict["stage"] = stage[0]
2201 db_dict["detailed-status"] = " ".join(stage)
2202 elif stage is not None:
2203 db_dict["stage"] = str(stage)
2204
2205 if error_message is not None:
2206 db_dict["errorMessage"] = error_message
2207 if operation_state is not None:
2208 db_dict["operationState"] = operation_state
2209 db_dict["statusEnteredTime"] = time()
2210 self.update_db_2("nslcmops", op_id, db_dict)
2211 except DbException as e:
2212 self.logger.warn(
2213 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2214 )
2215
2216 def _write_all_config_status(self, db_nsr: dict, status: str):
2217 try:
2218 nsr_id = db_nsr["_id"]
2219 # configurationStatus
2220 config_status = db_nsr.get("configurationStatus")
2221 if config_status:
2222 db_nsr_update = {
2223 "configurationStatus.{}.status".format(index): status
2224 for index, v in enumerate(config_status)
2225 if v
2226 }
2227 # update status
2228 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2229
2230 except DbException as e:
2231 self.logger.warn(
2232 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2233 )
2234
2235 def _write_configuration_status(
2236 self,
2237 nsr_id: str,
2238 vca_index: int,
2239 status: str = None,
2240 element_under_configuration: str = None,
2241 element_type: str = None,
2242 other_update: dict = None,
2243 ):
2244
2245 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2246 # .format(vca_index, status))
2247
2248 try:
2249 db_path = "configurationStatus.{}.".format(vca_index)
2250 db_dict = other_update or {}
2251 if status:
2252 db_dict[db_path + "status"] = status
2253 if element_under_configuration:
2254 db_dict[
2255 db_path + "elementUnderConfiguration"
2256 ] = element_under_configuration
2257 if element_type:
2258 db_dict[db_path + "elementType"] = element_type
2259 self.update_db_2("nsrs", nsr_id, db_dict)
2260 except DbException as e:
2261 self.logger.warn(
2262 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2263 status, nsr_id, vca_index, e
2264 )
2265 )
2266
2267 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2268 """
2269 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2270 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2271 Database is used because the result can be obtained from a different LCM worker in case of HA.
2272 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2273 :param db_nslcmop: database content of nslcmop
2274 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2275 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2276 computed 'vim-account-id'
2277 """
2278 modified = False
2279 nslcmop_id = db_nslcmop["_id"]
2280 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2281 if placement_engine == "PLA":
2282 self.logger.debug(
2283 logging_text + "Invoke and wait for placement optimization"
2284 )
2285 await self.msg.aiowrite(
2286 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2287 )
2288 db_poll_interval = 5
2289 wait = db_poll_interval * 10
2290 pla_result = None
2291 while not pla_result and wait >= 0:
2292 await asyncio.sleep(db_poll_interval)
2293 wait -= db_poll_interval
2294 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2295 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2296
2297 if not pla_result:
2298 raise LcmException(
2299 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2300 )
2301
2302 for pla_vnf in pla_result["vnf"]:
2303 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2304 if not pla_vnf.get("vimAccountId") or not vnfr:
2305 continue
2306 modified = True
2307 self.db.set_one(
2308 "vnfrs",
2309 {"_id": vnfr["_id"]},
2310 {"vim-account-id": pla_vnf["vimAccountId"]},
2311 )
2312 # Modifies db_vnfrs
2313 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2314 return modified
2315
2316 def update_nsrs_with_pla_result(self, params):
2317 try:
2318 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2319 self.update_db_2(
2320 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2321 )
2322 except Exception as e:
2323 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2324
2325 async def instantiate(self, nsr_id, nslcmop_id):
2326 """
2327
2328 :param nsr_id: ns instance to deploy
2329 :param nslcmop_id: operation to run
2330 :return:
2331 """
2332
2333 # Try to lock HA task here
2334 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2335 if not task_is_locked_by_me:
2336 self.logger.debug(
2337 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2338 )
2339 return
2340
2341 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2342 self.logger.debug(logging_text + "Enter")
2343
2344 # get all needed from database
2345
2346 # database nsrs record
2347 db_nsr = None
2348
2349 # database nslcmops record
2350 db_nslcmop = None
2351
2352 # update operation on nsrs
2353 db_nsr_update = {}
2354 # update operation on nslcmops
2355 db_nslcmop_update = {}
2356
2357 nslcmop_operation_state = None
2358 db_vnfrs = {} # vnf's info indexed by member-index
2359 # n2vc_info = {}
2360 tasks_dict_info = {} # from task to info text
2361 exc = None
2362 error_list = []
2363 stage = [
2364 "Stage 1/5: preparation of the environment.",
2365 "Waiting for previous operations to terminate.",
2366 "",
2367 ]
2368 # ^ stage, step, VIM progress
2369 try:
2370 # wait for any previous tasks in process
2371 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2372
2373 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2374 stage[1] = "Reading from database."
2375 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2376 db_nsr_update["detailed-status"] = "creating"
2377 db_nsr_update["operational-status"] = "init"
2378 self._write_ns_status(
2379 nsr_id=nsr_id,
2380 ns_state="BUILDING",
2381 current_operation="INSTANTIATING",
2382 current_operation_id=nslcmop_id,
2383 other_update=db_nsr_update,
2384 )
2385 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2386
2387 # read from db: operation
2388 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2389 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2390 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2391 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2392 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2393 )
2394 ns_params = db_nslcmop.get("operationParams")
2395 if ns_params and ns_params.get("timeout_ns_deploy"):
2396 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2397 else:
2398 timeout_ns_deploy = self.timeout.get(
2399 "ns_deploy", self.timeout_ns_deploy
2400 )
2401
2402 # read from db: ns
2403 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2404 self.logger.debug(logging_text + stage[1])
2405 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2406 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2407 self.logger.debug(logging_text + stage[1])
2408 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2409 self.fs.sync(db_nsr["nsd-id"])
2410 db_nsr["nsd"] = nsd
2411 # nsr_name = db_nsr["name"] # TODO short-name??
2412
2413 # read from db: vnf's of this ns
2414 stage[1] = "Getting vnfrs from db."
2415 self.logger.debug(logging_text + stage[1])
2416 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2417
2418 # read from db: vnfd's for every vnf
2419 db_vnfds = [] # every vnfd data
2420
2421 # for each vnf in ns, read vnfd
2422 for vnfr in db_vnfrs_list:
2423 if vnfr.get("kdur"):
2424 kdur_list = []
2425 for kdur in vnfr["kdur"]:
2426 if kdur.get("additionalParams"):
2427 kdur["additionalParams"] = json.loads(
2428 kdur["additionalParams"]
2429 )
2430 kdur_list.append(kdur)
2431 vnfr["kdur"] = kdur_list
2432
2433 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2434 vnfd_id = vnfr["vnfd-id"]
2435 vnfd_ref = vnfr["vnfd-ref"]
2436 self.fs.sync(vnfd_id)
2437
2438 # if we haven't this vnfd, read it from db
2439 if vnfd_id not in db_vnfds:
2440 # read from db
2441 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2442 vnfd_id, vnfd_ref
2443 )
2444 self.logger.debug(logging_text + stage[1])
2445 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2446
2447 # store vnfd
2448 db_vnfds.append(vnfd)
2449
2450 # Get or generates the _admin.deployed.VCA list
2451 vca_deployed_list = None
2452 if db_nsr["_admin"].get("deployed"):
2453 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2454 if vca_deployed_list is None:
2455 vca_deployed_list = []
2456 configuration_status_list = []
2457 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2458 db_nsr_update["configurationStatus"] = configuration_status_list
2459 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2460 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2461 elif isinstance(vca_deployed_list, dict):
2462 # maintain backward compatibility. Change a dict to list at database
2463 vca_deployed_list = list(vca_deployed_list.values())
2464 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2465 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2466
2467 if not isinstance(
2468 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2469 ):
2470 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2471 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2472
2473 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2474 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2475 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2476 self.db.set_list(
2477 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2478 )
2479
2480 # n2vc_redesign STEP 2 Deploy Network Scenario
2481 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2482 self._write_op_status(op_id=nslcmop_id, stage=stage)
2483
2484 stage[1] = "Deploying KDUs."
2485 # self.logger.debug(logging_text + "Before deploy_kdus")
2486 # Call to deploy_kdus in case exists the "vdu:kdu" param
2487 await self.deploy_kdus(
2488 logging_text=logging_text,
2489 nsr_id=nsr_id,
2490 nslcmop_id=nslcmop_id,
2491 db_vnfrs=db_vnfrs,
2492 db_vnfds=db_vnfds,
2493 task_instantiation_info=tasks_dict_info,
2494 )
2495
2496 stage[1] = "Getting VCA public key."
2497 # n2vc_redesign STEP 1 Get VCA public ssh-key
2498 # feature 1429. Add n2vc public key to needed VMs
2499 n2vc_key = self.n2vc.get_public_key()
2500 n2vc_key_list = [n2vc_key]
2501 if self.vca_config.get("public_key"):
2502 n2vc_key_list.append(self.vca_config["public_key"])
2503
2504 stage[1] = "Deploying NS at VIM."
2505 task_ro = asyncio.ensure_future(
2506 self.instantiate_RO(
2507 logging_text=logging_text,
2508 nsr_id=nsr_id,
2509 nsd=nsd,
2510 db_nsr=db_nsr,
2511 db_nslcmop=db_nslcmop,
2512 db_vnfrs=db_vnfrs,
2513 db_vnfds=db_vnfds,
2514 n2vc_key_list=n2vc_key_list,
2515 stage=stage,
2516 )
2517 )
2518 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2519 tasks_dict_info[task_ro] = "Deploying at VIM"
2520
2521 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2522 stage[1] = "Deploying Execution Environments."
2523 self.logger.debug(logging_text + stage[1])
2524
2525 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2526 for vnf_profile in get_vnf_profiles(nsd):
2527 vnfd_id = vnf_profile["vnfd-id"]
2528 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2529 member_vnf_index = str(vnf_profile["id"])
2530 db_vnfr = db_vnfrs[member_vnf_index]
2531 base_folder = vnfd["_admin"]["storage"]
2532 vdu_id = None
2533 vdu_index = 0
2534 vdu_name = None
2535 kdu_name = None
2536
2537 # Get additional parameters
2538 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2539 if db_vnfr.get("additionalParamsForVnf"):
2540 deploy_params.update(
2541 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2542 )
2543
2544 descriptor_config = get_configuration(vnfd, vnfd["id"])
2545 if descriptor_config:
2546 self._deploy_n2vc(
2547 logging_text=logging_text
2548 + "member_vnf_index={} ".format(member_vnf_index),
2549 db_nsr=db_nsr,
2550 db_vnfr=db_vnfr,
2551 nslcmop_id=nslcmop_id,
2552 nsr_id=nsr_id,
2553 nsi_id=nsi_id,
2554 vnfd_id=vnfd_id,
2555 vdu_id=vdu_id,
2556 kdu_name=kdu_name,
2557 member_vnf_index=member_vnf_index,
2558 vdu_index=vdu_index,
2559 vdu_name=vdu_name,
2560 deploy_params=deploy_params,
2561 descriptor_config=descriptor_config,
2562 base_folder=base_folder,
2563 task_instantiation_info=tasks_dict_info,
2564 stage=stage,
2565 )
2566
2567 # Deploy charms for each VDU that supports one.
2568 for vdud in get_vdu_list(vnfd):
2569 vdu_id = vdud["id"]
2570 descriptor_config = get_configuration(vnfd, vdu_id)
2571 vdur = find_in_list(
2572 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2573 )
2574
2575 if vdur.get("additionalParams"):
2576 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2577 else:
2578 deploy_params_vdu = deploy_params
2579 deploy_params_vdu["OSM"] = get_osm_params(
2580 db_vnfr, vdu_id, vdu_count_index=0
2581 )
2582 vdud_count = get_number_of_instances(vnfd, vdu_id)
2583
2584 self.logger.debug("VDUD > {}".format(vdud))
2585 self.logger.debug(
2586 "Descriptor config > {}".format(descriptor_config)
2587 )
2588 if descriptor_config:
2589 vdu_name = None
2590 kdu_name = None
2591 for vdu_index in range(vdud_count):
2592 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2593 self._deploy_n2vc(
2594 logging_text=logging_text
2595 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2596 member_vnf_index, vdu_id, vdu_index
2597 ),
2598 db_nsr=db_nsr,
2599 db_vnfr=db_vnfr,
2600 nslcmop_id=nslcmop_id,
2601 nsr_id=nsr_id,
2602 nsi_id=nsi_id,
2603 vnfd_id=vnfd_id,
2604 vdu_id=vdu_id,
2605 kdu_name=kdu_name,
2606 member_vnf_index=member_vnf_index,
2607 vdu_index=vdu_index,
2608 vdu_name=vdu_name,
2609 deploy_params=deploy_params_vdu,
2610 descriptor_config=descriptor_config,
2611 base_folder=base_folder,
2612 task_instantiation_info=tasks_dict_info,
2613 stage=stage,
2614 )
2615 for kdud in get_kdu_list(vnfd):
2616 kdu_name = kdud["name"]
2617 descriptor_config = get_configuration(vnfd, kdu_name)
2618 if descriptor_config:
2619 vdu_id = None
2620 vdu_index = 0
2621 vdu_name = None
2622 kdur = next(
2623 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2624 )
2625 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2626 if kdur.get("additionalParams"):
2627 deploy_params_kdu.update(
2628 parse_yaml_strings(kdur["additionalParams"].copy())
2629 )
2630
2631 self._deploy_n2vc(
2632 logging_text=logging_text,
2633 db_nsr=db_nsr,
2634 db_vnfr=db_vnfr,
2635 nslcmop_id=nslcmop_id,
2636 nsr_id=nsr_id,
2637 nsi_id=nsi_id,
2638 vnfd_id=vnfd_id,
2639 vdu_id=vdu_id,
2640 kdu_name=kdu_name,
2641 member_vnf_index=member_vnf_index,
2642 vdu_index=vdu_index,
2643 vdu_name=vdu_name,
2644 deploy_params=deploy_params_kdu,
2645 descriptor_config=descriptor_config,
2646 base_folder=base_folder,
2647 task_instantiation_info=tasks_dict_info,
2648 stage=stage,
2649 )
2650
2651 # Check if this NS has a charm configuration
2652 descriptor_config = nsd.get("ns-configuration")
2653 if descriptor_config and descriptor_config.get("juju"):
2654 vnfd_id = None
2655 db_vnfr = None
2656 member_vnf_index = None
2657 vdu_id = None
2658 kdu_name = None
2659 vdu_index = 0
2660 vdu_name = None
2661
2662 # Get additional parameters
2663 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2664 if db_nsr.get("additionalParamsForNs"):
2665 deploy_params.update(
2666 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2667 )
2668 base_folder = nsd["_admin"]["storage"]
2669 self._deploy_n2vc(
2670 logging_text=logging_text,
2671 db_nsr=db_nsr,
2672 db_vnfr=db_vnfr,
2673 nslcmop_id=nslcmop_id,
2674 nsr_id=nsr_id,
2675 nsi_id=nsi_id,
2676 vnfd_id=vnfd_id,
2677 vdu_id=vdu_id,
2678 kdu_name=kdu_name,
2679 member_vnf_index=member_vnf_index,
2680 vdu_index=vdu_index,
2681 vdu_name=vdu_name,
2682 deploy_params=deploy_params,
2683 descriptor_config=descriptor_config,
2684 base_folder=base_folder,
2685 task_instantiation_info=tasks_dict_info,
2686 stage=stage,
2687 )
2688
2689 # rest of staff will be done at finally
2690
2691 except (
2692 ROclient.ROClientException,
2693 DbException,
2694 LcmException,
2695 N2VCException,
2696 ) as e:
2697 self.logger.error(
2698 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2699 )
2700 exc = e
2701 except asyncio.CancelledError:
2702 self.logger.error(
2703 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2704 )
2705 exc = "Operation was cancelled"
2706 except Exception as e:
2707 exc = traceback.format_exc()
2708 self.logger.critical(
2709 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2710 exc_info=True,
2711 )
2712 finally:
2713 if exc:
2714 error_list.append(str(exc))
2715 try:
2716 # wait for pending tasks
2717 if tasks_dict_info:
2718 stage[1] = "Waiting for instantiate pending tasks."
2719 self.logger.debug(logging_text + stage[1])
2720 error_list += await self._wait_for_tasks(
2721 logging_text,
2722 tasks_dict_info,
2723 timeout_ns_deploy,
2724 stage,
2725 nslcmop_id,
2726 nsr_id=nsr_id,
2727 )
2728 stage[1] = stage[2] = ""
2729 except asyncio.CancelledError:
2730 error_list.append("Cancelled")
2731 # TODO cancel all tasks
2732 except Exception as exc:
2733 error_list.append(str(exc))
2734
2735 # update operation-status
2736 db_nsr_update["operational-status"] = "running"
2737 # let's begin with VCA 'configured' status (later we can change it)
2738 db_nsr_update["config-status"] = "configured"
2739 for task, task_name in tasks_dict_info.items():
2740 if not task.done() or task.cancelled() or task.exception():
2741 if task_name.startswith(self.task_name_deploy_vca):
2742 # A N2VC task is pending
2743 db_nsr_update["config-status"] = "failed"
2744 else:
2745 # RO or KDU task is pending
2746 db_nsr_update["operational-status"] = "failed"
2747
2748 # update status at database
2749 if error_list:
2750 error_detail = ". ".join(error_list)
2751 self.logger.error(logging_text + error_detail)
2752 error_description_nslcmop = "{} Detail: {}".format(
2753 stage[0], error_detail
2754 )
2755 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2756 nslcmop_id, stage[0]
2757 )
2758
2759 db_nsr_update["detailed-status"] = (
2760 error_description_nsr + " Detail: " + error_detail
2761 )
2762 db_nslcmop_update["detailed-status"] = error_detail
2763 nslcmop_operation_state = "FAILED"
2764 ns_state = "BROKEN"
2765 else:
2766 error_detail = None
2767 error_description_nsr = error_description_nslcmop = None
2768 ns_state = "READY"
2769 db_nsr_update["detailed-status"] = "Done"
2770 db_nslcmop_update["detailed-status"] = "Done"
2771 nslcmop_operation_state = "COMPLETED"
2772
2773 if db_nsr:
2774 self._write_ns_status(
2775 nsr_id=nsr_id,
2776 ns_state=ns_state,
2777 current_operation="IDLE",
2778 current_operation_id=None,
2779 error_description=error_description_nsr,
2780 error_detail=error_detail,
2781 other_update=db_nsr_update,
2782 )
2783 self._write_op_status(
2784 op_id=nslcmop_id,
2785 stage="",
2786 error_message=error_description_nslcmop,
2787 operation_state=nslcmop_operation_state,
2788 other_update=db_nslcmop_update,
2789 )
2790
2791 if nslcmop_operation_state:
2792 try:
2793 await self.msg.aiowrite(
2794 "ns",
2795 "instantiated",
2796 {
2797 "nsr_id": nsr_id,
2798 "nslcmop_id": nslcmop_id,
2799 "operationState": nslcmop_operation_state,
2800 },
2801 loop=self.loop,
2802 )
2803 except Exception as e:
2804 self.logger.error(
2805 logging_text + "kafka_write notification Exception {}".format(e)
2806 )
2807
2808 self.logger.debug(logging_text + "Exit")
2809 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2810
2811 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2812 if vnfd_id not in cached_vnfds:
2813 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2814 return cached_vnfds[vnfd_id]
2815
2816 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2817 if vnf_profile_id not in cached_vnfrs:
2818 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2819 "vnfrs",
2820 {
2821 "member-vnf-index-ref": vnf_profile_id,
2822 "nsr-id-ref": nsr_id,
2823 },
2824 )
2825 return cached_vnfrs[vnf_profile_id]
2826
2827 def _is_deployed_vca_in_relation(
2828 self, vca: DeployedVCA, relation: Relation
2829 ) -> bool:
2830 found = False
2831 for endpoint in (relation.provider, relation.requirer):
2832 if endpoint["kdu-resource-profile-id"]:
2833 continue
2834 found = (
2835 vca.vnf_profile_id == endpoint.vnf_profile_id
2836 and vca.vdu_profile_id == endpoint.vdu_profile_id
2837 and vca.execution_environment_ref == endpoint.execution_environment_ref
2838 )
2839 if found:
2840 break
2841 return found
2842
2843 def _update_ee_relation_data_with_implicit_data(
2844 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2845 ):
2846 ee_relation_data = safe_get_ee_relation(
2847 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2848 )
2849 ee_relation_level = EELevel.get_level(ee_relation_data)
2850 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2851 "execution-environment-ref"
2852 ]:
2853 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2854 vnfd_id = vnf_profile["vnfd-id"]
2855 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2856 entity_id = (
2857 vnfd_id
2858 if ee_relation_level == EELevel.VNF
2859 else ee_relation_data["vdu-profile-id"]
2860 )
2861 ee = get_juju_ee_ref(db_vnfd, entity_id)
2862 if not ee:
2863 raise Exception(
2864 f"not execution environments found for ee_relation {ee_relation_data}"
2865 )
2866 ee_relation_data["execution-environment-ref"] = ee["id"]
2867 return ee_relation_data
2868
2869 def _get_ns_relations(
2870 self,
2871 nsr_id: str,
2872 nsd: Dict[str, Any],
2873 vca: DeployedVCA,
2874 cached_vnfds: Dict[str, Any],
2875 ) -> List[Relation]:
2876 relations = []
2877 db_ns_relations = get_ns_configuration_relation_list(nsd)
2878 for r in db_ns_relations:
2879 provider_dict = None
2880 requirer_dict = None
2881 if all(key in r for key in ("provider", "requirer")):
2882 provider_dict = r["provider"]
2883 requirer_dict = r["requirer"]
2884 elif "entities" in r:
2885 provider_id = r["entities"][0]["id"]
2886 provider_dict = {
2887 "nsr-id": nsr_id,
2888 "endpoint": r["entities"][0]["endpoint"],
2889 }
2890 if provider_id != nsd["id"]:
2891 provider_dict["vnf-profile-id"] = provider_id
2892 requirer_id = r["entities"][1]["id"]
2893 requirer_dict = {
2894 "nsr-id": nsr_id,
2895 "endpoint": r["entities"][1]["endpoint"],
2896 }
2897 if requirer_id != nsd["id"]:
2898 requirer_dict["vnf-profile-id"] = requirer_id
2899 else:
2900 raise Exception(
2901 "provider/requirer or entities must be included in the relation."
2902 )
2903 relation_provider = self._update_ee_relation_data_with_implicit_data(
2904 nsr_id, nsd, provider_dict, cached_vnfds
2905 )
2906 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2907 nsr_id, nsd, requirer_dict, cached_vnfds
2908 )
2909 provider = EERelation(relation_provider)
2910 requirer = EERelation(relation_requirer)
2911 relation = Relation(r["name"], provider, requirer)
2912 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2913 if vca_in_relation:
2914 relations.append(relation)
2915 return relations
2916
2917 def _get_vnf_relations(
2918 self,
2919 nsr_id: str,
2920 nsd: Dict[str, Any],
2921 vca: DeployedVCA,
2922 cached_vnfds: Dict[str, Any],
2923 ) -> List[Relation]:
2924 relations = []
2925 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2926 vnf_profile_id = vnf_profile["id"]
2927 vnfd_id = vnf_profile["vnfd-id"]
2928 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2929 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2930 for r in db_vnf_relations:
2931 provider_dict = None
2932 requirer_dict = None
2933 if all(key in r for key in ("provider", "requirer")):
2934 provider_dict = r["provider"]
2935 requirer_dict = r["requirer"]
2936 elif "entities" in r:
2937 provider_id = r["entities"][0]["id"]
2938 provider_dict = {
2939 "nsr-id": nsr_id,
2940 "vnf-profile-id": vnf_profile_id,
2941 "endpoint": r["entities"][0]["endpoint"],
2942 }
2943 if provider_id != vnfd_id:
2944 provider_dict["vdu-profile-id"] = provider_id
2945 requirer_id = r["entities"][1]["id"]
2946 requirer_dict = {
2947 "nsr-id": nsr_id,
2948 "vnf-profile-id": vnf_profile_id,
2949 "endpoint": r["entities"][1]["endpoint"],
2950 }
2951 if requirer_id != vnfd_id:
2952 requirer_dict["vdu-profile-id"] = requirer_id
2953 else:
2954 raise Exception(
2955 "provider/requirer or entities must be included in the relation."
2956 )
2957 relation_provider = self._update_ee_relation_data_with_implicit_data(
2958 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2959 )
2960 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2961 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2962 )
2963 provider = EERelation(relation_provider)
2964 requirer = EERelation(relation_requirer)
2965 relation = Relation(r["name"], provider, requirer)
2966 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2967 if vca_in_relation:
2968 relations.append(relation)
2969 return relations
2970
2971 def _get_kdu_resource_data(
2972 self,
2973 ee_relation: EERelation,
2974 db_nsr: Dict[str, Any],
2975 cached_vnfds: Dict[str, Any],
2976 ) -> DeployedK8sResource:
2977 nsd = get_nsd(db_nsr)
2978 vnf_profiles = get_vnf_profiles(nsd)
2979 vnfd_id = find_in_list(
2980 vnf_profiles,
2981 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2982 )["vnfd-id"]
2983 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2984 kdu_resource_profile = get_kdu_resource_profile(
2985 db_vnfd, ee_relation.kdu_resource_profile_id
2986 )
2987 kdu_name = kdu_resource_profile["kdu-name"]
2988 deployed_kdu, _ = get_deployed_kdu(
2989 db_nsr.get("_admin", ()).get("deployed", ()),
2990 kdu_name,
2991 ee_relation.vnf_profile_id,
2992 )
2993 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2994 return deployed_kdu
2995
2996 def _get_deployed_component(
2997 self,
2998 ee_relation: EERelation,
2999 db_nsr: Dict[str, Any],
3000 cached_vnfds: Dict[str, Any],
3001 ) -> DeployedComponent:
3002 nsr_id = db_nsr["_id"]
3003 deployed_component = None
3004 ee_level = EELevel.get_level(ee_relation)
3005 if ee_level == EELevel.NS:
3006 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
3007 if vca:
3008 deployed_component = DeployedVCA(nsr_id, vca)
3009 elif ee_level == EELevel.VNF:
3010 vca = get_deployed_vca(
3011 db_nsr,
3012 {
3013 "vdu_id": None,
3014 "member-vnf-index": ee_relation.vnf_profile_id,
3015 "ee_descriptor_id": ee_relation.execution_environment_ref,
3016 },
3017 )
3018 if vca:
3019 deployed_component = DeployedVCA(nsr_id, vca)
3020 elif ee_level == EELevel.VDU:
3021 vca = get_deployed_vca(
3022 db_nsr,
3023 {
3024 "vdu_id": ee_relation.vdu_profile_id,
3025 "member-vnf-index": ee_relation.vnf_profile_id,
3026 "ee_descriptor_id": ee_relation.execution_environment_ref,
3027 },
3028 )
3029 if vca:
3030 deployed_component = DeployedVCA(nsr_id, vca)
3031 elif ee_level == EELevel.KDU:
3032 kdu_resource_data = self._get_kdu_resource_data(
3033 ee_relation, db_nsr, cached_vnfds
3034 )
3035 if kdu_resource_data:
3036 deployed_component = DeployedK8sResource(kdu_resource_data)
3037 return deployed_component
3038
3039 async def _add_relation(
3040 self,
3041 relation: Relation,
3042 vca_type: str,
3043 db_nsr: Dict[str, Any],
3044 cached_vnfds: Dict[str, Any],
3045 cached_vnfrs: Dict[str, Any],
3046 ) -> bool:
3047 deployed_provider = self._get_deployed_component(
3048 relation.provider, db_nsr, cached_vnfds
3049 )
3050 deployed_requirer = self._get_deployed_component(
3051 relation.requirer, db_nsr, cached_vnfds
3052 )
3053 if (
3054 deployed_provider
3055 and deployed_requirer
3056 and deployed_provider.config_sw_installed
3057 and deployed_requirer.config_sw_installed
3058 ):
3059 provider_db_vnfr = (
3060 self._get_vnfr(
3061 relation.provider.nsr_id,
3062 relation.provider.vnf_profile_id,
3063 cached_vnfrs,
3064 )
3065 if relation.provider.vnf_profile_id
3066 else None
3067 )
3068 requirer_db_vnfr = (
3069 self._get_vnfr(
3070 relation.requirer.nsr_id,
3071 relation.requirer.vnf_profile_id,
3072 cached_vnfrs,
3073 )
3074 if relation.requirer.vnf_profile_id
3075 else None
3076 )
3077 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3078 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3079 provider_relation_endpoint = RelationEndpoint(
3080 deployed_provider.ee_id,
3081 provider_vca_id,
3082 relation.provider.endpoint,
3083 )
3084 requirer_relation_endpoint = RelationEndpoint(
3085 deployed_requirer.ee_id,
3086 requirer_vca_id,
3087 relation.requirer.endpoint,
3088 )
3089 await self.vca_map[vca_type].add_relation(
3090 provider=provider_relation_endpoint,
3091 requirer=requirer_relation_endpoint,
3092 )
3093 # remove entry from relations list
3094 return True
3095 return False
3096
3097 async def _add_vca_relations(
3098 self,
3099 logging_text,
3100 nsr_id,
3101 vca_type: str,
3102 vca_index: int,
3103 timeout: int = 3600,
3104 ) -> bool:
3105
3106 # steps:
3107 # 1. find all relations for this VCA
3108 # 2. wait for other peers related
3109 # 3. add relations
3110
3111 try:
3112 # STEP 1: find all relations for this VCA
3113
3114 # read nsr record
3115 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3116 nsd = get_nsd(db_nsr)
3117
3118 # this VCA data
3119 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3120 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3121
3122 cached_vnfds = {}
3123 cached_vnfrs = {}
3124 relations = []
3125 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3126 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3127
3128 # if no relations, terminate
3129 if not relations:
3130 self.logger.debug(logging_text + " No relations")
3131 return True
3132
3133 self.logger.debug(logging_text + " adding relations {}".format(relations))
3134
3135 # add all relations
3136 start = time()
3137 while True:
3138 # check timeout
3139 now = time()
3140 if now - start >= timeout:
3141 self.logger.error(logging_text + " : timeout adding relations")
3142 return False
3143
3144 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3145 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3146
3147 # for each relation, find the VCA's related
3148 for relation in relations.copy():
3149 added = await self._add_relation(
3150 relation,
3151 vca_type,
3152 db_nsr,
3153 cached_vnfds,
3154 cached_vnfrs,
3155 )
3156 if added:
3157 relations.remove(relation)
3158
3159 if not relations:
3160 self.logger.debug("Relations added")
3161 break
3162 await asyncio.sleep(5.0)
3163
3164 return True
3165
3166 except Exception as e:
3167 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3168 return False
3169
3170 async def _install_kdu(
3171 self,
3172 nsr_id: str,
3173 nsr_db_path: str,
3174 vnfr_data: dict,
3175 kdu_index: int,
3176 kdud: dict,
3177 vnfd: dict,
3178 k8s_instance_info: dict,
3179 k8params: dict = None,
3180 timeout: int = 600,
3181 vca_id: str = None,
3182 ):
3183
3184 try:
3185 k8sclustertype = k8s_instance_info["k8scluster-type"]
3186 # Instantiate kdu
3187 db_dict_install = {
3188 "collection": "nsrs",
3189 "filter": {"_id": nsr_id},
3190 "path": nsr_db_path,
3191 }
3192
3193 if k8s_instance_info.get("kdu-deployment-name"):
3194 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3195 else:
3196 kdu_instance = self.k8scluster_map[
3197 k8sclustertype
3198 ].generate_kdu_instance_name(
3199 db_dict=db_dict_install,
3200 kdu_model=k8s_instance_info["kdu-model"],
3201 kdu_name=k8s_instance_info["kdu-name"],
3202 )
3203 self.update_db_2(
3204 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3205 )
3206 await self.k8scluster_map[k8sclustertype].install(
3207 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3208 kdu_model=k8s_instance_info["kdu-model"],
3209 atomic=True,
3210 params=k8params,
3211 db_dict=db_dict_install,
3212 timeout=timeout,
3213 kdu_name=k8s_instance_info["kdu-name"],
3214 namespace=k8s_instance_info["namespace"],
3215 kdu_instance=kdu_instance,
3216 vca_id=vca_id,
3217 )
3218 self.update_db_2(
3219 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3220 )
3221
3222 # Obtain services to obtain management service ip
3223 services = await self.k8scluster_map[k8sclustertype].get_services(
3224 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3225 kdu_instance=kdu_instance,
3226 namespace=k8s_instance_info["namespace"],
3227 )
3228
3229 # Obtain management service info (if exists)
3230 vnfr_update_dict = {}
3231 kdu_config = get_configuration(vnfd, kdud["name"])
3232 if kdu_config:
3233 target_ee_list = kdu_config.get("execution-environment-list", [])
3234 else:
3235 target_ee_list = []
3236
3237 if services:
3238 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3239 mgmt_services = [
3240 service
3241 for service in kdud.get("service", [])
3242 if service.get("mgmt-service")
3243 ]
3244 for mgmt_service in mgmt_services:
3245 for service in services:
3246 if service["name"].startswith(mgmt_service["name"]):
3247 # Mgmt service found, Obtain service ip
3248 ip = service.get("external_ip", service.get("cluster_ip"))
3249 if isinstance(ip, list) and len(ip) == 1:
3250 ip = ip[0]
3251
3252 vnfr_update_dict[
3253 "kdur.{}.ip-address".format(kdu_index)
3254 ] = ip
3255
3256 # Check if must update also mgmt ip at the vnf
3257 service_external_cp = mgmt_service.get(
3258 "external-connection-point-ref"
3259 )
3260 if service_external_cp:
3261 if (
3262 deep_get(vnfd, ("mgmt-interface", "cp"))
3263 == service_external_cp
3264 ):
3265 vnfr_update_dict["ip-address"] = ip
3266
3267 if find_in_list(
3268 target_ee_list,
3269 lambda ee: ee.get(
3270 "external-connection-point-ref", ""
3271 )
3272 == service_external_cp,
3273 ):
3274 vnfr_update_dict[
3275 "kdur.{}.ip-address".format(kdu_index)
3276 ] = ip
3277 break
3278 else:
3279 self.logger.warn(
3280 "Mgmt service name: {} not found".format(
3281 mgmt_service["name"]
3282 )
3283 )
3284
3285 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3286 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3287
3288 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3289 if (
3290 kdu_config
3291 and kdu_config.get("initial-config-primitive")
3292 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3293 ):
3294 initial_config_primitive_list = kdu_config.get(
3295 "initial-config-primitive"
3296 )
3297 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3298
3299 for initial_config_primitive in initial_config_primitive_list:
3300 primitive_params_ = self._map_primitive_params(
3301 initial_config_primitive, {}, {}
3302 )
3303
3304 await asyncio.wait_for(
3305 self.k8scluster_map[k8sclustertype].exec_primitive(
3306 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3307 kdu_instance=kdu_instance,
3308 primitive_name=initial_config_primitive["name"],
3309 params=primitive_params_,
3310 db_dict=db_dict_install,
3311 vca_id=vca_id,
3312 ),
3313 timeout=timeout,
3314 )
3315
3316 except Exception as e:
3317 # Prepare update db with error and raise exception
3318 try:
3319 self.update_db_2(
3320 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3321 )
3322 self.update_db_2(
3323 "vnfrs",
3324 vnfr_data.get("_id"),
3325 {"kdur.{}.status".format(kdu_index): "ERROR"},
3326 )
3327 except Exception:
3328 # ignore to keep original exception
3329 pass
3330 # reraise original error
3331 raise
3332
3333 return kdu_instance
3334
3335 async def deploy_kdus(
3336 self,
3337 logging_text,
3338 nsr_id,
3339 nslcmop_id,
3340 db_vnfrs,
3341 db_vnfds,
3342 task_instantiation_info,
3343 ):
3344 # Launch kdus if present in the descriptor
3345
3346 k8scluster_id_2_uuic = {
3347 "helm-chart-v3": {},
3348 "helm-chart": {},
3349 "juju-bundle": {},
3350 }
3351
3352 async def _get_cluster_id(cluster_id, cluster_type):
3353 nonlocal k8scluster_id_2_uuic
3354 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3355 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3356
3357 # check if K8scluster is creating and wait look if previous tasks in process
3358 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3359 "k8scluster", cluster_id
3360 )
3361 if task_dependency:
3362 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3363 task_name, cluster_id
3364 )
3365 self.logger.debug(logging_text + text)
3366 await asyncio.wait(task_dependency, timeout=3600)
3367
3368 db_k8scluster = self.db.get_one(
3369 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3370 )
3371 if not db_k8scluster:
3372 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3373
3374 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3375 if not k8s_id:
3376 if cluster_type == "helm-chart-v3":
3377 try:
3378 # backward compatibility for existing clusters that have not been initialized for helm v3
3379 k8s_credentials = yaml.safe_dump(
3380 db_k8scluster.get("credentials")
3381 )
3382 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3383 k8s_credentials, reuse_cluster_uuid=cluster_id
3384 )
3385 db_k8scluster_update = {}
3386 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3387 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3388 db_k8scluster_update[
3389 "_admin.helm-chart-v3.created"
3390 ] = uninstall_sw
3391 db_k8scluster_update[
3392 "_admin.helm-chart-v3.operationalState"
3393 ] = "ENABLED"
3394 self.update_db_2(
3395 "k8sclusters", cluster_id, db_k8scluster_update
3396 )
3397 except Exception as e:
3398 self.logger.error(
3399 logging_text
3400 + "error initializing helm-v3 cluster: {}".format(str(e))
3401 )
3402 raise LcmException(
3403 "K8s cluster '{}' has not been initialized for '{}'".format(
3404 cluster_id, cluster_type
3405 )
3406 )
3407 else:
3408 raise LcmException(
3409 "K8s cluster '{}' has not been initialized for '{}'".format(
3410 cluster_id, cluster_type
3411 )
3412 )
3413 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3414 return k8s_id
3415
3416 logging_text += "Deploy kdus: "
3417 step = ""
3418 try:
3419 db_nsr_update = {"_admin.deployed.K8s": []}
3420 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3421
3422 index = 0
3423 updated_cluster_list = []
3424 updated_v3_cluster_list = []
3425
3426 for vnfr_data in db_vnfrs.values():
3427 vca_id = self.get_vca_id(vnfr_data, {})
3428 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3429 # Step 0: Prepare and set parameters
3430 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3431 vnfd_id = vnfr_data.get("vnfd-id")
3432 vnfd_with_id = find_in_list(
3433 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3434 )
3435 kdud = next(
3436 kdud
3437 for kdud in vnfd_with_id["kdu"]
3438 if kdud["name"] == kdur["kdu-name"]
3439 )
3440 namespace = kdur.get("k8s-namespace")
3441 kdu_deployment_name = kdur.get("kdu-deployment-name")
3442 if kdur.get("helm-chart"):
3443 kdumodel = kdur["helm-chart"]
3444 # Default version: helm3, if helm-version is v2 assign v2
3445 k8sclustertype = "helm-chart-v3"
3446 self.logger.debug("kdur: {}".format(kdur))
3447 if (
3448 kdur.get("helm-version")
3449 and kdur.get("helm-version") == "v2"
3450 ):
3451 k8sclustertype = "helm-chart"
3452 elif kdur.get("juju-bundle"):
3453 kdumodel = kdur["juju-bundle"]
3454 k8sclustertype = "juju-bundle"
3455 else:
3456 raise LcmException(
3457 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3458 "juju-bundle. Maybe an old NBI version is running".format(
3459 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3460 )
3461 )
3462 # check if kdumodel is a file and exists
3463 try:
3464 vnfd_with_id = find_in_list(
3465 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3466 )
3467 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3468 if storage: # may be not present if vnfd has not artifacts
3469 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3470 if storage["pkg-dir"]:
3471 filename = "{}/{}/{}s/{}".format(
3472 storage["folder"],
3473 storage["pkg-dir"],
3474 k8sclustertype,
3475 kdumodel,
3476 )
3477 else:
3478 filename = "{}/Scripts/{}s/{}".format(
3479 storage["folder"],
3480 k8sclustertype,
3481 kdumodel,
3482 )
3483 if self.fs.file_exists(
3484 filename, mode="file"
3485 ) or self.fs.file_exists(filename, mode="dir"):
3486 kdumodel = self.fs.path + filename
3487 except (asyncio.TimeoutError, asyncio.CancelledError):
3488 raise
3489 except Exception: # it is not a file
3490 pass
3491
3492 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3493 step = "Synchronize repos for k8s cluster '{}'".format(
3494 k8s_cluster_id
3495 )
3496 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3497
3498 # Synchronize repos
3499 if (
3500 k8sclustertype == "helm-chart"
3501 and cluster_uuid not in updated_cluster_list
3502 ) or (
3503 k8sclustertype == "helm-chart-v3"
3504 and cluster_uuid not in updated_v3_cluster_list
3505 ):
3506 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3507 self.k8scluster_map[k8sclustertype].synchronize_repos(
3508 cluster_uuid=cluster_uuid
3509 )
3510 )
3511 if del_repo_list or added_repo_dict:
3512 if k8sclustertype == "helm-chart":
3513 unset = {
3514 "_admin.helm_charts_added." + item: None
3515 for item in del_repo_list
3516 }
3517 updated = {
3518 "_admin.helm_charts_added." + item: name
3519 for item, name in added_repo_dict.items()
3520 }
3521 updated_cluster_list.append(cluster_uuid)
3522 elif k8sclustertype == "helm-chart-v3":
3523 unset = {
3524 "_admin.helm_charts_v3_added." + item: None
3525 for item in del_repo_list
3526 }
3527 updated = {
3528 "_admin.helm_charts_v3_added." + item: name
3529 for item, name in added_repo_dict.items()
3530 }
3531 updated_v3_cluster_list.append(cluster_uuid)
3532 self.logger.debug(
3533 logging_text + "repos synchronized on k8s cluster "
3534 "'{}' to_delete: {}, to_add: {}".format(
3535 k8s_cluster_id, del_repo_list, added_repo_dict
3536 )
3537 )
3538 self.db.set_one(
3539 "k8sclusters",
3540 {"_id": k8s_cluster_id},
3541 updated,
3542 unset=unset,
3543 )
3544
3545 # Instantiate kdu
3546 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3547 vnfr_data["member-vnf-index-ref"],
3548 kdur["kdu-name"],
3549 k8s_cluster_id,
3550 )
3551 k8s_instance_info = {
3552 "kdu-instance": None,
3553 "k8scluster-uuid": cluster_uuid,
3554 "k8scluster-type": k8sclustertype,
3555 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3556 "kdu-name": kdur["kdu-name"],
3557 "kdu-model": kdumodel,
3558 "namespace": namespace,
3559 "kdu-deployment-name": kdu_deployment_name,
3560 }
3561 db_path = "_admin.deployed.K8s.{}".format(index)
3562 db_nsr_update[db_path] = k8s_instance_info
3563 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3564 vnfd_with_id = find_in_list(
3565 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3566 )
3567 task = asyncio.ensure_future(
3568 self._install_kdu(
3569 nsr_id,
3570 db_path,
3571 vnfr_data,
3572 kdu_index,
3573 kdud,
3574 vnfd_with_id,
3575 k8s_instance_info,
3576 k8params=desc_params,
3577 timeout=600,
3578 vca_id=vca_id,
3579 )
3580 )
3581 self.lcm_tasks.register(
3582 "ns",
3583 nsr_id,
3584 nslcmop_id,
3585 "instantiate_KDU-{}".format(index),
3586 task,
3587 )
3588 task_instantiation_info[task] = "Deploying KDU {}".format(
3589 kdur["kdu-name"]
3590 )
3591
3592 index += 1
3593
3594 except (LcmException, asyncio.CancelledError):
3595 raise
3596 except Exception as e:
3597 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3598 if isinstance(e, (N2VCException, DbException)):
3599 self.logger.error(logging_text + msg)
3600 else:
3601 self.logger.critical(logging_text + msg, exc_info=True)
3602 raise LcmException(msg)
3603 finally:
3604 if db_nsr_update:
3605 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3606
3607 def _deploy_n2vc(
3608 self,
3609 logging_text,
3610 db_nsr,
3611 db_vnfr,
3612 nslcmop_id,
3613 nsr_id,
3614 nsi_id,
3615 vnfd_id,
3616 vdu_id,
3617 kdu_name,
3618 member_vnf_index,
3619 vdu_index,
3620 vdu_name,
3621 deploy_params,
3622 descriptor_config,
3623 base_folder,
3624 task_instantiation_info,
3625 stage,
3626 ):
3627 # launch instantiate_N2VC in a asyncio task and register task object
3628 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3629 # if not found, create one entry and update database
3630 # fill db_nsr._admin.deployed.VCA.<index>
3631
3632 self.logger.debug(
3633 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3634 )
3635 if "execution-environment-list" in descriptor_config:
3636 ee_list = descriptor_config.get("execution-environment-list", [])
3637 elif "juju" in descriptor_config:
3638 ee_list = [descriptor_config] # ns charms
3639 else: # other types as script are not supported
3640 ee_list = []
3641
3642 for ee_item in ee_list:
3643 self.logger.debug(
3644 logging_text
3645 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3646 ee_item.get("juju"), ee_item.get("helm-chart")
3647 )
3648 )
3649 ee_descriptor_id = ee_item.get("id")
3650 if ee_item.get("juju"):
3651 vca_name = ee_item["juju"].get("charm")
3652 vca_type = (
3653 "lxc_proxy_charm"
3654 if ee_item["juju"].get("charm") is not None
3655 else "native_charm"
3656 )
3657 if ee_item["juju"].get("cloud") == "k8s":
3658 vca_type = "k8s_proxy_charm"
3659 elif ee_item["juju"].get("proxy") is False:
3660 vca_type = "native_charm"
3661 elif ee_item.get("helm-chart"):
3662 vca_name = ee_item["helm-chart"]
3663 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3664 vca_type = "helm"
3665 else:
3666 vca_type = "helm-v3"
3667 else:
3668 self.logger.debug(
3669 logging_text + "skipping non juju neither charm configuration"
3670 )
3671 continue
3672
3673 vca_index = -1
3674 for vca_index, vca_deployed in enumerate(
3675 db_nsr["_admin"]["deployed"]["VCA"]
3676 ):
3677 if not vca_deployed:
3678 continue
3679 if (
3680 vca_deployed.get("member-vnf-index") == member_vnf_index
3681 and vca_deployed.get("vdu_id") == vdu_id
3682 and vca_deployed.get("kdu_name") == kdu_name
3683 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3684 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3685 ):
3686 break
3687 else:
3688 # not found, create one.
3689 target = (
3690 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3691 )
3692 if vdu_id:
3693 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3694 elif kdu_name:
3695 target += "/kdu/{}".format(kdu_name)
3696 vca_deployed = {
3697 "target_element": target,
3698 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3699 "member-vnf-index": member_vnf_index,
3700 "vdu_id": vdu_id,
3701 "kdu_name": kdu_name,
3702 "vdu_count_index": vdu_index,
3703 "operational-status": "init", # TODO revise
3704 "detailed-status": "", # TODO revise
3705 "step": "initial-deploy", # TODO revise
3706 "vnfd_id": vnfd_id,
3707 "vdu_name": vdu_name,
3708 "type": vca_type,
3709 "ee_descriptor_id": ee_descriptor_id,
3710 }
3711 vca_index += 1
3712
3713 # create VCA and configurationStatus in db
3714 db_dict = {
3715 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3716 "configurationStatus.{}".format(vca_index): dict(),
3717 }
3718 self.update_db_2("nsrs", nsr_id, db_dict)
3719
3720 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3721
3722 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3723 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3724 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3725
3726 # Launch task
3727 task_n2vc = asyncio.ensure_future(
3728 self.instantiate_N2VC(
3729 logging_text=logging_text,
3730 vca_index=vca_index,
3731 nsi_id=nsi_id,
3732 db_nsr=db_nsr,
3733 db_vnfr=db_vnfr,
3734 vdu_id=vdu_id,
3735 kdu_name=kdu_name,
3736 vdu_index=vdu_index,
3737 deploy_params=deploy_params,
3738 config_descriptor=descriptor_config,
3739 base_folder=base_folder,
3740 nslcmop_id=nslcmop_id,
3741 stage=stage,
3742 vca_type=vca_type,
3743 vca_name=vca_name,
3744 ee_config_descriptor=ee_item,
3745 )
3746 )
3747 self.lcm_tasks.register(
3748 "ns",
3749 nsr_id,
3750 nslcmop_id,
3751 "instantiate_N2VC-{}".format(vca_index),
3752 task_n2vc,
3753 )
3754 task_instantiation_info[
3755 task_n2vc
3756 ] = self.task_name_deploy_vca + " {}.{}".format(
3757 member_vnf_index or "", vdu_id or ""
3758 )
3759
3760 @staticmethod
3761 def _create_nslcmop(nsr_id, operation, params):
3762 """
3763 Creates a ns-lcm-opp content to be stored at database.
3764 :param nsr_id: internal id of the instance
3765 :param operation: instantiate, terminate, scale, action, ...
3766 :param params: user parameters for the operation
3767 :return: dictionary following SOL005 format
3768 """
3769 # Raise exception if invalid arguments
3770 if not (nsr_id and operation and params):
3771 raise LcmException(
3772 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3773 )
3774 now = time()
3775 _id = str(uuid4())
3776 nslcmop = {
3777 "id": _id,
3778 "_id": _id,
3779 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3780 "operationState": "PROCESSING",
3781 "statusEnteredTime": now,
3782 "nsInstanceId": nsr_id,
3783 "lcmOperationType": operation,
3784 "startTime": now,
3785 "isAutomaticInvocation": False,
3786 "operationParams": params,
3787 "isCancelPending": False,
3788 "links": {
3789 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3790 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3791 },
3792 }
3793 return nslcmop
3794
3795 def _format_additional_params(self, params):
3796 params = params or {}
3797 for key, value in params.items():
3798 if str(value).startswith("!!yaml "):
3799 params[key] = yaml.safe_load(value[7:])
3800 return params
3801
3802 def _get_terminate_primitive_params(self, seq, vnf_index):
3803 primitive = seq.get("name")
3804 primitive_params = {}
3805 params = {
3806 "member_vnf_index": vnf_index,
3807 "primitive": primitive,
3808 "primitive_params": primitive_params,
3809 }
3810 desc_params = {}
3811 return self._map_primitive_params(seq, params, desc_params)
3812
3813 # sub-operations
3814
3815 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3816 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3817 if op.get("operationState") == "COMPLETED":
3818 # b. Skip sub-operation
3819 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3820 return self.SUBOPERATION_STATUS_SKIP
3821 else:
3822 # c. retry executing sub-operation
3823 # The sub-operation exists, and operationState != 'COMPLETED'
3824 # Update operationState = 'PROCESSING' to indicate a retry.
3825 operationState = "PROCESSING"
3826 detailed_status = "In progress"
3827 self._update_suboperation_status(
3828 db_nslcmop, op_index, operationState, detailed_status
3829 )
3830 # Return the sub-operation index
3831 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3832 # with arguments extracted from the sub-operation
3833 return op_index
3834
3835 # Find a sub-operation where all keys in a matching dictionary must match
3836 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3837 def _find_suboperation(self, db_nslcmop, match):
3838 if db_nslcmop and match:
3839 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3840 for i, op in enumerate(op_list):
3841 if all(op.get(k) == match[k] for k in match):
3842 return i
3843 return self.SUBOPERATION_STATUS_NOT_FOUND
3844
3845 # Update status for a sub-operation given its index
3846 def _update_suboperation_status(
3847 self, db_nslcmop, op_index, operationState, detailed_status
3848 ):
3849 # Update DB for HA tasks
3850 q_filter = {"_id": db_nslcmop["_id"]}
3851 update_dict = {
3852 "_admin.operations.{}.operationState".format(op_index): operationState,
3853 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3854 }
3855 self.db.set_one(
3856 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3857 )
3858
3859 # Add sub-operation, return the index of the added sub-operation
3860 # Optionally, set operationState, detailed-status, and operationType
3861 # Status and type are currently set for 'scale' sub-operations:
3862 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3863 # 'detailed-status' : status message
3864 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3865 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3866 def _add_suboperation(
3867 self,
3868 db_nslcmop,
3869 vnf_index,
3870 vdu_id,
3871 vdu_count_index,
3872 vdu_name,
3873 primitive,
3874 mapped_primitive_params,
3875 operationState=None,
3876 detailed_status=None,
3877 operationType=None,
3878 RO_nsr_id=None,
3879 RO_scaling_info=None,
3880 ):
3881 if not db_nslcmop:
3882 return self.SUBOPERATION_STATUS_NOT_FOUND
3883 # Get the "_admin.operations" list, if it exists
3884 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3885 op_list = db_nslcmop_admin.get("operations")
3886 # Create or append to the "_admin.operations" list
3887 new_op = {
3888 "member_vnf_index": vnf_index,
3889 "vdu_id": vdu_id,
3890 "vdu_count_index": vdu_count_index,
3891 "primitive": primitive,
3892 "primitive_params": mapped_primitive_params,
3893 }
3894 if operationState:
3895 new_op["operationState"] = operationState
3896 if detailed_status:
3897 new_op["detailed-status"] = detailed_status
3898 if operationType:
3899 new_op["lcmOperationType"] = operationType
3900 if RO_nsr_id:
3901 new_op["RO_nsr_id"] = RO_nsr_id
3902 if RO_scaling_info:
3903 new_op["RO_scaling_info"] = RO_scaling_info
3904 if not op_list:
3905 # No existing operations, create key 'operations' with current operation as first list element
3906 db_nslcmop_admin.update({"operations": [new_op]})
3907 op_list = db_nslcmop_admin.get("operations")
3908 else:
3909 # Existing operations, append operation to list
3910 op_list.append(new_op)
3911
3912 db_nslcmop_update = {"_admin.operations": op_list}
3913 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3914 op_index = len(op_list) - 1
3915 return op_index
3916
3917 # Helper methods for scale() sub-operations
3918
3919 # pre-scale/post-scale:
3920 # Check for 3 different cases:
3921 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3922 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3923 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3924 def _check_or_add_scale_suboperation(
3925 self,
3926 db_nslcmop,
3927 vnf_index,
3928 vnf_config_primitive,
3929 primitive_params,
3930 operationType,
3931 RO_nsr_id=None,
3932 RO_scaling_info=None,
3933 ):
3934 # Find this sub-operation
3935 if RO_nsr_id and RO_scaling_info:
3936 operationType = "SCALE-RO"
3937 match = {
3938 "member_vnf_index": vnf_index,
3939 "RO_nsr_id": RO_nsr_id,
3940 "RO_scaling_info": RO_scaling_info,
3941 }
3942 else:
3943 match = {
3944 "member_vnf_index": vnf_index,
3945 "primitive": vnf_config_primitive,
3946 "primitive_params": primitive_params,
3947 "lcmOperationType": operationType,
3948 }
3949 op_index = self._find_suboperation(db_nslcmop, match)
3950 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3951 # a. New sub-operation
3952 # The sub-operation does not exist, add it.
3953 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3954 # The following parameters are set to None for all kind of scaling:
3955 vdu_id = None
3956 vdu_count_index = None
3957 vdu_name = None
3958 if RO_nsr_id and RO_scaling_info:
3959 vnf_config_primitive = None
3960 primitive_params = None
3961 else:
3962 RO_nsr_id = None
3963 RO_scaling_info = None
3964 # Initial status for sub-operation
3965 operationState = "PROCESSING"
3966 detailed_status = "In progress"
3967 # Add sub-operation for pre/post-scaling (zero or more operations)
3968 self._add_suboperation(
3969 db_nslcmop,
3970 vnf_index,
3971 vdu_id,
3972 vdu_count_index,
3973 vdu_name,
3974 vnf_config_primitive,
3975 primitive_params,
3976 operationState,
3977 detailed_status,
3978 operationType,
3979 RO_nsr_id,
3980 RO_scaling_info,
3981 )
3982 return self.SUBOPERATION_STATUS_NEW
3983 else:
3984 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3985 # or op_index (operationState != 'COMPLETED')
3986 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3987
3988 # Function to return execution_environment id
3989
3990 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3991 # TODO vdu_index_count
3992 for vca in vca_deployed_list:
3993 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3994 return vca["ee_id"]
3995
3996 async def destroy_N2VC(
3997 self,
3998 logging_text,
3999 db_nslcmop,
4000 vca_deployed,
4001 config_descriptor,
4002 vca_index,
4003 destroy_ee=True,
4004 exec_primitives=True,
4005 scaling_in=False,
4006 vca_id: str = None,
4007 ):
4008 """
4009 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4010 :param logging_text:
4011 :param db_nslcmop:
4012 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4013 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4014 :param vca_index: index in the database _admin.deployed.VCA
4015 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4016 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4017 not executed properly
4018 :param scaling_in: True destroys the application, False destroys the model
4019 :return: None or exception
4020 """
4021
4022 self.logger.debug(
4023 logging_text
4024 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4025 vca_index, vca_deployed, config_descriptor, destroy_ee
4026 )
4027 )
4028
4029 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
4030
4031 # execute terminate_primitives
4032 if exec_primitives:
4033 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
4034 config_descriptor.get("terminate-config-primitive"),
4035 vca_deployed.get("ee_descriptor_id"),
4036 )
4037 vdu_id = vca_deployed.get("vdu_id")
4038 vdu_count_index = vca_deployed.get("vdu_count_index")
4039 vdu_name = vca_deployed.get("vdu_name")
4040 vnf_index = vca_deployed.get("member-vnf-index")
4041 if terminate_primitives and vca_deployed.get("needed_terminate"):
4042 for seq in terminate_primitives:
4043 # For each sequence in list, get primitive and call _ns_execute_primitive()
4044 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
4045 vnf_index, seq.get("name")
4046 )
4047 self.logger.debug(logging_text + step)
4048 # Create the primitive for each sequence, i.e. "primitive": "touch"
4049 primitive = seq.get("name")
4050 mapped_primitive_params = self._get_terminate_primitive_params(
4051 seq, vnf_index
4052 )
4053
4054 # Add sub-operation
4055 self._add_suboperation(
4056 db_nslcmop,
4057 vnf_index,
4058 vdu_id,
4059 vdu_count_index,
4060 vdu_name,
4061 primitive,
4062 mapped_primitive_params,
4063 )
4064 # Sub-operations: Call _ns_execute_primitive() instead of action()
4065 try:
4066 result, result_detail = await self._ns_execute_primitive(
4067 vca_deployed["ee_id"],
4068 primitive,
4069 mapped_primitive_params,
4070 vca_type=vca_type,
4071 vca_id=vca_id,
4072 )
4073 except LcmException:
4074 # this happens when VCA is not deployed. In this case it is not needed to terminate
4075 continue
4076 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4077 if result not in result_ok:
4078 raise LcmException(
4079 "terminate_primitive {} for vnf_member_index={} fails with "
4080 "error {}".format(seq.get("name"), vnf_index, result_detail)
4081 )
4082 # set that this VCA do not need terminated
4083 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4084 vca_index
4085 )
4086 self.update_db_2(
4087 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4088 )
4089
4090 # Delete Prometheus Jobs if any
4091 # This uses NSR_ID, so it will destroy any jobs under this index
4092 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4093
4094 if destroy_ee:
4095 await self.vca_map[vca_type].delete_execution_environment(
4096 vca_deployed["ee_id"],
4097 scaling_in=scaling_in,
4098 vca_type=vca_type,
4099 vca_id=vca_id,
4100 )
4101
4102 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4103 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4104 namespace = "." + db_nsr["_id"]
4105 try:
4106 await self.n2vc.delete_namespace(
4107 namespace=namespace,
4108 total_timeout=self.timeout_charm_delete,
4109 vca_id=vca_id,
4110 )
4111 except N2VCNotFound: # already deleted. Skip
4112 pass
4113 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4114
4115 async def _terminate_RO(
4116 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4117 ):
4118 """
4119 Terminates a deployment from RO
4120 :param logging_text:
4121 :param nsr_deployed: db_nsr._admin.deployed
4122 :param nsr_id:
4123 :param nslcmop_id:
4124 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4125 this method will update only the index 2, but it will write on database the concatenated content of the list
4126 :return:
4127 """
4128 db_nsr_update = {}
4129 failed_detail = []
4130 ro_nsr_id = ro_delete_action = None
4131 if nsr_deployed and nsr_deployed.get("RO"):
4132 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4133 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4134 try:
4135 if ro_nsr_id:
4136 stage[2] = "Deleting ns from VIM."
4137 db_nsr_update["detailed-status"] = " ".join(stage)
4138 self._write_op_status(nslcmop_id, stage)
4139 self.logger.debug(logging_text + stage[2])
4140 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4141 self._write_op_status(nslcmop_id, stage)
4142 desc = await self.RO.delete("ns", ro_nsr_id)
4143 ro_delete_action = desc["action_id"]
4144 db_nsr_update[
4145 "_admin.deployed.RO.nsr_delete_action_id"
4146 ] = ro_delete_action
4147 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4148 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4149 if ro_delete_action:
4150 # wait until NS is deleted from VIM
4151 stage[2] = "Waiting ns deleted from VIM."
4152 detailed_status_old = None
4153 self.logger.debug(
4154 logging_text
4155 + stage[2]
4156 + " RO_id={} ro_delete_action={}".format(
4157 ro_nsr_id, ro_delete_action
4158 )
4159 )
4160 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4161 self._write_op_status(nslcmop_id, stage)
4162
4163 delete_timeout = 20 * 60 # 20 minutes
4164 while delete_timeout > 0:
4165 desc = await self.RO.show(
4166 "ns",
4167 item_id_name=ro_nsr_id,
4168 extra_item="action",
4169 extra_item_id=ro_delete_action,
4170 )
4171
4172 # deploymentStatus
4173 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4174
4175 ns_status, ns_status_info = self.RO.check_action_status(desc)
4176 if ns_status == "ERROR":
4177 raise ROclient.ROClientException(ns_status_info)
4178 elif ns_status == "BUILD":
4179 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4180 elif ns_status == "ACTIVE":
4181 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4182 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4183 break
4184 else:
4185 assert (
4186 False
4187 ), "ROclient.check_action_status returns unknown {}".format(
4188 ns_status
4189 )
4190 if stage[2] != detailed_status_old:
4191 detailed_status_old = stage[2]
4192 db_nsr_update["detailed-status"] = " ".join(stage)
4193 self._write_op_status(nslcmop_id, stage)
4194 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4195 await asyncio.sleep(5, loop=self.loop)
4196 delete_timeout -= 5
4197 else: # delete_timeout <= 0:
4198 raise ROclient.ROClientException(
4199 "Timeout waiting ns deleted from VIM"
4200 )
4201
4202 except Exception as e:
4203 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4204 if (
4205 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4206 ): # not found
4207 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4208 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4209 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4210 self.logger.debug(
4211 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4212 )
4213 elif (
4214 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4215 ): # conflict
4216 failed_detail.append("delete conflict: {}".format(e))
4217 self.logger.debug(
4218 logging_text
4219 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4220 )
4221 else:
4222 failed_detail.append("delete error: {}".format(e))
4223 self.logger.error(
4224 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4225 )
4226
4227 # Delete nsd
4228 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4229 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4230 try:
4231 stage[2] = "Deleting nsd from RO."
4232 db_nsr_update["detailed-status"] = " ".join(stage)
4233 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4234 self._write_op_status(nslcmop_id, stage)
4235 await self.RO.delete("nsd", ro_nsd_id)
4236 self.logger.debug(
4237 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4238 )
4239 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4240 except Exception as e:
4241 if (
4242 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4243 ): # not found
4244 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4245 self.logger.debug(
4246 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4247 )
4248 elif (
4249 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4250 ): # conflict
4251 failed_detail.append(
4252 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4253 )
4254 self.logger.debug(logging_text + failed_detail[-1])
4255 else:
4256 failed_detail.append(
4257 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4258 )
4259 self.logger.error(logging_text + failed_detail[-1])
4260
4261 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4262 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4263 if not vnf_deployed or not vnf_deployed["id"]:
4264 continue
4265 try:
4266 ro_vnfd_id = vnf_deployed["id"]
4267 stage[
4268 2
4269 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4270 vnf_deployed["member-vnf-index"], ro_vnfd_id
4271 )
4272 db_nsr_update["detailed-status"] = " ".join(stage)
4273 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4274 self._write_op_status(nslcmop_id, stage)
4275 await self.RO.delete("vnfd", ro_vnfd_id)
4276 self.logger.debug(
4277 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4278 )
4279 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4280 except Exception as e:
4281 if (
4282 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4283 ): # not found
4284 db_nsr_update[
4285 "_admin.deployed.RO.vnfd.{}.id".format(index)
4286 ] = None
4287 self.logger.debug(
4288 logging_text
4289 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4290 )
4291 elif (
4292 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4293 ): # conflict
4294 failed_detail.append(
4295 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4296 )
4297 self.logger.debug(logging_text + failed_detail[-1])
4298 else:
4299 failed_detail.append(
4300 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4301 )
4302 self.logger.error(logging_text + failed_detail[-1])
4303
4304 if failed_detail:
4305 stage[2] = "Error deleting from VIM"
4306 else:
4307 stage[2] = "Deleted from VIM"
4308 db_nsr_update["detailed-status"] = " ".join(stage)
4309 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4310 self._write_op_status(nslcmop_id, stage)
4311
4312 if failed_detail:
4313 raise LcmException("; ".join(failed_detail))
4314
4315 async def terminate(self, nsr_id, nslcmop_id):
4316 # Try to lock HA task here
4317 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4318 if not task_is_locked_by_me:
4319 return
4320
4321 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4322 self.logger.debug(logging_text + "Enter")
4323 timeout_ns_terminate = self.timeout_ns_terminate
4324 db_nsr = None
4325 db_nslcmop = None
4326 operation_params = None
4327 exc = None
4328 error_list = [] # annotates all failed error messages
4329 db_nslcmop_update = {}
4330 autoremove = False # autoremove after terminated
4331 tasks_dict_info = {}
4332 db_nsr_update = {}
4333 stage = [
4334 "Stage 1/3: Preparing task.",
4335 "Waiting for previous operations to terminate.",
4336 "",
4337 ]
4338 # ^ contains [stage, step, VIM-status]
4339 try:
4340 # wait for any previous tasks in process
4341 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4342
4343 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4344 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4345 operation_params = db_nslcmop.get("operationParams") or {}
4346 if operation_params.get("timeout_ns_terminate"):
4347 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4348 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4349 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4350
4351 db_nsr_update["operational-status"] = "terminating"
4352 db_nsr_update["config-status"] = "terminating"
4353 self._write_ns_status(
4354 nsr_id=nsr_id,
4355 ns_state="TERMINATING",
4356 current_operation="TERMINATING",
4357 current_operation_id=nslcmop_id,
4358 other_update=db_nsr_update,
4359 )
4360 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4361 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4362 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4363 return
4364
4365 stage[1] = "Getting vnf descriptors from db."
4366 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4367 db_vnfrs_dict = {
4368 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4369 }
4370 db_vnfds_from_id = {}
4371 db_vnfds_from_member_index = {}
4372 # Loop over VNFRs
4373 for vnfr in db_vnfrs_list:
4374 vnfd_id = vnfr["vnfd-id"]
4375 if vnfd_id not in db_vnfds_from_id:
4376 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4377 db_vnfds_from_id[vnfd_id] = vnfd
4378 db_vnfds_from_member_index[
4379 vnfr["member-vnf-index-ref"]
4380 ] = db_vnfds_from_id[vnfd_id]
4381
4382 # Destroy individual execution environments when there are terminating primitives.
4383 # Rest of EE will be deleted at once
4384 # TODO - check before calling _destroy_N2VC
4385 # if not operation_params.get("skip_terminate_primitives"):#
4386 # or not vca.get("needed_terminate"):
4387 stage[0] = "Stage 2/3 execute terminating primitives."
4388 self.logger.debug(logging_text + stage[0])
4389 stage[1] = "Looking execution environment that needs terminate."
4390 self.logger.debug(logging_text + stage[1])
4391
4392 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4393 config_descriptor = None
4394 vca_member_vnf_index = vca.get("member-vnf-index")
4395 vca_id = self.get_vca_id(
4396 db_vnfrs_dict.get(vca_member_vnf_index)
4397 if vca_member_vnf_index
4398 else None,
4399 db_nsr,
4400 )
4401 if not vca or not vca.get("ee_id"):
4402 continue
4403 if not vca.get("member-vnf-index"):
4404 # ns
4405 config_descriptor = db_nsr.get("ns-configuration")
4406 elif vca.get("vdu_id"):
4407 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4408 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4409 elif vca.get("kdu_name"):
4410 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4411 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4412 else:
4413 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4414 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4415 vca_type = vca.get("type")
4416 exec_terminate_primitives = not operation_params.get(
4417 "skip_terminate_primitives"
4418 ) and vca.get("needed_terminate")
4419 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4420 # pending native charms
4421 destroy_ee = (
4422 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4423 )
4424 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4425 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4426 task = asyncio.ensure_future(
4427 self.destroy_N2VC(
4428 logging_text,
4429 db_nslcmop,
4430 vca,
4431 config_descriptor,
4432 vca_index,
4433 destroy_ee,
4434 exec_terminate_primitives,
4435 vca_id=vca_id,
4436 )
4437 )
4438 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4439
4440 # wait for pending tasks of terminate primitives
4441 if tasks_dict_info:
4442 self.logger.debug(
4443 logging_text
4444 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4445 )
4446 error_list = await self._wait_for_tasks(
4447 logging_text,
4448 tasks_dict_info,
4449 min(self.timeout_charm_delete, timeout_ns_terminate),
4450 stage,
4451 nslcmop_id,
4452 )
4453 tasks_dict_info.clear()
4454 if error_list:
4455 return # raise LcmException("; ".join(error_list))
4456
4457 # remove All execution environments at once
4458 stage[0] = "Stage 3/3 delete all."
4459
4460 if nsr_deployed.get("VCA"):
4461 stage[1] = "Deleting all execution environments."
4462 self.logger.debug(logging_text + stage[1])
4463 vca_id = self.get_vca_id({}, db_nsr)
4464 task_delete_ee = asyncio.ensure_future(
4465 asyncio.wait_for(
4466 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4467 timeout=self.timeout_charm_delete,
4468 )
4469 )
4470 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4471 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4472
4473 # Delete from k8scluster
4474 stage[1] = "Deleting KDUs."
4475 self.logger.debug(logging_text + stage[1])
4476 # print(nsr_deployed)
4477 for kdu in get_iterable(nsr_deployed, "K8s"):
4478 if not kdu or not kdu.get("kdu-instance"):
4479 continue
4480 kdu_instance = kdu.get("kdu-instance")
4481 if kdu.get("k8scluster-type") in self.k8scluster_map:
4482 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4483 vca_id = self.get_vca_id({}, db_nsr)
4484 task_delete_kdu_instance = asyncio.ensure_future(
4485 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4486 cluster_uuid=kdu.get("k8scluster-uuid"),
4487 kdu_instance=kdu_instance,
4488 vca_id=vca_id,
4489 )
4490 )
4491 else:
4492 self.logger.error(
4493 logging_text
4494 + "Unknown k8s deployment type {}".format(
4495 kdu.get("k8scluster-type")
4496 )
4497 )
4498 continue
4499 tasks_dict_info[
4500 task_delete_kdu_instance
4501 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4502
4503 # remove from RO
4504 stage[1] = "Deleting ns from VIM."
4505 if self.ng_ro:
4506 task_delete_ro = asyncio.ensure_future(
4507 self._terminate_ng_ro(
4508 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4509 )
4510 )
4511 else:
4512 task_delete_ro = asyncio.ensure_future(
4513 self._terminate_RO(
4514 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4515 )
4516 )
4517 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4518
4519 # rest of staff will be done at finally
4520
4521 except (
4522 ROclient.ROClientException,
4523 DbException,
4524 LcmException,
4525 N2VCException,
4526 ) as e:
4527 self.logger.error(logging_text + "Exit Exception {}".format(e))
4528 exc = e
4529 except asyncio.CancelledError:
4530 self.logger.error(
4531 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4532 )
4533 exc = "Operation was cancelled"
4534 except Exception as e:
4535 exc = traceback.format_exc()
4536 self.logger.critical(
4537 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4538 exc_info=True,
4539 )
4540 finally:
4541 if exc:
4542 error_list.append(str(exc))
4543 try:
4544 # wait for pending tasks
4545 if tasks_dict_info:
4546 stage[1] = "Waiting for terminate pending tasks."
4547 self.logger.debug(logging_text + stage[1])
4548 error_list += await self._wait_for_tasks(
4549 logging_text,
4550 tasks_dict_info,
4551 timeout_ns_terminate,
4552 stage,
4553 nslcmop_id,
4554 )
4555 stage[1] = stage[2] = ""
4556 except asyncio.CancelledError:
4557 error_list.append("Cancelled")
4558 # TODO cancell all tasks
4559 except Exception as exc:
4560 error_list.append(str(exc))
4561 # update status at database
4562 if error_list:
4563 error_detail = "; ".join(error_list)
4564 # self.logger.error(logging_text + error_detail)
4565 error_description_nslcmop = "{} Detail: {}".format(
4566 stage[0], error_detail
4567 )
4568 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4569 nslcmop_id, stage[0]
4570 )
4571
4572 db_nsr_update["operational-status"] = "failed"
4573 db_nsr_update["detailed-status"] = (
4574 error_description_nsr + " Detail: " + error_detail
4575 )
4576 db_nslcmop_update["detailed-status"] = error_detail
4577 nslcmop_operation_state = "FAILED"
4578 ns_state = "BROKEN"
4579 else:
4580 error_detail = None
4581 error_description_nsr = error_description_nslcmop = None
4582 ns_state = "NOT_INSTANTIATED"
4583 db_nsr_update["operational-status"] = "terminated"
4584 db_nsr_update["detailed-status"] = "Done"
4585 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4586 db_nslcmop_update["detailed-status"] = "Done"
4587 nslcmop_operation_state = "COMPLETED"
4588
4589 if db_nsr:
4590 self._write_ns_status(
4591 nsr_id=nsr_id,
4592 ns_state=ns_state,
4593 current_operation="IDLE",
4594 current_operation_id=None,
4595 error_description=error_description_nsr,
4596 error_detail=error_detail,
4597 other_update=db_nsr_update,
4598 )
4599 self._write_op_status(
4600 op_id=nslcmop_id,
4601 stage="",
4602 error_message=error_description_nslcmop,
4603 operation_state=nslcmop_operation_state,
4604 other_update=db_nslcmop_update,
4605 )
4606 if ns_state == "NOT_INSTANTIATED":
4607 try:
4608 self.db.set_list(
4609 "vnfrs",
4610 {"nsr-id-ref": nsr_id},
4611 {"_admin.nsState": "NOT_INSTANTIATED"},
4612 )
4613 except DbException as e:
4614 self.logger.warn(
4615 logging_text
4616 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4617 nsr_id, e
4618 )
4619 )
4620 if operation_params:
4621 autoremove = operation_params.get("autoremove", False)
4622 if nslcmop_operation_state:
4623 try:
4624 await self.msg.aiowrite(
4625 "ns",
4626 "terminated",
4627 {
4628 "nsr_id": nsr_id,
4629 "nslcmop_id": nslcmop_id,
4630 "operationState": nslcmop_operation_state,
4631 "autoremove": autoremove,
4632 },
4633 loop=self.loop,
4634 )
4635 except Exception as e:
4636 self.logger.error(
4637 logging_text + "kafka_write notification Exception {}".format(e)
4638 )
4639
4640 self.logger.debug(logging_text + "Exit")
4641 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4642
4643 async def _wait_for_tasks(
4644 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4645 ):
4646 time_start = time()
4647 error_detail_list = []
4648 error_list = []
4649 pending_tasks = list(created_tasks_info.keys())
4650 num_tasks = len(pending_tasks)
4651 num_done = 0
4652 stage[1] = "{}/{}.".format(num_done, num_tasks)
4653 self._write_op_status(nslcmop_id, stage)
4654 while pending_tasks:
4655 new_error = None
4656 _timeout = timeout + time_start - time()
4657 done, pending_tasks = await asyncio.wait(
4658 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4659 )
4660 num_done += len(done)
4661 if not done: # Timeout
4662 for task in pending_tasks:
4663 new_error = created_tasks_info[task] + ": Timeout"
4664 error_detail_list.append(new_error)
4665 error_list.append(new_error)
4666 break
4667 for task in done:
4668 if task.cancelled():
4669 exc = "Cancelled"
4670 else:
4671 exc = task.exception()
4672 if exc:
4673 if isinstance(exc, asyncio.TimeoutError):
4674 exc = "Timeout"
4675 new_error = created_tasks_info[task] + ": {}".format(exc)
4676 error_list.append(created_tasks_info[task])
4677 error_detail_list.append(new_error)
4678 if isinstance(
4679 exc,
4680 (
4681 str,
4682 DbException,
4683 N2VCException,
4684 ROclient.ROClientException,
4685 LcmException,
4686 K8sException,
4687 NgRoException,
4688 ),
4689 ):
4690 self.logger.error(logging_text + new_error)
4691 else:
4692 exc_traceback = "".join(
4693 traceback.format_exception(None, exc, exc.__traceback__)
4694 )
4695 self.logger.error(
4696 logging_text
4697 + created_tasks_info[task]
4698 + " "
4699 + exc_traceback
4700 )
4701 else:
4702 self.logger.debug(
4703 logging_text + created_tasks_info[task] + ": Done"
4704 )
4705 stage[1] = "{}/{}.".format(num_done, num_tasks)
4706 if new_error:
4707 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4708 if nsr_id: # update also nsr
4709 self.update_db_2(
4710 "nsrs",
4711 nsr_id,
4712 {
4713 "errorDescription": "Error at: " + ", ".join(error_list),
4714 "errorDetail": ". ".join(error_detail_list),
4715 },
4716 )
4717 self._write_op_status(nslcmop_id, stage)
4718 return error_detail_list
4719
4720 @staticmethod
4721 def _map_primitive_params(primitive_desc, params, instantiation_params):
4722 """
4723 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4724 The default-value is used. If it is between < > it look for a value at instantiation_params
4725 :param primitive_desc: portion of VNFD/NSD that describes primitive
4726 :param params: Params provided by user
4727 :param instantiation_params: Instantiation params provided by user
4728 :return: a dictionary with the calculated params
4729 """
4730 calculated_params = {}
4731 for parameter in primitive_desc.get("parameter", ()):
4732 param_name = parameter["name"]
4733 if param_name in params:
4734 calculated_params[param_name] = params[param_name]
4735 elif "default-value" in parameter or "value" in parameter:
4736 if "value" in parameter:
4737 calculated_params[param_name] = parameter["value"]
4738 else:
4739 calculated_params[param_name] = parameter["default-value"]
4740 if (
4741 isinstance(calculated_params[param_name], str)
4742 and calculated_params[param_name].startswith("<")
4743 and calculated_params[param_name].endswith(">")
4744 ):
4745 if calculated_params[param_name][1:-1] in instantiation_params:
4746 calculated_params[param_name] = instantiation_params[
4747 calculated_params[param_name][1:-1]
4748 ]
4749 else:
4750 raise LcmException(
4751 "Parameter {} needed to execute primitive {} not provided".format(
4752 calculated_params[param_name], primitive_desc["name"]
4753 )
4754 )
4755 else:
4756 raise LcmException(
4757 "Parameter {} needed to execute primitive {} not provided".format(
4758 param_name, primitive_desc["name"]
4759 )
4760 )
4761
4762 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4763 calculated_params[param_name] = yaml.safe_dump(
4764 calculated_params[param_name], default_flow_style=True, width=256
4765 )
4766 elif isinstance(calculated_params[param_name], str) and calculated_params[
4767 param_name
4768 ].startswith("!!yaml "):
4769 calculated_params[param_name] = calculated_params[param_name][7:]
4770 if parameter.get("data-type") == "INTEGER":
4771 try:
4772 calculated_params[param_name] = int(calculated_params[param_name])
4773 except ValueError: # error converting string to int
4774 raise LcmException(
4775 "Parameter {} of primitive {} must be integer".format(
4776 param_name, primitive_desc["name"]
4777 )
4778 )
4779 elif parameter.get("data-type") == "BOOLEAN":
4780 calculated_params[param_name] = not (
4781 (str(calculated_params[param_name])).lower() == "false"
4782 )
4783
4784 # add always ns_config_info if primitive name is config
4785 if primitive_desc["name"] == "config":
4786 if "ns_config_info" in instantiation_params:
4787 calculated_params["ns_config_info"] = instantiation_params[
4788 "ns_config_info"
4789 ]
4790 return calculated_params
4791
4792 def _look_for_deployed_vca(
4793 self,
4794 deployed_vca,
4795 member_vnf_index,
4796 vdu_id,
4797 vdu_count_index,
4798 kdu_name=None,
4799 ee_descriptor_id=None,
4800 ):
4801 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4802 for vca in deployed_vca:
4803 if not vca:
4804 continue
4805 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4806 continue
4807 if (
4808 vdu_count_index is not None
4809 and vdu_count_index != vca["vdu_count_index"]
4810 ):
4811 continue
4812 if kdu_name and kdu_name != vca["kdu_name"]:
4813 continue
4814 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4815 continue
4816 break
4817 else:
4818 # vca_deployed not found
4819 raise LcmException(
4820 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4821 " is not deployed".format(
4822 member_vnf_index,
4823 vdu_id,
4824 vdu_count_index,
4825 kdu_name,
4826 ee_descriptor_id,
4827 )
4828 )
4829 # get ee_id
4830 ee_id = vca.get("ee_id")
4831 vca_type = vca.get(
4832 "type", "lxc_proxy_charm"
4833 ) # default value for backward compatibility - proxy charm
4834 if not ee_id:
4835 raise LcmException(
4836 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4837 "execution environment".format(
4838 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4839 )
4840 )
4841 return ee_id, vca_type
4842
4843 async def _ns_execute_primitive(
4844 self,
4845 ee_id,
4846 primitive,
4847 primitive_params,
4848 retries=0,
4849 retries_interval=30,
4850 timeout=None,
4851 vca_type=None,
4852 db_dict=None,
4853 vca_id: str = None,
4854 ) -> (str, str):
4855 try:
4856 if primitive == "config":
4857 primitive_params = {"params": primitive_params}
4858
4859 vca_type = vca_type or "lxc_proxy_charm"
4860
4861 while retries >= 0:
4862 try:
4863 output = await asyncio.wait_for(
4864 self.vca_map[vca_type].exec_primitive(
4865 ee_id=ee_id,
4866 primitive_name=primitive,
4867 params_dict=primitive_params,
4868 progress_timeout=self.timeout_progress_primitive,
4869 total_timeout=self.timeout_primitive,
4870 db_dict=db_dict,
4871 vca_id=vca_id,
4872 vca_type=vca_type,
4873 ),
4874 timeout=timeout or self.timeout_primitive,
4875 )
4876 # execution was OK
4877 break
4878 except asyncio.CancelledError:
4879 raise
4880 except Exception as e: # asyncio.TimeoutError
4881 if isinstance(e, asyncio.TimeoutError):
4882 e = "Timeout"
4883 retries -= 1
4884 if retries >= 0:
4885 self.logger.debug(
4886 "Error executing action {} on {} -> {}".format(
4887 primitive, ee_id, e
4888 )
4889 )
4890 # wait and retry
4891 await asyncio.sleep(retries_interval, loop=self.loop)
4892 else:
4893 return "FAILED", str(e)
4894
4895 return "COMPLETED", output
4896
4897 except (LcmException, asyncio.CancelledError):
4898 raise
4899 except Exception as e:
4900 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4901
4902 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4903 """
4904 Updating the vca_status with latest juju information in nsrs record
4905 :param: nsr_id: Id of the nsr
4906 :param: nslcmop_id: Id of the nslcmop
4907 :return: None
4908 """
4909
4910 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4911 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4912 vca_id = self.get_vca_id({}, db_nsr)
4913 if db_nsr["_admin"]["deployed"]["K8s"]:
4914 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4915 cluster_uuid, kdu_instance, cluster_type = (
4916 k8s["k8scluster-uuid"],
4917 k8s["kdu-instance"],
4918 k8s["k8scluster-type"],
4919 )
4920 await self._on_update_k8s_db(
4921 cluster_uuid=cluster_uuid,
4922 kdu_instance=kdu_instance,
4923 filter={"_id": nsr_id},
4924 vca_id=vca_id,
4925 cluster_type=cluster_type,
4926 )
4927 else:
4928 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4929 table, filter = "nsrs", {"_id": nsr_id}
4930 path = "_admin.deployed.VCA.{}.".format(vca_index)
4931 await self._on_update_n2vc_db(table, filter, path, {})
4932
4933 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4934 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4935
4936 async def action(self, nsr_id, nslcmop_id):
4937 # Try to lock HA task here
4938 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4939 if not task_is_locked_by_me:
4940 return
4941
4942 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4943 self.logger.debug(logging_text + "Enter")
4944 # get all needed from database
4945 db_nsr = None
4946 db_nslcmop = None
4947 db_nsr_update = {}
4948 db_nslcmop_update = {}
4949 nslcmop_operation_state = None
4950 error_description_nslcmop = None
4951 exc = None
4952 try:
4953 # wait for any previous tasks in process
4954 step = "Waiting for previous operations to terminate"
4955 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4956
4957 self._write_ns_status(
4958 nsr_id=nsr_id,
4959 ns_state=None,
4960 current_operation="RUNNING ACTION",
4961 current_operation_id=nslcmop_id,
4962 )
4963
4964 step = "Getting information from database"
4965 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4966 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4967 if db_nslcmop["operationParams"].get("primitive_params"):
4968 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4969 db_nslcmop["operationParams"]["primitive_params"]
4970 )
4971
4972 nsr_deployed = db_nsr["_admin"].get("deployed")
4973 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4974 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4975 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4976 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4977 primitive = db_nslcmop["operationParams"]["primitive"]
4978 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4979 timeout_ns_action = db_nslcmop["operationParams"].get(
4980 "timeout_ns_action", self.timeout_primitive
4981 )
4982
4983 if vnf_index:
4984 step = "Getting vnfr from database"
4985 db_vnfr = self.db.get_one(
4986 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4987 )
4988 if db_vnfr.get("kdur"):
4989 kdur_list = []
4990 for kdur in db_vnfr["kdur"]:
4991 if kdur.get("additionalParams"):
4992 kdur["additionalParams"] = json.loads(
4993 kdur["additionalParams"]
4994 )
4995 kdur_list.append(kdur)
4996 db_vnfr["kdur"] = kdur_list
4997 step = "Getting vnfd from database"
4998 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4999
5000 # Sync filesystem before running a primitive
5001 self.fs.sync(db_vnfr["vnfd-id"])
5002 else:
5003 step = "Getting nsd from database"
5004 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
5005
5006 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5007 # for backward compatibility
5008 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5009 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5010 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5011 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5012
5013 # look for primitive
5014 config_primitive_desc = descriptor_configuration = None
5015 if vdu_id:
5016 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
5017 elif kdu_name:
5018 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
5019 elif vnf_index:
5020 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
5021 else:
5022 descriptor_configuration = db_nsd.get("ns-configuration")
5023
5024 if descriptor_configuration and descriptor_configuration.get(
5025 "config-primitive"
5026 ):
5027 for config_primitive in descriptor_configuration["config-primitive"]:
5028 if config_primitive["name"] == primitive:
5029 config_primitive_desc = config_primitive
5030 break
5031
5032 if not config_primitive_desc:
5033 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
5034 raise LcmException(
5035 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5036 primitive
5037 )
5038 )
5039 primitive_name = primitive
5040 ee_descriptor_id = None
5041 else:
5042 primitive_name = config_primitive_desc.get(
5043 "execution-environment-primitive", primitive
5044 )
5045 ee_descriptor_id = config_primitive_desc.get(
5046 "execution-environment-ref"
5047 )
5048
5049 if vnf_index:
5050 if vdu_id:
5051 vdur = next(
5052 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
5053 )
5054 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
5055 elif kdu_name:
5056 kdur = next(
5057 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
5058 )
5059 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
5060 else:
5061 desc_params = parse_yaml_strings(
5062 db_vnfr.get("additionalParamsForVnf")
5063 )
5064 else:
5065 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
5066 if kdu_name and get_configuration(db_vnfd, kdu_name):
5067 kdu_configuration = get_configuration(db_vnfd, kdu_name)
5068 actions = set()
5069 for primitive in kdu_configuration.get("initial-config-primitive", []):
5070 actions.add(primitive["name"])
5071 for primitive in kdu_configuration.get("config-primitive", []):
5072 actions.add(primitive["name"])
5073 kdu = find_in_list(
5074 nsr_deployed["K8s"],
5075 lambda kdu: kdu_name == kdu["kdu-name"]
5076 and kdu["member-vnf-index"] == vnf_index,
5077 )
5078 kdu_action = (
5079 True
5080 if primitive_name in actions
5081 and kdu["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5082 else False
5083 )
5084
5085 # TODO check if ns is in a proper status
5086 if kdu_name and (
5087 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5088 ):
5089 # kdur and desc_params already set from before
5090 if primitive_params:
5091 desc_params.update(primitive_params)
5092 # TODO Check if we will need something at vnf level
5093 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5094 if (
5095 kdu_name == kdu["kdu-name"]
5096 and kdu["member-vnf-index"] == vnf_index
5097 ):
5098 break
5099 else:
5100 raise LcmException(
5101 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5102 )
5103
5104 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5105 msg = "unknown k8scluster-type '{}'".format(
5106 kdu.get("k8scluster-type")
5107 )
5108 raise LcmException(msg)
5109
5110 db_dict = {
5111 "collection": "nsrs",
5112 "filter": {"_id": nsr_id},
5113 "path": "_admin.deployed.K8s.{}".format(index),
5114 }
5115 self.logger.debug(
5116 logging_text
5117 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5118 )
5119 step = "Executing kdu {}".format(primitive_name)
5120 if primitive_name == "upgrade":
5121 if desc_params.get("kdu_model"):
5122 kdu_model = desc_params.get("kdu_model")
5123 del desc_params["kdu_model"]
5124 else:
5125 kdu_model = kdu.get("kdu-model")
5126 parts = kdu_model.split(sep=":")
5127 if len(parts) == 2:
5128 kdu_model = parts[0]
5129
5130 detailed_status = await asyncio.wait_for(
5131 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5132 cluster_uuid=kdu.get("k8scluster-uuid"),
5133 kdu_instance=kdu.get("kdu-instance"),
5134 atomic=True,
5135 kdu_model=kdu_model,
5136 params=desc_params,
5137 db_dict=db_dict,
5138 timeout=timeout_ns_action,
5139 ),
5140 timeout=timeout_ns_action + 10,
5141 )
5142 self.logger.debug(
5143 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5144 )
5145 elif primitive_name == "rollback":
5146 detailed_status = await asyncio.wait_for(
5147 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5148 cluster_uuid=kdu.get("k8scluster-uuid"),
5149 kdu_instance=kdu.get("kdu-instance"),
5150 db_dict=db_dict,
5151 ),
5152 timeout=timeout_ns_action,
5153 )
5154 elif primitive_name == "status":
5155 detailed_status = await asyncio.wait_for(
5156 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5157 cluster_uuid=kdu.get("k8scluster-uuid"),
5158 kdu_instance=kdu.get("kdu-instance"),
5159 vca_id=vca_id,
5160 ),
5161 timeout=timeout_ns_action,
5162 )
5163 else:
5164 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5165 kdu["kdu-name"], nsr_id
5166 )
5167 params = self._map_primitive_params(
5168 config_primitive_desc, primitive_params, desc_params
5169 )
5170
5171 detailed_status = await asyncio.wait_for(
5172 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5173 cluster_uuid=kdu.get("k8scluster-uuid"),
5174 kdu_instance=kdu_instance,
5175 primitive_name=primitive_name,
5176 params=params,
5177 db_dict=db_dict,
5178 timeout=timeout_ns_action,
5179 vca_id=vca_id,
5180 ),
5181 timeout=timeout_ns_action,
5182 )
5183
5184 if detailed_status:
5185 nslcmop_operation_state = "COMPLETED"
5186 else:
5187 detailed_status = ""
5188 nslcmop_operation_state = "FAILED"
5189 else:
5190 ee_id, vca_type = self._look_for_deployed_vca(
5191 nsr_deployed["VCA"],
5192 member_vnf_index=vnf_index,
5193 vdu_id=vdu_id,
5194 vdu_count_index=vdu_count_index,
5195 ee_descriptor_id=ee_descriptor_id,
5196 )
5197 for vca_index, vca_deployed in enumerate(
5198 db_nsr["_admin"]["deployed"]["VCA"]
5199 ):
5200 if vca_deployed.get("member-vnf-index") == vnf_index:
5201 db_dict = {
5202 "collection": "nsrs",
5203 "filter": {"_id": nsr_id},
5204 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5205 }
5206 break
5207 (
5208 nslcmop_operation_state,
5209 detailed_status,
5210 ) = await self._ns_execute_primitive(
5211 ee_id,
5212 primitive=primitive_name,
5213 primitive_params=self._map_primitive_params(
5214 config_primitive_desc, primitive_params, desc_params
5215 ),
5216 timeout=timeout_ns_action,
5217 vca_type=vca_type,
5218 db_dict=db_dict,
5219 vca_id=vca_id,
5220 )
5221
5222 db_nslcmop_update["detailed-status"] = detailed_status
5223 error_description_nslcmop = (
5224 detailed_status if nslcmop_operation_state == "FAILED" else ""
5225 )
5226 self.logger.debug(
5227 logging_text
5228 + " task Done with result {} {}".format(
5229 nslcmop_operation_state, detailed_status
5230 )
5231 )
5232 return # database update is called inside finally
5233
5234 except (DbException, LcmException, N2VCException, K8sException) as e:
5235 self.logger.error(logging_text + "Exit Exception {}".format(e))
5236 exc = e
5237 except asyncio.CancelledError:
5238 self.logger.error(
5239 logging_text + "Cancelled Exception while '{}'".format(step)
5240 )
5241 exc = "Operation was cancelled"
5242 except asyncio.TimeoutError:
5243 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5244 exc = "Timeout"
5245 except Exception as e:
5246 exc = traceback.format_exc()
5247 self.logger.critical(
5248 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5249 exc_info=True,
5250 )
5251 finally:
5252 if exc:
5253 db_nslcmop_update[
5254 "detailed-status"
5255 ] = (
5256 detailed_status
5257 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5258 nslcmop_operation_state = "FAILED"
5259 if db_nsr:
5260 self._write_ns_status(
5261 nsr_id=nsr_id,
5262 ns_state=db_nsr[
5263 "nsState"
5264 ], # TODO check if degraded. For the moment use previous status
5265 current_operation="IDLE",
5266 current_operation_id=None,
5267 # error_description=error_description_nsr,
5268 # error_detail=error_detail,
5269 other_update=db_nsr_update,
5270 )
5271
5272 self._write_op_status(
5273 op_id=nslcmop_id,
5274 stage="",
5275 error_message=error_description_nslcmop,
5276 operation_state=nslcmop_operation_state,
5277 other_update=db_nslcmop_update,
5278 )
5279
5280 if nslcmop_operation_state:
5281 try:
5282 await self.msg.aiowrite(
5283 "ns",
5284 "actioned",
5285 {
5286 "nsr_id": nsr_id,
5287 "nslcmop_id": nslcmop_id,
5288 "operationState": nslcmop_operation_state,
5289 },
5290 loop=self.loop,
5291 )
5292 except Exception as e:
5293 self.logger.error(
5294 logging_text + "kafka_write notification Exception {}".format(e)
5295 )
5296 self.logger.debug(logging_text + "Exit")
5297 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5298 return nslcmop_operation_state, detailed_status
5299
5300 async def _ns_charm_upgrade(
5301 self,
5302 ee_id,
5303 charm_id,
5304 charm_type,
5305 path,
5306 timeout: float = None,
5307 ) -> (str, str):
5308 """This method upgrade charms in VNF instances
5309
5310 Args:
5311 ee_id: Execution environment id
5312 path: Local path to the charm
5313 charm_id: charm-id
5314 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5315 timeout: (Float) Timeout for the ns update operation
5316
5317 Returns:
5318 result: (str, str) COMPLETED/FAILED, details
5319 """
5320 try:
5321 charm_type = charm_type or "lxc_proxy_charm"
5322 output = await self.vca_map[charm_type].upgrade_charm(
5323 ee_id=ee_id,
5324 path=path,
5325 charm_id=charm_id,
5326 charm_type=charm_type,
5327 timeout=timeout or self.timeout_ns_update,
5328 )
5329
5330 if output:
5331 return "COMPLETED", output
5332
5333 except (LcmException, asyncio.CancelledError):
5334 raise
5335
5336 except Exception as e:
5337
5338 self.logger.debug("Error upgrading charm {}".format(path))
5339
5340 return "FAILED", "Error upgrading charm {}: {}".format(path, e)
5341
5342 async def update(self, nsr_id, nslcmop_id):
5343 """Update NS according to different update types
5344
5345 This method performs upgrade of VNF instances then updates the revision
5346 number in VNF record
5347
5348 Args:
5349 nsr_id: Network service will be updated
5350 nslcmop_id: ns lcm operation id
5351
5352 Returns:
5353 It may raise DbException, LcmException, N2VCException, K8sException
5354
5355 """
5356 # Try to lock HA task here
5357 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5358 if not task_is_locked_by_me:
5359 return
5360
5361 logging_text = "Task ns={} update={} ".format(nsr_id, nslcmop_id)
5362 self.logger.debug(logging_text + "Enter")
5363
5364 # Set the required variables to be filled up later
5365 db_nsr = None
5366 db_nslcmop_update = {}
5367 vnfr_update = {}
5368 nslcmop_operation_state = None
5369 db_nsr_update = {}
5370 error_description_nslcmop = ""
5371 exc = None
5372 change_type = ""
5373 detailed_status = ""
5374
5375 try:
5376 # wait for any previous tasks in process
5377 step = "Waiting for previous operations to terminate"
5378 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5379 self._write_ns_status(
5380 nsr_id=nsr_id,
5381 ns_state=None,
5382 current_operation="UPDATING",
5383 current_operation_id=nslcmop_id,
5384 )
5385
5386 step = "Getting nslcmop from database"
5387 db_nslcmop = self.db.get_one(
5388 "nslcmops", {"_id": nslcmop_id}, fail_on_empty=False
5389 )
5390 update_type = db_nslcmop["operationParams"]["updateType"]
5391
5392 step = "Getting nsr from database"
5393 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5394 old_operational_status = db_nsr["operational-status"]
5395 db_nsr_update["operational-status"] = "updating"
5396 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5397 nsr_deployed = db_nsr["_admin"].get("deployed")
5398
5399 if update_type == "CHANGE_VNFPKG":
5400
5401 # Get the input parameters given through update request
5402 vnf_instance_id = db_nslcmop["operationParams"][
5403 "changeVnfPackageData"
5404 ].get("vnfInstanceId")
5405
5406 vnfd_id = db_nslcmop["operationParams"]["changeVnfPackageData"].get(
5407 "vnfdId"
5408 )
5409 timeout_seconds = db_nslcmop["operationParams"].get("timeout_ns_update")
5410
5411 step = "Getting vnfr from database"
5412 db_vnfr = self.db.get_one(
5413 "vnfrs", {"_id": vnf_instance_id}, fail_on_empty=False
5414 )
5415
5416 step = "Getting vnfds from database"
5417 # Latest VNFD
5418 latest_vnfd = self.db.get_one(
5419 "vnfds", {"_id": vnfd_id}, fail_on_empty=False
5420 )
5421 latest_vnfd_revision = latest_vnfd["_admin"].get("revision")
5422
5423 # Current VNFD
5424 current_vnf_revision = db_vnfr.get("revision", 1)
5425 current_vnfd = self.db.get_one(
5426 "vnfds_revisions",
5427 {"_id": vnfd_id + ":" + str(current_vnf_revision)},
5428 fail_on_empty=False,
5429 )
5430 # Charm artifact paths will be filled up later
5431 (
5432 current_charm_artifact_path,
5433 target_charm_artifact_path,
5434 charm_artifact_paths,
5435 ) = ([], [], [])
5436
5437 step = "Checking if revision has changed in VNFD"
5438 if current_vnf_revision != latest_vnfd_revision:
5439
5440 # There is new revision of VNFD, update operation is required
5441 current_vnfd_path = vnfd_id + ":" + str(current_vnf_revision)
5442 latest_vnfd_path = vnfd_id
5443
5444 step = "Removing the VNFD packages if they exist in the local path"
5445 shutil.rmtree(self.fs.path + current_vnfd_path, ignore_errors=True)
5446 shutil.rmtree(self.fs.path + latest_vnfd_path, ignore_errors=True)
5447
5448 step = "Get the VNFD packages from FSMongo"
5449 self.fs.sync(from_path=latest_vnfd_path)
5450 self.fs.sync(from_path=current_vnfd_path)
5451
5452 step = (
5453 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5454 )
5455 base_folder = latest_vnfd["_admin"]["storage"]
5456
5457 for charm_index, charm_deployed in enumerate(
5458 get_iterable(nsr_deployed, "VCA")
5459 ):
5460 vnf_index = db_vnfr.get("member-vnf-index-ref")
5461
5462 # Getting charm-id and charm-type
5463 if charm_deployed.get("member-vnf-index") == vnf_index:
5464 charm_id = self.get_vca_id(db_vnfr, db_nsr)
5465 charm_type = charm_deployed.get("type")
5466
5467 # Getting ee-id
5468 ee_id = charm_deployed.get("ee_id")
5469
5470 step = "Getting descriptor config"
5471 descriptor_config = get_configuration(
5472 current_vnfd, current_vnfd["id"]
5473 )
5474
5475 if "execution-environment-list" in descriptor_config:
5476 ee_list = descriptor_config.get(
5477 "execution-environment-list", []
5478 )
5479 else:
5480 ee_list = []
5481
5482 # There could be several charm used in the same VNF
5483 for ee_item in ee_list:
5484 if ee_item.get("juju"):
5485
5486 step = "Getting charm name"
5487 charm_name = ee_item["juju"].get("charm")
5488
5489 step = "Setting Charm artifact paths"
5490 current_charm_artifact_path.append(
5491 get_charm_artifact_path(
5492 base_folder,
5493 charm_name,
5494 charm_type,
5495 current_vnf_revision,
5496 )
5497 )
5498 target_charm_artifact_path.append(
5499 get_charm_artifact_path(
5500 base_folder,
5501 charm_name,
5502 charm_type,
5503 )
5504 )
5505
5506 charm_artifact_paths = zip(
5507 current_charm_artifact_path, target_charm_artifact_path
5508 )
5509
5510 step = "Checking if software version has changed in VNFD"
5511 if find_software_version(current_vnfd) != find_software_version(
5512 latest_vnfd
5513 ):
5514
5515 step = "Checking if existing VNF has charm"
5516 for current_charm_path, target_charm_path in list(
5517 charm_artifact_paths
5518 ):
5519 if current_charm_path:
5520 raise LcmException(
5521 "Software version change is not supported as VNF instance {} has charm.".format(
5522 vnf_instance_id
5523 )
5524 )
5525
5526 # There is no change in the charm package, then redeploy the VNF
5527 # based on new descriptor
5528 step = "Redeploying VNF"
5529 # This part is in https://osm.etsi.org/gerrit/11943
5530
5531 else:
5532 step = "Checking if any charm package has changed or not"
5533 for current_charm_path, target_charm_path in list(
5534 charm_artifact_paths
5535 ):
5536 if (
5537 current_charm_path
5538 and target_charm_path
5539 and self.check_charm_hash_changed(
5540 current_charm_path, target_charm_path
5541 )
5542 ):
5543
5544 step = "Checking whether VNF uses juju bundle"
5545 if check_juju_bundle_existence(current_vnfd):
5546
5547 raise LcmException(
5548 "Charm upgrade is not supported for the instance which"
5549 " uses juju-bundle: {}".format(
5550 check_juju_bundle_existence(current_vnfd)
5551 )
5552 )
5553
5554 step = "Upgrading Charm"
5555 (
5556 result,
5557 detailed_status,
5558 ) = await self._ns_charm_upgrade(
5559 ee_id=ee_id,
5560 charm_id=charm_id,
5561 charm_type=charm_type,
5562 path=self.fs.path + target_charm_path,
5563 timeout=timeout_seconds,
5564 )
5565
5566 if result == "FAILED":
5567 nslcmop_operation_state = result
5568 error_description_nslcmop = detailed_status
5569
5570 db_nslcmop_update["detailed-status"] = detailed_status
5571 self.logger.debug(
5572 logging_text
5573 + " step {} Done with result {} {}".format(
5574 step, nslcmop_operation_state, detailed_status
5575 )
5576 )
5577
5578 step = "Updating policies"
5579 # This part is in https://osm.etsi.org/gerrit/11943
5580
5581 # If nslcmop_operation_state is None, so any operation is not failed.
5582 if not nslcmop_operation_state:
5583 nslcmop_operation_state = "COMPLETED"
5584
5585 # If update CHANGE_VNFPKG nslcmop_operation is successful
5586 # vnf revision need to be updated
5587 vnfr_update["revision"] = latest_vnfd_revision
5588 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
5589
5590 self.logger.debug(
5591 logging_text
5592 + " task Done with result {} {}".format(
5593 nslcmop_operation_state, detailed_status
5594 )
5595 )
5596 elif update_type == "REMOVE_VNF":
5597 # This part is included in https://osm.etsi.org/gerrit/11876
5598 pass
5599
5600 # If nslcmop_operation_state is None, so any operation is not failed.
5601 # All operations are executed in overall.
5602 if not nslcmop_operation_state:
5603 nslcmop_operation_state = "COMPLETED"
5604 db_nsr_update["operational-status"] = old_operational_status
5605
5606 except (DbException, LcmException, N2VCException, K8sException) as e:
5607 self.logger.error(logging_text + "Exit Exception {}".format(e))
5608 exc = e
5609 except asyncio.CancelledError:
5610 self.logger.error(
5611 logging_text + "Cancelled Exception while '{}'".format(step)
5612 )
5613 exc = "Operation was cancelled"
5614 except asyncio.TimeoutError:
5615 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5616 exc = "Timeout"
5617 except Exception as e:
5618 exc = traceback.format_exc()
5619 self.logger.critical(
5620 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5621 exc_info=True,
5622 )
5623 finally:
5624 if exc:
5625 db_nslcmop_update[
5626 "detailed-status"
5627 ] = (
5628 detailed_status
5629 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5630 nslcmop_operation_state = "FAILED"
5631 db_nsr_update["operational-status"] = old_operational_status
5632 if db_nsr:
5633 self._write_ns_status(
5634 nsr_id=nsr_id,
5635 ns_state=db_nsr["nsState"],
5636 current_operation="IDLE",
5637 current_operation_id=None,
5638 other_update=db_nsr_update,
5639 )
5640
5641 self._write_op_status(
5642 op_id=nslcmop_id,
5643 stage="",
5644 error_message=error_description_nslcmop,
5645 operation_state=nslcmop_operation_state,
5646 other_update=db_nslcmop_update,
5647 )
5648
5649 if nslcmop_operation_state:
5650 try:
5651 await self.msg.aiowrite(
5652 "ns",
5653 "updated",
5654 {
5655 "nsr_id": nsr_id,
5656 "nslcmop_id": nslcmop_id,
5657 "operationState": nslcmop_operation_state,
5658 },
5659 loop=self.loop,
5660 )
5661 except Exception as e:
5662 self.logger.error(
5663 logging_text + "kafka_write notification Exception {}".format(e)
5664 )
5665 self.logger.debug(logging_text + "Exit")
5666 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_update")
5667 return nslcmop_operation_state, detailed_status
5668
5669 async def scale(self, nsr_id, nslcmop_id):
5670 # Try to lock HA task here
5671 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5672 if not task_is_locked_by_me:
5673 return
5674
5675 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5676 stage = ["", "", ""]
5677 tasks_dict_info = {}
5678 # ^ stage, step, VIM progress
5679 self.logger.debug(logging_text + "Enter")
5680 # get all needed from database
5681 db_nsr = None
5682 db_nslcmop_update = {}
5683 db_nsr_update = {}
5684 exc = None
5685 # in case of error, indicates what part of scale was failed to put nsr at error status
5686 scale_process = None
5687 old_operational_status = ""
5688 old_config_status = ""
5689 nsi_id = None
5690 try:
5691 # wait for any previous tasks in process
5692 step = "Waiting for previous operations to terminate"
5693 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5694 self._write_ns_status(
5695 nsr_id=nsr_id,
5696 ns_state=None,
5697 current_operation="SCALING",
5698 current_operation_id=nslcmop_id,
5699 )
5700
5701 step = "Getting nslcmop from database"
5702 self.logger.debug(
5703 step + " after having waited for previous tasks to be completed"
5704 )
5705 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5706
5707 step = "Getting nsr from database"
5708 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5709 old_operational_status = db_nsr["operational-status"]
5710 old_config_status = db_nsr["config-status"]
5711
5712 step = "Parsing scaling parameters"
5713 db_nsr_update["operational-status"] = "scaling"
5714 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5715 nsr_deployed = db_nsr["_admin"].get("deployed")
5716
5717 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5718 "scaleByStepData"
5719 ]["member-vnf-index"]
5720 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5721 "scaleByStepData"
5722 ]["scaling-group-descriptor"]
5723 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5724 # for backward compatibility
5725 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5726 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5727 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5728 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5729
5730 step = "Getting vnfr from database"
5731 db_vnfr = self.db.get_one(
5732 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5733 )
5734
5735 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5736
5737 step = "Getting vnfd from database"
5738 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5739
5740 base_folder = db_vnfd["_admin"]["storage"]
5741
5742 step = "Getting scaling-group-descriptor"
5743 scaling_descriptor = find_in_list(
5744 get_scaling_aspect(db_vnfd),
5745 lambda scale_desc: scale_desc["name"] == scaling_group,
5746 )
5747 if not scaling_descriptor:
5748 raise LcmException(
5749 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5750 "at vnfd:scaling-group-descriptor".format(scaling_group)
5751 )
5752
5753 step = "Sending scale order to VIM"
5754 # TODO check if ns is in a proper status
5755 nb_scale_op = 0
5756 if not db_nsr["_admin"].get("scaling-group"):
5757 self.update_db_2(
5758 "nsrs",
5759 nsr_id,
5760 {
5761 "_admin.scaling-group": [
5762 {"name": scaling_group, "nb-scale-op": 0}
5763 ]
5764 },
5765 )
5766 admin_scale_index = 0
5767 else:
5768 for admin_scale_index, admin_scale_info in enumerate(
5769 db_nsr["_admin"]["scaling-group"]
5770 ):
5771 if admin_scale_info["name"] == scaling_group:
5772 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5773 break
5774 else: # not found, set index one plus last element and add new entry with the name
5775 admin_scale_index += 1
5776 db_nsr_update[
5777 "_admin.scaling-group.{}.name".format(admin_scale_index)
5778 ] = scaling_group
5779
5780 vca_scaling_info = []
5781 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5782 if scaling_type == "SCALE_OUT":
5783 if "aspect-delta-details" not in scaling_descriptor:
5784 raise LcmException(
5785 "Aspect delta details not fount in scaling descriptor {}".format(
5786 scaling_descriptor["name"]
5787 )
5788 )
5789 # count if max-instance-count is reached
5790 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5791
5792 scaling_info["scaling_direction"] = "OUT"
5793 scaling_info["vdu-create"] = {}
5794 scaling_info["kdu-create"] = {}
5795 for delta in deltas:
5796 for vdu_delta in delta.get("vdu-delta", {}):
5797 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5798 # vdu_index also provides the number of instance of the targeted vdu
5799 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5800 cloud_init_text = self._get_vdu_cloud_init_content(
5801 vdud, db_vnfd
5802 )
5803 if cloud_init_text:
5804 additional_params = (
5805 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5806 or {}
5807 )
5808 cloud_init_list = []
5809
5810 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5811 max_instance_count = 10
5812 if vdu_profile and "max-number-of-instances" in vdu_profile:
5813 max_instance_count = vdu_profile.get(
5814 "max-number-of-instances", 10
5815 )
5816
5817 default_instance_num = get_number_of_instances(
5818 db_vnfd, vdud["id"]
5819 )
5820 instances_number = vdu_delta.get("number-of-instances", 1)
5821 nb_scale_op += instances_number
5822
5823 new_instance_count = nb_scale_op + default_instance_num
5824 # Control if new count is over max and vdu count is less than max.
5825 # Then assign new instance count
5826 if new_instance_count > max_instance_count > vdu_count:
5827 instances_number = new_instance_count - max_instance_count
5828 else:
5829 instances_number = instances_number
5830
5831 if new_instance_count > max_instance_count:
5832 raise LcmException(
5833 "reached the limit of {} (max-instance-count) "
5834 "scaling-out operations for the "
5835 "scaling-group-descriptor '{}'".format(
5836 nb_scale_op, scaling_group
5837 )
5838 )
5839 for x in range(vdu_delta.get("number-of-instances", 1)):
5840 if cloud_init_text:
5841 # TODO Information of its own ip is not available because db_vnfr is not updated.
5842 additional_params["OSM"] = get_osm_params(
5843 db_vnfr, vdu_delta["id"], vdu_index + x
5844 )
5845 cloud_init_list.append(
5846 self._parse_cloud_init(
5847 cloud_init_text,
5848 additional_params,
5849 db_vnfd["id"],
5850 vdud["id"],
5851 )
5852 )
5853 vca_scaling_info.append(
5854 {
5855 "osm_vdu_id": vdu_delta["id"],
5856 "member-vnf-index": vnf_index,
5857 "type": "create",
5858 "vdu_index": vdu_index + x,
5859 }
5860 )
5861 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5862 for kdu_delta in delta.get("kdu-resource-delta", {}):
5863 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5864 kdu_name = kdu_profile["kdu-name"]
5865 resource_name = kdu_profile.get("resource-name", "")
5866
5867 # Might have different kdus in the same delta
5868 # Should have list for each kdu
5869 if not scaling_info["kdu-create"].get(kdu_name, None):
5870 scaling_info["kdu-create"][kdu_name] = []
5871
5872 kdur = get_kdur(db_vnfr, kdu_name)
5873 if kdur.get("helm-chart"):
5874 k8s_cluster_type = "helm-chart-v3"
5875 self.logger.debug("kdur: {}".format(kdur))
5876 if (
5877 kdur.get("helm-version")
5878 and kdur.get("helm-version") == "v2"
5879 ):
5880 k8s_cluster_type = "helm-chart"
5881 elif kdur.get("juju-bundle"):
5882 k8s_cluster_type = "juju-bundle"
5883 else:
5884 raise LcmException(
5885 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5886 "juju-bundle. Maybe an old NBI version is running".format(
5887 db_vnfr["member-vnf-index-ref"], kdu_name
5888 )
5889 )
5890
5891 max_instance_count = 10
5892 if kdu_profile and "max-number-of-instances" in kdu_profile:
5893 max_instance_count = kdu_profile.get(
5894 "max-number-of-instances", 10
5895 )
5896
5897 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5898 deployed_kdu, _ = get_deployed_kdu(
5899 nsr_deployed, kdu_name, vnf_index
5900 )
5901 if deployed_kdu is None:
5902 raise LcmException(
5903 "KDU '{}' for vnf '{}' not deployed".format(
5904 kdu_name, vnf_index
5905 )
5906 )
5907 kdu_instance = deployed_kdu.get("kdu-instance")
5908 instance_num = await self.k8scluster_map[
5909 k8s_cluster_type
5910 ].get_scale_count(
5911 resource_name,
5912 kdu_instance,
5913 vca_id=vca_id,
5914 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
5915 kdu_model=deployed_kdu.get("kdu-model"),
5916 )
5917 kdu_replica_count = instance_num + kdu_delta.get(
5918 "number-of-instances", 1
5919 )
5920
5921 # Control if new count is over max and instance_num is less than max.
5922 # Then assign max instance number to kdu replica count
5923 if kdu_replica_count > max_instance_count > instance_num:
5924 kdu_replica_count = max_instance_count
5925 if kdu_replica_count > max_instance_count:
5926 raise LcmException(
5927 "reached the limit of {} (max-instance-count) "
5928 "scaling-out operations for the "
5929 "scaling-group-descriptor '{}'".format(
5930 instance_num, scaling_group
5931 )
5932 )
5933
5934 for x in range(kdu_delta.get("number-of-instances", 1)):
5935 vca_scaling_info.append(
5936 {
5937 "osm_kdu_id": kdu_name,
5938 "member-vnf-index": vnf_index,
5939 "type": "create",
5940 "kdu_index": instance_num + x - 1,
5941 }
5942 )
5943 scaling_info["kdu-create"][kdu_name].append(
5944 {
5945 "member-vnf-index": vnf_index,
5946 "type": "create",
5947 "k8s-cluster-type": k8s_cluster_type,
5948 "resource-name": resource_name,
5949 "scale": kdu_replica_count,
5950 }
5951 )
5952 elif scaling_type == "SCALE_IN":
5953 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5954
5955 scaling_info["scaling_direction"] = "IN"
5956 scaling_info["vdu-delete"] = {}
5957 scaling_info["kdu-delete"] = {}
5958
5959 for delta in deltas:
5960 for vdu_delta in delta.get("vdu-delta", {}):
5961 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5962 min_instance_count = 0
5963 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5964 if vdu_profile and "min-number-of-instances" in vdu_profile:
5965 min_instance_count = vdu_profile["min-number-of-instances"]
5966
5967 default_instance_num = get_number_of_instances(
5968 db_vnfd, vdu_delta["id"]
5969 )
5970 instance_num = vdu_delta.get("number-of-instances", 1)
5971 nb_scale_op -= instance_num
5972
5973 new_instance_count = nb_scale_op + default_instance_num
5974
5975 if new_instance_count < min_instance_count < vdu_count:
5976 instances_number = min_instance_count - new_instance_count
5977 else:
5978 instances_number = instance_num
5979
5980 if new_instance_count < min_instance_count:
5981 raise LcmException(
5982 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5983 "scaling-group-descriptor '{}'".format(
5984 nb_scale_op, scaling_group
5985 )
5986 )
5987 for x in range(vdu_delta.get("number-of-instances", 1)):
5988 vca_scaling_info.append(
5989 {
5990 "osm_vdu_id": vdu_delta["id"],
5991 "member-vnf-index": vnf_index,
5992 "type": "delete",
5993 "vdu_index": vdu_index - 1 - x,
5994 }
5995 )
5996 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5997 for kdu_delta in delta.get("kdu-resource-delta", {}):
5998 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5999 kdu_name = kdu_profile["kdu-name"]
6000 resource_name = kdu_profile.get("resource-name", "")
6001
6002 if not scaling_info["kdu-delete"].get(kdu_name, None):
6003 scaling_info["kdu-delete"][kdu_name] = []
6004
6005 kdur = get_kdur(db_vnfr, kdu_name)
6006 if kdur.get("helm-chart"):
6007 k8s_cluster_type = "helm-chart-v3"
6008 self.logger.debug("kdur: {}".format(kdur))
6009 if (
6010 kdur.get("helm-version")
6011 and kdur.get("helm-version") == "v2"
6012 ):
6013 k8s_cluster_type = "helm-chart"
6014 elif kdur.get("juju-bundle"):
6015 k8s_cluster_type = "juju-bundle"
6016 else:
6017 raise LcmException(
6018 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6019 "juju-bundle. Maybe an old NBI version is running".format(
6020 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
6021 )
6022 )
6023
6024 min_instance_count = 0
6025 if kdu_profile and "min-number-of-instances" in kdu_profile:
6026 min_instance_count = kdu_profile["min-number-of-instances"]
6027
6028 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
6029 deployed_kdu, _ = get_deployed_kdu(
6030 nsr_deployed, kdu_name, vnf_index
6031 )
6032 if deployed_kdu is None:
6033 raise LcmException(
6034 "KDU '{}' for vnf '{}' not deployed".format(
6035 kdu_name, vnf_index
6036 )
6037 )
6038 kdu_instance = deployed_kdu.get("kdu-instance")
6039 instance_num = await self.k8scluster_map[
6040 k8s_cluster_type
6041 ].get_scale_count(
6042 resource_name,
6043 kdu_instance,
6044 vca_id=vca_id,
6045 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
6046 kdu_model=deployed_kdu.get("kdu-model"),
6047 )
6048 kdu_replica_count = instance_num - kdu_delta.get(
6049 "number-of-instances", 1
6050 )
6051
6052 if kdu_replica_count < min_instance_count < instance_num:
6053 kdu_replica_count = min_instance_count
6054 if kdu_replica_count < min_instance_count:
6055 raise LcmException(
6056 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6057 "scaling-group-descriptor '{}'".format(
6058 instance_num, scaling_group
6059 )
6060 )
6061
6062 for x in range(kdu_delta.get("number-of-instances", 1)):
6063 vca_scaling_info.append(
6064 {
6065 "osm_kdu_id": kdu_name,
6066 "member-vnf-index": vnf_index,
6067 "type": "delete",
6068 "kdu_index": instance_num - x - 1,
6069 }
6070 )
6071 scaling_info["kdu-delete"][kdu_name].append(
6072 {
6073 "member-vnf-index": vnf_index,
6074 "type": "delete",
6075 "k8s-cluster-type": k8s_cluster_type,
6076 "resource-name": resource_name,
6077 "scale": kdu_replica_count,
6078 }
6079 )
6080
6081 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6082 vdu_delete = copy(scaling_info.get("vdu-delete"))
6083 if scaling_info["scaling_direction"] == "IN":
6084 for vdur in reversed(db_vnfr["vdur"]):
6085 if vdu_delete.get(vdur["vdu-id-ref"]):
6086 vdu_delete[vdur["vdu-id-ref"]] -= 1
6087 scaling_info["vdu"].append(
6088 {
6089 "name": vdur.get("name") or vdur.get("vdu-name"),
6090 "vdu_id": vdur["vdu-id-ref"],
6091 "interface": [],
6092 }
6093 )
6094 for interface in vdur["interfaces"]:
6095 scaling_info["vdu"][-1]["interface"].append(
6096 {
6097 "name": interface["name"],
6098 "ip_address": interface["ip-address"],
6099 "mac_address": interface.get("mac-address"),
6100 }
6101 )
6102 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6103
6104 # PRE-SCALE BEGIN
6105 step = "Executing pre-scale vnf-config-primitive"
6106 if scaling_descriptor.get("scaling-config-action"):
6107 for scaling_config_action in scaling_descriptor[
6108 "scaling-config-action"
6109 ]:
6110 if (
6111 scaling_config_action.get("trigger") == "pre-scale-in"
6112 and scaling_type == "SCALE_IN"
6113 ) or (
6114 scaling_config_action.get("trigger") == "pre-scale-out"
6115 and scaling_type == "SCALE_OUT"
6116 ):
6117 vnf_config_primitive = scaling_config_action[
6118 "vnf-config-primitive-name-ref"
6119 ]
6120 step = db_nslcmop_update[
6121 "detailed-status"
6122 ] = "executing pre-scale scaling-config-action '{}'".format(
6123 vnf_config_primitive
6124 )
6125
6126 # look for primitive
6127 for config_primitive in (
6128 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6129 ).get("config-primitive", ()):
6130 if config_primitive["name"] == vnf_config_primitive:
6131 break
6132 else:
6133 raise LcmException(
6134 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6135 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6136 "primitive".format(scaling_group, vnf_config_primitive)
6137 )
6138
6139 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6140 if db_vnfr.get("additionalParamsForVnf"):
6141 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6142
6143 scale_process = "VCA"
6144 db_nsr_update["config-status"] = "configuring pre-scaling"
6145 primitive_params = self._map_primitive_params(
6146 config_primitive, {}, vnfr_params
6147 )
6148
6149 # Pre-scale retry check: Check if this sub-operation has been executed before
6150 op_index = self._check_or_add_scale_suboperation(
6151 db_nslcmop,
6152 vnf_index,
6153 vnf_config_primitive,
6154 primitive_params,
6155 "PRE-SCALE",
6156 )
6157 if op_index == self.SUBOPERATION_STATUS_SKIP:
6158 # Skip sub-operation
6159 result = "COMPLETED"
6160 result_detail = "Done"
6161 self.logger.debug(
6162 logging_text
6163 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6164 vnf_config_primitive, result, result_detail
6165 )
6166 )
6167 else:
6168 if op_index == self.SUBOPERATION_STATUS_NEW:
6169 # New sub-operation: Get index of this sub-operation
6170 op_index = (
6171 len(db_nslcmop.get("_admin", {}).get("operations"))
6172 - 1
6173 )
6174 self.logger.debug(
6175 logging_text
6176 + "vnf_config_primitive={} New sub-operation".format(
6177 vnf_config_primitive
6178 )
6179 )
6180 else:
6181 # retry: Get registered params for this existing sub-operation
6182 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6183 op_index
6184 ]
6185 vnf_index = op.get("member_vnf_index")
6186 vnf_config_primitive = op.get("primitive")
6187 primitive_params = op.get("primitive_params")
6188 self.logger.debug(
6189 logging_text
6190 + "vnf_config_primitive={} Sub-operation retry".format(
6191 vnf_config_primitive
6192 )
6193 )
6194 # Execute the primitive, either with new (first-time) or registered (reintent) args
6195 ee_descriptor_id = config_primitive.get(
6196 "execution-environment-ref"
6197 )
6198 primitive_name = config_primitive.get(
6199 "execution-environment-primitive", vnf_config_primitive
6200 )
6201 ee_id, vca_type = self._look_for_deployed_vca(
6202 nsr_deployed["VCA"],
6203 member_vnf_index=vnf_index,
6204 vdu_id=None,
6205 vdu_count_index=None,
6206 ee_descriptor_id=ee_descriptor_id,
6207 )
6208 result, result_detail = await self._ns_execute_primitive(
6209 ee_id,
6210 primitive_name,
6211 primitive_params,
6212 vca_type=vca_type,
6213 vca_id=vca_id,
6214 )
6215 self.logger.debug(
6216 logging_text
6217 + "vnf_config_primitive={} Done with result {} {}".format(
6218 vnf_config_primitive, result, result_detail
6219 )
6220 )
6221 # Update operationState = COMPLETED | FAILED
6222 self._update_suboperation_status(
6223 db_nslcmop, op_index, result, result_detail
6224 )
6225
6226 if result == "FAILED":
6227 raise LcmException(result_detail)
6228 db_nsr_update["config-status"] = old_config_status
6229 scale_process = None
6230 # PRE-SCALE END
6231
6232 db_nsr_update[
6233 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
6234 ] = nb_scale_op
6235 db_nsr_update[
6236 "_admin.scaling-group.{}.time".format(admin_scale_index)
6237 ] = time()
6238
6239 # SCALE-IN VCA - BEGIN
6240 if vca_scaling_info:
6241 step = db_nslcmop_update[
6242 "detailed-status"
6243 ] = "Deleting the execution environments"
6244 scale_process = "VCA"
6245 for vca_info in vca_scaling_info:
6246 if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
6247 member_vnf_index = str(vca_info["member-vnf-index"])
6248 self.logger.debug(
6249 logging_text + "vdu info: {}".format(vca_info)
6250 )
6251 if vca_info.get("osm_vdu_id"):
6252 vdu_id = vca_info["osm_vdu_id"]
6253 vdu_index = int(vca_info["vdu_index"])
6254 stage[
6255 1
6256 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6257 member_vnf_index, vdu_id, vdu_index
6258 )
6259 stage[2] = step = "Scaling in VCA"
6260 self._write_op_status(op_id=nslcmop_id, stage=stage)
6261 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
6262 config_update = db_nsr["configurationStatus"]
6263 for vca_index, vca in enumerate(vca_update):
6264 if (
6265 (vca or vca.get("ee_id"))
6266 and vca["member-vnf-index"] == member_vnf_index
6267 and vca["vdu_count_index"] == vdu_index
6268 ):
6269 if vca.get("vdu_id"):
6270 config_descriptor = get_configuration(
6271 db_vnfd, vca.get("vdu_id")
6272 )
6273 elif vca.get("kdu_name"):
6274 config_descriptor = get_configuration(
6275 db_vnfd, vca.get("kdu_name")
6276 )
6277 else:
6278 config_descriptor = get_configuration(
6279 db_vnfd, db_vnfd["id"]
6280 )
6281 operation_params = (
6282 db_nslcmop.get("operationParams") or {}
6283 )
6284 exec_terminate_primitives = not operation_params.get(
6285 "skip_terminate_primitives"
6286 ) and vca.get("needed_terminate")
6287 task = asyncio.ensure_future(
6288 asyncio.wait_for(
6289 self.destroy_N2VC(
6290 logging_text,
6291 db_nslcmop,
6292 vca,
6293 config_descriptor,
6294 vca_index,
6295 destroy_ee=True,
6296 exec_primitives=exec_terminate_primitives,
6297 scaling_in=True,
6298 vca_id=vca_id,
6299 ),
6300 timeout=self.timeout_charm_delete,
6301 )
6302 )
6303 tasks_dict_info[task] = "Terminating VCA {}".format(
6304 vca.get("ee_id")
6305 )
6306 del vca_update[vca_index]
6307 del config_update[vca_index]
6308 # wait for pending tasks of terminate primitives
6309 if tasks_dict_info:
6310 self.logger.debug(
6311 logging_text
6312 + "Waiting for tasks {}".format(
6313 list(tasks_dict_info.keys())
6314 )
6315 )
6316 error_list = await self._wait_for_tasks(
6317 logging_text,
6318 tasks_dict_info,
6319 min(
6320 self.timeout_charm_delete, self.timeout_ns_terminate
6321 ),
6322 stage,
6323 nslcmop_id,
6324 )
6325 tasks_dict_info.clear()
6326 if error_list:
6327 raise LcmException("; ".join(error_list))
6328
6329 db_vca_and_config_update = {
6330 "_admin.deployed.VCA": vca_update,
6331 "configurationStatus": config_update,
6332 }
6333 self.update_db_2(
6334 "nsrs", db_nsr["_id"], db_vca_and_config_update
6335 )
6336 scale_process = None
6337 # SCALE-IN VCA - END
6338
6339 # SCALE RO - BEGIN
6340 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
6341 scale_process = "RO"
6342 if self.ro_config.get("ng"):
6343 await self._scale_ng_ro(
6344 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
6345 )
6346 scaling_info.pop("vdu-create", None)
6347 scaling_info.pop("vdu-delete", None)
6348
6349 scale_process = None
6350 # SCALE RO - END
6351
6352 # SCALE KDU - BEGIN
6353 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
6354 scale_process = "KDU"
6355 await self._scale_kdu(
6356 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6357 )
6358 scaling_info.pop("kdu-create", None)
6359 scaling_info.pop("kdu-delete", None)
6360
6361 scale_process = None
6362 # SCALE KDU - END
6363
6364 if db_nsr_update:
6365 self.update_db_2("nsrs", nsr_id, db_nsr_update)
6366
6367 # SCALE-UP VCA - BEGIN
6368 if vca_scaling_info:
6369 step = db_nslcmop_update[
6370 "detailed-status"
6371 ] = "Creating new execution environments"
6372 scale_process = "VCA"
6373 for vca_info in vca_scaling_info:
6374 if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
6375 member_vnf_index = str(vca_info["member-vnf-index"])
6376 self.logger.debug(
6377 logging_text + "vdu info: {}".format(vca_info)
6378 )
6379 vnfd_id = db_vnfr["vnfd-ref"]
6380 if vca_info.get("osm_vdu_id"):
6381 vdu_index = int(vca_info["vdu_index"])
6382 deploy_params = {"OSM": get_osm_params(db_vnfr)}
6383 if db_vnfr.get("additionalParamsForVnf"):
6384 deploy_params.update(
6385 parse_yaml_strings(
6386 db_vnfr["additionalParamsForVnf"].copy()
6387 )
6388 )
6389 descriptor_config = get_configuration(
6390 db_vnfd, db_vnfd["id"]
6391 )
6392 if descriptor_config:
6393 vdu_id = None
6394 vdu_name = None
6395 kdu_name = None
6396 self._deploy_n2vc(
6397 logging_text=logging_text
6398 + "member_vnf_index={} ".format(member_vnf_index),
6399 db_nsr=db_nsr,
6400 db_vnfr=db_vnfr,
6401 nslcmop_id=nslcmop_id,
6402 nsr_id=nsr_id,
6403 nsi_id=nsi_id,
6404 vnfd_id=vnfd_id,
6405 vdu_id=vdu_id,
6406 kdu_name=kdu_name,
6407 member_vnf_index=member_vnf_index,
6408 vdu_index=vdu_index,
6409 vdu_name=vdu_name,
6410 deploy_params=deploy_params,
6411 descriptor_config=descriptor_config,
6412 base_folder=base_folder,
6413 task_instantiation_info=tasks_dict_info,
6414 stage=stage,
6415 )
6416 vdu_id = vca_info["osm_vdu_id"]
6417 vdur = find_in_list(
6418 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
6419 )
6420 descriptor_config = get_configuration(db_vnfd, vdu_id)
6421 if vdur.get("additionalParams"):
6422 deploy_params_vdu = parse_yaml_strings(
6423 vdur["additionalParams"]
6424 )
6425 else:
6426 deploy_params_vdu = deploy_params
6427 deploy_params_vdu["OSM"] = get_osm_params(
6428 db_vnfr, vdu_id, vdu_count_index=vdu_index
6429 )
6430 if descriptor_config:
6431 vdu_name = None
6432 kdu_name = None
6433 stage[
6434 1
6435 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6436 member_vnf_index, vdu_id, vdu_index
6437 )
6438 stage[2] = step = "Scaling out VCA"
6439 self._write_op_status(op_id=nslcmop_id, stage=stage)
6440 self._deploy_n2vc(
6441 logging_text=logging_text
6442 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6443 member_vnf_index, vdu_id, vdu_index
6444 ),
6445 db_nsr=db_nsr,
6446 db_vnfr=db_vnfr,
6447 nslcmop_id=nslcmop_id,
6448 nsr_id=nsr_id,
6449 nsi_id=nsi_id,
6450 vnfd_id=vnfd_id,
6451 vdu_id=vdu_id,
6452 kdu_name=kdu_name,
6453 member_vnf_index=member_vnf_index,
6454 vdu_index=vdu_index,
6455 vdu_name=vdu_name,
6456 deploy_params=deploy_params_vdu,
6457 descriptor_config=descriptor_config,
6458 base_folder=base_folder,
6459 task_instantiation_info=tasks_dict_info,
6460 stage=stage,
6461 )
6462 # SCALE-UP VCA - END
6463 scale_process = None
6464
6465 # POST-SCALE BEGIN
6466 # execute primitive service POST-SCALING
6467 step = "Executing post-scale vnf-config-primitive"
6468 if scaling_descriptor.get("scaling-config-action"):
6469 for scaling_config_action in scaling_descriptor[
6470 "scaling-config-action"
6471 ]:
6472 if (
6473 scaling_config_action.get("trigger") == "post-scale-in"
6474 and scaling_type == "SCALE_IN"
6475 ) or (
6476 scaling_config_action.get("trigger") == "post-scale-out"
6477 and scaling_type == "SCALE_OUT"
6478 ):
6479 vnf_config_primitive = scaling_config_action[
6480 "vnf-config-primitive-name-ref"
6481 ]
6482 step = db_nslcmop_update[
6483 "detailed-status"
6484 ] = "executing post-scale scaling-config-action '{}'".format(
6485 vnf_config_primitive
6486 )
6487
6488 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6489 if db_vnfr.get("additionalParamsForVnf"):
6490 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6491
6492 # look for primitive
6493 for config_primitive in (
6494 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6495 ).get("config-primitive", ()):
6496 if config_primitive["name"] == vnf_config_primitive:
6497 break
6498 else:
6499 raise LcmException(
6500 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6501 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6502 "config-primitive".format(
6503 scaling_group, vnf_config_primitive
6504 )
6505 )
6506 scale_process = "VCA"
6507 db_nsr_update["config-status"] = "configuring post-scaling"
6508 primitive_params = self._map_primitive_params(
6509 config_primitive, {}, vnfr_params
6510 )
6511
6512 # Post-scale retry check: Check if this sub-operation has been executed before
6513 op_index = self._check_or_add_scale_suboperation(
6514 db_nslcmop,
6515 vnf_index,
6516 vnf_config_primitive,
6517 primitive_params,
6518 "POST-SCALE",
6519 )
6520 if op_index == self.SUBOPERATION_STATUS_SKIP:
6521 # Skip sub-operation
6522 result = "COMPLETED"
6523 result_detail = "Done"
6524 self.logger.debug(
6525 logging_text
6526 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6527 vnf_config_primitive, result, result_detail
6528 )
6529 )
6530 else:
6531 if op_index == self.SUBOPERATION_STATUS_NEW:
6532 # New sub-operation: Get index of this sub-operation
6533 op_index = (
6534 len(db_nslcmop.get("_admin", {}).get("operations"))
6535 - 1
6536 )
6537 self.logger.debug(
6538 logging_text
6539 + "vnf_config_primitive={} New sub-operation".format(
6540 vnf_config_primitive
6541 )
6542 )
6543 else:
6544 # retry: Get registered params for this existing sub-operation
6545 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6546 op_index
6547 ]
6548 vnf_index = op.get("member_vnf_index")
6549 vnf_config_primitive = op.get("primitive")
6550 primitive_params = op.get("primitive_params")
6551 self.logger.debug(
6552 logging_text
6553 + "vnf_config_primitive={} Sub-operation retry".format(
6554 vnf_config_primitive
6555 )
6556 )
6557 # Execute the primitive, either with new (first-time) or registered (reintent) args
6558 ee_descriptor_id = config_primitive.get(
6559 "execution-environment-ref"
6560 )
6561 primitive_name = config_primitive.get(
6562 "execution-environment-primitive", vnf_config_primitive
6563 )
6564 ee_id, vca_type = self._look_for_deployed_vca(
6565 nsr_deployed["VCA"],
6566 member_vnf_index=vnf_index,
6567 vdu_id=None,
6568 vdu_count_index=None,
6569 ee_descriptor_id=ee_descriptor_id,
6570 )
6571 result, result_detail = await self._ns_execute_primitive(
6572 ee_id,
6573 primitive_name,
6574 primitive_params,
6575 vca_type=vca_type,
6576 vca_id=vca_id,
6577 )
6578 self.logger.debug(
6579 logging_text
6580 + "vnf_config_primitive={} Done with result {} {}".format(
6581 vnf_config_primitive, result, result_detail
6582 )
6583 )
6584 # Update operationState = COMPLETED | FAILED
6585 self._update_suboperation_status(
6586 db_nslcmop, op_index, result, result_detail
6587 )
6588
6589 if result == "FAILED":
6590 raise LcmException(result_detail)
6591 db_nsr_update["config-status"] = old_config_status
6592 scale_process = None
6593 # POST-SCALE END
6594
6595 db_nsr_update[
6596 "detailed-status"
6597 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6598 db_nsr_update["operational-status"] = (
6599 "running"
6600 if old_operational_status == "failed"
6601 else old_operational_status
6602 )
6603 db_nsr_update["config-status"] = old_config_status
6604 return
6605 except (
6606 ROclient.ROClientException,
6607 DbException,
6608 LcmException,
6609 NgRoException,
6610 ) as e:
6611 self.logger.error(logging_text + "Exit Exception {}".format(e))
6612 exc = e
6613 except asyncio.CancelledError:
6614 self.logger.error(
6615 logging_text + "Cancelled Exception while '{}'".format(step)
6616 )
6617 exc = "Operation was cancelled"
6618 except Exception as e:
6619 exc = traceback.format_exc()
6620 self.logger.critical(
6621 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6622 exc_info=True,
6623 )
6624 finally:
6625 self._write_ns_status(
6626 nsr_id=nsr_id,
6627 ns_state=None,
6628 current_operation="IDLE",
6629 current_operation_id=None,
6630 )
6631 if tasks_dict_info:
6632 stage[1] = "Waiting for instantiate pending tasks."
6633 self.logger.debug(logging_text + stage[1])
6634 exc = await self._wait_for_tasks(
6635 logging_text,
6636 tasks_dict_info,
6637 self.timeout_ns_deploy,
6638 stage,
6639 nslcmop_id,
6640 nsr_id=nsr_id,
6641 )
6642 if exc:
6643 db_nslcmop_update[
6644 "detailed-status"
6645 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6646 nslcmop_operation_state = "FAILED"
6647 if db_nsr:
6648 db_nsr_update["operational-status"] = old_operational_status
6649 db_nsr_update["config-status"] = old_config_status
6650 db_nsr_update["detailed-status"] = ""
6651 if scale_process:
6652 if "VCA" in scale_process:
6653 db_nsr_update["config-status"] = "failed"
6654 if "RO" in scale_process:
6655 db_nsr_update["operational-status"] = "failed"
6656 db_nsr_update[
6657 "detailed-status"
6658 ] = "FAILED scaling nslcmop={} {}: {}".format(
6659 nslcmop_id, step, exc
6660 )
6661 else:
6662 error_description_nslcmop = None
6663 nslcmop_operation_state = "COMPLETED"
6664 db_nslcmop_update["detailed-status"] = "Done"
6665
6666 self._write_op_status(
6667 op_id=nslcmop_id,
6668 stage="",
6669 error_message=error_description_nslcmop,
6670 operation_state=nslcmop_operation_state,
6671 other_update=db_nslcmop_update,
6672 )
6673 if db_nsr:
6674 self._write_ns_status(
6675 nsr_id=nsr_id,
6676 ns_state=None,
6677 current_operation="IDLE",
6678 current_operation_id=None,
6679 other_update=db_nsr_update,
6680 )
6681
6682 if nslcmop_operation_state:
6683 try:
6684 msg = {
6685 "nsr_id": nsr_id,
6686 "nslcmop_id": nslcmop_id,
6687 "operationState": nslcmop_operation_state,
6688 }
6689 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6690 except Exception as e:
6691 self.logger.error(
6692 logging_text + "kafka_write notification Exception {}".format(e)
6693 )
6694 self.logger.debug(logging_text + "Exit")
6695 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6696
6697 async def _scale_kdu(
6698 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6699 ):
6700 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6701 for kdu_name in _scaling_info:
6702 for kdu_scaling_info in _scaling_info[kdu_name]:
6703 deployed_kdu, index = get_deployed_kdu(
6704 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6705 )
6706 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6707 kdu_instance = deployed_kdu["kdu-instance"]
6708 kdu_model = deployed_kdu.get("kdu-model")
6709 scale = int(kdu_scaling_info["scale"])
6710 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6711
6712 db_dict = {
6713 "collection": "nsrs",
6714 "filter": {"_id": nsr_id},
6715 "path": "_admin.deployed.K8s.{}".format(index),
6716 }
6717
6718 step = "scaling application {}".format(
6719 kdu_scaling_info["resource-name"]
6720 )
6721 self.logger.debug(logging_text + step)
6722
6723 if kdu_scaling_info["type"] == "delete":
6724 kdu_config = get_configuration(db_vnfd, kdu_name)
6725 if (
6726 kdu_config
6727 and kdu_config.get("terminate-config-primitive")
6728 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6729 ):
6730 terminate_config_primitive_list = kdu_config.get(
6731 "terminate-config-primitive"
6732 )
6733 terminate_config_primitive_list.sort(
6734 key=lambda val: int(val["seq"])
6735 )
6736
6737 for (
6738 terminate_config_primitive
6739 ) in terminate_config_primitive_list:
6740 primitive_params_ = self._map_primitive_params(
6741 terminate_config_primitive, {}, {}
6742 )
6743 step = "execute terminate config primitive"
6744 self.logger.debug(logging_text + step)
6745 await asyncio.wait_for(
6746 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6747 cluster_uuid=cluster_uuid,
6748 kdu_instance=kdu_instance,
6749 primitive_name=terminate_config_primitive["name"],
6750 params=primitive_params_,
6751 db_dict=db_dict,
6752 vca_id=vca_id,
6753 ),
6754 timeout=600,
6755 )
6756
6757 await asyncio.wait_for(
6758 self.k8scluster_map[k8s_cluster_type].scale(
6759 kdu_instance,
6760 scale,
6761 kdu_scaling_info["resource-name"],
6762 vca_id=vca_id,
6763 cluster_uuid=cluster_uuid,
6764 kdu_model=kdu_model,
6765 atomic=True,
6766 db_dict=db_dict,
6767 ),
6768 timeout=self.timeout_vca_on_error,
6769 )
6770
6771 if kdu_scaling_info["type"] == "create":
6772 kdu_config = get_configuration(db_vnfd, kdu_name)
6773 if (
6774 kdu_config
6775 and kdu_config.get("initial-config-primitive")
6776 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6777 ):
6778 initial_config_primitive_list = kdu_config.get(
6779 "initial-config-primitive"
6780 )
6781 initial_config_primitive_list.sort(
6782 key=lambda val: int(val["seq"])
6783 )
6784
6785 for initial_config_primitive in initial_config_primitive_list:
6786 primitive_params_ = self._map_primitive_params(
6787 initial_config_primitive, {}, {}
6788 )
6789 step = "execute initial config primitive"
6790 self.logger.debug(logging_text + step)
6791 await asyncio.wait_for(
6792 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6793 cluster_uuid=cluster_uuid,
6794 kdu_instance=kdu_instance,
6795 primitive_name=initial_config_primitive["name"],
6796 params=primitive_params_,
6797 db_dict=db_dict,
6798 vca_id=vca_id,
6799 ),
6800 timeout=600,
6801 )
6802
6803 async def _scale_ng_ro(
6804 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6805 ):
6806 nsr_id = db_nslcmop["nsInstanceId"]
6807 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6808 db_vnfrs = {}
6809
6810 # read from db: vnfd's for every vnf
6811 db_vnfds = []
6812
6813 # for each vnf in ns, read vnfd
6814 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6815 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6816 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6817 # if we haven't this vnfd, read it from db
6818 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6819 # read from db
6820 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6821 db_vnfds.append(vnfd)
6822 n2vc_key = self.n2vc.get_public_key()
6823 n2vc_key_list = [n2vc_key]
6824 self.scale_vnfr(
6825 db_vnfr,
6826 vdu_scaling_info.get("vdu-create"),
6827 vdu_scaling_info.get("vdu-delete"),
6828 mark_delete=True,
6829 )
6830 # db_vnfr has been updated, update db_vnfrs to use it
6831 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6832 await self._instantiate_ng_ro(
6833 logging_text,
6834 nsr_id,
6835 db_nsd,
6836 db_nsr,
6837 db_nslcmop,
6838 db_vnfrs,
6839 db_vnfds,
6840 n2vc_key_list,
6841 stage=stage,
6842 start_deploy=time(),
6843 timeout_ns_deploy=self.timeout_ns_deploy,
6844 )
6845 if vdu_scaling_info.get("vdu-delete"):
6846 self.scale_vnfr(
6847 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6848 )
6849
6850 async def extract_prometheus_scrape_jobs(
6851 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6852 ):
6853 # look if exist a file called 'prometheus*.j2' and
6854 artifact_content = self.fs.dir_ls(artifact_path)
6855 job_file = next(
6856 (
6857 f
6858 for f in artifact_content
6859 if f.startswith("prometheus") and f.endswith(".j2")
6860 ),
6861 None,
6862 )
6863 if not job_file:
6864 return
6865 with self.fs.file_open((artifact_path, job_file), "r") as f:
6866 job_data = f.read()
6867
6868 # TODO get_service
6869 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6870 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6871 host_port = "80"
6872 vnfr_id = vnfr_id.replace("-", "")
6873 variables = {
6874 "JOB_NAME": vnfr_id,
6875 "TARGET_IP": target_ip,
6876 "EXPORTER_POD_IP": host_name,
6877 "EXPORTER_POD_PORT": host_port,
6878 }
6879 job_list = parse_job(job_data, variables)
6880 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6881 for job in job_list:
6882 if (
6883 not isinstance(job.get("job_name"), str)
6884 or vnfr_id not in job["job_name"]
6885 ):
6886 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6887 job["nsr_id"] = nsr_id
6888 job["vnfr_id"] = vnfr_id
6889 return job_list
6890
6891 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6892 """
6893 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6894
6895 :param: vim_account_id: VIM Account ID
6896
6897 :return: (cloud_name, cloud_credential)
6898 """
6899 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6900 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6901
6902 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6903 """
6904 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6905
6906 :param: vim_account_id: VIM Account ID
6907
6908 :return: (cloud_name, cloud_credential)
6909 """
6910 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6911 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")