Fix Bug 2012 use existing volumes as instantiation parameters
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import shutil
21 from typing import Any, Dict, List
22 import yaml
23 import logging
24 import logging.handlers
25 import traceback
26 import json
27 from jinja2 import (
28 Environment,
29 TemplateError,
30 TemplateNotFound,
31 StrictUndefined,
32 UndefinedError,
33 )
34
35 from osm_lcm import ROclient
36 from osm_lcm.data_utils.nsr import (
37 get_deployed_kdu,
38 get_deployed_vca,
39 get_deployed_vca_list,
40 get_nsd,
41 )
42 from osm_lcm.data_utils.vca import (
43 DeployedComponent,
44 DeployedK8sResource,
45 DeployedVCA,
46 EELevel,
47 Relation,
48 EERelation,
49 safe_get_ee_relation,
50 )
51 from osm_lcm.ng_ro import NgRoClient, NgRoException
52 from osm_lcm.lcm_utils import (
53 LcmException,
54 LcmExceptionNoMgmtIP,
55 LcmBase,
56 deep_get,
57 get_iterable,
58 populate_dict,
59 check_juju_bundle_existence,
60 get_charm_artifact_path,
61 )
62 from osm_lcm.data_utils.nsd import (
63 get_ns_configuration_relation_list,
64 get_vnf_profile,
65 get_vnf_profiles,
66 )
67 from osm_lcm.data_utils.vnfd import (
68 get_kdu,
69 get_kdu_services,
70 get_relation_list,
71 get_vdu_list,
72 get_vdu_profile,
73 get_ee_sorted_initial_config_primitive_list,
74 get_ee_sorted_terminate_config_primitive_list,
75 get_kdu_list,
76 get_virtual_link_profiles,
77 get_vdu,
78 get_configuration,
79 get_vdu_index,
80 get_scaling_aspect,
81 get_number_of_instances,
82 get_juju_ee_ref,
83 get_kdu_resource_profile,
84 find_software_version,
85 )
86 from osm_lcm.data_utils.list_utils import find_in_list
87 from osm_lcm.data_utils.vnfr import (
88 get_osm_params,
89 get_vdur_index,
90 get_kdur,
91 get_volumes_from_instantiation_params,
92 )
93 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
94 from osm_lcm.data_utils.database.vim_account import VimAccountDB
95 from n2vc.definitions import RelationEndpoint
96 from n2vc.k8s_helm_conn import K8sHelmConnector
97 from n2vc.k8s_helm3_conn import K8sHelm3Connector
98 from n2vc.k8s_juju_conn import K8sJujuConnector
99
100 from osm_common.dbbase import DbException
101 from osm_common.fsbase import FsException
102
103 from osm_lcm.data_utils.database.database import Database
104 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
105
106 from n2vc.n2vc_juju_conn import N2VCJujuConnector
107 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
108
109 from osm_lcm.lcm_helm_conn import LCMHelmConn
110 from osm_lcm.osm_config import OsmConfigBuilder
111 from osm_lcm.prometheus import parse_job
112
113 from copy import copy, deepcopy
114 from time import time
115 from uuid import uuid4
116
117 from random import randint
118
119 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
120
121
122 class NsLcm(LcmBase):
123 timeout_vca_on_error = (
124 5 * 60
125 ) # Time for charm from first time at blocked,error status to mark as failed
126 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
127 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
128 timeout_charm_delete = 10 * 60
129 timeout_primitive = 30 * 60 # timeout for primitive execution
130 timeout_ns_update = 30 * 60 # timeout for ns update
131 timeout_progress_primitive = (
132 10 * 60
133 ) # timeout for some progress in a primitive execution
134 timeout_migrate = 1800 # default global timeout for migrating vnfs
135
136 SUBOPERATION_STATUS_NOT_FOUND = -1
137 SUBOPERATION_STATUS_NEW = -2
138 SUBOPERATION_STATUS_SKIP = -3
139 task_name_deploy_vca = "Deploying VCA"
140
141 def __init__(self, msg, lcm_tasks, config, loop):
142 """
143 Init, Connect to database, filesystem storage, and messaging
144 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
145 :return: None
146 """
147 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
148
149 self.db = Database().instance.db
150 self.fs = Filesystem().instance.fs
151 self.loop = loop
152 self.lcm_tasks = lcm_tasks
153 self.timeout = config["timeout"]
154 self.ro_config = config["ro_config"]
155 self.ng_ro = config["ro_config"].get("ng")
156 self.vca_config = config["VCA"].copy()
157
158 # create N2VC connector
159 self.n2vc = N2VCJujuConnector(
160 log=self.logger,
161 loop=self.loop,
162 on_update_db=self._on_update_n2vc_db,
163 fs=self.fs,
164 db=self.db,
165 )
166
167 self.conn_helm_ee = LCMHelmConn(
168 log=self.logger,
169 loop=self.loop,
170 vca_config=self.vca_config,
171 on_update_db=self._on_update_n2vc_db,
172 )
173
174 self.k8sclusterhelm2 = K8sHelmConnector(
175 kubectl_command=self.vca_config.get("kubectlpath"),
176 helm_command=self.vca_config.get("helmpath"),
177 log=self.logger,
178 on_update_db=None,
179 fs=self.fs,
180 db=self.db,
181 )
182
183 self.k8sclusterhelm3 = K8sHelm3Connector(
184 kubectl_command=self.vca_config.get("kubectlpath"),
185 helm_command=self.vca_config.get("helm3path"),
186 fs=self.fs,
187 log=self.logger,
188 db=self.db,
189 on_update_db=None,
190 )
191
192 self.k8sclusterjuju = K8sJujuConnector(
193 kubectl_command=self.vca_config.get("kubectlpath"),
194 juju_command=self.vca_config.get("jujupath"),
195 log=self.logger,
196 loop=self.loop,
197 on_update_db=self._on_update_k8s_db,
198 fs=self.fs,
199 db=self.db,
200 )
201
202 self.k8scluster_map = {
203 "helm-chart": self.k8sclusterhelm2,
204 "helm-chart-v3": self.k8sclusterhelm3,
205 "chart": self.k8sclusterhelm3,
206 "juju-bundle": self.k8sclusterjuju,
207 "juju": self.k8sclusterjuju,
208 }
209
210 self.vca_map = {
211 "lxc_proxy_charm": self.n2vc,
212 "native_charm": self.n2vc,
213 "k8s_proxy_charm": self.n2vc,
214 "helm": self.conn_helm_ee,
215 "helm-v3": self.conn_helm_ee,
216 }
217
218 # create RO client
219 self.RO = NgRoClient(self.loop, **self.ro_config)
220
221 @staticmethod
222 def increment_ip_mac(ip_mac, vm_index=1):
223 if not isinstance(ip_mac, str):
224 return ip_mac
225 try:
226 # try with ipv4 look for last dot
227 i = ip_mac.rfind(".")
228 if i > 0:
229 i += 1
230 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
231 # try with ipv6 or mac look for last colon. Operate in hex
232 i = ip_mac.rfind(":")
233 if i > 0:
234 i += 1
235 # format in hex, len can be 2 for mac or 4 for ipv6
236 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
237 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
238 )
239 except Exception:
240 pass
241 return None
242
243 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
244
245 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
246
247 try:
248 # TODO filter RO descriptor fields...
249
250 # write to database
251 db_dict = dict()
252 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
253 db_dict["deploymentStatus"] = ro_descriptor
254 self.update_db_2("nsrs", nsrs_id, db_dict)
255
256 except Exception as e:
257 self.logger.warn(
258 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
259 )
260
261 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
262
263 # remove last dot from path (if exists)
264 if path.endswith("."):
265 path = path[:-1]
266
267 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
268 # .format(table, filter, path, updated_data))
269 try:
270
271 nsr_id = filter.get("_id")
272
273 # read ns record from database
274 nsr = self.db.get_one(table="nsrs", q_filter=filter)
275 current_ns_status = nsr.get("nsState")
276
277 # get vca status for NS
278 status_dict = await self.n2vc.get_status(
279 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
280 )
281
282 # vcaStatus
283 db_dict = dict()
284 db_dict["vcaStatus"] = status_dict
285 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
286
287 # update configurationStatus for this VCA
288 try:
289 vca_index = int(path[path.rfind(".") + 1 :])
290
291 vca_list = deep_get(
292 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
293 )
294 vca_status = vca_list[vca_index].get("status")
295
296 configuration_status_list = nsr.get("configurationStatus")
297 config_status = configuration_status_list[vca_index].get("status")
298
299 if config_status == "BROKEN" and vca_status != "failed":
300 db_dict["configurationStatus"][vca_index] = "READY"
301 elif config_status != "BROKEN" and vca_status == "failed":
302 db_dict["configurationStatus"][vca_index] = "BROKEN"
303 except Exception as e:
304 # not update configurationStatus
305 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
306
307 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
308 # if nsState = 'DEGRADED' check if all is OK
309 is_degraded = False
310 if current_ns_status in ("READY", "DEGRADED"):
311 error_description = ""
312 # check machines
313 if status_dict.get("machines"):
314 for machine_id in status_dict.get("machines"):
315 machine = status_dict.get("machines").get(machine_id)
316 # check machine agent-status
317 if machine.get("agent-status"):
318 s = machine.get("agent-status").get("status")
319 if s != "started":
320 is_degraded = True
321 error_description += (
322 "machine {} agent-status={} ; ".format(
323 machine_id, s
324 )
325 )
326 # check machine instance status
327 if machine.get("instance-status"):
328 s = machine.get("instance-status").get("status")
329 if s != "running":
330 is_degraded = True
331 error_description += (
332 "machine {} instance-status={} ; ".format(
333 machine_id, s
334 )
335 )
336 # check applications
337 if status_dict.get("applications"):
338 for app_id in status_dict.get("applications"):
339 app = status_dict.get("applications").get(app_id)
340 # check application status
341 if app.get("status"):
342 s = app.get("status").get("status")
343 if s != "active":
344 is_degraded = True
345 error_description += (
346 "application {} status={} ; ".format(app_id, s)
347 )
348
349 if error_description:
350 db_dict["errorDescription"] = error_description
351 if current_ns_status == "READY" and is_degraded:
352 db_dict["nsState"] = "DEGRADED"
353 if current_ns_status == "DEGRADED" and not is_degraded:
354 db_dict["nsState"] = "READY"
355
356 # write to database
357 self.update_db_2("nsrs", nsr_id, db_dict)
358
359 except (asyncio.CancelledError, asyncio.TimeoutError):
360 raise
361 except Exception as e:
362 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
363
364 async def _on_update_k8s_db(
365 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
366 ):
367 """
368 Updating vca status in NSR record
369 :param cluster_uuid: UUID of a k8s cluster
370 :param kdu_instance: The unique name of the KDU instance
371 :param filter: To get nsr_id
372 :cluster_type: The cluster type (juju, k8s)
373 :return: none
374 """
375
376 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
377 # .format(cluster_uuid, kdu_instance, filter))
378
379 nsr_id = filter.get("_id")
380 try:
381 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
382 cluster_uuid=cluster_uuid,
383 kdu_instance=kdu_instance,
384 yaml_format=False,
385 complete_status=True,
386 vca_id=vca_id,
387 )
388
389 # vcaStatus
390 db_dict = dict()
391 db_dict["vcaStatus"] = {nsr_id: vca_status}
392
393 if cluster_type in ("juju-bundle", "juju"):
394 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
395 # status in a similar way between Juju Bundles and Helm Charts on this side
396 await self.k8sclusterjuju.update_vca_status(
397 db_dict["vcaStatus"],
398 kdu_instance,
399 vca_id=vca_id,
400 )
401
402 self.logger.debug(
403 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
404 )
405
406 # write to database
407 self.update_db_2("nsrs", nsr_id, db_dict)
408 except (asyncio.CancelledError, asyncio.TimeoutError):
409 raise
410 except Exception as e:
411 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
412
413 @staticmethod
414 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
415 try:
416 env = Environment(undefined=StrictUndefined)
417 template = env.from_string(cloud_init_text)
418 return template.render(additional_params or {})
419 except UndefinedError as e:
420 raise LcmException(
421 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
422 "file, must be provided in the instantiation parameters inside the "
423 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
424 )
425 except (TemplateError, TemplateNotFound) as e:
426 raise LcmException(
427 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
428 vnfd_id, vdu_id, e
429 )
430 )
431
432 def _get_vdu_cloud_init_content(self, vdu, vnfd):
433 cloud_init_content = cloud_init_file = None
434 try:
435 if vdu.get("cloud-init-file"):
436 base_folder = vnfd["_admin"]["storage"]
437 if base_folder["pkg-dir"]:
438 cloud_init_file = "{}/{}/cloud_init/{}".format(
439 base_folder["folder"],
440 base_folder["pkg-dir"],
441 vdu["cloud-init-file"],
442 )
443 else:
444 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
445 base_folder["folder"],
446 vdu["cloud-init-file"],
447 )
448 with self.fs.file_open(cloud_init_file, "r") as ci_file:
449 cloud_init_content = ci_file.read()
450 elif vdu.get("cloud-init"):
451 cloud_init_content = vdu["cloud-init"]
452
453 return cloud_init_content
454 except FsException as e:
455 raise LcmException(
456 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
457 vnfd["id"], vdu["id"], cloud_init_file, e
458 )
459 )
460
461 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
462 vdur = next(
463 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]), {}
464 )
465 additional_params = vdur.get("additionalParams")
466 return parse_yaml_strings(additional_params)
467
468 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
469 """
470 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
471 :param vnfd: input vnfd
472 :param new_id: overrides vnf id if provided
473 :param additionalParams: Instantiation params for VNFs provided
474 :param nsrId: Id of the NSR
475 :return: copy of vnfd
476 """
477 vnfd_RO = deepcopy(vnfd)
478 # remove unused by RO configuration, monitoring, scaling and internal keys
479 vnfd_RO.pop("_id", None)
480 vnfd_RO.pop("_admin", None)
481 vnfd_RO.pop("monitoring-param", None)
482 vnfd_RO.pop("scaling-group-descriptor", None)
483 vnfd_RO.pop("kdu", None)
484 vnfd_RO.pop("k8s-cluster", None)
485 if new_id:
486 vnfd_RO["id"] = new_id
487
488 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
489 for vdu in get_iterable(vnfd_RO, "vdu"):
490 vdu.pop("cloud-init-file", None)
491 vdu.pop("cloud-init", None)
492 return vnfd_RO
493
494 @staticmethod
495 def ip_profile_2_RO(ip_profile):
496 RO_ip_profile = deepcopy(ip_profile)
497 if "dns-server" in RO_ip_profile:
498 if isinstance(RO_ip_profile["dns-server"], list):
499 RO_ip_profile["dns-address"] = []
500 for ds in RO_ip_profile.pop("dns-server"):
501 RO_ip_profile["dns-address"].append(ds["address"])
502 else:
503 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
504 if RO_ip_profile.get("ip-version") == "ipv4":
505 RO_ip_profile["ip-version"] = "IPv4"
506 if RO_ip_profile.get("ip-version") == "ipv6":
507 RO_ip_profile["ip-version"] = "IPv6"
508 if "dhcp-params" in RO_ip_profile:
509 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
510 return RO_ip_profile
511
512 def _get_ro_vim_id_for_vim_account(self, vim_account):
513 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
514 if db_vim["_admin"]["operationalState"] != "ENABLED":
515 raise LcmException(
516 "VIM={} is not available. operationalState={}".format(
517 vim_account, db_vim["_admin"]["operationalState"]
518 )
519 )
520 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
521 return RO_vim_id
522
523 def get_ro_wim_id_for_wim_account(self, wim_account):
524 if isinstance(wim_account, str):
525 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
526 if db_wim["_admin"]["operationalState"] != "ENABLED":
527 raise LcmException(
528 "WIM={} is not available. operationalState={}".format(
529 wim_account, db_wim["_admin"]["operationalState"]
530 )
531 )
532 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
533 return RO_wim_id
534 else:
535 return wim_account
536
537 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
538
539 db_vdu_push_list = []
540 template_vdur = []
541 db_update = {"_admin.modified": time()}
542 if vdu_create:
543 for vdu_id, vdu_count in vdu_create.items():
544 vdur = next(
545 (
546 vdur
547 for vdur in reversed(db_vnfr["vdur"])
548 if vdur["vdu-id-ref"] == vdu_id
549 ),
550 None,
551 )
552 if not vdur:
553 # Read the template saved in the db:
554 self.logger.debug(
555 "No vdur in the database. Using the vdur-template to scale"
556 )
557 vdur_template = db_vnfr.get("vdur-template")
558 if not vdur_template:
559 raise LcmException(
560 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
561 vdu_id
562 )
563 )
564 vdur = vdur_template[0]
565 # Delete a template from the database after using it
566 self.db.set_one(
567 "vnfrs",
568 {"_id": db_vnfr["_id"]},
569 None,
570 pull={"vdur-template": {"_id": vdur["_id"]}},
571 )
572 for count in range(vdu_count):
573 vdur_copy = deepcopy(vdur)
574 vdur_copy["status"] = "BUILD"
575 vdur_copy["status-detailed"] = None
576 vdur_copy["ip-address"] = None
577 vdur_copy["_id"] = str(uuid4())
578 vdur_copy["count-index"] += count + 1
579 vdur_copy["id"] = "{}-{}".format(
580 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
581 )
582 vdur_copy.pop("vim_info", None)
583 for iface in vdur_copy["interfaces"]:
584 if iface.get("fixed-ip"):
585 iface["ip-address"] = self.increment_ip_mac(
586 iface["ip-address"], count + 1
587 )
588 else:
589 iface.pop("ip-address", None)
590 if iface.get("fixed-mac"):
591 iface["mac-address"] = self.increment_ip_mac(
592 iface["mac-address"], count + 1
593 )
594 else:
595 iface.pop("mac-address", None)
596 if db_vnfr["vdur"]:
597 iface.pop(
598 "mgmt_vnf", None
599 ) # only first vdu can be managment of vnf
600 db_vdu_push_list.append(vdur_copy)
601 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
602 if vdu_delete:
603 if len(db_vnfr["vdur"]) == 1:
604 # The scale will move to 0 instances
605 self.logger.debug(
606 "Scaling to 0 !, creating the template with the last vdur"
607 )
608 template_vdur = [db_vnfr["vdur"][0]]
609 for vdu_id, vdu_count in vdu_delete.items():
610 if mark_delete:
611 indexes_to_delete = [
612 iv[0]
613 for iv in enumerate(db_vnfr["vdur"])
614 if iv[1]["vdu-id-ref"] == vdu_id
615 ]
616 db_update.update(
617 {
618 "vdur.{}.status".format(i): "DELETING"
619 for i in indexes_to_delete[-vdu_count:]
620 }
621 )
622 else:
623 # it must be deleted one by one because common.db does not allow otherwise
624 vdus_to_delete = [
625 v
626 for v in reversed(db_vnfr["vdur"])
627 if v["vdu-id-ref"] == vdu_id
628 ]
629 for vdu in vdus_to_delete[:vdu_count]:
630 self.db.set_one(
631 "vnfrs",
632 {"_id": db_vnfr["_id"]},
633 None,
634 pull={"vdur": {"_id": vdu["_id"]}},
635 )
636 db_push = {}
637 if db_vdu_push_list:
638 db_push["vdur"] = db_vdu_push_list
639 if template_vdur:
640 db_push["vdur-template"] = template_vdur
641 if not db_push:
642 db_push = None
643 db_vnfr["vdur-template"] = template_vdur
644 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
645 # modify passed dictionary db_vnfr
646 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
647 db_vnfr["vdur"] = db_vnfr_["vdur"]
648
649 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
650 """
651 Updates database nsr with the RO info for the created vld
652 :param ns_update_nsr: dictionary to be filled with the updated info
653 :param db_nsr: content of db_nsr. This is also modified
654 :param nsr_desc_RO: nsr descriptor from RO
655 :return: Nothing, LcmException is raised on errors
656 """
657
658 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
659 for net_RO in get_iterable(nsr_desc_RO, "nets"):
660 if vld["id"] != net_RO.get("ns_net_osm_id"):
661 continue
662 vld["vim-id"] = net_RO.get("vim_net_id")
663 vld["name"] = net_RO.get("vim_name")
664 vld["status"] = net_RO.get("status")
665 vld["status-detailed"] = net_RO.get("error_msg")
666 ns_update_nsr["vld.{}".format(vld_index)] = vld
667 break
668 else:
669 raise LcmException(
670 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
671 )
672
673 def set_vnfr_at_error(self, db_vnfrs, error_text):
674 try:
675 for db_vnfr in db_vnfrs.values():
676 vnfr_update = {"status": "ERROR"}
677 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
678 if "status" not in vdur:
679 vdur["status"] = "ERROR"
680 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
681 if error_text:
682 vdur["status-detailed"] = str(error_text)
683 vnfr_update[
684 "vdur.{}.status-detailed".format(vdu_index)
685 ] = "ERROR"
686 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
687 except DbException as e:
688 self.logger.error("Cannot update vnf. {}".format(e))
689
690 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
691 """
692 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
693 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
694 :param nsr_desc_RO: nsr descriptor from RO
695 :return: Nothing, LcmException is raised on errors
696 """
697 for vnf_index, db_vnfr in db_vnfrs.items():
698 for vnf_RO in nsr_desc_RO["vnfs"]:
699 if vnf_RO["member_vnf_index"] != vnf_index:
700 continue
701 vnfr_update = {}
702 if vnf_RO.get("ip_address"):
703 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
704 "ip_address"
705 ].split(";")[0]
706 elif not db_vnfr.get("ip-address"):
707 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
708 raise LcmExceptionNoMgmtIP(
709 "ns member_vnf_index '{}' has no IP address".format(
710 vnf_index
711 )
712 )
713
714 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
715 vdur_RO_count_index = 0
716 if vdur.get("pdu-type"):
717 continue
718 for vdur_RO in get_iterable(vnf_RO, "vms"):
719 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
720 continue
721 if vdur["count-index"] != vdur_RO_count_index:
722 vdur_RO_count_index += 1
723 continue
724 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
725 if vdur_RO.get("ip_address"):
726 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
727 else:
728 vdur["ip-address"] = None
729 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
730 vdur["name"] = vdur_RO.get("vim_name")
731 vdur["status"] = vdur_RO.get("status")
732 vdur["status-detailed"] = vdur_RO.get("error_msg")
733 for ifacer in get_iterable(vdur, "interfaces"):
734 for interface_RO in get_iterable(vdur_RO, "interfaces"):
735 if ifacer["name"] == interface_RO.get("internal_name"):
736 ifacer["ip-address"] = interface_RO.get(
737 "ip_address"
738 )
739 ifacer["mac-address"] = interface_RO.get(
740 "mac_address"
741 )
742 break
743 else:
744 raise LcmException(
745 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
746 "from VIM info".format(
747 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
748 )
749 )
750 vnfr_update["vdur.{}".format(vdu_index)] = vdur
751 break
752 else:
753 raise LcmException(
754 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
755 "VIM info".format(
756 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
757 )
758 )
759
760 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
761 for net_RO in get_iterable(nsr_desc_RO, "nets"):
762 if vld["id"] != net_RO.get("vnf_net_osm_id"):
763 continue
764 vld["vim-id"] = net_RO.get("vim_net_id")
765 vld["name"] = net_RO.get("vim_name")
766 vld["status"] = net_RO.get("status")
767 vld["status-detailed"] = net_RO.get("error_msg")
768 vnfr_update["vld.{}".format(vld_index)] = vld
769 break
770 else:
771 raise LcmException(
772 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
773 vnf_index, vld["id"]
774 )
775 )
776
777 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
778 break
779
780 else:
781 raise LcmException(
782 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
783 vnf_index
784 )
785 )
786
787 def _get_ns_config_info(self, nsr_id):
788 """
789 Generates a mapping between vnf,vdu elements and the N2VC id
790 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
791 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
792 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
793 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
794 """
795 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
796 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
797 mapping = {}
798 ns_config_info = {"osm-config-mapping": mapping}
799 for vca in vca_deployed_list:
800 if not vca["member-vnf-index"]:
801 continue
802 if not vca["vdu_id"]:
803 mapping[vca["member-vnf-index"]] = vca["application"]
804 else:
805 mapping[
806 "{}.{}.{}".format(
807 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
808 )
809 ] = vca["application"]
810 return ns_config_info
811
812 async def _instantiate_ng_ro(
813 self,
814 logging_text,
815 nsr_id,
816 nsd,
817 db_nsr,
818 db_nslcmop,
819 db_vnfrs,
820 db_vnfds,
821 n2vc_key_list,
822 stage,
823 start_deploy,
824 timeout_ns_deploy,
825 ):
826
827 db_vims = {}
828
829 def get_vim_account(vim_account_id):
830 nonlocal db_vims
831 if vim_account_id in db_vims:
832 return db_vims[vim_account_id]
833 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
834 db_vims[vim_account_id] = db_vim
835 return db_vim
836
837 # modify target_vld info with instantiation parameters
838 def parse_vld_instantiation_params(
839 target_vim, target_vld, vld_params, target_sdn
840 ):
841 if vld_params.get("ip-profile"):
842 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
843 "ip-profile"
844 ]
845 if vld_params.get("provider-network"):
846 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
847 "provider-network"
848 ]
849 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
850 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
851 "provider-network"
852 ]["sdn-ports"]
853 if vld_params.get("wimAccountId"):
854 target_wim = "wim:{}".format(vld_params["wimAccountId"])
855 target_vld["vim_info"][target_wim] = {}
856 for param in ("vim-network-name", "vim-network-id"):
857 if vld_params.get(param):
858 if isinstance(vld_params[param], dict):
859 for vim, vim_net in vld_params[param].items():
860 other_target_vim = "vim:" + vim
861 populate_dict(
862 target_vld["vim_info"],
863 (other_target_vim, param.replace("-", "_")),
864 vim_net,
865 )
866 else: # isinstance str
867 target_vld["vim_info"][target_vim][
868 param.replace("-", "_")
869 ] = vld_params[param]
870 if vld_params.get("common_id"):
871 target_vld["common_id"] = vld_params.get("common_id")
872
873 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
874 def update_ns_vld_target(target, ns_params):
875 for vnf_params in ns_params.get("vnf", ()):
876 if vnf_params.get("vimAccountId"):
877 target_vnf = next(
878 (
879 vnfr
880 for vnfr in db_vnfrs.values()
881 if vnf_params["member-vnf-index"]
882 == vnfr["member-vnf-index-ref"]
883 ),
884 None,
885 )
886 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
887 for a_index, a_vld in enumerate(target["ns"]["vld"]):
888 target_vld = find_in_list(
889 get_iterable(vdur, "interfaces"),
890 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
891 )
892 if target_vld:
893 if vnf_params.get("vimAccountId") not in a_vld.get(
894 "vim_info", {}
895 ):
896 target["ns"]["vld"][a_index].get("vim_info").update(
897 {
898 "vim:{}".format(vnf_params["vimAccountId"]): {
899 "vim_network_name": ""
900 }
901 }
902 )
903
904 nslcmop_id = db_nslcmop["_id"]
905 target = {
906 "name": db_nsr["name"],
907 "ns": {"vld": []},
908 "vnf": [],
909 "image": deepcopy(db_nsr["image"]),
910 "flavor": deepcopy(db_nsr["flavor"]),
911 "action_id": nslcmop_id,
912 "cloud_init_content": {},
913 }
914 for image in target["image"]:
915 image["vim_info"] = {}
916 for flavor in target["flavor"]:
917 flavor["vim_info"] = {}
918 if db_nsr.get("affinity-or-anti-affinity-group"):
919 target["affinity-or-anti-affinity-group"] = deepcopy(
920 db_nsr["affinity-or-anti-affinity-group"]
921 )
922 for affinity_or_anti_affinity_group in target[
923 "affinity-or-anti-affinity-group"
924 ]:
925 affinity_or_anti_affinity_group["vim_info"] = {}
926
927 if db_nslcmop.get("lcmOperationType") != "instantiate":
928 # get parameters of instantiation:
929 db_nslcmop_instantiate = self.db.get_list(
930 "nslcmops",
931 {
932 "nsInstanceId": db_nslcmop["nsInstanceId"],
933 "lcmOperationType": "instantiate",
934 },
935 )[-1]
936 ns_params = db_nslcmop_instantiate.get("operationParams")
937 else:
938 ns_params = db_nslcmop.get("operationParams")
939 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
940 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
941
942 cp2target = {}
943 for vld_index, vld in enumerate(db_nsr.get("vld")):
944 target_vim = "vim:{}".format(ns_params["vimAccountId"])
945 target_vld = {
946 "id": vld["id"],
947 "name": vld["name"],
948 "mgmt-network": vld.get("mgmt-network", False),
949 "type": vld.get("type"),
950 "vim_info": {
951 target_vim: {
952 "vim_network_name": vld.get("vim-network-name"),
953 "vim_account_id": ns_params["vimAccountId"],
954 }
955 },
956 }
957 # check if this network needs SDN assist
958 if vld.get("pci-interfaces"):
959 db_vim = get_vim_account(ns_params["vimAccountId"])
960 sdnc_id = db_vim["config"].get("sdn-controller")
961 if sdnc_id:
962 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
963 target_sdn = "sdn:{}".format(sdnc_id)
964 target_vld["vim_info"][target_sdn] = {
965 "sdn": True,
966 "target_vim": target_vim,
967 "vlds": [sdn_vld],
968 "type": vld.get("type"),
969 }
970
971 nsd_vnf_profiles = get_vnf_profiles(nsd)
972 for nsd_vnf_profile in nsd_vnf_profiles:
973 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
974 if cp["virtual-link-profile-id"] == vld["id"]:
975 cp2target[
976 "member_vnf:{}.{}".format(
977 cp["constituent-cpd-id"][0][
978 "constituent-base-element-id"
979 ],
980 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
981 )
982 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
983
984 # check at nsd descriptor, if there is an ip-profile
985 vld_params = {}
986 nsd_vlp = find_in_list(
987 get_virtual_link_profiles(nsd),
988 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
989 == vld["id"],
990 )
991 if (
992 nsd_vlp
993 and nsd_vlp.get("virtual-link-protocol-data")
994 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
995 ):
996 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
997 "l3-protocol-data"
998 ]
999 ip_profile_dest_data = {}
1000 if "ip-version" in ip_profile_source_data:
1001 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1002 "ip-version"
1003 ]
1004 if "cidr" in ip_profile_source_data:
1005 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1006 "cidr"
1007 ]
1008 if "gateway-ip" in ip_profile_source_data:
1009 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
1010 "gateway-ip"
1011 ]
1012 if "dhcp-enabled" in ip_profile_source_data:
1013 ip_profile_dest_data["dhcp-params"] = {
1014 "enabled": ip_profile_source_data["dhcp-enabled"]
1015 }
1016 vld_params["ip-profile"] = ip_profile_dest_data
1017
1018 # update vld_params with instantiation params
1019 vld_instantiation_params = find_in_list(
1020 get_iterable(ns_params, "vld"),
1021 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
1022 )
1023 if vld_instantiation_params:
1024 vld_params.update(vld_instantiation_params)
1025 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
1026 target["ns"]["vld"].append(target_vld)
1027 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1028 update_ns_vld_target(target, ns_params)
1029
1030 for vnfr in db_vnfrs.values():
1031 vnfd = find_in_list(
1032 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1033 )
1034 vnf_params = find_in_list(
1035 get_iterable(ns_params, "vnf"),
1036 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1037 )
1038 target_vnf = deepcopy(vnfr)
1039 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1040 for vld in target_vnf.get("vld", ()):
1041 # check if connected to a ns.vld, to fill target'
1042 vnf_cp = find_in_list(
1043 vnfd.get("int-virtual-link-desc", ()),
1044 lambda cpd: cpd.get("id") == vld["id"],
1045 )
1046 if vnf_cp:
1047 ns_cp = "member_vnf:{}.{}".format(
1048 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1049 )
1050 if cp2target.get(ns_cp):
1051 vld["target"] = cp2target[ns_cp]
1052
1053 vld["vim_info"] = {
1054 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1055 }
1056 # check if this network needs SDN assist
1057 target_sdn = None
1058 if vld.get("pci-interfaces"):
1059 db_vim = get_vim_account(vnfr["vim-account-id"])
1060 sdnc_id = db_vim["config"].get("sdn-controller")
1061 if sdnc_id:
1062 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1063 target_sdn = "sdn:{}".format(sdnc_id)
1064 vld["vim_info"][target_sdn] = {
1065 "sdn": True,
1066 "target_vim": target_vim,
1067 "vlds": [sdn_vld],
1068 "type": vld.get("type"),
1069 }
1070
1071 # check at vnfd descriptor, if there is an ip-profile
1072 vld_params = {}
1073 vnfd_vlp = find_in_list(
1074 get_virtual_link_profiles(vnfd),
1075 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1076 )
1077 if (
1078 vnfd_vlp
1079 and vnfd_vlp.get("virtual-link-protocol-data")
1080 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1081 ):
1082 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1083 "l3-protocol-data"
1084 ]
1085 ip_profile_dest_data = {}
1086 if "ip-version" in ip_profile_source_data:
1087 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1088 "ip-version"
1089 ]
1090 if "cidr" in ip_profile_source_data:
1091 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1092 "cidr"
1093 ]
1094 if "gateway-ip" in ip_profile_source_data:
1095 ip_profile_dest_data[
1096 "gateway-address"
1097 ] = ip_profile_source_data["gateway-ip"]
1098 if "dhcp-enabled" in ip_profile_source_data:
1099 ip_profile_dest_data["dhcp-params"] = {
1100 "enabled": ip_profile_source_data["dhcp-enabled"]
1101 }
1102
1103 vld_params["ip-profile"] = ip_profile_dest_data
1104 # update vld_params with instantiation params
1105 if vnf_params:
1106 vld_instantiation_params = find_in_list(
1107 get_iterable(vnf_params, "internal-vld"),
1108 lambda i_vld: i_vld["name"] == vld["id"],
1109 )
1110 if vld_instantiation_params:
1111 vld_params.update(vld_instantiation_params)
1112 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1113
1114 vdur_list = []
1115 for vdur in target_vnf.get("vdur", ()):
1116 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1117 continue # This vdu must not be created
1118 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1119
1120 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1121
1122 if ssh_keys_all:
1123 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1124 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1125 if (
1126 vdu_configuration
1127 and vdu_configuration.get("config-access")
1128 and vdu_configuration.get("config-access").get("ssh-access")
1129 ):
1130 vdur["ssh-keys"] = ssh_keys_all
1131 vdur["ssh-access-required"] = vdu_configuration[
1132 "config-access"
1133 ]["ssh-access"]["required"]
1134 elif (
1135 vnf_configuration
1136 and vnf_configuration.get("config-access")
1137 and vnf_configuration.get("config-access").get("ssh-access")
1138 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1139 ):
1140 vdur["ssh-keys"] = ssh_keys_all
1141 vdur["ssh-access-required"] = vnf_configuration[
1142 "config-access"
1143 ]["ssh-access"]["required"]
1144 elif ssh_keys_instantiation and find_in_list(
1145 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1146 ):
1147 vdur["ssh-keys"] = ssh_keys_instantiation
1148
1149 self.logger.debug("NS > vdur > {}".format(vdur))
1150
1151 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1152 # cloud-init
1153 if vdud.get("cloud-init-file"):
1154 vdur["cloud-init"] = "{}:file:{}".format(
1155 vnfd["_id"], vdud.get("cloud-init-file")
1156 )
1157 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1158 if vdur["cloud-init"] not in target["cloud_init_content"]:
1159 base_folder = vnfd["_admin"]["storage"]
1160 if base_folder["pkg-dir"]:
1161 cloud_init_file = "{}/{}/cloud_init/{}".format(
1162 base_folder["folder"],
1163 base_folder["pkg-dir"],
1164 vdud.get("cloud-init-file"),
1165 )
1166 else:
1167 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1168 base_folder["folder"],
1169 vdud.get("cloud-init-file"),
1170 )
1171 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1172 target["cloud_init_content"][
1173 vdur["cloud-init"]
1174 ] = ci_file.read()
1175 elif vdud.get("cloud-init"):
1176 vdur["cloud-init"] = "{}:vdu:{}".format(
1177 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1178 )
1179 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1180 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1181 "cloud-init"
1182 ]
1183 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1184 deploy_params_vdu = self._format_additional_params(
1185 vdur.get("additionalParams") or {}
1186 )
1187 deploy_params_vdu["OSM"] = get_osm_params(
1188 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1189 )
1190 vdur["additionalParams"] = deploy_params_vdu
1191
1192 # flavor
1193 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1194 if target_vim not in ns_flavor["vim_info"]:
1195 ns_flavor["vim_info"][target_vim] = {}
1196
1197 # deal with images
1198 # in case alternative images are provided we must check if they should be applied
1199 # for the vim_type, modify the vim_type taking into account
1200 ns_image_id = int(vdur["ns-image-id"])
1201 if vdur.get("alt-image-ids"):
1202 db_vim = get_vim_account(vnfr["vim-account-id"])
1203 vim_type = db_vim["vim_type"]
1204 for alt_image_id in vdur.get("alt-image-ids"):
1205 ns_alt_image = target["image"][int(alt_image_id)]
1206 if vim_type == ns_alt_image.get("vim-type"):
1207 # must use alternative image
1208 self.logger.debug(
1209 "use alternative image id: {}".format(alt_image_id)
1210 )
1211 ns_image_id = alt_image_id
1212 vdur["ns-image-id"] = ns_image_id
1213 break
1214 ns_image = target["image"][int(ns_image_id)]
1215 if target_vim not in ns_image["vim_info"]:
1216 ns_image["vim_info"][target_vim] = {}
1217
1218 # Affinity groups
1219 if vdur.get("affinity-or-anti-affinity-group-id"):
1220 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1221 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1222 if target_vim not in ns_ags["vim_info"]:
1223 ns_ags["vim_info"][target_vim] = {}
1224
1225 vdur["vim_info"] = {target_vim: {}}
1226 # instantiation parameters
1227 if vnf_params:
1228 vdu_instantiation_params = find_in_list(
1229 get_iterable(vnf_params, "vdu"),
1230 lambda i_vdu: i_vdu["id"] == vdud["id"],
1231 )
1232 if vdu_instantiation_params:
1233 # Parse the vdu_volumes from the instantiation params
1234 vdu_volumes = get_volumes_from_instantiation_params(
1235 vdu_instantiation_params, vdud
1236 )
1237 vdur["additionalParams"]["OSM"]["vdu_volumes"] = vdu_volumes
1238 vdur_list.append(vdur)
1239 target_vnf["vdur"] = vdur_list
1240 target["vnf"].append(target_vnf)
1241
1242 desc = await self.RO.deploy(nsr_id, target)
1243 self.logger.debug("RO return > {}".format(desc))
1244 action_id = desc["action_id"]
1245 await self._wait_ng_ro(
1246 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1247 )
1248
1249 # Updating NSR
1250 db_nsr_update = {
1251 "_admin.deployed.RO.operational-status": "running",
1252 "detailed-status": " ".join(stage),
1253 }
1254 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1255 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1256 self._write_op_status(nslcmop_id, stage)
1257 self.logger.debug(
1258 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1259 )
1260 return
1261
1262 async def _wait_ng_ro(
1263 self,
1264 nsr_id,
1265 action_id,
1266 nslcmop_id=None,
1267 start_time=None,
1268 timeout=600,
1269 stage=None,
1270 ):
1271 detailed_status_old = None
1272 db_nsr_update = {}
1273 start_time = start_time or time()
1274 while time() <= start_time + timeout:
1275 desc_status = await self.RO.status(nsr_id, action_id)
1276 self.logger.debug("Wait NG RO > {}".format(desc_status))
1277 if desc_status["status"] == "FAILED":
1278 raise NgRoException(desc_status["details"])
1279 elif desc_status["status"] == "BUILD":
1280 if stage:
1281 stage[2] = "VIM: ({})".format(desc_status["details"])
1282 elif desc_status["status"] == "DONE":
1283 if stage:
1284 stage[2] = "Deployed at VIM"
1285 break
1286 else:
1287 assert False, "ROclient.check_ns_status returns unknown {}".format(
1288 desc_status["status"]
1289 )
1290 if stage and nslcmop_id and stage[2] != detailed_status_old:
1291 detailed_status_old = stage[2]
1292 db_nsr_update["detailed-status"] = " ".join(stage)
1293 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1294 self._write_op_status(nslcmop_id, stage)
1295 await asyncio.sleep(15, loop=self.loop)
1296 else: # timeout_ns_deploy
1297 raise NgRoException("Timeout waiting ns to deploy")
1298
1299 async def _terminate_ng_ro(
1300 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1301 ):
1302 db_nsr_update = {}
1303 failed_detail = []
1304 action_id = None
1305 start_deploy = time()
1306 try:
1307 target = {
1308 "ns": {"vld": []},
1309 "vnf": [],
1310 "image": [],
1311 "flavor": [],
1312 "action_id": nslcmop_id,
1313 }
1314 desc = await self.RO.deploy(nsr_id, target)
1315 action_id = desc["action_id"]
1316 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1317 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1318 self.logger.debug(
1319 logging_text
1320 + "ns terminate action at RO. action_id={}".format(action_id)
1321 )
1322
1323 # wait until done
1324 delete_timeout = 20 * 60 # 20 minutes
1325 await self._wait_ng_ro(
1326 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1327 )
1328
1329 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1330 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1331 # delete all nsr
1332 await self.RO.delete(nsr_id)
1333 except Exception as e:
1334 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1335 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1336 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1337 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1338 self.logger.debug(
1339 logging_text + "RO_action_id={} already deleted".format(action_id)
1340 )
1341 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1342 failed_detail.append("delete conflict: {}".format(e))
1343 self.logger.debug(
1344 logging_text
1345 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1346 )
1347 else:
1348 failed_detail.append("delete error: {}".format(e))
1349 self.logger.error(
1350 logging_text
1351 + "RO_action_id={} delete error: {}".format(action_id, e)
1352 )
1353
1354 if failed_detail:
1355 stage[2] = "Error deleting from VIM"
1356 else:
1357 stage[2] = "Deleted from VIM"
1358 db_nsr_update["detailed-status"] = " ".join(stage)
1359 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1360 self._write_op_status(nslcmop_id, stage)
1361
1362 if failed_detail:
1363 raise LcmException("; ".join(failed_detail))
1364 return
1365
1366 async def instantiate_RO(
1367 self,
1368 logging_text,
1369 nsr_id,
1370 nsd,
1371 db_nsr,
1372 db_nslcmop,
1373 db_vnfrs,
1374 db_vnfds,
1375 n2vc_key_list,
1376 stage,
1377 ):
1378 """
1379 Instantiate at RO
1380 :param logging_text: preffix text to use at logging
1381 :param nsr_id: nsr identity
1382 :param nsd: database content of ns descriptor
1383 :param db_nsr: database content of ns record
1384 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1385 :param db_vnfrs:
1386 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1387 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1388 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1389 :return: None or exception
1390 """
1391 try:
1392 start_deploy = time()
1393 ns_params = db_nslcmop.get("operationParams")
1394 if ns_params and ns_params.get("timeout_ns_deploy"):
1395 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1396 else:
1397 timeout_ns_deploy = self.timeout.get(
1398 "ns_deploy", self.timeout_ns_deploy
1399 )
1400
1401 # Check for and optionally request placement optimization. Database will be updated if placement activated
1402 stage[2] = "Waiting for Placement."
1403 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1404 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1405 for vnfr in db_vnfrs.values():
1406 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1407 break
1408 else:
1409 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1410
1411 return await self._instantiate_ng_ro(
1412 logging_text,
1413 nsr_id,
1414 nsd,
1415 db_nsr,
1416 db_nslcmop,
1417 db_vnfrs,
1418 db_vnfds,
1419 n2vc_key_list,
1420 stage,
1421 start_deploy,
1422 timeout_ns_deploy,
1423 )
1424 except Exception as e:
1425 stage[2] = "ERROR deploying at VIM"
1426 self.set_vnfr_at_error(db_vnfrs, str(e))
1427 self.logger.error(
1428 "Error deploying at VIM {}".format(e),
1429 exc_info=not isinstance(
1430 e,
1431 (
1432 ROclient.ROClientException,
1433 LcmException,
1434 DbException,
1435 NgRoException,
1436 ),
1437 ),
1438 )
1439 raise
1440
1441 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1442 """
1443 Wait for kdu to be up, get ip address
1444 :param logging_text: prefix use for logging
1445 :param nsr_id:
1446 :param vnfr_id:
1447 :param kdu_name:
1448 :return: IP address, K8s services
1449 """
1450
1451 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1452 nb_tries = 0
1453
1454 while nb_tries < 360:
1455 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1456 kdur = next(
1457 (
1458 x
1459 for x in get_iterable(db_vnfr, "kdur")
1460 if x.get("kdu-name") == kdu_name
1461 ),
1462 None,
1463 )
1464 if not kdur:
1465 raise LcmException(
1466 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1467 )
1468 if kdur.get("status"):
1469 if kdur["status"] in ("READY", "ENABLED"):
1470 return kdur.get("ip-address"), kdur.get("services")
1471 else:
1472 raise LcmException(
1473 "target KDU={} is in error state".format(kdu_name)
1474 )
1475
1476 await asyncio.sleep(10, loop=self.loop)
1477 nb_tries += 1
1478 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1479
1480 async def wait_vm_up_insert_key_ro(
1481 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1482 ):
1483 """
1484 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1485 :param logging_text: prefix use for logging
1486 :param nsr_id:
1487 :param vnfr_id:
1488 :param vdu_id:
1489 :param vdu_index:
1490 :param pub_key: public ssh key to inject, None to skip
1491 :param user: user to apply the public ssh key
1492 :return: IP address
1493 """
1494
1495 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1496 ro_nsr_id = None
1497 ip_address = None
1498 nb_tries = 0
1499 target_vdu_id = None
1500 ro_retries = 0
1501
1502 while True:
1503
1504 ro_retries += 1
1505 if ro_retries >= 360: # 1 hour
1506 raise LcmException(
1507 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1508 )
1509
1510 await asyncio.sleep(10, loop=self.loop)
1511
1512 # get ip address
1513 if not target_vdu_id:
1514 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1515
1516 if not vdu_id: # for the VNF case
1517 if db_vnfr.get("status") == "ERROR":
1518 raise LcmException(
1519 "Cannot inject ssh-key because target VNF is in error state"
1520 )
1521 ip_address = db_vnfr.get("ip-address")
1522 if not ip_address:
1523 continue
1524 vdur = next(
1525 (
1526 x
1527 for x in get_iterable(db_vnfr, "vdur")
1528 if x.get("ip-address") == ip_address
1529 ),
1530 None,
1531 )
1532 else: # VDU case
1533 vdur = next(
1534 (
1535 x
1536 for x in get_iterable(db_vnfr, "vdur")
1537 if x.get("vdu-id-ref") == vdu_id
1538 and x.get("count-index") == vdu_index
1539 ),
1540 None,
1541 )
1542
1543 if (
1544 not vdur and len(db_vnfr.get("vdur", ())) == 1
1545 ): # If only one, this should be the target vdu
1546 vdur = db_vnfr["vdur"][0]
1547 if not vdur:
1548 raise LcmException(
1549 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1550 vnfr_id, vdu_id, vdu_index
1551 )
1552 )
1553 # New generation RO stores information at "vim_info"
1554 ng_ro_status = None
1555 target_vim = None
1556 if vdur.get("vim_info"):
1557 target_vim = next(
1558 t for t in vdur["vim_info"]
1559 ) # there should be only one key
1560 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1561 if (
1562 vdur.get("pdu-type")
1563 or vdur.get("status") == "ACTIVE"
1564 or ng_ro_status == "ACTIVE"
1565 ):
1566 ip_address = vdur.get("ip-address")
1567 if not ip_address:
1568 continue
1569 target_vdu_id = vdur["vdu-id-ref"]
1570 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1571 raise LcmException(
1572 "Cannot inject ssh-key because target VM is in error state"
1573 )
1574
1575 if not target_vdu_id:
1576 continue
1577
1578 # inject public key into machine
1579 if pub_key and user:
1580 self.logger.debug(logging_text + "Inserting RO key")
1581 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1582 if vdur.get("pdu-type"):
1583 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1584 return ip_address
1585 try:
1586 ro_vm_id = "{}-{}".format(
1587 db_vnfr["member-vnf-index-ref"], target_vdu_id
1588 ) # TODO add vdu_index
1589 if self.ng_ro:
1590 target = {
1591 "action": {
1592 "action": "inject_ssh_key",
1593 "key": pub_key,
1594 "user": user,
1595 },
1596 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1597 }
1598 desc = await self.RO.deploy(nsr_id, target)
1599 action_id = desc["action_id"]
1600 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1601 break
1602 else:
1603 # wait until NS is deployed at RO
1604 if not ro_nsr_id:
1605 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1606 ro_nsr_id = deep_get(
1607 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1608 )
1609 if not ro_nsr_id:
1610 continue
1611 result_dict = await self.RO.create_action(
1612 item="ns",
1613 item_id_name=ro_nsr_id,
1614 descriptor={
1615 "add_public_key": pub_key,
1616 "vms": [ro_vm_id],
1617 "user": user,
1618 },
1619 )
1620 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1621 if not result_dict or not isinstance(result_dict, dict):
1622 raise LcmException(
1623 "Unknown response from RO when injecting key"
1624 )
1625 for result in result_dict.values():
1626 if result.get("vim_result") == 200:
1627 break
1628 else:
1629 raise ROclient.ROClientException(
1630 "error injecting key: {}".format(
1631 result.get("description")
1632 )
1633 )
1634 break
1635 except NgRoException as e:
1636 raise LcmException(
1637 "Reaching max tries injecting key. Error: {}".format(e)
1638 )
1639 except ROclient.ROClientException as e:
1640 if not nb_tries:
1641 self.logger.debug(
1642 logging_text
1643 + "error injecting key: {}. Retrying until {} seconds".format(
1644 e, 20 * 10
1645 )
1646 )
1647 nb_tries += 1
1648 if nb_tries >= 20:
1649 raise LcmException(
1650 "Reaching max tries injecting key. Error: {}".format(e)
1651 )
1652 else:
1653 break
1654
1655 return ip_address
1656
1657 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1658 """
1659 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1660 """
1661 my_vca = vca_deployed_list[vca_index]
1662 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1663 # vdu or kdu: no dependencies
1664 return
1665 timeout = 300
1666 while timeout >= 0:
1667 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1668 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1669 configuration_status_list = db_nsr["configurationStatus"]
1670 for index, vca_deployed in enumerate(configuration_status_list):
1671 if index == vca_index:
1672 # myself
1673 continue
1674 if not my_vca.get("member-vnf-index") or (
1675 vca_deployed.get("member-vnf-index")
1676 == my_vca.get("member-vnf-index")
1677 ):
1678 internal_status = configuration_status_list[index].get("status")
1679 if internal_status == "READY":
1680 continue
1681 elif internal_status == "BROKEN":
1682 raise LcmException(
1683 "Configuration aborted because dependent charm/s has failed"
1684 )
1685 else:
1686 break
1687 else:
1688 # no dependencies, return
1689 return
1690 await asyncio.sleep(10)
1691 timeout -= 1
1692
1693 raise LcmException("Configuration aborted because dependent charm/s timeout")
1694
1695 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1696 vca_id = None
1697 if db_vnfr:
1698 vca_id = deep_get(db_vnfr, ("vca-id",))
1699 elif db_nsr:
1700 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1701 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1702 return vca_id
1703
1704 async def instantiate_N2VC(
1705 self,
1706 logging_text,
1707 vca_index,
1708 nsi_id,
1709 db_nsr,
1710 db_vnfr,
1711 vdu_id,
1712 kdu_name,
1713 vdu_index,
1714 config_descriptor,
1715 deploy_params,
1716 base_folder,
1717 nslcmop_id,
1718 stage,
1719 vca_type,
1720 vca_name,
1721 ee_config_descriptor,
1722 ):
1723 nsr_id = db_nsr["_id"]
1724 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1725 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1726 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1727 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1728 db_dict = {
1729 "collection": "nsrs",
1730 "filter": {"_id": nsr_id},
1731 "path": db_update_entry,
1732 }
1733 step = ""
1734 try:
1735
1736 element_type = "NS"
1737 element_under_configuration = nsr_id
1738
1739 vnfr_id = None
1740 if db_vnfr:
1741 vnfr_id = db_vnfr["_id"]
1742 osm_config["osm"]["vnf_id"] = vnfr_id
1743
1744 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1745
1746 if vca_type == "native_charm":
1747 index_number = 0
1748 else:
1749 index_number = vdu_index or 0
1750
1751 if vnfr_id:
1752 element_type = "VNF"
1753 element_under_configuration = vnfr_id
1754 namespace += ".{}-{}".format(vnfr_id, index_number)
1755 if vdu_id:
1756 namespace += ".{}-{}".format(vdu_id, index_number)
1757 element_type = "VDU"
1758 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1759 osm_config["osm"]["vdu_id"] = vdu_id
1760 elif kdu_name:
1761 namespace += ".{}".format(kdu_name)
1762 element_type = "KDU"
1763 element_under_configuration = kdu_name
1764 osm_config["osm"]["kdu_name"] = kdu_name
1765
1766 # Get artifact path
1767 if base_folder["pkg-dir"]:
1768 artifact_path = "{}/{}/{}/{}".format(
1769 base_folder["folder"],
1770 base_folder["pkg-dir"],
1771 "charms"
1772 if vca_type
1773 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1774 else "helm-charts",
1775 vca_name,
1776 )
1777 else:
1778 artifact_path = "{}/Scripts/{}/{}/".format(
1779 base_folder["folder"],
1780 "charms"
1781 if vca_type
1782 in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1783 else "helm-charts",
1784 vca_name,
1785 )
1786
1787 self.logger.debug("Artifact path > {}".format(artifact_path))
1788
1789 # get initial_config_primitive_list that applies to this element
1790 initial_config_primitive_list = config_descriptor.get(
1791 "initial-config-primitive"
1792 )
1793
1794 self.logger.debug(
1795 "Initial config primitive list > {}".format(
1796 initial_config_primitive_list
1797 )
1798 )
1799
1800 # add config if not present for NS charm
1801 ee_descriptor_id = ee_config_descriptor.get("id")
1802 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1803 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1804 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1805 )
1806
1807 self.logger.debug(
1808 "Initial config primitive list #2 > {}".format(
1809 initial_config_primitive_list
1810 )
1811 )
1812 # n2vc_redesign STEP 3.1
1813 # find old ee_id if exists
1814 ee_id = vca_deployed.get("ee_id")
1815
1816 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1817 # create or register execution environment in VCA
1818 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1819
1820 self._write_configuration_status(
1821 nsr_id=nsr_id,
1822 vca_index=vca_index,
1823 status="CREATING",
1824 element_under_configuration=element_under_configuration,
1825 element_type=element_type,
1826 )
1827
1828 step = "create execution environment"
1829 self.logger.debug(logging_text + step)
1830
1831 ee_id = None
1832 credentials = None
1833 if vca_type == "k8s_proxy_charm":
1834 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1835 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1836 namespace=namespace,
1837 artifact_path=artifact_path,
1838 db_dict=db_dict,
1839 vca_id=vca_id,
1840 )
1841 elif vca_type == "helm" or vca_type == "helm-v3":
1842 ee_id, credentials = await self.vca_map[
1843 vca_type
1844 ].create_execution_environment(
1845 namespace=namespace,
1846 reuse_ee_id=ee_id,
1847 db_dict=db_dict,
1848 config=osm_config,
1849 artifact_path=artifact_path,
1850 vca_type=vca_type,
1851 )
1852 else:
1853 ee_id, credentials = await self.vca_map[
1854 vca_type
1855 ].create_execution_environment(
1856 namespace=namespace,
1857 reuse_ee_id=ee_id,
1858 db_dict=db_dict,
1859 vca_id=vca_id,
1860 )
1861
1862 elif vca_type == "native_charm":
1863 step = "Waiting to VM being up and getting IP address"
1864 self.logger.debug(logging_text + step)
1865 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1866 logging_text,
1867 nsr_id,
1868 vnfr_id,
1869 vdu_id,
1870 vdu_index,
1871 user=None,
1872 pub_key=None,
1873 )
1874 credentials = {"hostname": rw_mgmt_ip}
1875 # get username
1876 username = deep_get(
1877 config_descriptor, ("config-access", "ssh-access", "default-user")
1878 )
1879 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1880 # merged. Meanwhile let's get username from initial-config-primitive
1881 if not username and initial_config_primitive_list:
1882 for config_primitive in initial_config_primitive_list:
1883 for param in config_primitive.get("parameter", ()):
1884 if param["name"] == "ssh-username":
1885 username = param["value"]
1886 break
1887 if not username:
1888 raise LcmException(
1889 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1890 "'config-access.ssh-access.default-user'"
1891 )
1892 credentials["username"] = username
1893 # n2vc_redesign STEP 3.2
1894
1895 self._write_configuration_status(
1896 nsr_id=nsr_id,
1897 vca_index=vca_index,
1898 status="REGISTERING",
1899 element_under_configuration=element_under_configuration,
1900 element_type=element_type,
1901 )
1902
1903 step = "register execution environment {}".format(credentials)
1904 self.logger.debug(logging_text + step)
1905 ee_id = await self.vca_map[vca_type].register_execution_environment(
1906 credentials=credentials,
1907 namespace=namespace,
1908 db_dict=db_dict,
1909 vca_id=vca_id,
1910 )
1911
1912 # for compatibility with MON/POL modules, the need model and application name at database
1913 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1914 ee_id_parts = ee_id.split(".")
1915 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1916 if len(ee_id_parts) >= 2:
1917 model_name = ee_id_parts[0]
1918 application_name = ee_id_parts[1]
1919 db_nsr_update[db_update_entry + "model"] = model_name
1920 db_nsr_update[db_update_entry + "application"] = application_name
1921
1922 # n2vc_redesign STEP 3.3
1923 step = "Install configuration Software"
1924
1925 self._write_configuration_status(
1926 nsr_id=nsr_id,
1927 vca_index=vca_index,
1928 status="INSTALLING SW",
1929 element_under_configuration=element_under_configuration,
1930 element_type=element_type,
1931 other_update=db_nsr_update,
1932 )
1933
1934 # TODO check if already done
1935 self.logger.debug(logging_text + step)
1936 config = None
1937 if vca_type == "native_charm":
1938 config_primitive = next(
1939 (p for p in initial_config_primitive_list if p["name"] == "config"),
1940 None,
1941 )
1942 if config_primitive:
1943 config = self._map_primitive_params(
1944 config_primitive, {}, deploy_params
1945 )
1946 num_units = 1
1947 if vca_type == "lxc_proxy_charm":
1948 if element_type == "NS":
1949 num_units = db_nsr.get("config-units") or 1
1950 elif element_type == "VNF":
1951 num_units = db_vnfr.get("config-units") or 1
1952 elif element_type == "VDU":
1953 for v in db_vnfr["vdur"]:
1954 if vdu_id == v["vdu-id-ref"]:
1955 num_units = v.get("config-units") or 1
1956 break
1957 if vca_type != "k8s_proxy_charm":
1958 await self.vca_map[vca_type].install_configuration_sw(
1959 ee_id=ee_id,
1960 artifact_path=artifact_path,
1961 db_dict=db_dict,
1962 config=config,
1963 num_units=num_units,
1964 vca_id=vca_id,
1965 vca_type=vca_type,
1966 )
1967
1968 # write in db flag of configuration_sw already installed
1969 self.update_db_2(
1970 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1971 )
1972
1973 # add relations for this VCA (wait for other peers related with this VCA)
1974 await self._add_vca_relations(
1975 logging_text=logging_text,
1976 nsr_id=nsr_id,
1977 vca_type=vca_type,
1978 vca_index=vca_index,
1979 )
1980
1981 # if SSH access is required, then get execution environment SSH public
1982 # if native charm we have waited already to VM be UP
1983 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1984 pub_key = None
1985 user = None
1986 # self.logger.debug("get ssh key block")
1987 if deep_get(
1988 config_descriptor, ("config-access", "ssh-access", "required")
1989 ):
1990 # self.logger.debug("ssh key needed")
1991 # Needed to inject a ssh key
1992 user = deep_get(
1993 config_descriptor,
1994 ("config-access", "ssh-access", "default-user"),
1995 )
1996 step = "Install configuration Software, getting public ssh key"
1997 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1998 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1999 )
2000
2001 step = "Insert public key into VM user={} ssh_key={}".format(
2002 user, pub_key
2003 )
2004 else:
2005 # self.logger.debug("no need to get ssh key")
2006 step = "Waiting to VM being up and getting IP address"
2007 self.logger.debug(logging_text + step)
2008
2009 # default rw_mgmt_ip to None, avoiding the non definition of the variable
2010 rw_mgmt_ip = None
2011
2012 # n2vc_redesign STEP 5.1
2013 # wait for RO (ip-address) Insert pub_key into VM
2014 if vnfr_id:
2015 if kdu_name:
2016 rw_mgmt_ip, services = await self.wait_kdu_up(
2017 logging_text, nsr_id, vnfr_id, kdu_name
2018 )
2019 vnfd = self.db.get_one(
2020 "vnfds_revisions",
2021 {"_id": f'{db_vnfr["vnfd-id"]}:{db_vnfr["revision"]}'},
2022 )
2023 kdu = get_kdu(vnfd, kdu_name)
2024 kdu_services = [
2025 service["name"] for service in get_kdu_services(kdu)
2026 ]
2027 exposed_services = []
2028 for service in services:
2029 if any(s in service["name"] for s in kdu_services):
2030 exposed_services.append(service)
2031 await self.vca_map[vca_type].exec_primitive(
2032 ee_id=ee_id,
2033 primitive_name="config",
2034 params_dict={
2035 "osm-config": json.dumps(
2036 OsmConfigBuilder(
2037 k8s={"services": exposed_services}
2038 ).build()
2039 )
2040 },
2041 vca_id=vca_id,
2042 )
2043
2044 # This verification is needed in order to avoid trying to add a public key
2045 # to a VM, when the VNF is a KNF (in the edge case where the user creates a VCA
2046 # for a KNF and not for its KDUs, the previous verification gives False, and the code
2047 # jumps to this block, meaning that there is the need to verify if the VNF is actually a VNF
2048 # or it is a KNF)
2049 elif db_vnfr.get('vdur'):
2050 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
2051 logging_text,
2052 nsr_id,
2053 vnfr_id,
2054 vdu_id,
2055 vdu_index,
2056 user=user,
2057 pub_key=pub_key,
2058 )
2059
2060 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
2061
2062 # store rw_mgmt_ip in deploy params for later replacement
2063 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
2064
2065 # n2vc_redesign STEP 6 Execute initial config primitive
2066 step = "execute initial config primitive"
2067
2068 # wait for dependent primitives execution (NS -> VNF -> VDU)
2069 if initial_config_primitive_list:
2070 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
2071
2072 # stage, in function of element type: vdu, kdu, vnf or ns
2073 my_vca = vca_deployed_list[vca_index]
2074 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
2075 # VDU or KDU
2076 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2077 elif my_vca.get("member-vnf-index"):
2078 # VNF
2079 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2080 else:
2081 # NS
2082 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2083
2084 self._write_configuration_status(
2085 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2086 )
2087
2088 self._write_op_status(op_id=nslcmop_id, stage=stage)
2089
2090 check_if_terminated_needed = True
2091 for initial_config_primitive in initial_config_primitive_list:
2092 # adding information on the vca_deployed if it is a NS execution environment
2093 if not vca_deployed["member-vnf-index"]:
2094 deploy_params["ns_config_info"] = json.dumps(
2095 self._get_ns_config_info(nsr_id)
2096 )
2097 # TODO check if already done
2098 primitive_params_ = self._map_primitive_params(
2099 initial_config_primitive, {}, deploy_params
2100 )
2101
2102 step = "execute primitive '{}' params '{}'".format(
2103 initial_config_primitive["name"], primitive_params_
2104 )
2105 self.logger.debug(logging_text + step)
2106 await self.vca_map[vca_type].exec_primitive(
2107 ee_id=ee_id,
2108 primitive_name=initial_config_primitive["name"],
2109 params_dict=primitive_params_,
2110 db_dict=db_dict,
2111 vca_id=vca_id,
2112 vca_type=vca_type,
2113 )
2114 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2115 if check_if_terminated_needed:
2116 if config_descriptor.get("terminate-config-primitive"):
2117 self.update_db_2(
2118 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2119 )
2120 check_if_terminated_needed = False
2121
2122 # TODO register in database that primitive is done
2123
2124 # STEP 7 Configure metrics
2125 if vca_type == "helm" or vca_type == "helm-v3":
2126 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2127 ee_id=ee_id,
2128 artifact_path=artifact_path,
2129 ee_config_descriptor=ee_config_descriptor,
2130 vnfr_id=vnfr_id,
2131 nsr_id=nsr_id,
2132 target_ip=rw_mgmt_ip,
2133 )
2134 if prometheus_jobs:
2135 self.update_db_2(
2136 "nsrs",
2137 nsr_id,
2138 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2139 )
2140
2141 for job in prometheus_jobs:
2142 self.db.set_one(
2143 "prometheus_jobs",
2144 {"job_name": job["job_name"]},
2145 job,
2146 upsert=True,
2147 fail_on_empty=False,
2148 )
2149
2150 step = "instantiated at VCA"
2151 self.logger.debug(logging_text + step)
2152
2153 self._write_configuration_status(
2154 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2155 )
2156
2157 except Exception as e: # TODO not use Exception but N2VC exception
2158 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2159 if not isinstance(
2160 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2161 ):
2162 self.logger.error(
2163 "Exception while {} : {}".format(step, e), exc_info=True
2164 )
2165 self._write_configuration_status(
2166 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2167 )
2168 raise LcmException("{} {}".format(step, e)) from e
2169
2170 def _write_ns_status(
2171 self,
2172 nsr_id: str,
2173 ns_state: str,
2174 current_operation: str,
2175 current_operation_id: str,
2176 error_description: str = None,
2177 error_detail: str = None,
2178 other_update: dict = None,
2179 ):
2180 """
2181 Update db_nsr fields.
2182 :param nsr_id:
2183 :param ns_state:
2184 :param current_operation:
2185 :param current_operation_id:
2186 :param error_description:
2187 :param error_detail:
2188 :param other_update: Other required changes at database if provided, will be cleared
2189 :return:
2190 """
2191 try:
2192 db_dict = other_update or {}
2193 db_dict[
2194 "_admin.nslcmop"
2195 ] = current_operation_id # for backward compatibility
2196 db_dict["_admin.current-operation"] = current_operation_id
2197 db_dict["_admin.operation-type"] = (
2198 current_operation if current_operation != "IDLE" else None
2199 )
2200 db_dict["currentOperation"] = current_operation
2201 db_dict["currentOperationID"] = current_operation_id
2202 db_dict["errorDescription"] = error_description
2203 db_dict["errorDetail"] = error_detail
2204
2205 if ns_state:
2206 db_dict["nsState"] = ns_state
2207 self.update_db_2("nsrs", nsr_id, db_dict)
2208 except DbException as e:
2209 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2210
2211 def _write_op_status(
2212 self,
2213 op_id: str,
2214 stage: list = None,
2215 error_message: str = None,
2216 queuePosition: int = 0,
2217 operation_state: str = None,
2218 other_update: dict = None,
2219 ):
2220 try:
2221 db_dict = other_update or {}
2222 db_dict["queuePosition"] = queuePosition
2223 if isinstance(stage, list):
2224 db_dict["stage"] = stage[0]
2225 db_dict["detailed-status"] = " ".join(stage)
2226 elif stage is not None:
2227 db_dict["stage"] = str(stage)
2228
2229 if error_message is not None:
2230 db_dict["errorMessage"] = error_message
2231 if operation_state is not None:
2232 db_dict["operationState"] = operation_state
2233 db_dict["statusEnteredTime"] = time()
2234 self.update_db_2("nslcmops", op_id, db_dict)
2235 except DbException as e:
2236 self.logger.warn(
2237 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2238 )
2239
2240 def _write_all_config_status(self, db_nsr: dict, status: str):
2241 try:
2242 nsr_id = db_nsr["_id"]
2243 # configurationStatus
2244 config_status = db_nsr.get("configurationStatus")
2245 if config_status:
2246 db_nsr_update = {
2247 "configurationStatus.{}.status".format(index): status
2248 for index, v in enumerate(config_status)
2249 if v
2250 }
2251 # update status
2252 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2253
2254 except DbException as e:
2255 self.logger.warn(
2256 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2257 )
2258
2259 def _write_configuration_status(
2260 self,
2261 nsr_id: str,
2262 vca_index: int,
2263 status: str = None,
2264 element_under_configuration: str = None,
2265 element_type: str = None,
2266 other_update: dict = None,
2267 ):
2268
2269 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2270 # .format(vca_index, status))
2271
2272 try:
2273 db_path = "configurationStatus.{}.".format(vca_index)
2274 db_dict = other_update or {}
2275 if status:
2276 db_dict[db_path + "status"] = status
2277 if element_under_configuration:
2278 db_dict[
2279 db_path + "elementUnderConfiguration"
2280 ] = element_under_configuration
2281 if element_type:
2282 db_dict[db_path + "elementType"] = element_type
2283 self.update_db_2("nsrs", nsr_id, db_dict)
2284 except DbException as e:
2285 self.logger.warn(
2286 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2287 status, nsr_id, vca_index, e
2288 )
2289 )
2290
2291 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2292 """
2293 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2294 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2295 Database is used because the result can be obtained from a different LCM worker in case of HA.
2296 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2297 :param db_nslcmop: database content of nslcmop
2298 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2299 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2300 computed 'vim-account-id'
2301 """
2302 modified = False
2303 nslcmop_id = db_nslcmop["_id"]
2304 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2305 if placement_engine == "PLA":
2306 self.logger.debug(
2307 logging_text + "Invoke and wait for placement optimization"
2308 )
2309 await self.msg.aiowrite(
2310 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2311 )
2312 db_poll_interval = 5
2313 wait = db_poll_interval * 10
2314 pla_result = None
2315 while not pla_result and wait >= 0:
2316 await asyncio.sleep(db_poll_interval)
2317 wait -= db_poll_interval
2318 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2319 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2320
2321 if not pla_result:
2322 raise LcmException(
2323 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2324 )
2325
2326 for pla_vnf in pla_result["vnf"]:
2327 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2328 if not pla_vnf.get("vimAccountId") or not vnfr:
2329 continue
2330 modified = True
2331 self.db.set_one(
2332 "vnfrs",
2333 {"_id": vnfr["_id"]},
2334 {"vim-account-id": pla_vnf["vimAccountId"]},
2335 )
2336 # Modifies db_vnfrs
2337 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2338 return modified
2339
2340 def update_nsrs_with_pla_result(self, params):
2341 try:
2342 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2343 self.update_db_2(
2344 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2345 )
2346 except Exception as e:
2347 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2348
2349 async def instantiate(self, nsr_id, nslcmop_id):
2350 """
2351
2352 :param nsr_id: ns instance to deploy
2353 :param nslcmop_id: operation to run
2354 :return:
2355 """
2356
2357 # Try to lock HA task here
2358 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2359 if not task_is_locked_by_me:
2360 self.logger.debug(
2361 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2362 )
2363 return
2364
2365 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2366 self.logger.debug(logging_text + "Enter")
2367
2368 # get all needed from database
2369
2370 # database nsrs record
2371 db_nsr = None
2372
2373 # database nslcmops record
2374 db_nslcmop = None
2375
2376 # update operation on nsrs
2377 db_nsr_update = {}
2378 # update operation on nslcmops
2379 db_nslcmop_update = {}
2380
2381 nslcmop_operation_state = None
2382 db_vnfrs = {} # vnf's info indexed by member-index
2383 # n2vc_info = {}
2384 tasks_dict_info = {} # from task to info text
2385 exc = None
2386 error_list = []
2387 stage = [
2388 "Stage 1/5: preparation of the environment.",
2389 "Waiting for previous operations to terminate.",
2390 "",
2391 ]
2392 # ^ stage, step, VIM progress
2393 try:
2394 # wait for any previous tasks in process
2395 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2396
2397 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2398 stage[1] = "Reading from database."
2399 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2400 db_nsr_update["detailed-status"] = "creating"
2401 db_nsr_update["operational-status"] = "init"
2402 self._write_ns_status(
2403 nsr_id=nsr_id,
2404 ns_state="BUILDING",
2405 current_operation="INSTANTIATING",
2406 current_operation_id=nslcmop_id,
2407 other_update=db_nsr_update,
2408 )
2409 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2410
2411 # read from db: operation
2412 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2413 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2414 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2415 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2416 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2417 )
2418 ns_params = db_nslcmop.get("operationParams")
2419 if ns_params and ns_params.get("timeout_ns_deploy"):
2420 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2421 else:
2422 timeout_ns_deploy = self.timeout.get(
2423 "ns_deploy", self.timeout_ns_deploy
2424 )
2425
2426 # read from db: ns
2427 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2428 self.logger.debug(logging_text + stage[1])
2429 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2430 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2431 self.logger.debug(logging_text + stage[1])
2432 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2433 self.fs.sync(db_nsr["nsd-id"])
2434 db_nsr["nsd"] = nsd
2435 # nsr_name = db_nsr["name"] # TODO short-name??
2436
2437 # read from db: vnf's of this ns
2438 stage[1] = "Getting vnfrs from db."
2439 self.logger.debug(logging_text + stage[1])
2440 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2441
2442 # read from db: vnfd's for every vnf
2443 db_vnfds = [] # every vnfd data
2444
2445 # for each vnf in ns, read vnfd
2446 for vnfr in db_vnfrs_list:
2447 if vnfr.get("kdur"):
2448 kdur_list = []
2449 for kdur in vnfr["kdur"]:
2450 if kdur.get("additionalParams"):
2451 kdur["additionalParams"] = json.loads(
2452 kdur["additionalParams"]
2453 )
2454 kdur_list.append(kdur)
2455 vnfr["kdur"] = kdur_list
2456
2457 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2458 vnfd_id = vnfr["vnfd-id"]
2459 vnfd_ref = vnfr["vnfd-ref"]
2460 self.fs.sync(vnfd_id)
2461
2462 # if we haven't this vnfd, read it from db
2463 if vnfd_id not in db_vnfds:
2464 # read from db
2465 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2466 vnfd_id, vnfd_ref
2467 )
2468 self.logger.debug(logging_text + stage[1])
2469 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2470
2471 # store vnfd
2472 db_vnfds.append(vnfd)
2473
2474 # Get or generates the _admin.deployed.VCA list
2475 vca_deployed_list = None
2476 if db_nsr["_admin"].get("deployed"):
2477 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2478 if vca_deployed_list is None:
2479 vca_deployed_list = []
2480 configuration_status_list = []
2481 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2482 db_nsr_update["configurationStatus"] = configuration_status_list
2483 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2484 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2485 elif isinstance(vca_deployed_list, dict):
2486 # maintain backward compatibility. Change a dict to list at database
2487 vca_deployed_list = list(vca_deployed_list.values())
2488 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2489 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2490
2491 if not isinstance(
2492 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2493 ):
2494 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2495 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2496
2497 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2498 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2499 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2500 self.db.set_list(
2501 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2502 )
2503
2504 # n2vc_redesign STEP 2 Deploy Network Scenario
2505 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2506 self._write_op_status(op_id=nslcmop_id, stage=stage)
2507
2508 stage[1] = "Deploying KDUs."
2509 # self.logger.debug(logging_text + "Before deploy_kdus")
2510 # Call to deploy_kdus in case exists the "vdu:kdu" param
2511 await self.deploy_kdus(
2512 logging_text=logging_text,
2513 nsr_id=nsr_id,
2514 nslcmop_id=nslcmop_id,
2515 db_vnfrs=db_vnfrs,
2516 db_vnfds=db_vnfds,
2517 task_instantiation_info=tasks_dict_info,
2518 )
2519
2520 stage[1] = "Getting VCA public key."
2521 # n2vc_redesign STEP 1 Get VCA public ssh-key
2522 # feature 1429. Add n2vc public key to needed VMs
2523 n2vc_key = self.n2vc.get_public_key()
2524 n2vc_key_list = [n2vc_key]
2525 if self.vca_config.get("public_key"):
2526 n2vc_key_list.append(self.vca_config["public_key"])
2527
2528 stage[1] = "Deploying NS at VIM."
2529 task_ro = asyncio.ensure_future(
2530 self.instantiate_RO(
2531 logging_text=logging_text,
2532 nsr_id=nsr_id,
2533 nsd=nsd,
2534 db_nsr=db_nsr,
2535 db_nslcmop=db_nslcmop,
2536 db_vnfrs=db_vnfrs,
2537 db_vnfds=db_vnfds,
2538 n2vc_key_list=n2vc_key_list,
2539 stage=stage,
2540 )
2541 )
2542 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2543 tasks_dict_info[task_ro] = "Deploying at VIM"
2544
2545 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2546 stage[1] = "Deploying Execution Environments."
2547 self.logger.debug(logging_text + stage[1])
2548
2549 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2550 for vnf_profile in get_vnf_profiles(nsd):
2551 vnfd_id = vnf_profile["vnfd-id"]
2552 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2553 member_vnf_index = str(vnf_profile["id"])
2554 db_vnfr = db_vnfrs[member_vnf_index]
2555 base_folder = vnfd["_admin"]["storage"]
2556 vdu_id = None
2557 vdu_index = 0
2558 vdu_name = None
2559 kdu_name = None
2560
2561 # Get additional parameters
2562 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2563 if db_vnfr.get("additionalParamsForVnf"):
2564 deploy_params.update(
2565 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2566 )
2567
2568 descriptor_config = get_configuration(vnfd, vnfd["id"])
2569 if descriptor_config:
2570 self._deploy_n2vc(
2571 logging_text=logging_text
2572 + "member_vnf_index={} ".format(member_vnf_index),
2573 db_nsr=db_nsr,
2574 db_vnfr=db_vnfr,
2575 nslcmop_id=nslcmop_id,
2576 nsr_id=nsr_id,
2577 nsi_id=nsi_id,
2578 vnfd_id=vnfd_id,
2579 vdu_id=vdu_id,
2580 kdu_name=kdu_name,
2581 member_vnf_index=member_vnf_index,
2582 vdu_index=vdu_index,
2583 vdu_name=vdu_name,
2584 deploy_params=deploy_params,
2585 descriptor_config=descriptor_config,
2586 base_folder=base_folder,
2587 task_instantiation_info=tasks_dict_info,
2588 stage=stage,
2589 )
2590
2591 # Deploy charms for each VDU that supports one.
2592 for vdud in get_vdu_list(vnfd):
2593 vdu_id = vdud["id"]
2594 descriptor_config = get_configuration(vnfd, vdu_id)
2595 vdur = find_in_list(
2596 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2597 )
2598
2599 if vdur.get("additionalParams"):
2600 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2601 else:
2602 deploy_params_vdu = deploy_params
2603 deploy_params_vdu["OSM"] = get_osm_params(
2604 db_vnfr, vdu_id, vdu_count_index=0
2605 )
2606 vdud_count = get_number_of_instances(vnfd, vdu_id)
2607
2608 self.logger.debug("VDUD > {}".format(vdud))
2609 self.logger.debug(
2610 "Descriptor config > {}".format(descriptor_config)
2611 )
2612 if descriptor_config:
2613 vdu_name = None
2614 kdu_name = None
2615 for vdu_index in range(vdud_count):
2616 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2617 self._deploy_n2vc(
2618 logging_text=logging_text
2619 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2620 member_vnf_index, vdu_id, vdu_index
2621 ),
2622 db_nsr=db_nsr,
2623 db_vnfr=db_vnfr,
2624 nslcmop_id=nslcmop_id,
2625 nsr_id=nsr_id,
2626 nsi_id=nsi_id,
2627 vnfd_id=vnfd_id,
2628 vdu_id=vdu_id,
2629 kdu_name=kdu_name,
2630 member_vnf_index=member_vnf_index,
2631 vdu_index=vdu_index,
2632 vdu_name=vdu_name,
2633 deploy_params=deploy_params_vdu,
2634 descriptor_config=descriptor_config,
2635 base_folder=base_folder,
2636 task_instantiation_info=tasks_dict_info,
2637 stage=stage,
2638 )
2639 for kdud in get_kdu_list(vnfd):
2640 kdu_name = kdud["name"]
2641 descriptor_config = get_configuration(vnfd, kdu_name)
2642 if descriptor_config:
2643 vdu_id = None
2644 vdu_index = 0
2645 vdu_name = None
2646 kdur = next(
2647 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2648 )
2649 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2650 if kdur.get("additionalParams"):
2651 deploy_params_kdu.update(
2652 parse_yaml_strings(kdur["additionalParams"].copy())
2653 )
2654
2655 self._deploy_n2vc(
2656 logging_text=logging_text,
2657 db_nsr=db_nsr,
2658 db_vnfr=db_vnfr,
2659 nslcmop_id=nslcmop_id,
2660 nsr_id=nsr_id,
2661 nsi_id=nsi_id,
2662 vnfd_id=vnfd_id,
2663 vdu_id=vdu_id,
2664 kdu_name=kdu_name,
2665 member_vnf_index=member_vnf_index,
2666 vdu_index=vdu_index,
2667 vdu_name=vdu_name,
2668 deploy_params=deploy_params_kdu,
2669 descriptor_config=descriptor_config,
2670 base_folder=base_folder,
2671 task_instantiation_info=tasks_dict_info,
2672 stage=stage,
2673 )
2674
2675 # Check if this NS has a charm configuration
2676 descriptor_config = nsd.get("ns-configuration")
2677 if descriptor_config and descriptor_config.get("juju"):
2678 vnfd_id = None
2679 db_vnfr = None
2680 member_vnf_index = None
2681 vdu_id = None
2682 kdu_name = None
2683 vdu_index = 0
2684 vdu_name = None
2685
2686 # Get additional parameters
2687 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2688 if db_nsr.get("additionalParamsForNs"):
2689 deploy_params.update(
2690 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2691 )
2692 base_folder = nsd["_admin"]["storage"]
2693 self._deploy_n2vc(
2694 logging_text=logging_text,
2695 db_nsr=db_nsr,
2696 db_vnfr=db_vnfr,
2697 nslcmop_id=nslcmop_id,
2698 nsr_id=nsr_id,
2699 nsi_id=nsi_id,
2700 vnfd_id=vnfd_id,
2701 vdu_id=vdu_id,
2702 kdu_name=kdu_name,
2703 member_vnf_index=member_vnf_index,
2704 vdu_index=vdu_index,
2705 vdu_name=vdu_name,
2706 deploy_params=deploy_params,
2707 descriptor_config=descriptor_config,
2708 base_folder=base_folder,
2709 task_instantiation_info=tasks_dict_info,
2710 stage=stage,
2711 )
2712
2713 # rest of staff will be done at finally
2714
2715 except (
2716 ROclient.ROClientException,
2717 DbException,
2718 LcmException,
2719 N2VCException,
2720 ) as e:
2721 self.logger.error(
2722 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2723 )
2724 exc = e
2725 except asyncio.CancelledError:
2726 self.logger.error(
2727 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2728 )
2729 exc = "Operation was cancelled"
2730 except Exception as e:
2731 exc = traceback.format_exc()
2732 self.logger.critical(
2733 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2734 exc_info=True,
2735 )
2736 finally:
2737 if exc:
2738 error_list.append(str(exc))
2739 try:
2740 # wait for pending tasks
2741 if tasks_dict_info:
2742 stage[1] = "Waiting for instantiate pending tasks."
2743 self.logger.debug(logging_text + stage[1])
2744 error_list += await self._wait_for_tasks(
2745 logging_text,
2746 tasks_dict_info,
2747 timeout_ns_deploy,
2748 stage,
2749 nslcmop_id,
2750 nsr_id=nsr_id,
2751 )
2752 stage[1] = stage[2] = ""
2753 except asyncio.CancelledError:
2754 error_list.append("Cancelled")
2755 # TODO cancel all tasks
2756 except Exception as exc:
2757 error_list.append(str(exc))
2758
2759 # update operation-status
2760 db_nsr_update["operational-status"] = "running"
2761 # let's begin with VCA 'configured' status (later we can change it)
2762 db_nsr_update["config-status"] = "configured"
2763 for task, task_name in tasks_dict_info.items():
2764 if not task.done() or task.cancelled() or task.exception():
2765 if task_name.startswith(self.task_name_deploy_vca):
2766 # A N2VC task is pending
2767 db_nsr_update["config-status"] = "failed"
2768 else:
2769 # RO or KDU task is pending
2770 db_nsr_update["operational-status"] = "failed"
2771
2772 # update status at database
2773 if error_list:
2774 error_detail = ". ".join(error_list)
2775 self.logger.error(logging_text + error_detail)
2776 error_description_nslcmop = "{} Detail: {}".format(
2777 stage[0], error_detail
2778 )
2779 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2780 nslcmop_id, stage[0]
2781 )
2782
2783 db_nsr_update["detailed-status"] = (
2784 error_description_nsr + " Detail: " + error_detail
2785 )
2786 db_nslcmop_update["detailed-status"] = error_detail
2787 nslcmop_operation_state = "FAILED"
2788 ns_state = "BROKEN"
2789 else:
2790 error_detail = None
2791 error_description_nsr = error_description_nslcmop = None
2792 ns_state = "READY"
2793 db_nsr_update["detailed-status"] = "Done"
2794 db_nslcmop_update["detailed-status"] = "Done"
2795 nslcmop_operation_state = "COMPLETED"
2796
2797 if db_nsr:
2798 self._write_ns_status(
2799 nsr_id=nsr_id,
2800 ns_state=ns_state,
2801 current_operation="IDLE",
2802 current_operation_id=None,
2803 error_description=error_description_nsr,
2804 error_detail=error_detail,
2805 other_update=db_nsr_update,
2806 )
2807 self._write_op_status(
2808 op_id=nslcmop_id,
2809 stage="",
2810 error_message=error_description_nslcmop,
2811 operation_state=nslcmop_operation_state,
2812 other_update=db_nslcmop_update,
2813 )
2814
2815 if nslcmop_operation_state:
2816 try:
2817 await self.msg.aiowrite(
2818 "ns",
2819 "instantiated",
2820 {
2821 "nsr_id": nsr_id,
2822 "nslcmop_id": nslcmop_id,
2823 "operationState": nslcmop_operation_state,
2824 },
2825 loop=self.loop,
2826 )
2827 except Exception as e:
2828 self.logger.error(
2829 logging_text + "kafka_write notification Exception {}".format(e)
2830 )
2831
2832 self.logger.debug(logging_text + "Exit")
2833 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2834
2835 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2836 if vnfd_id not in cached_vnfds:
2837 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2838 return cached_vnfds[vnfd_id]
2839
2840 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2841 if vnf_profile_id not in cached_vnfrs:
2842 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2843 "vnfrs",
2844 {
2845 "member-vnf-index-ref": vnf_profile_id,
2846 "nsr-id-ref": nsr_id,
2847 },
2848 )
2849 return cached_vnfrs[vnf_profile_id]
2850
2851 def _is_deployed_vca_in_relation(
2852 self, vca: DeployedVCA, relation: Relation
2853 ) -> bool:
2854 found = False
2855 for endpoint in (relation.provider, relation.requirer):
2856 if endpoint["kdu-resource-profile-id"]:
2857 continue
2858 found = (
2859 vca.vnf_profile_id == endpoint.vnf_profile_id
2860 and vca.vdu_profile_id == endpoint.vdu_profile_id
2861 and vca.execution_environment_ref == endpoint.execution_environment_ref
2862 )
2863 if found:
2864 break
2865 return found
2866
2867 def _update_ee_relation_data_with_implicit_data(
2868 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2869 ):
2870 ee_relation_data = safe_get_ee_relation(
2871 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2872 )
2873 ee_relation_level = EELevel.get_level(ee_relation_data)
2874 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2875 "execution-environment-ref"
2876 ]:
2877 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2878 vnfd_id = vnf_profile["vnfd-id"]
2879 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2880 entity_id = (
2881 vnfd_id
2882 if ee_relation_level == EELevel.VNF
2883 else ee_relation_data["vdu-profile-id"]
2884 )
2885 ee = get_juju_ee_ref(db_vnfd, entity_id)
2886 if not ee:
2887 raise Exception(
2888 f"not execution environments found for ee_relation {ee_relation_data}"
2889 )
2890 ee_relation_data["execution-environment-ref"] = ee["id"]
2891 return ee_relation_data
2892
2893 def _get_ns_relations(
2894 self,
2895 nsr_id: str,
2896 nsd: Dict[str, Any],
2897 vca: DeployedVCA,
2898 cached_vnfds: Dict[str, Any],
2899 ) -> List[Relation]:
2900 relations = []
2901 db_ns_relations = get_ns_configuration_relation_list(nsd)
2902 for r in db_ns_relations:
2903 provider_dict = None
2904 requirer_dict = None
2905 if all(key in r for key in ("provider", "requirer")):
2906 provider_dict = r["provider"]
2907 requirer_dict = r["requirer"]
2908 elif "entities" in r:
2909 provider_id = r["entities"][0]["id"]
2910 provider_dict = {
2911 "nsr-id": nsr_id,
2912 "endpoint": r["entities"][0]["endpoint"],
2913 }
2914 if provider_id != nsd["id"]:
2915 provider_dict["vnf-profile-id"] = provider_id
2916 requirer_id = r["entities"][1]["id"]
2917 requirer_dict = {
2918 "nsr-id": nsr_id,
2919 "endpoint": r["entities"][1]["endpoint"],
2920 }
2921 if requirer_id != nsd["id"]:
2922 requirer_dict["vnf-profile-id"] = requirer_id
2923 else:
2924 raise Exception(
2925 "provider/requirer or entities must be included in the relation."
2926 )
2927 relation_provider = self._update_ee_relation_data_with_implicit_data(
2928 nsr_id, nsd, provider_dict, cached_vnfds
2929 )
2930 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2931 nsr_id, nsd, requirer_dict, cached_vnfds
2932 )
2933 provider = EERelation(relation_provider)
2934 requirer = EERelation(relation_requirer)
2935 relation = Relation(r["name"], provider, requirer)
2936 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2937 if vca_in_relation:
2938 relations.append(relation)
2939 return relations
2940
2941 def _get_vnf_relations(
2942 self,
2943 nsr_id: str,
2944 nsd: Dict[str, Any],
2945 vca: DeployedVCA,
2946 cached_vnfds: Dict[str, Any],
2947 ) -> List[Relation]:
2948 relations = []
2949 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2950 vnf_profile_id = vnf_profile["id"]
2951 vnfd_id = vnf_profile["vnfd-id"]
2952 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2953 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2954 for r in db_vnf_relations:
2955 provider_dict = None
2956 requirer_dict = None
2957 if all(key in r for key in ("provider", "requirer")):
2958 provider_dict = r["provider"]
2959 requirer_dict = r["requirer"]
2960 elif "entities" in r:
2961 provider_id = r["entities"][0]["id"]
2962 provider_dict = {
2963 "nsr-id": nsr_id,
2964 "vnf-profile-id": vnf_profile_id,
2965 "endpoint": r["entities"][0]["endpoint"],
2966 }
2967 if provider_id != vnfd_id:
2968 provider_dict["vdu-profile-id"] = provider_id
2969 requirer_id = r["entities"][1]["id"]
2970 requirer_dict = {
2971 "nsr-id": nsr_id,
2972 "vnf-profile-id": vnf_profile_id,
2973 "endpoint": r["entities"][1]["endpoint"],
2974 }
2975 if requirer_id != vnfd_id:
2976 requirer_dict["vdu-profile-id"] = requirer_id
2977 else:
2978 raise Exception(
2979 "provider/requirer or entities must be included in the relation."
2980 )
2981 relation_provider = self._update_ee_relation_data_with_implicit_data(
2982 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2983 )
2984 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2985 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2986 )
2987 provider = EERelation(relation_provider)
2988 requirer = EERelation(relation_requirer)
2989 relation = Relation(r["name"], provider, requirer)
2990 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2991 if vca_in_relation:
2992 relations.append(relation)
2993 return relations
2994
2995 def _get_kdu_resource_data(
2996 self,
2997 ee_relation: EERelation,
2998 db_nsr: Dict[str, Any],
2999 cached_vnfds: Dict[str, Any],
3000 ) -> DeployedK8sResource:
3001 nsd = get_nsd(db_nsr)
3002 vnf_profiles = get_vnf_profiles(nsd)
3003 vnfd_id = find_in_list(
3004 vnf_profiles,
3005 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
3006 )["vnfd-id"]
3007 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
3008 kdu_resource_profile = get_kdu_resource_profile(
3009 db_vnfd, ee_relation.kdu_resource_profile_id
3010 )
3011 kdu_name = kdu_resource_profile["kdu-name"]
3012 deployed_kdu, _ = get_deployed_kdu(
3013 db_nsr.get("_admin", ()).get("deployed", ()),
3014 kdu_name,
3015 ee_relation.vnf_profile_id,
3016 )
3017 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
3018 return deployed_kdu
3019
3020 def _get_deployed_component(
3021 self,
3022 ee_relation: EERelation,
3023 db_nsr: Dict[str, Any],
3024 cached_vnfds: Dict[str, Any],
3025 ) -> DeployedComponent:
3026 nsr_id = db_nsr["_id"]
3027 deployed_component = None
3028 ee_level = EELevel.get_level(ee_relation)
3029 if ee_level == EELevel.NS:
3030 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
3031 if vca:
3032 deployed_component = DeployedVCA(nsr_id, vca)
3033 elif ee_level == EELevel.VNF:
3034 vca = get_deployed_vca(
3035 db_nsr,
3036 {
3037 "vdu_id": None,
3038 "member-vnf-index": ee_relation.vnf_profile_id,
3039 "ee_descriptor_id": ee_relation.execution_environment_ref,
3040 },
3041 )
3042 if vca:
3043 deployed_component = DeployedVCA(nsr_id, vca)
3044 elif ee_level == EELevel.VDU:
3045 vca = get_deployed_vca(
3046 db_nsr,
3047 {
3048 "vdu_id": ee_relation.vdu_profile_id,
3049 "member-vnf-index": ee_relation.vnf_profile_id,
3050 "ee_descriptor_id": ee_relation.execution_environment_ref,
3051 },
3052 )
3053 if vca:
3054 deployed_component = DeployedVCA(nsr_id, vca)
3055 elif ee_level == EELevel.KDU:
3056 kdu_resource_data = self._get_kdu_resource_data(
3057 ee_relation, db_nsr, cached_vnfds
3058 )
3059 if kdu_resource_data:
3060 deployed_component = DeployedK8sResource(kdu_resource_data)
3061 return deployed_component
3062
3063 async def _add_relation(
3064 self,
3065 relation: Relation,
3066 vca_type: str,
3067 db_nsr: Dict[str, Any],
3068 cached_vnfds: Dict[str, Any],
3069 cached_vnfrs: Dict[str, Any],
3070 ) -> bool:
3071 deployed_provider = self._get_deployed_component(
3072 relation.provider, db_nsr, cached_vnfds
3073 )
3074 deployed_requirer = self._get_deployed_component(
3075 relation.requirer, db_nsr, cached_vnfds
3076 )
3077 if (
3078 deployed_provider
3079 and deployed_requirer
3080 and deployed_provider.config_sw_installed
3081 and deployed_requirer.config_sw_installed
3082 ):
3083 provider_db_vnfr = (
3084 self._get_vnfr(
3085 relation.provider.nsr_id,
3086 relation.provider.vnf_profile_id,
3087 cached_vnfrs,
3088 )
3089 if relation.provider.vnf_profile_id
3090 else None
3091 )
3092 requirer_db_vnfr = (
3093 self._get_vnfr(
3094 relation.requirer.nsr_id,
3095 relation.requirer.vnf_profile_id,
3096 cached_vnfrs,
3097 )
3098 if relation.requirer.vnf_profile_id
3099 else None
3100 )
3101 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3102 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3103 provider_relation_endpoint = RelationEndpoint(
3104 deployed_provider.ee_id,
3105 provider_vca_id,
3106 relation.provider.endpoint,
3107 )
3108 requirer_relation_endpoint = RelationEndpoint(
3109 deployed_requirer.ee_id,
3110 requirer_vca_id,
3111 relation.requirer.endpoint,
3112 )
3113 await self.vca_map[vca_type].add_relation(
3114 provider=provider_relation_endpoint,
3115 requirer=requirer_relation_endpoint,
3116 )
3117 # remove entry from relations list
3118 return True
3119 return False
3120
3121 async def _add_vca_relations(
3122 self,
3123 logging_text,
3124 nsr_id,
3125 vca_type: str,
3126 vca_index: int,
3127 timeout: int = 3600,
3128 ) -> bool:
3129
3130 # steps:
3131 # 1. find all relations for this VCA
3132 # 2. wait for other peers related
3133 # 3. add relations
3134
3135 try:
3136 # STEP 1: find all relations for this VCA
3137
3138 # read nsr record
3139 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3140 nsd = get_nsd(db_nsr)
3141
3142 # this VCA data
3143 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3144 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3145
3146 cached_vnfds = {}
3147 cached_vnfrs = {}
3148 relations = []
3149 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3150 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3151
3152 # if no relations, terminate
3153 if not relations:
3154 self.logger.debug(logging_text + " No relations")
3155 return True
3156
3157 self.logger.debug(logging_text + " adding relations {}".format(relations))
3158
3159 # add all relations
3160 start = time()
3161 while True:
3162 # check timeout
3163 now = time()
3164 if now - start >= timeout:
3165 self.logger.error(logging_text + " : timeout adding relations")
3166 return False
3167
3168 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3169 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3170
3171 # for each relation, find the VCA's related
3172 for relation in relations.copy():
3173 added = await self._add_relation(
3174 relation,
3175 vca_type,
3176 db_nsr,
3177 cached_vnfds,
3178 cached_vnfrs,
3179 )
3180 if added:
3181 relations.remove(relation)
3182
3183 if not relations:
3184 self.logger.debug("Relations added")
3185 break
3186 await asyncio.sleep(5.0)
3187
3188 return True
3189
3190 except Exception as e:
3191 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3192 return False
3193
3194 async def _install_kdu(
3195 self,
3196 nsr_id: str,
3197 nsr_db_path: str,
3198 vnfr_data: dict,
3199 kdu_index: int,
3200 kdud: dict,
3201 vnfd: dict,
3202 k8s_instance_info: dict,
3203 k8params: dict = None,
3204 timeout: int = 600,
3205 vca_id: str = None,
3206 ):
3207
3208 try:
3209 k8sclustertype = k8s_instance_info["k8scluster-type"]
3210 # Instantiate kdu
3211 db_dict_install = {
3212 "collection": "nsrs",
3213 "filter": {"_id": nsr_id},
3214 "path": nsr_db_path,
3215 }
3216
3217 if k8s_instance_info.get("kdu-deployment-name"):
3218 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3219 else:
3220 kdu_instance = self.k8scluster_map[
3221 k8sclustertype
3222 ].generate_kdu_instance_name(
3223 db_dict=db_dict_install,
3224 kdu_model=k8s_instance_info["kdu-model"],
3225 kdu_name=k8s_instance_info["kdu-name"],
3226 )
3227
3228 # Update the nsrs table with the kdu-instance value
3229 self.update_db_2(
3230 item="nsrs",
3231 _id=nsr_id,
3232 _desc={nsr_db_path + ".kdu-instance": kdu_instance},
3233 )
3234
3235 # Update the nsrs table with the actual namespace being used, if the k8scluster-type is `juju` or
3236 # `juju-bundle`. This verification is needed because there is not a standard/homogeneous namespace
3237 # between the Helm Charts and Juju Bundles-based KNFs. If we found a way of having an homogeneous
3238 # namespace, this first verification could be removed, and the next step would be done for any kind
3239 # of KNF.
3240 # TODO -> find a way to have an homogeneous namespace between the Helm Charts and Juju Bundles-based
3241 # KNFs (Bug 2027: https://osm.etsi.org/bugzilla/show_bug.cgi?id=2027)
3242 if k8sclustertype in ("juju", "juju-bundle"):
3243 # First, verify if the current namespace is present in the `_admin.projects_read` (if not, it means
3244 # that the user passed a namespace which he wants its KDU to be deployed in)
3245 if (
3246 self.db.count(
3247 table="nsrs",
3248 q_filter={
3249 "_id": nsr_id,
3250 "_admin.projects_write": k8s_instance_info["namespace"],
3251 "_admin.projects_read": k8s_instance_info["namespace"],
3252 },
3253 )
3254 > 0
3255 ):
3256 self.logger.debug(
3257 f"Updating namespace/model for Juju Bundle from {k8s_instance_info['namespace']} to {kdu_instance}"
3258 )
3259 self.update_db_2(
3260 item="nsrs",
3261 _id=nsr_id,
3262 _desc={f"{nsr_db_path}.namespace": kdu_instance},
3263 )
3264 k8s_instance_info["namespace"] = kdu_instance
3265
3266 await self.k8scluster_map[k8sclustertype].install(
3267 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3268 kdu_model=k8s_instance_info["kdu-model"],
3269 atomic=True,
3270 params=k8params,
3271 db_dict=db_dict_install,
3272 timeout=timeout,
3273 kdu_name=k8s_instance_info["kdu-name"],
3274 namespace=k8s_instance_info["namespace"],
3275 kdu_instance=kdu_instance,
3276 vca_id=vca_id,
3277 )
3278
3279 # Obtain services to obtain management service ip
3280 services = await self.k8scluster_map[k8sclustertype].get_services(
3281 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3282 kdu_instance=kdu_instance,
3283 namespace=k8s_instance_info["namespace"],
3284 )
3285
3286 # Obtain management service info (if exists)
3287 vnfr_update_dict = {}
3288 kdu_config = get_configuration(vnfd, kdud["name"])
3289 if kdu_config:
3290 target_ee_list = kdu_config.get("execution-environment-list", [])
3291 else:
3292 target_ee_list = []
3293
3294 if services:
3295 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3296 mgmt_services = [
3297 service
3298 for service in kdud.get("service", [])
3299 if service.get("mgmt-service")
3300 ]
3301 for mgmt_service in mgmt_services:
3302 for service in services:
3303 if service["name"].startswith(mgmt_service["name"]):
3304 # Mgmt service found, Obtain service ip
3305 ip = service.get("external_ip", service.get("cluster_ip"))
3306 if isinstance(ip, list) and len(ip) == 1:
3307 ip = ip[0]
3308
3309 vnfr_update_dict[
3310 "kdur.{}.ip-address".format(kdu_index)
3311 ] = ip
3312
3313 # Check if must update also mgmt ip at the vnf
3314 service_external_cp = mgmt_service.get(
3315 "external-connection-point-ref"
3316 )
3317 if service_external_cp:
3318 if (
3319 deep_get(vnfd, ("mgmt-interface", "cp"))
3320 == service_external_cp
3321 ):
3322 vnfr_update_dict["ip-address"] = ip
3323
3324 if find_in_list(
3325 target_ee_list,
3326 lambda ee: ee.get(
3327 "external-connection-point-ref", ""
3328 )
3329 == service_external_cp,
3330 ):
3331 vnfr_update_dict[
3332 "kdur.{}.ip-address".format(kdu_index)
3333 ] = ip
3334 break
3335 else:
3336 self.logger.warn(
3337 "Mgmt service name: {} not found".format(
3338 mgmt_service["name"]
3339 )
3340 )
3341
3342 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3343 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3344
3345 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3346 if (
3347 kdu_config
3348 and kdu_config.get("initial-config-primitive")
3349 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3350 ):
3351 initial_config_primitive_list = kdu_config.get(
3352 "initial-config-primitive"
3353 )
3354 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3355
3356 for initial_config_primitive in initial_config_primitive_list:
3357 primitive_params_ = self._map_primitive_params(
3358 initial_config_primitive, {}, {}
3359 )
3360
3361 await asyncio.wait_for(
3362 self.k8scluster_map[k8sclustertype].exec_primitive(
3363 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3364 kdu_instance=kdu_instance,
3365 primitive_name=initial_config_primitive["name"],
3366 params=primitive_params_,
3367 db_dict=db_dict_install,
3368 vca_id=vca_id,
3369 ),
3370 timeout=timeout,
3371 )
3372
3373 except Exception as e:
3374 # Prepare update db with error and raise exception
3375 try:
3376 self.update_db_2(
3377 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3378 )
3379 self.update_db_2(
3380 "vnfrs",
3381 vnfr_data.get("_id"),
3382 {"kdur.{}.status".format(kdu_index): "ERROR"},
3383 )
3384 except Exception:
3385 # ignore to keep original exception
3386 pass
3387 # reraise original error
3388 raise
3389
3390 return kdu_instance
3391
3392 async def deploy_kdus(
3393 self,
3394 logging_text,
3395 nsr_id,
3396 nslcmop_id,
3397 db_vnfrs,
3398 db_vnfds,
3399 task_instantiation_info,
3400 ):
3401 # Launch kdus if present in the descriptor
3402
3403 k8scluster_id_2_uuic = {
3404 "helm-chart-v3": {},
3405 "helm-chart": {},
3406 "juju-bundle": {},
3407 }
3408
3409 async def _get_cluster_id(cluster_id, cluster_type):
3410 nonlocal k8scluster_id_2_uuic
3411 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3412 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3413
3414 # check if K8scluster is creating and wait look if previous tasks in process
3415 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3416 "k8scluster", cluster_id
3417 )
3418 if task_dependency:
3419 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3420 task_name, cluster_id
3421 )
3422 self.logger.debug(logging_text + text)
3423 await asyncio.wait(task_dependency, timeout=3600)
3424
3425 db_k8scluster = self.db.get_one(
3426 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3427 )
3428 if not db_k8scluster:
3429 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3430
3431 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3432 if not k8s_id:
3433 if cluster_type == "helm-chart-v3":
3434 try:
3435 # backward compatibility for existing clusters that have not been initialized for helm v3
3436 k8s_credentials = yaml.safe_dump(
3437 db_k8scluster.get("credentials")
3438 )
3439 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3440 k8s_credentials, reuse_cluster_uuid=cluster_id
3441 )
3442 db_k8scluster_update = {}
3443 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3444 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3445 db_k8scluster_update[
3446 "_admin.helm-chart-v3.created"
3447 ] = uninstall_sw
3448 db_k8scluster_update[
3449 "_admin.helm-chart-v3.operationalState"
3450 ] = "ENABLED"
3451 self.update_db_2(
3452 "k8sclusters", cluster_id, db_k8scluster_update
3453 )
3454 except Exception as e:
3455 self.logger.error(
3456 logging_text
3457 + "error initializing helm-v3 cluster: {}".format(str(e))
3458 )
3459 raise LcmException(
3460 "K8s cluster '{}' has not been initialized for '{}'".format(
3461 cluster_id, cluster_type
3462 )
3463 )
3464 else:
3465 raise LcmException(
3466 "K8s cluster '{}' has not been initialized for '{}'".format(
3467 cluster_id, cluster_type
3468 )
3469 )
3470 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3471 return k8s_id
3472
3473 logging_text += "Deploy kdus: "
3474 step = ""
3475 try:
3476 db_nsr_update = {"_admin.deployed.K8s": []}
3477 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3478
3479 index = 0
3480 updated_cluster_list = []
3481 updated_v3_cluster_list = []
3482
3483 for vnfr_data in db_vnfrs.values():
3484 vca_id = self.get_vca_id(vnfr_data, {})
3485 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3486 # Step 0: Prepare and set parameters
3487 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3488 vnfd_id = vnfr_data.get("vnfd-id")
3489 vnfd_with_id = find_in_list(
3490 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3491 )
3492 kdud = next(
3493 kdud
3494 for kdud in vnfd_with_id["kdu"]
3495 if kdud["name"] == kdur["kdu-name"]
3496 )
3497 namespace = kdur.get("k8s-namespace")
3498 kdu_deployment_name = kdur.get("kdu-deployment-name")
3499 if kdur.get("helm-chart"):
3500 kdumodel = kdur["helm-chart"]
3501 # Default version: helm3, if helm-version is v2 assign v2
3502 k8sclustertype = "helm-chart-v3"
3503 self.logger.debug("kdur: {}".format(kdur))
3504 if (
3505 kdur.get("helm-version")
3506 and kdur.get("helm-version") == "v2"
3507 ):
3508 k8sclustertype = "helm-chart"
3509 elif kdur.get("juju-bundle"):
3510 kdumodel = kdur["juju-bundle"]
3511 k8sclustertype = "juju-bundle"
3512 else:
3513 raise LcmException(
3514 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3515 "juju-bundle. Maybe an old NBI version is running".format(
3516 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3517 )
3518 )
3519 # check if kdumodel is a file and exists
3520 try:
3521 vnfd_with_id = find_in_list(
3522 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3523 )
3524 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3525 if storage: # may be not present if vnfd has not artifacts
3526 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3527 if storage["pkg-dir"]:
3528 filename = "{}/{}/{}s/{}".format(
3529 storage["folder"],
3530 storage["pkg-dir"],
3531 k8sclustertype,
3532 kdumodel,
3533 )
3534 else:
3535 filename = "{}/Scripts/{}s/{}".format(
3536 storage["folder"],
3537 k8sclustertype,
3538 kdumodel,
3539 )
3540 if self.fs.file_exists(
3541 filename, mode="file"
3542 ) or self.fs.file_exists(filename, mode="dir"):
3543 kdumodel = self.fs.path + filename
3544 except (asyncio.TimeoutError, asyncio.CancelledError):
3545 raise
3546 except Exception: # it is not a file
3547 pass
3548
3549 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3550 step = "Synchronize repos for k8s cluster '{}'".format(
3551 k8s_cluster_id
3552 )
3553 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3554
3555 # Synchronize repos
3556 if (
3557 k8sclustertype == "helm-chart"
3558 and cluster_uuid not in updated_cluster_list
3559 ) or (
3560 k8sclustertype == "helm-chart-v3"
3561 and cluster_uuid not in updated_v3_cluster_list
3562 ):
3563 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3564 self.k8scluster_map[k8sclustertype].synchronize_repos(
3565 cluster_uuid=cluster_uuid
3566 )
3567 )
3568 if del_repo_list or added_repo_dict:
3569 if k8sclustertype == "helm-chart":
3570 unset = {
3571 "_admin.helm_charts_added." + item: None
3572 for item in del_repo_list
3573 }
3574 updated = {
3575 "_admin.helm_charts_added." + item: name
3576 for item, name in added_repo_dict.items()
3577 }
3578 updated_cluster_list.append(cluster_uuid)
3579 elif k8sclustertype == "helm-chart-v3":
3580 unset = {
3581 "_admin.helm_charts_v3_added." + item: None
3582 for item in del_repo_list
3583 }
3584 updated = {
3585 "_admin.helm_charts_v3_added." + item: name
3586 for item, name in added_repo_dict.items()
3587 }
3588 updated_v3_cluster_list.append(cluster_uuid)
3589 self.logger.debug(
3590 logging_text + "repos synchronized on k8s cluster "
3591 "'{}' to_delete: {}, to_add: {}".format(
3592 k8s_cluster_id, del_repo_list, added_repo_dict
3593 )
3594 )
3595 self.db.set_one(
3596 "k8sclusters",
3597 {"_id": k8s_cluster_id},
3598 updated,
3599 unset=unset,
3600 )
3601
3602 # Instantiate kdu
3603 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3604 vnfr_data["member-vnf-index-ref"],
3605 kdur["kdu-name"],
3606 k8s_cluster_id,
3607 )
3608 k8s_instance_info = {
3609 "kdu-instance": None,
3610 "k8scluster-uuid": cluster_uuid,
3611 "k8scluster-type": k8sclustertype,
3612 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3613 "kdu-name": kdur["kdu-name"],
3614 "kdu-model": kdumodel,
3615 "namespace": namespace,
3616 "kdu-deployment-name": kdu_deployment_name,
3617 }
3618 db_path = "_admin.deployed.K8s.{}".format(index)
3619 db_nsr_update[db_path] = k8s_instance_info
3620 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3621 vnfd_with_id = find_in_list(
3622 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3623 )
3624 task = asyncio.ensure_future(
3625 self._install_kdu(
3626 nsr_id,
3627 db_path,
3628 vnfr_data,
3629 kdu_index,
3630 kdud,
3631 vnfd_with_id,
3632 k8s_instance_info,
3633 k8params=desc_params,
3634 timeout=1800,
3635 vca_id=vca_id,
3636 )
3637 )
3638 self.lcm_tasks.register(
3639 "ns",
3640 nsr_id,
3641 nslcmop_id,
3642 "instantiate_KDU-{}".format(index),
3643 task,
3644 )
3645 task_instantiation_info[task] = "Deploying KDU {}".format(
3646 kdur["kdu-name"]
3647 )
3648
3649 index += 1
3650
3651 except (LcmException, asyncio.CancelledError):
3652 raise
3653 except Exception as e:
3654 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3655 if isinstance(e, (N2VCException, DbException)):
3656 self.logger.error(logging_text + msg)
3657 else:
3658 self.logger.critical(logging_text + msg, exc_info=True)
3659 raise LcmException(msg)
3660 finally:
3661 if db_nsr_update:
3662 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3663
3664 def _deploy_n2vc(
3665 self,
3666 logging_text,
3667 db_nsr,
3668 db_vnfr,
3669 nslcmop_id,
3670 nsr_id,
3671 nsi_id,
3672 vnfd_id,
3673 vdu_id,
3674 kdu_name,
3675 member_vnf_index,
3676 vdu_index,
3677 vdu_name,
3678 deploy_params,
3679 descriptor_config,
3680 base_folder,
3681 task_instantiation_info,
3682 stage,
3683 ):
3684 # launch instantiate_N2VC in a asyncio task and register task object
3685 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3686 # if not found, create one entry and update database
3687 # fill db_nsr._admin.deployed.VCA.<index>
3688
3689 self.logger.debug(
3690 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3691 )
3692 if "execution-environment-list" in descriptor_config:
3693 ee_list = descriptor_config.get("execution-environment-list", [])
3694 elif "juju" in descriptor_config:
3695 ee_list = [descriptor_config] # ns charms
3696 else: # other types as script are not supported
3697 ee_list = []
3698
3699 for ee_item in ee_list:
3700 self.logger.debug(
3701 logging_text
3702 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3703 ee_item.get("juju"), ee_item.get("helm-chart")
3704 )
3705 )
3706 ee_descriptor_id = ee_item.get("id")
3707 if ee_item.get("juju"):
3708 vca_name = ee_item["juju"].get("charm")
3709 vca_type = (
3710 "lxc_proxy_charm"
3711 if ee_item["juju"].get("charm") is not None
3712 else "native_charm"
3713 )
3714 if ee_item["juju"].get("cloud") == "k8s":
3715 vca_type = "k8s_proxy_charm"
3716 elif ee_item["juju"].get("proxy") is False:
3717 vca_type = "native_charm"
3718 elif ee_item.get("helm-chart"):
3719 vca_name = ee_item["helm-chart"]
3720 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3721 vca_type = "helm"
3722 else:
3723 vca_type = "helm-v3"
3724 else:
3725 self.logger.debug(
3726 logging_text + "skipping non juju neither charm configuration"
3727 )
3728 continue
3729
3730 vca_index = -1
3731 for vca_index, vca_deployed in enumerate(
3732 db_nsr["_admin"]["deployed"]["VCA"]
3733 ):
3734 if not vca_deployed:
3735 continue
3736 if (
3737 vca_deployed.get("member-vnf-index") == member_vnf_index
3738 and vca_deployed.get("vdu_id") == vdu_id
3739 and vca_deployed.get("kdu_name") == kdu_name
3740 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3741 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3742 ):
3743 break
3744 else:
3745 # not found, create one.
3746 target = (
3747 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3748 )
3749 if vdu_id:
3750 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3751 elif kdu_name:
3752 target += "/kdu/{}".format(kdu_name)
3753 vca_deployed = {
3754 "target_element": target,
3755 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3756 "member-vnf-index": member_vnf_index,
3757 "vdu_id": vdu_id,
3758 "kdu_name": kdu_name,
3759 "vdu_count_index": vdu_index,
3760 "operational-status": "init", # TODO revise
3761 "detailed-status": "", # TODO revise
3762 "step": "initial-deploy", # TODO revise
3763 "vnfd_id": vnfd_id,
3764 "vdu_name": vdu_name,
3765 "type": vca_type,
3766 "ee_descriptor_id": ee_descriptor_id,
3767 }
3768 vca_index += 1
3769
3770 # create VCA and configurationStatus in db
3771 db_dict = {
3772 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3773 "configurationStatus.{}".format(vca_index): dict(),
3774 }
3775 self.update_db_2("nsrs", nsr_id, db_dict)
3776
3777 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3778
3779 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3780 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3781 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3782
3783 # Launch task
3784 task_n2vc = asyncio.ensure_future(
3785 self.instantiate_N2VC(
3786 logging_text=logging_text,
3787 vca_index=vca_index,
3788 nsi_id=nsi_id,
3789 db_nsr=db_nsr,
3790 db_vnfr=db_vnfr,
3791 vdu_id=vdu_id,
3792 kdu_name=kdu_name,
3793 vdu_index=vdu_index,
3794 deploy_params=deploy_params,
3795 config_descriptor=descriptor_config,
3796 base_folder=base_folder,
3797 nslcmop_id=nslcmop_id,
3798 stage=stage,
3799 vca_type=vca_type,
3800 vca_name=vca_name,
3801 ee_config_descriptor=ee_item,
3802 )
3803 )
3804 self.lcm_tasks.register(
3805 "ns",
3806 nsr_id,
3807 nslcmop_id,
3808 "instantiate_N2VC-{}".format(vca_index),
3809 task_n2vc,
3810 )
3811 task_instantiation_info[
3812 task_n2vc
3813 ] = self.task_name_deploy_vca + " {}.{}".format(
3814 member_vnf_index or "", vdu_id or ""
3815 )
3816
3817 @staticmethod
3818 def _create_nslcmop(nsr_id, operation, params):
3819 """
3820 Creates a ns-lcm-opp content to be stored at database.
3821 :param nsr_id: internal id of the instance
3822 :param operation: instantiate, terminate, scale, action, ...
3823 :param params: user parameters for the operation
3824 :return: dictionary following SOL005 format
3825 """
3826 # Raise exception if invalid arguments
3827 if not (nsr_id and operation and params):
3828 raise LcmException(
3829 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3830 )
3831 now = time()
3832 _id = str(uuid4())
3833 nslcmop = {
3834 "id": _id,
3835 "_id": _id,
3836 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3837 "operationState": "PROCESSING",
3838 "statusEnteredTime": now,
3839 "nsInstanceId": nsr_id,
3840 "lcmOperationType": operation,
3841 "startTime": now,
3842 "isAutomaticInvocation": False,
3843 "operationParams": params,
3844 "isCancelPending": False,
3845 "links": {
3846 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3847 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3848 },
3849 }
3850 return nslcmop
3851
3852 def _format_additional_params(self, params):
3853 params = params or {}
3854 for key, value in params.items():
3855 if str(value).startswith("!!yaml "):
3856 params[key] = yaml.safe_load(value[7:])
3857 return params
3858
3859 def _get_terminate_primitive_params(self, seq, vnf_index):
3860 primitive = seq.get("name")
3861 primitive_params = {}
3862 params = {
3863 "member_vnf_index": vnf_index,
3864 "primitive": primitive,
3865 "primitive_params": primitive_params,
3866 }
3867 desc_params = {}
3868 return self._map_primitive_params(seq, params, desc_params)
3869
3870 # sub-operations
3871
3872 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3873 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3874 if op.get("operationState") == "COMPLETED":
3875 # b. Skip sub-operation
3876 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3877 return self.SUBOPERATION_STATUS_SKIP
3878 else:
3879 # c. retry executing sub-operation
3880 # The sub-operation exists, and operationState != 'COMPLETED'
3881 # Update operationState = 'PROCESSING' to indicate a retry.
3882 operationState = "PROCESSING"
3883 detailed_status = "In progress"
3884 self._update_suboperation_status(
3885 db_nslcmop, op_index, operationState, detailed_status
3886 )
3887 # Return the sub-operation index
3888 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3889 # with arguments extracted from the sub-operation
3890 return op_index
3891
3892 # Find a sub-operation where all keys in a matching dictionary must match
3893 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3894 def _find_suboperation(self, db_nslcmop, match):
3895 if db_nslcmop and match:
3896 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3897 for i, op in enumerate(op_list):
3898 if all(op.get(k) == match[k] for k in match):
3899 return i
3900 return self.SUBOPERATION_STATUS_NOT_FOUND
3901
3902 # Update status for a sub-operation given its index
3903 def _update_suboperation_status(
3904 self, db_nslcmop, op_index, operationState, detailed_status
3905 ):
3906 # Update DB for HA tasks
3907 q_filter = {"_id": db_nslcmop["_id"]}
3908 update_dict = {
3909 "_admin.operations.{}.operationState".format(op_index): operationState,
3910 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3911 }
3912 self.db.set_one(
3913 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3914 )
3915
3916 # Add sub-operation, return the index of the added sub-operation
3917 # Optionally, set operationState, detailed-status, and operationType
3918 # Status and type are currently set for 'scale' sub-operations:
3919 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3920 # 'detailed-status' : status message
3921 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3922 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3923 def _add_suboperation(
3924 self,
3925 db_nslcmop,
3926 vnf_index,
3927 vdu_id,
3928 vdu_count_index,
3929 vdu_name,
3930 primitive,
3931 mapped_primitive_params,
3932 operationState=None,
3933 detailed_status=None,
3934 operationType=None,
3935 RO_nsr_id=None,
3936 RO_scaling_info=None,
3937 ):
3938 if not db_nslcmop:
3939 return self.SUBOPERATION_STATUS_NOT_FOUND
3940 # Get the "_admin.operations" list, if it exists
3941 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3942 op_list = db_nslcmop_admin.get("operations")
3943 # Create or append to the "_admin.operations" list
3944 new_op = {
3945 "member_vnf_index": vnf_index,
3946 "vdu_id": vdu_id,
3947 "vdu_count_index": vdu_count_index,
3948 "primitive": primitive,
3949 "primitive_params": mapped_primitive_params,
3950 }
3951 if operationState:
3952 new_op["operationState"] = operationState
3953 if detailed_status:
3954 new_op["detailed-status"] = detailed_status
3955 if operationType:
3956 new_op["lcmOperationType"] = operationType
3957 if RO_nsr_id:
3958 new_op["RO_nsr_id"] = RO_nsr_id
3959 if RO_scaling_info:
3960 new_op["RO_scaling_info"] = RO_scaling_info
3961 if not op_list:
3962 # No existing operations, create key 'operations' with current operation as first list element
3963 db_nslcmop_admin.update({"operations": [new_op]})
3964 op_list = db_nslcmop_admin.get("operations")
3965 else:
3966 # Existing operations, append operation to list
3967 op_list.append(new_op)
3968
3969 db_nslcmop_update = {"_admin.operations": op_list}
3970 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3971 op_index = len(op_list) - 1
3972 return op_index
3973
3974 # Helper methods for scale() sub-operations
3975
3976 # pre-scale/post-scale:
3977 # Check for 3 different cases:
3978 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3979 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3980 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3981 def _check_or_add_scale_suboperation(
3982 self,
3983 db_nslcmop,
3984 vnf_index,
3985 vnf_config_primitive,
3986 primitive_params,
3987 operationType,
3988 RO_nsr_id=None,
3989 RO_scaling_info=None,
3990 ):
3991 # Find this sub-operation
3992 if RO_nsr_id and RO_scaling_info:
3993 operationType = "SCALE-RO"
3994 match = {
3995 "member_vnf_index": vnf_index,
3996 "RO_nsr_id": RO_nsr_id,
3997 "RO_scaling_info": RO_scaling_info,
3998 }
3999 else:
4000 match = {
4001 "member_vnf_index": vnf_index,
4002 "primitive": vnf_config_primitive,
4003 "primitive_params": primitive_params,
4004 "lcmOperationType": operationType,
4005 }
4006 op_index = self._find_suboperation(db_nslcmop, match)
4007 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
4008 # a. New sub-operation
4009 # The sub-operation does not exist, add it.
4010 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
4011 # The following parameters are set to None for all kind of scaling:
4012 vdu_id = None
4013 vdu_count_index = None
4014 vdu_name = None
4015 if RO_nsr_id and RO_scaling_info:
4016 vnf_config_primitive = None
4017 primitive_params = None
4018 else:
4019 RO_nsr_id = None
4020 RO_scaling_info = None
4021 # Initial status for sub-operation
4022 operationState = "PROCESSING"
4023 detailed_status = "In progress"
4024 # Add sub-operation for pre/post-scaling (zero or more operations)
4025 self._add_suboperation(
4026 db_nslcmop,
4027 vnf_index,
4028 vdu_id,
4029 vdu_count_index,
4030 vdu_name,
4031 vnf_config_primitive,
4032 primitive_params,
4033 operationState,
4034 detailed_status,
4035 operationType,
4036 RO_nsr_id,
4037 RO_scaling_info,
4038 )
4039 return self.SUBOPERATION_STATUS_NEW
4040 else:
4041 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
4042 # or op_index (operationState != 'COMPLETED')
4043 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
4044
4045 # Function to return execution_environment id
4046
4047 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
4048 # TODO vdu_index_count
4049 for vca in vca_deployed_list:
4050 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
4051 return vca["ee_id"]
4052
4053 async def destroy_N2VC(
4054 self,
4055 logging_text,
4056 db_nslcmop,
4057 vca_deployed,
4058 config_descriptor,
4059 vca_index,
4060 destroy_ee=True,
4061 exec_primitives=True,
4062 scaling_in=False,
4063 vca_id: str = None,
4064 ):
4065 """
4066 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
4067 :param logging_text:
4068 :param db_nslcmop:
4069 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
4070 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
4071 :param vca_index: index in the database _admin.deployed.VCA
4072 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
4073 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
4074 not executed properly
4075 :param scaling_in: True destroys the application, False destroys the model
4076 :return: None or exception
4077 """
4078
4079 self.logger.debug(
4080 logging_text
4081 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
4082 vca_index, vca_deployed, config_descriptor, destroy_ee
4083 )
4084 )
4085
4086 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
4087
4088 # execute terminate_primitives
4089 if exec_primitives:
4090 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
4091 config_descriptor.get("terminate-config-primitive"),
4092 vca_deployed.get("ee_descriptor_id"),
4093 )
4094 vdu_id = vca_deployed.get("vdu_id")
4095 vdu_count_index = vca_deployed.get("vdu_count_index")
4096 vdu_name = vca_deployed.get("vdu_name")
4097 vnf_index = vca_deployed.get("member-vnf-index")
4098 if terminate_primitives and vca_deployed.get("needed_terminate"):
4099 for seq in terminate_primitives:
4100 # For each sequence in list, get primitive and call _ns_execute_primitive()
4101 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
4102 vnf_index, seq.get("name")
4103 )
4104 self.logger.debug(logging_text + step)
4105 # Create the primitive for each sequence, i.e. "primitive": "touch"
4106 primitive = seq.get("name")
4107 mapped_primitive_params = self._get_terminate_primitive_params(
4108 seq, vnf_index
4109 )
4110
4111 # Add sub-operation
4112 self._add_suboperation(
4113 db_nslcmop,
4114 vnf_index,
4115 vdu_id,
4116 vdu_count_index,
4117 vdu_name,
4118 primitive,
4119 mapped_primitive_params,
4120 )
4121 # Sub-operations: Call _ns_execute_primitive() instead of action()
4122 try:
4123 result, result_detail = await self._ns_execute_primitive(
4124 vca_deployed["ee_id"],
4125 primitive,
4126 mapped_primitive_params,
4127 vca_type=vca_type,
4128 vca_id=vca_id,
4129 )
4130 except LcmException:
4131 # this happens when VCA is not deployed. In this case it is not needed to terminate
4132 continue
4133 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4134 if result not in result_ok:
4135 raise LcmException(
4136 "terminate_primitive {} for vnf_member_index={} fails with "
4137 "error {}".format(seq.get("name"), vnf_index, result_detail)
4138 )
4139 # set that this VCA do not need terminated
4140 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4141 vca_index
4142 )
4143 self.update_db_2(
4144 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4145 )
4146
4147 # Delete Prometheus Jobs if any
4148 # This uses NSR_ID, so it will destroy any jobs under this index
4149 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4150
4151 if destroy_ee:
4152 await self.vca_map[vca_type].delete_execution_environment(
4153 vca_deployed["ee_id"],
4154 scaling_in=scaling_in,
4155 vca_type=vca_type,
4156 vca_id=vca_id,
4157 )
4158
4159 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4160 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4161 namespace = "." + db_nsr["_id"]
4162 try:
4163 await self.n2vc.delete_namespace(
4164 namespace=namespace,
4165 total_timeout=self.timeout_charm_delete,
4166 vca_id=vca_id,
4167 )
4168 except N2VCNotFound: # already deleted. Skip
4169 pass
4170 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4171
4172 async def _terminate_RO(
4173 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4174 ):
4175 """
4176 Terminates a deployment from RO
4177 :param logging_text:
4178 :param nsr_deployed: db_nsr._admin.deployed
4179 :param nsr_id:
4180 :param nslcmop_id:
4181 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4182 this method will update only the index 2, but it will write on database the concatenated content of the list
4183 :return:
4184 """
4185 db_nsr_update = {}
4186 failed_detail = []
4187 ro_nsr_id = ro_delete_action = None
4188 if nsr_deployed and nsr_deployed.get("RO"):
4189 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4190 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4191 try:
4192 if ro_nsr_id:
4193 stage[2] = "Deleting ns from VIM."
4194 db_nsr_update["detailed-status"] = " ".join(stage)
4195 self._write_op_status(nslcmop_id, stage)
4196 self.logger.debug(logging_text + stage[2])
4197 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4198 self._write_op_status(nslcmop_id, stage)
4199 desc = await self.RO.delete("ns", ro_nsr_id)
4200 ro_delete_action = desc["action_id"]
4201 db_nsr_update[
4202 "_admin.deployed.RO.nsr_delete_action_id"
4203 ] = ro_delete_action
4204 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4205 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4206 if ro_delete_action:
4207 # wait until NS is deleted from VIM
4208 stage[2] = "Waiting ns deleted from VIM."
4209 detailed_status_old = None
4210 self.logger.debug(
4211 logging_text
4212 + stage[2]
4213 + " RO_id={} ro_delete_action={}".format(
4214 ro_nsr_id, ro_delete_action
4215 )
4216 )
4217 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4218 self._write_op_status(nslcmop_id, stage)
4219
4220 delete_timeout = 20 * 60 # 20 minutes
4221 while delete_timeout > 0:
4222 desc = await self.RO.show(
4223 "ns",
4224 item_id_name=ro_nsr_id,
4225 extra_item="action",
4226 extra_item_id=ro_delete_action,
4227 )
4228
4229 # deploymentStatus
4230 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4231
4232 ns_status, ns_status_info = self.RO.check_action_status(desc)
4233 if ns_status == "ERROR":
4234 raise ROclient.ROClientException(ns_status_info)
4235 elif ns_status == "BUILD":
4236 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4237 elif ns_status == "ACTIVE":
4238 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4239 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4240 break
4241 else:
4242 assert (
4243 False
4244 ), "ROclient.check_action_status returns unknown {}".format(
4245 ns_status
4246 )
4247 if stage[2] != detailed_status_old:
4248 detailed_status_old = stage[2]
4249 db_nsr_update["detailed-status"] = " ".join(stage)
4250 self._write_op_status(nslcmop_id, stage)
4251 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4252 await asyncio.sleep(5, loop=self.loop)
4253 delete_timeout -= 5
4254 else: # delete_timeout <= 0:
4255 raise ROclient.ROClientException(
4256 "Timeout waiting ns deleted from VIM"
4257 )
4258
4259 except Exception as e:
4260 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4261 if (
4262 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4263 ): # not found
4264 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4265 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4266 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4267 self.logger.debug(
4268 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4269 )
4270 elif (
4271 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4272 ): # conflict
4273 failed_detail.append("delete conflict: {}".format(e))
4274 self.logger.debug(
4275 logging_text
4276 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4277 )
4278 else:
4279 failed_detail.append("delete error: {}".format(e))
4280 self.logger.error(
4281 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4282 )
4283
4284 # Delete nsd
4285 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4286 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4287 try:
4288 stage[2] = "Deleting nsd from RO."
4289 db_nsr_update["detailed-status"] = " ".join(stage)
4290 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4291 self._write_op_status(nslcmop_id, stage)
4292 await self.RO.delete("nsd", ro_nsd_id)
4293 self.logger.debug(
4294 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4295 )
4296 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4297 except Exception as e:
4298 if (
4299 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4300 ): # not found
4301 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4302 self.logger.debug(
4303 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4304 )
4305 elif (
4306 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4307 ): # conflict
4308 failed_detail.append(
4309 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4310 )
4311 self.logger.debug(logging_text + failed_detail[-1])
4312 else:
4313 failed_detail.append(
4314 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4315 )
4316 self.logger.error(logging_text + failed_detail[-1])
4317
4318 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4319 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4320 if not vnf_deployed or not vnf_deployed["id"]:
4321 continue
4322 try:
4323 ro_vnfd_id = vnf_deployed["id"]
4324 stage[
4325 2
4326 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4327 vnf_deployed["member-vnf-index"], ro_vnfd_id
4328 )
4329 db_nsr_update["detailed-status"] = " ".join(stage)
4330 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4331 self._write_op_status(nslcmop_id, stage)
4332 await self.RO.delete("vnfd", ro_vnfd_id)
4333 self.logger.debug(
4334 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4335 )
4336 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4337 except Exception as e:
4338 if (
4339 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4340 ): # not found
4341 db_nsr_update[
4342 "_admin.deployed.RO.vnfd.{}.id".format(index)
4343 ] = None
4344 self.logger.debug(
4345 logging_text
4346 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4347 )
4348 elif (
4349 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4350 ): # conflict
4351 failed_detail.append(
4352 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4353 )
4354 self.logger.debug(logging_text + failed_detail[-1])
4355 else:
4356 failed_detail.append(
4357 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4358 )
4359 self.logger.error(logging_text + failed_detail[-1])
4360
4361 if failed_detail:
4362 stage[2] = "Error deleting from VIM"
4363 else:
4364 stage[2] = "Deleted from VIM"
4365 db_nsr_update["detailed-status"] = " ".join(stage)
4366 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4367 self._write_op_status(nslcmop_id, stage)
4368
4369 if failed_detail:
4370 raise LcmException("; ".join(failed_detail))
4371
4372 async def terminate(self, nsr_id, nslcmop_id):
4373 # Try to lock HA task here
4374 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4375 if not task_is_locked_by_me:
4376 return
4377
4378 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4379 self.logger.debug(logging_text + "Enter")
4380 timeout_ns_terminate = self.timeout_ns_terminate
4381 db_nsr = None
4382 db_nslcmop = None
4383 operation_params = None
4384 exc = None
4385 error_list = [] # annotates all failed error messages
4386 db_nslcmop_update = {}
4387 autoremove = False # autoremove after terminated
4388 tasks_dict_info = {}
4389 db_nsr_update = {}
4390 stage = [
4391 "Stage 1/3: Preparing task.",
4392 "Waiting for previous operations to terminate.",
4393 "",
4394 ]
4395 # ^ contains [stage, step, VIM-status]
4396 try:
4397 # wait for any previous tasks in process
4398 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4399
4400 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4401 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4402 operation_params = db_nslcmop.get("operationParams") or {}
4403 if operation_params.get("timeout_ns_terminate"):
4404 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4405 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4406 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4407
4408 db_nsr_update["operational-status"] = "terminating"
4409 db_nsr_update["config-status"] = "terminating"
4410 self._write_ns_status(
4411 nsr_id=nsr_id,
4412 ns_state="TERMINATING",
4413 current_operation="TERMINATING",
4414 current_operation_id=nslcmop_id,
4415 other_update=db_nsr_update,
4416 )
4417 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4418 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4419 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4420 return
4421
4422 stage[1] = "Getting vnf descriptors from db."
4423 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4424 db_vnfrs_dict = {
4425 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4426 }
4427 db_vnfds_from_id = {}
4428 db_vnfds_from_member_index = {}
4429 # Loop over VNFRs
4430 for vnfr in db_vnfrs_list:
4431 vnfd_id = vnfr["vnfd-id"]
4432 if vnfd_id not in db_vnfds_from_id:
4433 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4434 db_vnfds_from_id[vnfd_id] = vnfd
4435 db_vnfds_from_member_index[
4436 vnfr["member-vnf-index-ref"]
4437 ] = db_vnfds_from_id[vnfd_id]
4438
4439 # Destroy individual execution environments when there are terminating primitives.
4440 # Rest of EE will be deleted at once
4441 # TODO - check before calling _destroy_N2VC
4442 # if not operation_params.get("skip_terminate_primitives"):#
4443 # or not vca.get("needed_terminate"):
4444 stage[0] = "Stage 2/3 execute terminating primitives."
4445 self.logger.debug(logging_text + stage[0])
4446 stage[1] = "Looking execution environment that needs terminate."
4447 self.logger.debug(logging_text + stage[1])
4448
4449 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4450 config_descriptor = None
4451 vca_member_vnf_index = vca.get("member-vnf-index")
4452 vca_id = self.get_vca_id(
4453 db_vnfrs_dict.get(vca_member_vnf_index)
4454 if vca_member_vnf_index
4455 else None,
4456 db_nsr,
4457 )
4458 if not vca or not vca.get("ee_id"):
4459 continue
4460 if not vca.get("member-vnf-index"):
4461 # ns
4462 config_descriptor = db_nsr.get("ns-configuration")
4463 elif vca.get("vdu_id"):
4464 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4465 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4466 elif vca.get("kdu_name"):
4467 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4468 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4469 else:
4470 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4471 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4472 vca_type = vca.get("type")
4473 exec_terminate_primitives = not operation_params.get(
4474 "skip_terminate_primitives"
4475 ) and vca.get("needed_terminate")
4476 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4477 # pending native charms
4478 destroy_ee = (
4479 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4480 )
4481 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4482 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4483 task = asyncio.ensure_future(
4484 self.destroy_N2VC(
4485 logging_text,
4486 db_nslcmop,
4487 vca,
4488 config_descriptor,
4489 vca_index,
4490 destroy_ee,
4491 exec_terminate_primitives,
4492 vca_id=vca_id,
4493 )
4494 )
4495 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4496
4497 # wait for pending tasks of terminate primitives
4498 if tasks_dict_info:
4499 self.logger.debug(
4500 logging_text
4501 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4502 )
4503 error_list = await self._wait_for_tasks(
4504 logging_text,
4505 tasks_dict_info,
4506 min(self.timeout_charm_delete, timeout_ns_terminate),
4507 stage,
4508 nslcmop_id,
4509 )
4510 tasks_dict_info.clear()
4511 if error_list:
4512 return # raise LcmException("; ".join(error_list))
4513
4514 # remove All execution environments at once
4515 stage[0] = "Stage 3/3 delete all."
4516
4517 if nsr_deployed.get("VCA"):
4518 stage[1] = "Deleting all execution environments."
4519 self.logger.debug(logging_text + stage[1])
4520 vca_id = self.get_vca_id({}, db_nsr)
4521 task_delete_ee = asyncio.ensure_future(
4522 asyncio.wait_for(
4523 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4524 timeout=self.timeout_charm_delete,
4525 )
4526 )
4527 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4528 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4529
4530 # Delete from k8scluster
4531 stage[1] = "Deleting KDUs."
4532 self.logger.debug(logging_text + stage[1])
4533 # print(nsr_deployed)
4534 for kdu in get_iterable(nsr_deployed, "K8s"):
4535 if not kdu or not kdu.get("kdu-instance"):
4536 continue
4537 kdu_instance = kdu.get("kdu-instance")
4538 if kdu.get("k8scluster-type") in self.k8scluster_map:
4539 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4540 vca_id = self.get_vca_id({}, db_nsr)
4541 task_delete_kdu_instance = asyncio.ensure_future(
4542 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4543 cluster_uuid=kdu.get("k8scluster-uuid"),
4544 kdu_instance=kdu_instance,
4545 vca_id=vca_id,
4546 namespace=kdu.get("namespace"),
4547 )
4548 )
4549 else:
4550 self.logger.error(
4551 logging_text
4552 + "Unknown k8s deployment type {}".format(
4553 kdu.get("k8scluster-type")
4554 )
4555 )
4556 continue
4557 tasks_dict_info[
4558 task_delete_kdu_instance
4559 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4560
4561 # remove from RO
4562 stage[1] = "Deleting ns from VIM."
4563 if self.ng_ro:
4564 task_delete_ro = asyncio.ensure_future(
4565 self._terminate_ng_ro(
4566 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4567 )
4568 )
4569 else:
4570 task_delete_ro = asyncio.ensure_future(
4571 self._terminate_RO(
4572 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4573 )
4574 )
4575 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4576
4577 # rest of staff will be done at finally
4578
4579 except (
4580 ROclient.ROClientException,
4581 DbException,
4582 LcmException,
4583 N2VCException,
4584 ) as e:
4585 self.logger.error(logging_text + "Exit Exception {}".format(e))
4586 exc = e
4587 except asyncio.CancelledError:
4588 self.logger.error(
4589 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4590 )
4591 exc = "Operation was cancelled"
4592 except Exception as e:
4593 exc = traceback.format_exc()
4594 self.logger.critical(
4595 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4596 exc_info=True,
4597 )
4598 finally:
4599 if exc:
4600 error_list.append(str(exc))
4601 try:
4602 # wait for pending tasks
4603 if tasks_dict_info:
4604 stage[1] = "Waiting for terminate pending tasks."
4605 self.logger.debug(logging_text + stage[1])
4606 error_list += await self._wait_for_tasks(
4607 logging_text,
4608 tasks_dict_info,
4609 timeout_ns_terminate,
4610 stage,
4611 nslcmop_id,
4612 )
4613 stage[1] = stage[2] = ""
4614 except asyncio.CancelledError:
4615 error_list.append("Cancelled")
4616 # TODO cancell all tasks
4617 except Exception as exc:
4618 error_list.append(str(exc))
4619 # update status at database
4620 if error_list:
4621 error_detail = "; ".join(error_list)
4622 # self.logger.error(logging_text + error_detail)
4623 error_description_nslcmop = "{} Detail: {}".format(
4624 stage[0], error_detail
4625 )
4626 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4627 nslcmop_id, stage[0]
4628 )
4629
4630 db_nsr_update["operational-status"] = "failed"
4631 db_nsr_update["detailed-status"] = (
4632 error_description_nsr + " Detail: " + error_detail
4633 )
4634 db_nslcmop_update["detailed-status"] = error_detail
4635 nslcmop_operation_state = "FAILED"
4636 ns_state = "BROKEN"
4637 else:
4638 error_detail = None
4639 error_description_nsr = error_description_nslcmop = None
4640 ns_state = "NOT_INSTANTIATED"
4641 db_nsr_update["operational-status"] = "terminated"
4642 db_nsr_update["detailed-status"] = "Done"
4643 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4644 db_nslcmop_update["detailed-status"] = "Done"
4645 nslcmop_operation_state = "COMPLETED"
4646
4647 if db_nsr:
4648 self._write_ns_status(
4649 nsr_id=nsr_id,
4650 ns_state=ns_state,
4651 current_operation="IDLE",
4652 current_operation_id=None,
4653 error_description=error_description_nsr,
4654 error_detail=error_detail,
4655 other_update=db_nsr_update,
4656 )
4657 self._write_op_status(
4658 op_id=nslcmop_id,
4659 stage="",
4660 error_message=error_description_nslcmop,
4661 operation_state=nslcmop_operation_state,
4662 other_update=db_nslcmop_update,
4663 )
4664 if ns_state == "NOT_INSTANTIATED":
4665 try:
4666 self.db.set_list(
4667 "vnfrs",
4668 {"nsr-id-ref": nsr_id},
4669 {"_admin.nsState": "NOT_INSTANTIATED"},
4670 )
4671 except DbException as e:
4672 self.logger.warn(
4673 logging_text
4674 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4675 nsr_id, e
4676 )
4677 )
4678 if operation_params:
4679 autoremove = operation_params.get("autoremove", False)
4680 if nslcmop_operation_state:
4681 try:
4682 await self.msg.aiowrite(
4683 "ns",
4684 "terminated",
4685 {
4686 "nsr_id": nsr_id,
4687 "nslcmop_id": nslcmop_id,
4688 "operationState": nslcmop_operation_state,
4689 "autoremove": autoremove,
4690 },
4691 loop=self.loop,
4692 )
4693 except Exception as e:
4694 self.logger.error(
4695 logging_text + "kafka_write notification Exception {}".format(e)
4696 )
4697
4698 self.logger.debug(logging_text + "Exit")
4699 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4700
4701 async def _wait_for_tasks(
4702 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4703 ):
4704 time_start = time()
4705 error_detail_list = []
4706 error_list = []
4707 pending_tasks = list(created_tasks_info.keys())
4708 num_tasks = len(pending_tasks)
4709 num_done = 0
4710 stage[1] = "{}/{}.".format(num_done, num_tasks)
4711 self._write_op_status(nslcmop_id, stage)
4712 while pending_tasks:
4713 new_error = None
4714 _timeout = timeout + time_start - time()
4715 done, pending_tasks = await asyncio.wait(
4716 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4717 )
4718 num_done += len(done)
4719 if not done: # Timeout
4720 for task in pending_tasks:
4721 new_error = created_tasks_info[task] + ": Timeout"
4722 error_detail_list.append(new_error)
4723 error_list.append(new_error)
4724 break
4725 for task in done:
4726 if task.cancelled():
4727 exc = "Cancelled"
4728 else:
4729 exc = task.exception()
4730 if exc:
4731 if isinstance(exc, asyncio.TimeoutError):
4732 exc = "Timeout"
4733 new_error = created_tasks_info[task] + ": {}".format(exc)
4734 error_list.append(created_tasks_info[task])
4735 error_detail_list.append(new_error)
4736 if isinstance(
4737 exc,
4738 (
4739 str,
4740 DbException,
4741 N2VCException,
4742 ROclient.ROClientException,
4743 LcmException,
4744 K8sException,
4745 NgRoException,
4746 ),
4747 ):
4748 self.logger.error(logging_text + new_error)
4749 else:
4750 exc_traceback = "".join(
4751 traceback.format_exception(None, exc, exc.__traceback__)
4752 )
4753 self.logger.error(
4754 logging_text
4755 + created_tasks_info[task]
4756 + " "
4757 + exc_traceback
4758 )
4759 else:
4760 self.logger.debug(
4761 logging_text + created_tasks_info[task] + ": Done"
4762 )
4763 stage[1] = "{}/{}.".format(num_done, num_tasks)
4764 if new_error:
4765 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4766 if nsr_id: # update also nsr
4767 self.update_db_2(
4768 "nsrs",
4769 nsr_id,
4770 {
4771 "errorDescription": "Error at: " + ", ".join(error_list),
4772 "errorDetail": ". ".join(error_detail_list),
4773 },
4774 )
4775 self._write_op_status(nslcmop_id, stage)
4776 return error_detail_list
4777
4778 @staticmethod
4779 def _map_primitive_params(primitive_desc, params, instantiation_params):
4780 """
4781 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4782 The default-value is used. If it is between < > it look for a value at instantiation_params
4783 :param primitive_desc: portion of VNFD/NSD that describes primitive
4784 :param params: Params provided by user
4785 :param instantiation_params: Instantiation params provided by user
4786 :return: a dictionary with the calculated params
4787 """
4788 calculated_params = {}
4789 for parameter in primitive_desc.get("parameter", ()):
4790 param_name = parameter["name"]
4791 if param_name in params:
4792 calculated_params[param_name] = params[param_name]
4793 elif "default-value" in parameter or "value" in parameter:
4794 if "value" in parameter:
4795 calculated_params[param_name] = parameter["value"]
4796 else:
4797 calculated_params[param_name] = parameter["default-value"]
4798 if (
4799 isinstance(calculated_params[param_name], str)
4800 and calculated_params[param_name].startswith("<")
4801 and calculated_params[param_name].endswith(">")
4802 ):
4803 if calculated_params[param_name][1:-1] in instantiation_params:
4804 calculated_params[param_name] = instantiation_params[
4805 calculated_params[param_name][1:-1]
4806 ]
4807 else:
4808 raise LcmException(
4809 "Parameter {} needed to execute primitive {} not provided".format(
4810 calculated_params[param_name], primitive_desc["name"]
4811 )
4812 )
4813 else:
4814 raise LcmException(
4815 "Parameter {} needed to execute primitive {} not provided".format(
4816 param_name, primitive_desc["name"]
4817 )
4818 )
4819
4820 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4821 calculated_params[param_name] = yaml.safe_dump(
4822 calculated_params[param_name], default_flow_style=True, width=256
4823 )
4824 elif isinstance(calculated_params[param_name], str) and calculated_params[
4825 param_name
4826 ].startswith("!!yaml "):
4827 calculated_params[param_name] = calculated_params[param_name][7:]
4828 if parameter.get("data-type") == "INTEGER":
4829 try:
4830 calculated_params[param_name] = int(calculated_params[param_name])
4831 except ValueError: # error converting string to int
4832 raise LcmException(
4833 "Parameter {} of primitive {} must be integer".format(
4834 param_name, primitive_desc["name"]
4835 )
4836 )
4837 elif parameter.get("data-type") == "BOOLEAN":
4838 calculated_params[param_name] = not (
4839 (str(calculated_params[param_name])).lower() == "false"
4840 )
4841
4842 # add always ns_config_info if primitive name is config
4843 if primitive_desc["name"] == "config":
4844 if "ns_config_info" in instantiation_params:
4845 calculated_params["ns_config_info"] = instantiation_params[
4846 "ns_config_info"
4847 ]
4848 return calculated_params
4849
4850 def _look_for_deployed_vca(
4851 self,
4852 deployed_vca,
4853 member_vnf_index,
4854 vdu_id,
4855 vdu_count_index,
4856 kdu_name=None,
4857 ee_descriptor_id=None,
4858 ):
4859 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4860 for vca in deployed_vca:
4861 if not vca:
4862 continue
4863 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4864 continue
4865 if (
4866 vdu_count_index is not None
4867 and vdu_count_index != vca["vdu_count_index"]
4868 ):
4869 continue
4870 if kdu_name and kdu_name != vca["kdu_name"]:
4871 continue
4872 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4873 continue
4874 break
4875 else:
4876 # vca_deployed not found
4877 raise LcmException(
4878 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4879 " is not deployed".format(
4880 member_vnf_index,
4881 vdu_id,
4882 vdu_count_index,
4883 kdu_name,
4884 ee_descriptor_id,
4885 )
4886 )
4887 # get ee_id
4888 ee_id = vca.get("ee_id")
4889 vca_type = vca.get(
4890 "type", "lxc_proxy_charm"
4891 ) # default value for backward compatibility - proxy charm
4892 if not ee_id:
4893 raise LcmException(
4894 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4895 "execution environment".format(
4896 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4897 )
4898 )
4899 return ee_id, vca_type
4900
4901 async def _ns_execute_primitive(
4902 self,
4903 ee_id,
4904 primitive,
4905 primitive_params,
4906 retries=0,
4907 retries_interval=30,
4908 timeout=None,
4909 vca_type=None,
4910 db_dict=None,
4911 vca_id: str = None,
4912 ) -> (str, str):
4913 try:
4914 if primitive == "config":
4915 primitive_params = {"params": primitive_params}
4916
4917 vca_type = vca_type or "lxc_proxy_charm"
4918
4919 while retries >= 0:
4920 try:
4921 output = await asyncio.wait_for(
4922 self.vca_map[vca_type].exec_primitive(
4923 ee_id=ee_id,
4924 primitive_name=primitive,
4925 params_dict=primitive_params,
4926 progress_timeout=self.timeout_progress_primitive,
4927 total_timeout=self.timeout_primitive,
4928 db_dict=db_dict,
4929 vca_id=vca_id,
4930 vca_type=vca_type,
4931 ),
4932 timeout=timeout or self.timeout_primitive,
4933 )
4934 # execution was OK
4935 break
4936 except asyncio.CancelledError:
4937 raise
4938 except Exception as e: # asyncio.TimeoutError
4939 if isinstance(e, asyncio.TimeoutError):
4940 e = "Timeout"
4941 retries -= 1
4942 if retries >= 0:
4943 self.logger.debug(
4944 "Error executing action {} on {} -> {}".format(
4945 primitive, ee_id, e
4946 )
4947 )
4948 # wait and retry
4949 await asyncio.sleep(retries_interval, loop=self.loop)
4950 else:
4951 return "FAILED", str(e)
4952
4953 return "COMPLETED", output
4954
4955 except (LcmException, asyncio.CancelledError):
4956 raise
4957 except Exception as e:
4958 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4959
4960 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4961 """
4962 Updating the vca_status with latest juju information in nsrs record
4963 :param: nsr_id: Id of the nsr
4964 :param: nslcmop_id: Id of the nslcmop
4965 :return: None
4966 """
4967
4968 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4969 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4970 vca_id = self.get_vca_id({}, db_nsr)
4971 if db_nsr["_admin"]["deployed"]["K8s"]:
4972 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4973 cluster_uuid, kdu_instance, cluster_type = (
4974 k8s["k8scluster-uuid"],
4975 k8s["kdu-instance"],
4976 k8s["k8scluster-type"],
4977 )
4978 await self._on_update_k8s_db(
4979 cluster_uuid=cluster_uuid,
4980 kdu_instance=kdu_instance,
4981 filter={"_id": nsr_id},
4982 vca_id=vca_id,
4983 cluster_type=cluster_type,
4984 )
4985 else:
4986 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4987 table, filter = "nsrs", {"_id": nsr_id}
4988 path = "_admin.deployed.VCA.{}.".format(vca_index)
4989 await self._on_update_n2vc_db(table, filter, path, {})
4990
4991 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4992 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4993
4994 async def action(self, nsr_id, nslcmop_id):
4995 # Try to lock HA task here
4996 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4997 if not task_is_locked_by_me:
4998 return
4999
5000 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
5001 self.logger.debug(logging_text + "Enter")
5002 # get all needed from database
5003 db_nsr = None
5004 db_nslcmop = None
5005 db_nsr_update = {}
5006 db_nslcmop_update = {}
5007 nslcmop_operation_state = None
5008 error_description_nslcmop = None
5009 exc = None
5010 try:
5011 # wait for any previous tasks in process
5012 step = "Waiting for previous operations to terminate"
5013 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5014
5015 self._write_ns_status(
5016 nsr_id=nsr_id,
5017 ns_state=None,
5018 current_operation="RUNNING ACTION",
5019 current_operation_id=nslcmop_id,
5020 )
5021
5022 step = "Getting information from database"
5023 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5024 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5025 if db_nslcmop["operationParams"].get("primitive_params"):
5026 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
5027 db_nslcmop["operationParams"]["primitive_params"]
5028 )
5029
5030 nsr_deployed = db_nsr["_admin"].get("deployed")
5031 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
5032 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
5033 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
5034 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
5035 primitive = db_nslcmop["operationParams"]["primitive"]
5036 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
5037 timeout_ns_action = db_nslcmop["operationParams"].get(
5038 "timeout_ns_action", self.timeout_primitive
5039 )
5040
5041 if vnf_index:
5042 step = "Getting vnfr from database"
5043 db_vnfr = self.db.get_one(
5044 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5045 )
5046 if db_vnfr.get("kdur"):
5047 kdur_list = []
5048 for kdur in db_vnfr["kdur"]:
5049 if kdur.get("additionalParams"):
5050 kdur["additionalParams"] = json.loads(
5051 kdur["additionalParams"]
5052 )
5053 kdur_list.append(kdur)
5054 db_vnfr["kdur"] = kdur_list
5055 step = "Getting vnfd from database"
5056 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5057
5058 # Sync filesystem before running a primitive
5059 self.fs.sync(db_vnfr["vnfd-id"])
5060 else:
5061 step = "Getting nsd from database"
5062 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
5063
5064 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5065 # for backward compatibility
5066 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5067 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5068 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5069 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5070
5071 # look for primitive
5072 config_primitive_desc = descriptor_configuration = None
5073 if vdu_id:
5074 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
5075 elif kdu_name:
5076 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
5077 elif vnf_index:
5078 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
5079 else:
5080 descriptor_configuration = db_nsd.get("ns-configuration")
5081
5082 if descriptor_configuration and descriptor_configuration.get(
5083 "config-primitive"
5084 ):
5085 for config_primitive in descriptor_configuration["config-primitive"]:
5086 if config_primitive["name"] == primitive:
5087 config_primitive_desc = config_primitive
5088 break
5089
5090 if not config_primitive_desc:
5091 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
5092 raise LcmException(
5093 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
5094 primitive
5095 )
5096 )
5097 primitive_name = primitive
5098 ee_descriptor_id = None
5099 else:
5100 primitive_name = config_primitive_desc.get(
5101 "execution-environment-primitive", primitive
5102 )
5103 ee_descriptor_id = config_primitive_desc.get(
5104 "execution-environment-ref"
5105 )
5106
5107 if vnf_index:
5108 if vdu_id:
5109 vdur = next(
5110 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
5111 )
5112 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
5113 elif kdu_name:
5114 kdur = next(
5115 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
5116 )
5117 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
5118 else:
5119 desc_params = parse_yaml_strings(
5120 db_vnfr.get("additionalParamsForVnf")
5121 )
5122 else:
5123 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
5124 if kdu_name and get_configuration(db_vnfd, kdu_name):
5125 kdu_configuration = get_configuration(db_vnfd, kdu_name)
5126 actions = set()
5127 for primitive in kdu_configuration.get("initial-config-primitive", []):
5128 actions.add(primitive["name"])
5129 for primitive in kdu_configuration.get("config-primitive", []):
5130 actions.add(primitive["name"])
5131 kdu = find_in_list(
5132 nsr_deployed["K8s"],
5133 lambda kdu: kdu_name == kdu["kdu-name"]
5134 and kdu["member-vnf-index"] == vnf_index,
5135 )
5136 kdu_action = (
5137 True
5138 if primitive_name in actions
5139 and kdu["k8scluster-type"] not in ("helm-chart", "helm-chart-v3")
5140 else False
5141 )
5142
5143 # TODO check if ns is in a proper status
5144 if kdu_name and (
5145 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5146 ):
5147 # kdur and desc_params already set from before
5148 if primitive_params:
5149 desc_params.update(primitive_params)
5150 # TODO Check if we will need something at vnf level
5151 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5152 if (
5153 kdu_name == kdu["kdu-name"]
5154 and kdu["member-vnf-index"] == vnf_index
5155 ):
5156 break
5157 else:
5158 raise LcmException(
5159 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5160 )
5161
5162 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5163 msg = "unknown k8scluster-type '{}'".format(
5164 kdu.get("k8scluster-type")
5165 )
5166 raise LcmException(msg)
5167
5168 db_dict = {
5169 "collection": "nsrs",
5170 "filter": {"_id": nsr_id},
5171 "path": "_admin.deployed.K8s.{}".format(index),
5172 }
5173 self.logger.debug(
5174 logging_text
5175 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5176 )
5177 step = "Executing kdu {}".format(primitive_name)
5178 if primitive_name == "upgrade":
5179 if desc_params.get("kdu_model"):
5180 kdu_model = desc_params.get("kdu_model")
5181 del desc_params["kdu_model"]
5182 else:
5183 kdu_model = kdu.get("kdu-model")
5184 parts = kdu_model.split(sep=":")
5185 if len(parts) == 2:
5186 kdu_model = parts[0]
5187
5188 detailed_status = await asyncio.wait_for(
5189 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5190 cluster_uuid=kdu.get("k8scluster-uuid"),
5191 kdu_instance=kdu.get("kdu-instance"),
5192 atomic=True,
5193 kdu_model=kdu_model,
5194 params=desc_params,
5195 db_dict=db_dict,
5196 timeout=timeout_ns_action,
5197 ),
5198 timeout=timeout_ns_action + 10,
5199 )
5200 self.logger.debug(
5201 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5202 )
5203 elif primitive_name == "rollback":
5204 detailed_status = await asyncio.wait_for(
5205 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5206 cluster_uuid=kdu.get("k8scluster-uuid"),
5207 kdu_instance=kdu.get("kdu-instance"),
5208 db_dict=db_dict,
5209 ),
5210 timeout=timeout_ns_action,
5211 )
5212 elif primitive_name == "status":
5213 detailed_status = await asyncio.wait_for(
5214 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5215 cluster_uuid=kdu.get("k8scluster-uuid"),
5216 kdu_instance=kdu.get("kdu-instance"),
5217 vca_id=vca_id,
5218 ),
5219 timeout=timeout_ns_action,
5220 )
5221 else:
5222 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5223 kdu["kdu-name"], nsr_id
5224 )
5225 params = self._map_primitive_params(
5226 config_primitive_desc, primitive_params, desc_params
5227 )
5228
5229 detailed_status = await asyncio.wait_for(
5230 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5231 cluster_uuid=kdu.get("k8scluster-uuid"),
5232 kdu_instance=kdu_instance,
5233 primitive_name=primitive_name,
5234 params=params,
5235 db_dict=db_dict,
5236 timeout=timeout_ns_action,
5237 vca_id=vca_id,
5238 ),
5239 timeout=timeout_ns_action,
5240 )
5241
5242 if detailed_status:
5243 nslcmop_operation_state = "COMPLETED"
5244 else:
5245 detailed_status = ""
5246 nslcmop_operation_state = "FAILED"
5247 else:
5248 ee_id, vca_type = self._look_for_deployed_vca(
5249 nsr_deployed["VCA"],
5250 member_vnf_index=vnf_index,
5251 vdu_id=vdu_id,
5252 vdu_count_index=vdu_count_index,
5253 ee_descriptor_id=ee_descriptor_id,
5254 )
5255 for vca_index, vca_deployed in enumerate(
5256 db_nsr["_admin"]["deployed"]["VCA"]
5257 ):
5258 if vca_deployed.get("member-vnf-index") == vnf_index:
5259 db_dict = {
5260 "collection": "nsrs",
5261 "filter": {"_id": nsr_id},
5262 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5263 }
5264 break
5265 (
5266 nslcmop_operation_state,
5267 detailed_status,
5268 ) = await self._ns_execute_primitive(
5269 ee_id,
5270 primitive=primitive_name,
5271 primitive_params=self._map_primitive_params(
5272 config_primitive_desc, primitive_params, desc_params
5273 ),
5274 timeout=timeout_ns_action,
5275 vca_type=vca_type,
5276 db_dict=db_dict,
5277 vca_id=vca_id,
5278 )
5279
5280 db_nslcmop_update["detailed-status"] = detailed_status
5281 error_description_nslcmop = (
5282 detailed_status if nslcmop_operation_state == "FAILED" else ""
5283 )
5284 self.logger.debug(
5285 logging_text
5286 + " task Done with result {} {}".format(
5287 nslcmop_operation_state, detailed_status
5288 )
5289 )
5290 return # database update is called inside finally
5291
5292 except (DbException, LcmException, N2VCException, K8sException) as e:
5293 self.logger.error(logging_text + "Exit Exception {}".format(e))
5294 exc = e
5295 except asyncio.CancelledError:
5296 self.logger.error(
5297 logging_text + "Cancelled Exception while '{}'".format(step)
5298 )
5299 exc = "Operation was cancelled"
5300 except asyncio.TimeoutError:
5301 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5302 exc = "Timeout"
5303 except Exception as e:
5304 exc = traceback.format_exc()
5305 self.logger.critical(
5306 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5307 exc_info=True,
5308 )
5309 finally:
5310 if exc:
5311 db_nslcmop_update[
5312 "detailed-status"
5313 ] = (
5314 detailed_status
5315 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5316 nslcmop_operation_state = "FAILED"
5317 if db_nsr:
5318 self._write_ns_status(
5319 nsr_id=nsr_id,
5320 ns_state=db_nsr[
5321 "nsState"
5322 ], # TODO check if degraded. For the moment use previous status
5323 current_operation="IDLE",
5324 current_operation_id=None,
5325 # error_description=error_description_nsr,
5326 # error_detail=error_detail,
5327 other_update=db_nsr_update,
5328 )
5329
5330 self._write_op_status(
5331 op_id=nslcmop_id,
5332 stage="",
5333 error_message=error_description_nslcmop,
5334 operation_state=nslcmop_operation_state,
5335 other_update=db_nslcmop_update,
5336 )
5337
5338 if nslcmop_operation_state:
5339 try:
5340 await self.msg.aiowrite(
5341 "ns",
5342 "actioned",
5343 {
5344 "nsr_id": nsr_id,
5345 "nslcmop_id": nslcmop_id,
5346 "operationState": nslcmop_operation_state,
5347 },
5348 loop=self.loop,
5349 )
5350 except Exception as e:
5351 self.logger.error(
5352 logging_text + "kafka_write notification Exception {}".format(e)
5353 )
5354 self.logger.debug(logging_text + "Exit")
5355 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5356 return nslcmop_operation_state, detailed_status
5357
5358 async def terminate_vdus(
5359 self, db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text
5360 ):
5361 """This method terminates VDUs
5362
5363 Args:
5364 db_vnfr: VNF instance record
5365 member_vnf_index: VNF index to identify the VDUs to be removed
5366 db_nsr: NS instance record
5367 update_db_nslcmops: Nslcmop update record
5368 """
5369 vca_scaling_info = []
5370 scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5371 scaling_info["scaling_direction"] = "IN"
5372 scaling_info["vdu-delete"] = {}
5373 scaling_info["kdu-delete"] = {}
5374 db_vdur = db_vnfr.get("vdur")
5375 vdur_list = copy(db_vdur)
5376 count_index = 0
5377 for index, vdu in enumerate(vdur_list):
5378 vca_scaling_info.append(
5379 {
5380 "osm_vdu_id": vdu["vdu-id-ref"],
5381 "member-vnf-index": member_vnf_index,
5382 "type": "delete",
5383 "vdu_index": count_index,
5384 })
5385 scaling_info["vdu-delete"][vdu["vdu-id-ref"]] = count_index
5386 scaling_info["vdu"].append(
5387 {
5388 "name": vdu.get("name") or vdu.get("vdu-name"),
5389 "vdu_id": vdu["vdu-id-ref"],
5390 "interface": [],
5391 })
5392 for interface in vdu["interfaces"]:
5393 scaling_info["vdu"][index]["interface"].append(
5394 {
5395 "name": interface["name"],
5396 "ip_address": interface["ip-address"],
5397 "mac_address": interface.get("mac-address"),
5398 })
5399 self.logger.info("NS update scaling info{}".format(scaling_info))
5400 stage[2] = "Terminating VDUs"
5401 if scaling_info.get("vdu-delete"):
5402 # scale_process = "RO"
5403 if self.ro_config.get("ng"):
5404 await self._scale_ng_ro(
5405 logging_text, db_nsr, update_db_nslcmops, db_vnfr, scaling_info, stage
5406 )
5407
5408 async def remove_vnf(
5409 self, nsr_id, nslcmop_id, vnf_instance_id
5410 ):
5411 """This method is to Remove VNF instances from NS.
5412
5413 Args:
5414 nsr_id: NS instance id
5415 nslcmop_id: nslcmop id of update
5416 vnf_instance_id: id of the VNF instance to be removed
5417
5418 Returns:
5419 result: (str, str) COMPLETED/FAILED, details
5420 """
5421 try:
5422 db_nsr_update = {}
5423 logging_text = "Task ns={} update ".format(nsr_id)
5424 check_vnfr_count = len(self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}))
5425 self.logger.info("check_vnfr_count {}".format(check_vnfr_count))
5426 if check_vnfr_count > 1:
5427 stage = ["", "", ""]
5428 step = "Getting nslcmop from database"
5429 self.logger.debug(step + " after having waited for previous tasks to be completed")
5430 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5431 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5432 db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
5433 member_vnf_index = db_vnfr["member-vnf-index-ref"]
5434 """ db_vnfr = self.db.get_one(
5435 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}) """
5436
5437 update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5438 await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text)
5439
5440 constituent_vnfr = db_nsr.get("constituent-vnfr-ref")
5441 constituent_vnfr.remove(db_vnfr.get("_id"))
5442 db_nsr_update["constituent-vnfr-ref"] = db_nsr.get("constituent-vnfr-ref")
5443 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5444 self.db.del_one("vnfrs", {"_id": db_vnfr.get("_id")})
5445 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5446 return "COMPLETED", "Done"
5447 else:
5448 step = "Terminate VNF Failed with"
5449 raise LcmException("{} Cannot terminate the last VNF in this NS.".format(
5450 vnf_instance_id))
5451 except (LcmException, asyncio.CancelledError):
5452 raise
5453 except Exception as e:
5454 self.logger.debug("Error removing VNF {}".format(e))
5455 return "FAILED", "Error removing VNF {}".format(e)
5456
5457 async def _ns_redeploy_vnf(
5458 self, nsr_id, nslcmop_id, db_vnfd, db_vnfr, db_nsr,
5459 ):
5460 """This method updates and redeploys VNF instances
5461
5462 Args:
5463 nsr_id: NS instance id
5464 nslcmop_id: nslcmop id
5465 db_vnfd: VNF descriptor
5466 db_vnfr: VNF instance record
5467 db_nsr: NS instance record
5468
5469 Returns:
5470 result: (str, str) COMPLETED/FAILED, details
5471 """
5472 try:
5473 count_index = 0
5474 stage = ["", "", ""]
5475 logging_text = "Task ns={} update ".format(nsr_id)
5476 latest_vnfd_revision = db_vnfd["_admin"].get("revision")
5477 member_vnf_index = db_vnfr["member-vnf-index-ref"]
5478
5479 # Terminate old VNF resources
5480 update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5481 await self.terminate_vdus(db_vnfr, member_vnf_index, db_nsr, update_db_nslcmops, stage, logging_text)
5482
5483 # old_vnfd_id = db_vnfr["vnfd-id"]
5484 # new_db_vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5485 new_db_vnfd = db_vnfd
5486 # new_vnfd_ref = new_db_vnfd["id"]
5487 # new_vnfd_id = vnfd_id
5488
5489 # Create VDUR
5490 new_vnfr_cp = []
5491 for cp in new_db_vnfd.get("ext-cpd", ()):
5492 vnf_cp = {
5493 "name": cp.get("id"),
5494 "connection-point-id": cp.get("int-cpd", {}).get("cpd"),
5495 "connection-point-vdu-id": cp.get("int-cpd", {}).get("vdu-id"),
5496 "id": cp.get("id"),
5497 }
5498 new_vnfr_cp.append(vnf_cp)
5499 new_vdur = update_db_nslcmops["operationParams"]["newVdur"]
5500 # new_vdur = self._create_vdur_descriptor_from_vnfd(db_nsd, db_vnfd, old_db_vnfd, vnfd_id, db_nsr, member_vnf_index)
5501 # new_vnfr_update = {"vnfd-ref": new_vnfd_ref, "vnfd-id": new_vnfd_id, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5502 new_vnfr_update = {"revision": latest_vnfd_revision, "connection-point": new_vnfr_cp, "vdur": new_vdur, "ip-address": ""}
5503 self.update_db_2("vnfrs", db_vnfr["_id"], new_vnfr_update)
5504 updated_db_vnfr = self.db.get_one(
5505 "vnfrs", {"member-vnf-index-ref": member_vnf_index, "nsr-id-ref": nsr_id}
5506 )
5507
5508 # Instantiate new VNF resources
5509 # update_db_nslcmops = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5510 vca_scaling_info = []
5511 scaling_info = {"scaling_group_name": "vdu_autoscale", "vdu": [], "kdu": []}
5512 scaling_info["scaling_direction"] = "OUT"
5513 scaling_info["vdu-create"] = {}
5514 scaling_info["kdu-create"] = {}
5515 vdud_instantiate_list = db_vnfd["vdu"]
5516 for index, vdud in enumerate(vdud_instantiate_list):
5517 cloud_init_text = self._get_vdu_cloud_init_content(
5518 vdud, db_vnfd
5519 )
5520 if cloud_init_text:
5521 additional_params = (
5522 self._get_vdu_additional_params(updated_db_vnfr, vdud["id"])
5523 or {}
5524 )
5525 cloud_init_list = []
5526 if cloud_init_text:
5527 # TODO Information of its own ip is not available because db_vnfr is not updated.
5528 additional_params["OSM"] = get_osm_params(
5529 updated_db_vnfr, vdud["id"], 1
5530 )
5531 cloud_init_list.append(
5532 self._parse_cloud_init(
5533 cloud_init_text,
5534 additional_params,
5535 db_vnfd["id"],
5536 vdud["id"],
5537 )
5538 )
5539 vca_scaling_info.append(
5540 {
5541 "osm_vdu_id": vdud["id"],
5542 "member-vnf-index": member_vnf_index,
5543 "type": "create",
5544 "vdu_index": count_index,
5545 }
5546 )
5547 scaling_info["vdu-create"][vdud["id"]] = count_index
5548 if self.ro_config.get("ng"):
5549 self.logger.debug(
5550 "New Resources to be deployed: {}".format(scaling_info))
5551 await self._scale_ng_ro(
5552 logging_text, db_nsr, update_db_nslcmops, updated_db_vnfr, scaling_info, stage
5553 )
5554 return "COMPLETED", "Done"
5555 except (LcmException, asyncio.CancelledError):
5556 raise
5557 except Exception as e:
5558 self.logger.debug("Error updating VNF {}".format(e))
5559 return "FAILED", "Error updating VNF {}".format(e)
5560
5561 async def _ns_charm_upgrade(
5562 self,
5563 ee_id,
5564 charm_id,
5565 charm_type,
5566 path,
5567 timeout: float = None,
5568 ) -> (str, str):
5569 """This method upgrade charms in VNF instances
5570
5571 Args:
5572 ee_id: Execution environment id
5573 path: Local path to the charm
5574 charm_id: charm-id
5575 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
5576 timeout: (Float) Timeout for the ns update operation
5577
5578 Returns:
5579 result: (str, str) COMPLETED/FAILED, details
5580 """
5581 try:
5582 charm_type = charm_type or "lxc_proxy_charm"
5583 output = await self.vca_map[charm_type].upgrade_charm(
5584 ee_id=ee_id,
5585 path=path,
5586 charm_id=charm_id,
5587 charm_type=charm_type,
5588 timeout=timeout or self.timeout_ns_update,
5589 )
5590
5591 if output:
5592 return "COMPLETED", output
5593
5594 except (LcmException, asyncio.CancelledError):
5595 raise
5596
5597 except Exception as e:
5598
5599 self.logger.debug("Error upgrading charm {}".format(path))
5600
5601 return "FAILED", "Error upgrading charm {}: {}".format(path, e)
5602
5603 async def update(self, nsr_id, nslcmop_id):
5604 """Update NS according to different update types
5605
5606 This method performs upgrade of VNF instances then updates the revision
5607 number in VNF record
5608
5609 Args:
5610 nsr_id: Network service will be updated
5611 nslcmop_id: ns lcm operation id
5612
5613 Returns:
5614 It may raise DbException, LcmException, N2VCException, K8sException
5615
5616 """
5617 # Try to lock HA task here
5618 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5619 if not task_is_locked_by_me:
5620 return
5621
5622 logging_text = "Task ns={} update={} ".format(nsr_id, nslcmop_id)
5623 self.logger.debug(logging_text + "Enter")
5624
5625 # Set the required variables to be filled up later
5626 db_nsr = None
5627 db_nslcmop_update = {}
5628 vnfr_update = {}
5629 nslcmop_operation_state = None
5630 db_nsr_update = {}
5631 error_description_nslcmop = ""
5632 exc = None
5633 change_type = "updated"
5634 detailed_status = ""
5635
5636 try:
5637 # wait for any previous tasks in process
5638 step = "Waiting for previous operations to terminate"
5639 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5640 self._write_ns_status(
5641 nsr_id=nsr_id,
5642 ns_state=None,
5643 current_operation="UPDATING",
5644 current_operation_id=nslcmop_id,
5645 )
5646
5647 step = "Getting nslcmop from database"
5648 db_nslcmop = self.db.get_one(
5649 "nslcmops", {"_id": nslcmop_id}, fail_on_empty=False
5650 )
5651 update_type = db_nslcmop["operationParams"]["updateType"]
5652
5653 step = "Getting nsr from database"
5654 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5655 old_operational_status = db_nsr["operational-status"]
5656 db_nsr_update["operational-status"] = "updating"
5657 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5658 nsr_deployed = db_nsr["_admin"].get("deployed")
5659
5660 if update_type == "CHANGE_VNFPKG":
5661
5662 # Get the input parameters given through update request
5663 vnf_instance_id = db_nslcmop["operationParams"][
5664 "changeVnfPackageData"
5665 ].get("vnfInstanceId")
5666
5667 vnfd_id = db_nslcmop["operationParams"]["changeVnfPackageData"].get(
5668 "vnfdId"
5669 )
5670 timeout_seconds = db_nslcmop["operationParams"].get("timeout_ns_update")
5671
5672 step = "Getting vnfr from database"
5673 db_vnfr = self.db.get_one(
5674 "vnfrs", {"_id": vnf_instance_id}, fail_on_empty=False
5675 )
5676
5677 step = "Getting vnfds from database"
5678 # Latest VNFD
5679 latest_vnfd = self.db.get_one(
5680 "vnfds", {"_id": vnfd_id}, fail_on_empty=False
5681 )
5682 latest_vnfd_revision = latest_vnfd["_admin"].get("revision")
5683
5684 # Current VNFD
5685 current_vnf_revision = db_vnfr.get("revision", 1)
5686 current_vnfd = self.db.get_one(
5687 "vnfds_revisions",
5688 {"_id": vnfd_id + ":" + str(current_vnf_revision)},
5689 fail_on_empty=False,
5690 )
5691 # Charm artifact paths will be filled up later
5692 (
5693 current_charm_artifact_path,
5694 target_charm_artifact_path,
5695 charm_artifact_paths,
5696 ) = ([], [], [])
5697
5698 step = "Checking if revision has changed in VNFD"
5699 if current_vnf_revision != latest_vnfd_revision:
5700
5701 change_type = "policy_updated"
5702
5703 # There is new revision of VNFD, update operation is required
5704 current_vnfd_path = vnfd_id + ":" + str(current_vnf_revision)
5705 latest_vnfd_path = vnfd_id + ":" + str(latest_vnfd_revision)
5706
5707 step = "Removing the VNFD packages if they exist in the local path"
5708 shutil.rmtree(self.fs.path + current_vnfd_path, ignore_errors=True)
5709 shutil.rmtree(self.fs.path + latest_vnfd_path, ignore_errors=True)
5710
5711 step = "Get the VNFD packages from FSMongo"
5712 self.fs.sync(from_path=latest_vnfd_path)
5713 self.fs.sync(from_path=current_vnfd_path)
5714
5715 step = (
5716 "Get the charm-type, charm-id, ee-id if there is deployed VCA"
5717 )
5718 base_folder = latest_vnfd["_admin"]["storage"]
5719
5720 for charm_index, charm_deployed in enumerate(
5721 get_iterable(nsr_deployed, "VCA")
5722 ):
5723 vnf_index = db_vnfr.get("member-vnf-index-ref")
5724
5725 # Getting charm-id and charm-type
5726 if charm_deployed.get("member-vnf-index") == vnf_index:
5727 charm_id = self.get_vca_id(db_vnfr, db_nsr)
5728 charm_type = charm_deployed.get("type")
5729
5730 # Getting ee-id
5731 ee_id = charm_deployed.get("ee_id")
5732
5733 step = "Getting descriptor config"
5734 descriptor_config = get_configuration(
5735 current_vnfd, current_vnfd["id"]
5736 )
5737
5738 if "execution-environment-list" in descriptor_config:
5739 ee_list = descriptor_config.get(
5740 "execution-environment-list", []
5741 )
5742 else:
5743 ee_list = []
5744
5745 # There could be several charm used in the same VNF
5746 for ee_item in ee_list:
5747 if ee_item.get("juju"):
5748
5749 step = "Getting charm name"
5750 charm_name = ee_item["juju"].get("charm")
5751
5752 step = "Setting Charm artifact paths"
5753 current_charm_artifact_path.append(
5754 get_charm_artifact_path(
5755 base_folder,
5756 charm_name,
5757 charm_type,
5758 current_vnf_revision,
5759 )
5760 )
5761 target_charm_artifact_path.append(
5762 get_charm_artifact_path(
5763 base_folder,
5764 charm_name,
5765 charm_type,
5766 latest_vnfd_revision,
5767 )
5768 )
5769
5770 charm_artifact_paths = zip(
5771 current_charm_artifact_path, target_charm_artifact_path
5772 )
5773
5774 step = "Checking if software version has changed in VNFD"
5775 if find_software_version(current_vnfd) != find_software_version(
5776 latest_vnfd
5777 ):
5778
5779 step = "Checking if existing VNF has charm"
5780 for current_charm_path, target_charm_path in list(
5781 charm_artifact_paths
5782 ):
5783 if current_charm_path:
5784 raise LcmException(
5785 "Software version change is not supported as VNF instance {} has charm.".format(
5786 vnf_instance_id
5787 )
5788 )
5789
5790 # There is no change in the charm package, then redeploy the VNF
5791 # based on new descriptor
5792 step = "Redeploying VNF"
5793 member_vnf_index = db_vnfr["member-vnf-index-ref"]
5794 (
5795 result,
5796 detailed_status
5797 ) = await self._ns_redeploy_vnf(
5798 nsr_id,
5799 nslcmop_id,
5800 latest_vnfd,
5801 db_vnfr,
5802 db_nsr
5803 )
5804 if result == "FAILED":
5805 nslcmop_operation_state = result
5806 error_description_nslcmop = detailed_status
5807 db_nslcmop_update["detailed-status"] = detailed_status
5808 self.logger.debug(
5809 logging_text
5810 + " step {} Done with result {} {}".format(
5811 step, nslcmop_operation_state, detailed_status
5812 )
5813 )
5814
5815 else:
5816 step = "Checking if any charm package has changed or not"
5817 for current_charm_path, target_charm_path in list(
5818 charm_artifact_paths
5819 ):
5820 if (
5821 current_charm_path
5822 and target_charm_path
5823 and self.check_charm_hash_changed(
5824 current_charm_path, target_charm_path
5825 )
5826 ):
5827
5828 step = "Checking whether VNF uses juju bundle"
5829 if check_juju_bundle_existence(current_vnfd):
5830
5831 raise LcmException(
5832 "Charm upgrade is not supported for the instance which"
5833 " uses juju-bundle: {}".format(
5834 check_juju_bundle_existence(current_vnfd)
5835 )
5836 )
5837
5838 step = "Upgrading Charm"
5839 (
5840 result,
5841 detailed_status,
5842 ) = await self._ns_charm_upgrade(
5843 ee_id=ee_id,
5844 charm_id=charm_id,
5845 charm_type=charm_type,
5846 path=self.fs.path + target_charm_path,
5847 timeout=timeout_seconds,
5848 )
5849
5850 if result == "FAILED":
5851 nslcmop_operation_state = result
5852 error_description_nslcmop = detailed_status
5853
5854 db_nslcmop_update["detailed-status"] = detailed_status
5855 self.logger.debug(
5856 logging_text
5857 + " step {} Done with result {} {}".format(
5858 step, nslcmop_operation_state, detailed_status
5859 )
5860 )
5861
5862 step = "Updating policies"
5863 member_vnf_index = db_vnfr["member-vnf-index-ref"]
5864 result = "COMPLETED"
5865 detailed_status = "Done"
5866 db_nslcmop_update["detailed-status"] = "Done"
5867
5868 # If nslcmop_operation_state is None, so any operation is not failed.
5869 if not nslcmop_operation_state:
5870 nslcmop_operation_state = "COMPLETED"
5871
5872 # If update CHANGE_VNFPKG nslcmop_operation is successful
5873 # vnf revision need to be updated
5874 vnfr_update["revision"] = latest_vnfd_revision
5875 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
5876
5877 self.logger.debug(
5878 logging_text
5879 + " task Done with result {} {}".format(
5880 nslcmop_operation_state, detailed_status
5881 )
5882 )
5883 elif update_type == "REMOVE_VNF":
5884 # This part is included in https://osm.etsi.org/gerrit/11876
5885 vnf_instance_id = db_nslcmop["operationParams"]["removeVnfInstanceId"]
5886 db_vnfr = self.db.get_one("vnfrs", {"_id": vnf_instance_id})
5887 member_vnf_index = db_vnfr["member-vnf-index-ref"]
5888 step = "Removing VNF"
5889 (result, detailed_status) = await self.remove_vnf(nsr_id, nslcmop_id, vnf_instance_id)
5890 if result == "FAILED":
5891 nslcmop_operation_state = result
5892 error_description_nslcmop = detailed_status
5893 db_nslcmop_update["detailed-status"] = detailed_status
5894 change_type = "vnf_terminated"
5895 if not nslcmop_operation_state:
5896 nslcmop_operation_state = "COMPLETED"
5897 self.logger.debug(
5898 logging_text
5899 + " task Done with result {} {}".format(
5900 nslcmop_operation_state, detailed_status
5901 )
5902 )
5903
5904 # If nslcmop_operation_state is None, so any operation is not failed.
5905 # All operations are executed in overall.
5906 if not nslcmop_operation_state:
5907 nslcmop_operation_state = "COMPLETED"
5908 db_nsr_update["operational-status"] = old_operational_status
5909
5910 except (DbException, LcmException, N2VCException, K8sException) as e:
5911 self.logger.error(logging_text + "Exit Exception {}".format(e))
5912 exc = e
5913 except asyncio.CancelledError:
5914 self.logger.error(
5915 logging_text + "Cancelled Exception while '{}'".format(step)
5916 )
5917 exc = "Operation was cancelled"
5918 except asyncio.TimeoutError:
5919 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5920 exc = "Timeout"
5921 except Exception as e:
5922 exc = traceback.format_exc()
5923 self.logger.critical(
5924 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5925 exc_info=True,
5926 )
5927 finally:
5928 if exc:
5929 db_nslcmop_update[
5930 "detailed-status"
5931 ] = (
5932 detailed_status
5933 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5934 nslcmop_operation_state = "FAILED"
5935 db_nsr_update["operational-status"] = old_operational_status
5936 if db_nsr:
5937 self._write_ns_status(
5938 nsr_id=nsr_id,
5939 ns_state=db_nsr["nsState"],
5940 current_operation="IDLE",
5941 current_operation_id=None,
5942 other_update=db_nsr_update,
5943 )
5944
5945 self._write_op_status(
5946 op_id=nslcmop_id,
5947 stage="",
5948 error_message=error_description_nslcmop,
5949 operation_state=nslcmop_operation_state,
5950 other_update=db_nslcmop_update,
5951 )
5952
5953 if nslcmop_operation_state:
5954 try:
5955 msg = {
5956 "nsr_id": nsr_id,
5957 "nslcmop_id": nslcmop_id,
5958 "operationState": nslcmop_operation_state,
5959 }
5960 if change_type in ("vnf_terminated", "policy_updated"):
5961 msg.update({"vnf_member_index": member_vnf_index})
5962 await self.msg.aiowrite("ns", change_type, msg, loop=self.loop)
5963 except Exception as e:
5964 self.logger.error(
5965 logging_text + "kafka_write notification Exception {}".format(e)
5966 )
5967 self.logger.debug(logging_text + "Exit")
5968 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_update")
5969 return nslcmop_operation_state, detailed_status
5970
5971 async def scale(self, nsr_id, nslcmop_id):
5972 # Try to lock HA task here
5973 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5974 if not task_is_locked_by_me:
5975 return
5976
5977 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5978 stage = ["", "", ""]
5979 tasks_dict_info = {}
5980 # ^ stage, step, VIM progress
5981 self.logger.debug(logging_text + "Enter")
5982 # get all needed from database
5983 db_nsr = None
5984 db_nslcmop_update = {}
5985 db_nsr_update = {}
5986 exc = None
5987 # in case of error, indicates what part of scale was failed to put nsr at error status
5988 scale_process = None
5989 old_operational_status = ""
5990 old_config_status = ""
5991 nsi_id = None
5992 try:
5993 # wait for any previous tasks in process
5994 step = "Waiting for previous operations to terminate"
5995 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5996 self._write_ns_status(
5997 nsr_id=nsr_id,
5998 ns_state=None,
5999 current_operation="SCALING",
6000 current_operation_id=nslcmop_id,
6001 )
6002
6003 step = "Getting nslcmop from database"
6004 self.logger.debug(
6005 step + " after having waited for previous tasks to be completed"
6006 )
6007 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
6008
6009 step = "Getting nsr from database"
6010 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
6011 old_operational_status = db_nsr["operational-status"]
6012 old_config_status = db_nsr["config-status"]
6013
6014 step = "Parsing scaling parameters"
6015 db_nsr_update["operational-status"] = "scaling"
6016 self.update_db_2("nsrs", nsr_id, db_nsr_update)
6017 nsr_deployed = db_nsr["_admin"].get("deployed")
6018
6019 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
6020 "scaleByStepData"
6021 ]["member-vnf-index"]
6022 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
6023 "scaleByStepData"
6024 ]["scaling-group-descriptor"]
6025 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
6026 # for backward compatibility
6027 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
6028 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
6029 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
6030 self.update_db_2("nsrs", nsr_id, db_nsr_update)
6031
6032 step = "Getting vnfr from database"
6033 db_vnfr = self.db.get_one(
6034 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
6035 )
6036
6037 vca_id = self.get_vca_id(db_vnfr, db_nsr)
6038
6039 step = "Getting vnfd from database"
6040 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
6041
6042 base_folder = db_vnfd["_admin"]["storage"]
6043
6044 step = "Getting scaling-group-descriptor"
6045 scaling_descriptor = find_in_list(
6046 get_scaling_aspect(db_vnfd),
6047 lambda scale_desc: scale_desc["name"] == scaling_group,
6048 )
6049 if not scaling_descriptor:
6050 raise LcmException(
6051 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
6052 "at vnfd:scaling-group-descriptor".format(scaling_group)
6053 )
6054
6055 step = "Sending scale order to VIM"
6056 # TODO check if ns is in a proper status
6057 nb_scale_op = 0
6058 if not db_nsr["_admin"].get("scaling-group"):
6059 self.update_db_2(
6060 "nsrs",
6061 nsr_id,
6062 {
6063 "_admin.scaling-group": [
6064 {"name": scaling_group, "nb-scale-op": 0}
6065 ]
6066 },
6067 )
6068 admin_scale_index = 0
6069 else:
6070 for admin_scale_index, admin_scale_info in enumerate(
6071 db_nsr["_admin"]["scaling-group"]
6072 ):
6073 if admin_scale_info["name"] == scaling_group:
6074 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
6075 break
6076 else: # not found, set index one plus last element and add new entry with the name
6077 admin_scale_index += 1
6078 db_nsr_update[
6079 "_admin.scaling-group.{}.name".format(admin_scale_index)
6080 ] = scaling_group
6081
6082 vca_scaling_info = []
6083 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
6084 if scaling_type == "SCALE_OUT":
6085 if "aspect-delta-details" not in scaling_descriptor:
6086 raise LcmException(
6087 "Aspect delta details not fount in scaling descriptor {}".format(
6088 scaling_descriptor["name"]
6089 )
6090 )
6091 # count if max-instance-count is reached
6092 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
6093
6094 scaling_info["scaling_direction"] = "OUT"
6095 scaling_info["vdu-create"] = {}
6096 scaling_info["kdu-create"] = {}
6097 for delta in deltas:
6098 for vdu_delta in delta.get("vdu-delta", {}):
6099 vdud = get_vdu(db_vnfd, vdu_delta["id"])
6100 # vdu_index also provides the number of instance of the targeted vdu
6101 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
6102 cloud_init_text = self._get_vdu_cloud_init_content(
6103 vdud, db_vnfd
6104 )
6105 if cloud_init_text:
6106 additional_params = (
6107 self._get_vdu_additional_params(db_vnfr, vdud["id"])
6108 or {}
6109 )
6110 cloud_init_list = []
6111
6112 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
6113 max_instance_count = 10
6114 if vdu_profile and "max-number-of-instances" in vdu_profile:
6115 max_instance_count = vdu_profile.get(
6116 "max-number-of-instances", 10
6117 )
6118
6119 default_instance_num = get_number_of_instances(
6120 db_vnfd, vdud["id"]
6121 )
6122 instances_number = vdu_delta.get("number-of-instances", 1)
6123 nb_scale_op += instances_number
6124
6125 new_instance_count = nb_scale_op + default_instance_num
6126 # Control if new count is over max and vdu count is less than max.
6127 # Then assign new instance count
6128 if new_instance_count > max_instance_count > vdu_count:
6129 instances_number = new_instance_count - max_instance_count
6130 else:
6131 instances_number = instances_number
6132
6133 if new_instance_count > max_instance_count:
6134 raise LcmException(
6135 "reached the limit of {} (max-instance-count) "
6136 "scaling-out operations for the "
6137 "scaling-group-descriptor '{}'".format(
6138 nb_scale_op, scaling_group
6139 )
6140 )
6141 for x in range(vdu_delta.get("number-of-instances", 1)):
6142 if cloud_init_text:
6143 # TODO Information of its own ip is not available because db_vnfr is not updated.
6144 additional_params["OSM"] = get_osm_params(
6145 db_vnfr, vdu_delta["id"], vdu_index + x
6146 )
6147 cloud_init_list.append(
6148 self._parse_cloud_init(
6149 cloud_init_text,
6150 additional_params,
6151 db_vnfd["id"],
6152 vdud["id"],
6153 )
6154 )
6155 vca_scaling_info.append(
6156 {
6157 "osm_vdu_id": vdu_delta["id"],
6158 "member-vnf-index": vnf_index,
6159 "type": "create",
6160 "vdu_index": vdu_index + x,
6161 }
6162 )
6163 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
6164 for kdu_delta in delta.get("kdu-resource-delta", {}):
6165 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
6166 kdu_name = kdu_profile["kdu-name"]
6167 resource_name = kdu_profile.get("resource-name", "")
6168
6169 # Might have different kdus in the same delta
6170 # Should have list for each kdu
6171 if not scaling_info["kdu-create"].get(kdu_name, None):
6172 scaling_info["kdu-create"][kdu_name] = []
6173
6174 kdur = get_kdur(db_vnfr, kdu_name)
6175 if kdur.get("helm-chart"):
6176 k8s_cluster_type = "helm-chart-v3"
6177 self.logger.debug("kdur: {}".format(kdur))
6178 if (
6179 kdur.get("helm-version")
6180 and kdur.get("helm-version") == "v2"
6181 ):
6182 k8s_cluster_type = "helm-chart"
6183 elif kdur.get("juju-bundle"):
6184 k8s_cluster_type = "juju-bundle"
6185 else:
6186 raise LcmException(
6187 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6188 "juju-bundle. Maybe an old NBI version is running".format(
6189 db_vnfr["member-vnf-index-ref"], kdu_name
6190 )
6191 )
6192
6193 max_instance_count = 10
6194 if kdu_profile and "max-number-of-instances" in kdu_profile:
6195 max_instance_count = kdu_profile.get(
6196 "max-number-of-instances", 10
6197 )
6198
6199 nb_scale_op += kdu_delta.get("number-of-instances", 1)
6200 deployed_kdu, _ = get_deployed_kdu(
6201 nsr_deployed, kdu_name, vnf_index
6202 )
6203 if deployed_kdu is None:
6204 raise LcmException(
6205 "KDU '{}' for vnf '{}' not deployed".format(
6206 kdu_name, vnf_index
6207 )
6208 )
6209 kdu_instance = deployed_kdu.get("kdu-instance")
6210 instance_num = await self.k8scluster_map[
6211 k8s_cluster_type
6212 ].get_scale_count(
6213 resource_name,
6214 kdu_instance,
6215 vca_id=vca_id,
6216 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
6217 kdu_model=deployed_kdu.get("kdu-model"),
6218 )
6219 kdu_replica_count = instance_num + kdu_delta.get(
6220 "number-of-instances", 1
6221 )
6222
6223 # Control if new count is over max and instance_num is less than max.
6224 # Then assign max instance number to kdu replica count
6225 if kdu_replica_count > max_instance_count > instance_num:
6226 kdu_replica_count = max_instance_count
6227 if kdu_replica_count > max_instance_count:
6228 raise LcmException(
6229 "reached the limit of {} (max-instance-count) "
6230 "scaling-out operations for the "
6231 "scaling-group-descriptor '{}'".format(
6232 instance_num, scaling_group
6233 )
6234 )
6235
6236 for x in range(kdu_delta.get("number-of-instances", 1)):
6237 vca_scaling_info.append(
6238 {
6239 "osm_kdu_id": kdu_name,
6240 "member-vnf-index": vnf_index,
6241 "type": "create",
6242 "kdu_index": instance_num + x - 1,
6243 }
6244 )
6245 scaling_info["kdu-create"][kdu_name].append(
6246 {
6247 "member-vnf-index": vnf_index,
6248 "type": "create",
6249 "k8s-cluster-type": k8s_cluster_type,
6250 "resource-name": resource_name,
6251 "scale": kdu_replica_count,
6252 }
6253 )
6254 elif scaling_type == "SCALE_IN":
6255 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
6256
6257 scaling_info["scaling_direction"] = "IN"
6258 scaling_info["vdu-delete"] = {}
6259 scaling_info["kdu-delete"] = {}
6260
6261 for delta in deltas:
6262 for vdu_delta in delta.get("vdu-delta", {}):
6263 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
6264 min_instance_count = 0
6265 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
6266 if vdu_profile and "min-number-of-instances" in vdu_profile:
6267 min_instance_count = vdu_profile["min-number-of-instances"]
6268
6269 default_instance_num = get_number_of_instances(
6270 db_vnfd, vdu_delta["id"]
6271 )
6272 instance_num = vdu_delta.get("number-of-instances", 1)
6273 nb_scale_op -= instance_num
6274
6275 new_instance_count = nb_scale_op + default_instance_num
6276
6277 if new_instance_count < min_instance_count < vdu_count:
6278 instances_number = min_instance_count - new_instance_count
6279 else:
6280 instances_number = instance_num
6281
6282 if new_instance_count < min_instance_count:
6283 raise LcmException(
6284 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6285 "scaling-group-descriptor '{}'".format(
6286 nb_scale_op, scaling_group
6287 )
6288 )
6289 for x in range(vdu_delta.get("number-of-instances", 1)):
6290 vca_scaling_info.append(
6291 {
6292 "osm_vdu_id": vdu_delta["id"],
6293 "member-vnf-index": vnf_index,
6294 "type": "delete",
6295 "vdu_index": vdu_index - 1 - x,
6296 }
6297 )
6298 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
6299 for kdu_delta in delta.get("kdu-resource-delta", {}):
6300 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
6301 kdu_name = kdu_profile["kdu-name"]
6302 resource_name = kdu_profile.get("resource-name", "")
6303
6304 if not scaling_info["kdu-delete"].get(kdu_name, None):
6305 scaling_info["kdu-delete"][kdu_name] = []
6306
6307 kdur = get_kdur(db_vnfr, kdu_name)
6308 if kdur.get("helm-chart"):
6309 k8s_cluster_type = "helm-chart-v3"
6310 self.logger.debug("kdur: {}".format(kdur))
6311 if (
6312 kdur.get("helm-version")
6313 and kdur.get("helm-version") == "v2"
6314 ):
6315 k8s_cluster_type = "helm-chart"
6316 elif kdur.get("juju-bundle"):
6317 k8s_cluster_type = "juju-bundle"
6318 else:
6319 raise LcmException(
6320 "kdu type for kdu='{}.{}' is neither helm-chart nor "
6321 "juju-bundle. Maybe an old NBI version is running".format(
6322 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
6323 )
6324 )
6325
6326 min_instance_count = 0
6327 if kdu_profile and "min-number-of-instances" in kdu_profile:
6328 min_instance_count = kdu_profile["min-number-of-instances"]
6329
6330 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
6331 deployed_kdu, _ = get_deployed_kdu(
6332 nsr_deployed, kdu_name, vnf_index
6333 )
6334 if deployed_kdu is None:
6335 raise LcmException(
6336 "KDU '{}' for vnf '{}' not deployed".format(
6337 kdu_name, vnf_index
6338 )
6339 )
6340 kdu_instance = deployed_kdu.get("kdu-instance")
6341 instance_num = await self.k8scluster_map[
6342 k8s_cluster_type
6343 ].get_scale_count(
6344 resource_name,
6345 kdu_instance,
6346 vca_id=vca_id,
6347 cluster_uuid=deployed_kdu.get("k8scluster-uuid"),
6348 kdu_model=deployed_kdu.get("kdu-model"),
6349 )
6350 kdu_replica_count = instance_num - kdu_delta.get(
6351 "number-of-instances", 1
6352 )
6353
6354 if kdu_replica_count < min_instance_count < instance_num:
6355 kdu_replica_count = min_instance_count
6356 if kdu_replica_count < min_instance_count:
6357 raise LcmException(
6358 "reached the limit of {} (min-instance-count) scaling-in operations for the "
6359 "scaling-group-descriptor '{}'".format(
6360 instance_num, scaling_group
6361 )
6362 )
6363
6364 for x in range(kdu_delta.get("number-of-instances", 1)):
6365 vca_scaling_info.append(
6366 {
6367 "osm_kdu_id": kdu_name,
6368 "member-vnf-index": vnf_index,
6369 "type": "delete",
6370 "kdu_index": instance_num - x - 1,
6371 }
6372 )
6373 scaling_info["kdu-delete"][kdu_name].append(
6374 {
6375 "member-vnf-index": vnf_index,
6376 "type": "delete",
6377 "k8s-cluster-type": k8s_cluster_type,
6378 "resource-name": resource_name,
6379 "scale": kdu_replica_count,
6380 }
6381 )
6382
6383 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
6384 vdu_delete = copy(scaling_info.get("vdu-delete"))
6385 if scaling_info["scaling_direction"] == "IN":
6386 for vdur in reversed(db_vnfr["vdur"]):
6387 if vdu_delete.get(vdur["vdu-id-ref"]):
6388 vdu_delete[vdur["vdu-id-ref"]] -= 1
6389 scaling_info["vdu"].append(
6390 {
6391 "name": vdur.get("name") or vdur.get("vdu-name"),
6392 "vdu_id": vdur["vdu-id-ref"],
6393 "interface": [],
6394 }
6395 )
6396 for interface in vdur["interfaces"]:
6397 scaling_info["vdu"][-1]["interface"].append(
6398 {
6399 "name": interface["name"],
6400 "ip_address": interface["ip-address"],
6401 "mac_address": interface.get("mac-address"),
6402 }
6403 )
6404 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
6405
6406 # PRE-SCALE BEGIN
6407 step = "Executing pre-scale vnf-config-primitive"
6408 if scaling_descriptor.get("scaling-config-action"):
6409 for scaling_config_action in scaling_descriptor[
6410 "scaling-config-action"
6411 ]:
6412 if (
6413 scaling_config_action.get("trigger") == "pre-scale-in"
6414 and scaling_type == "SCALE_IN"
6415 ) or (
6416 scaling_config_action.get("trigger") == "pre-scale-out"
6417 and scaling_type == "SCALE_OUT"
6418 ):
6419 vnf_config_primitive = scaling_config_action[
6420 "vnf-config-primitive-name-ref"
6421 ]
6422 step = db_nslcmop_update[
6423 "detailed-status"
6424 ] = "executing pre-scale scaling-config-action '{}'".format(
6425 vnf_config_primitive
6426 )
6427
6428 # look for primitive
6429 for config_primitive in (
6430 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6431 ).get("config-primitive", ()):
6432 if config_primitive["name"] == vnf_config_primitive:
6433 break
6434 else:
6435 raise LcmException(
6436 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
6437 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
6438 "primitive".format(scaling_group, vnf_config_primitive)
6439 )
6440
6441 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6442 if db_vnfr.get("additionalParamsForVnf"):
6443 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6444
6445 scale_process = "VCA"
6446 db_nsr_update["config-status"] = "configuring pre-scaling"
6447 primitive_params = self._map_primitive_params(
6448 config_primitive, {}, vnfr_params
6449 )
6450
6451 # Pre-scale retry check: Check if this sub-operation has been executed before
6452 op_index = self._check_or_add_scale_suboperation(
6453 db_nslcmop,
6454 vnf_index,
6455 vnf_config_primitive,
6456 primitive_params,
6457 "PRE-SCALE",
6458 )
6459 if op_index == self.SUBOPERATION_STATUS_SKIP:
6460 # Skip sub-operation
6461 result = "COMPLETED"
6462 result_detail = "Done"
6463 self.logger.debug(
6464 logging_text
6465 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6466 vnf_config_primitive, result, result_detail
6467 )
6468 )
6469 else:
6470 if op_index == self.SUBOPERATION_STATUS_NEW:
6471 # New sub-operation: Get index of this sub-operation
6472 op_index = (
6473 len(db_nslcmop.get("_admin", {}).get("operations"))
6474 - 1
6475 )
6476 self.logger.debug(
6477 logging_text
6478 + "vnf_config_primitive={} New sub-operation".format(
6479 vnf_config_primitive
6480 )
6481 )
6482 else:
6483 # retry: Get registered params for this existing sub-operation
6484 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6485 op_index
6486 ]
6487 vnf_index = op.get("member_vnf_index")
6488 vnf_config_primitive = op.get("primitive")
6489 primitive_params = op.get("primitive_params")
6490 self.logger.debug(
6491 logging_text
6492 + "vnf_config_primitive={} Sub-operation retry".format(
6493 vnf_config_primitive
6494 )
6495 )
6496 # Execute the primitive, either with new (first-time) or registered (reintent) args
6497 ee_descriptor_id = config_primitive.get(
6498 "execution-environment-ref"
6499 )
6500 primitive_name = config_primitive.get(
6501 "execution-environment-primitive", vnf_config_primitive
6502 )
6503 ee_id, vca_type = self._look_for_deployed_vca(
6504 nsr_deployed["VCA"],
6505 member_vnf_index=vnf_index,
6506 vdu_id=None,
6507 vdu_count_index=None,
6508 ee_descriptor_id=ee_descriptor_id,
6509 )
6510 result, result_detail = await self._ns_execute_primitive(
6511 ee_id,
6512 primitive_name,
6513 primitive_params,
6514 vca_type=vca_type,
6515 vca_id=vca_id,
6516 )
6517 self.logger.debug(
6518 logging_text
6519 + "vnf_config_primitive={} Done with result {} {}".format(
6520 vnf_config_primitive, result, result_detail
6521 )
6522 )
6523 # Update operationState = COMPLETED | FAILED
6524 self._update_suboperation_status(
6525 db_nslcmop, op_index, result, result_detail
6526 )
6527
6528 if result == "FAILED":
6529 raise LcmException(result_detail)
6530 db_nsr_update["config-status"] = old_config_status
6531 scale_process = None
6532 # PRE-SCALE END
6533
6534 db_nsr_update[
6535 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
6536 ] = nb_scale_op
6537 db_nsr_update[
6538 "_admin.scaling-group.{}.time".format(admin_scale_index)
6539 ] = time()
6540
6541 # SCALE-IN VCA - BEGIN
6542 if vca_scaling_info:
6543 step = db_nslcmop_update[
6544 "detailed-status"
6545 ] = "Deleting the execution environments"
6546 scale_process = "VCA"
6547 for vca_info in vca_scaling_info:
6548 if vca_info["type"] == "delete" and not vca_info.get("osm_kdu_id"):
6549 member_vnf_index = str(vca_info["member-vnf-index"])
6550 self.logger.debug(
6551 logging_text + "vdu info: {}".format(vca_info)
6552 )
6553 if vca_info.get("osm_vdu_id"):
6554 vdu_id = vca_info["osm_vdu_id"]
6555 vdu_index = int(vca_info["vdu_index"])
6556 stage[
6557 1
6558 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6559 member_vnf_index, vdu_id, vdu_index
6560 )
6561 stage[2] = step = "Scaling in VCA"
6562 self._write_op_status(op_id=nslcmop_id, stage=stage)
6563 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
6564 config_update = db_nsr["configurationStatus"]
6565 for vca_index, vca in enumerate(vca_update):
6566 if (
6567 (vca or vca.get("ee_id"))
6568 and vca["member-vnf-index"] == member_vnf_index
6569 and vca["vdu_count_index"] == vdu_index
6570 ):
6571 if vca.get("vdu_id"):
6572 config_descriptor = get_configuration(
6573 db_vnfd, vca.get("vdu_id")
6574 )
6575 elif vca.get("kdu_name"):
6576 config_descriptor = get_configuration(
6577 db_vnfd, vca.get("kdu_name")
6578 )
6579 else:
6580 config_descriptor = get_configuration(
6581 db_vnfd, db_vnfd["id"]
6582 )
6583 operation_params = (
6584 db_nslcmop.get("operationParams") or {}
6585 )
6586 exec_terminate_primitives = not operation_params.get(
6587 "skip_terminate_primitives"
6588 ) and vca.get("needed_terminate")
6589 task = asyncio.ensure_future(
6590 asyncio.wait_for(
6591 self.destroy_N2VC(
6592 logging_text,
6593 db_nslcmop,
6594 vca,
6595 config_descriptor,
6596 vca_index,
6597 destroy_ee=True,
6598 exec_primitives=exec_terminate_primitives,
6599 scaling_in=True,
6600 vca_id=vca_id,
6601 ),
6602 timeout=self.timeout_charm_delete,
6603 )
6604 )
6605 tasks_dict_info[task] = "Terminating VCA {}".format(
6606 vca.get("ee_id")
6607 )
6608 del vca_update[vca_index]
6609 del config_update[vca_index]
6610 # wait for pending tasks of terminate primitives
6611 if tasks_dict_info:
6612 self.logger.debug(
6613 logging_text
6614 + "Waiting for tasks {}".format(
6615 list(tasks_dict_info.keys())
6616 )
6617 )
6618 error_list = await self._wait_for_tasks(
6619 logging_text,
6620 tasks_dict_info,
6621 min(
6622 self.timeout_charm_delete, self.timeout_ns_terminate
6623 ),
6624 stage,
6625 nslcmop_id,
6626 )
6627 tasks_dict_info.clear()
6628 if error_list:
6629 raise LcmException("; ".join(error_list))
6630
6631 db_vca_and_config_update = {
6632 "_admin.deployed.VCA": vca_update,
6633 "configurationStatus": config_update,
6634 }
6635 self.update_db_2(
6636 "nsrs", db_nsr["_id"], db_vca_and_config_update
6637 )
6638 scale_process = None
6639 # SCALE-IN VCA - END
6640
6641 # SCALE RO - BEGIN
6642 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
6643 scale_process = "RO"
6644 if self.ro_config.get("ng"):
6645 await self._scale_ng_ro(
6646 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
6647 )
6648 scaling_info.pop("vdu-create", None)
6649 scaling_info.pop("vdu-delete", None)
6650
6651 scale_process = None
6652 # SCALE RO - END
6653
6654 # SCALE KDU - BEGIN
6655 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
6656 scale_process = "KDU"
6657 await self._scale_kdu(
6658 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6659 )
6660 scaling_info.pop("kdu-create", None)
6661 scaling_info.pop("kdu-delete", None)
6662
6663 scale_process = None
6664 # SCALE KDU - END
6665
6666 if db_nsr_update:
6667 self.update_db_2("nsrs", nsr_id, db_nsr_update)
6668
6669 # SCALE-UP VCA - BEGIN
6670 if vca_scaling_info:
6671 step = db_nslcmop_update[
6672 "detailed-status"
6673 ] = "Creating new execution environments"
6674 scale_process = "VCA"
6675 for vca_info in vca_scaling_info:
6676 if vca_info["type"] == "create" and not vca_info.get("osm_kdu_id"):
6677 member_vnf_index = str(vca_info["member-vnf-index"])
6678 self.logger.debug(
6679 logging_text + "vdu info: {}".format(vca_info)
6680 )
6681 vnfd_id = db_vnfr["vnfd-ref"]
6682 if vca_info.get("osm_vdu_id"):
6683 vdu_index = int(vca_info["vdu_index"])
6684 deploy_params = {"OSM": get_osm_params(db_vnfr)}
6685 if db_vnfr.get("additionalParamsForVnf"):
6686 deploy_params.update(
6687 parse_yaml_strings(
6688 db_vnfr["additionalParamsForVnf"].copy()
6689 )
6690 )
6691 descriptor_config = get_configuration(
6692 db_vnfd, db_vnfd["id"]
6693 )
6694 if descriptor_config:
6695 vdu_id = None
6696 vdu_name = None
6697 kdu_name = None
6698 self._deploy_n2vc(
6699 logging_text=logging_text
6700 + "member_vnf_index={} ".format(member_vnf_index),
6701 db_nsr=db_nsr,
6702 db_vnfr=db_vnfr,
6703 nslcmop_id=nslcmop_id,
6704 nsr_id=nsr_id,
6705 nsi_id=nsi_id,
6706 vnfd_id=vnfd_id,
6707 vdu_id=vdu_id,
6708 kdu_name=kdu_name,
6709 member_vnf_index=member_vnf_index,
6710 vdu_index=vdu_index,
6711 vdu_name=vdu_name,
6712 deploy_params=deploy_params,
6713 descriptor_config=descriptor_config,
6714 base_folder=base_folder,
6715 task_instantiation_info=tasks_dict_info,
6716 stage=stage,
6717 )
6718 vdu_id = vca_info["osm_vdu_id"]
6719 vdur = find_in_list(
6720 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
6721 )
6722 descriptor_config = get_configuration(db_vnfd, vdu_id)
6723 if vdur.get("additionalParams"):
6724 deploy_params_vdu = parse_yaml_strings(
6725 vdur["additionalParams"]
6726 )
6727 else:
6728 deploy_params_vdu = deploy_params
6729 deploy_params_vdu["OSM"] = get_osm_params(
6730 db_vnfr, vdu_id, vdu_count_index=vdu_index
6731 )
6732 if descriptor_config:
6733 vdu_name = None
6734 kdu_name = None
6735 stage[
6736 1
6737 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6738 member_vnf_index, vdu_id, vdu_index
6739 )
6740 stage[2] = step = "Scaling out VCA"
6741 self._write_op_status(op_id=nslcmop_id, stage=stage)
6742 self._deploy_n2vc(
6743 logging_text=logging_text
6744 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6745 member_vnf_index, vdu_id, vdu_index
6746 ),
6747 db_nsr=db_nsr,
6748 db_vnfr=db_vnfr,
6749 nslcmop_id=nslcmop_id,
6750 nsr_id=nsr_id,
6751 nsi_id=nsi_id,
6752 vnfd_id=vnfd_id,
6753 vdu_id=vdu_id,
6754 kdu_name=kdu_name,
6755 member_vnf_index=member_vnf_index,
6756 vdu_index=vdu_index,
6757 vdu_name=vdu_name,
6758 deploy_params=deploy_params_vdu,
6759 descriptor_config=descriptor_config,
6760 base_folder=base_folder,
6761 task_instantiation_info=tasks_dict_info,
6762 stage=stage,
6763 )
6764 # SCALE-UP VCA - END
6765 scale_process = None
6766
6767 # POST-SCALE BEGIN
6768 # execute primitive service POST-SCALING
6769 step = "Executing post-scale vnf-config-primitive"
6770 if scaling_descriptor.get("scaling-config-action"):
6771 for scaling_config_action in scaling_descriptor[
6772 "scaling-config-action"
6773 ]:
6774 if (
6775 scaling_config_action.get("trigger") == "post-scale-in"
6776 and scaling_type == "SCALE_IN"
6777 ) or (
6778 scaling_config_action.get("trigger") == "post-scale-out"
6779 and scaling_type == "SCALE_OUT"
6780 ):
6781 vnf_config_primitive = scaling_config_action[
6782 "vnf-config-primitive-name-ref"
6783 ]
6784 step = db_nslcmop_update[
6785 "detailed-status"
6786 ] = "executing post-scale scaling-config-action '{}'".format(
6787 vnf_config_primitive
6788 )
6789
6790 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6791 if db_vnfr.get("additionalParamsForVnf"):
6792 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6793
6794 # look for primitive
6795 for config_primitive in (
6796 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6797 ).get("config-primitive", ()):
6798 if config_primitive["name"] == vnf_config_primitive:
6799 break
6800 else:
6801 raise LcmException(
6802 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6803 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6804 "config-primitive".format(
6805 scaling_group, vnf_config_primitive
6806 )
6807 )
6808 scale_process = "VCA"
6809 db_nsr_update["config-status"] = "configuring post-scaling"
6810 primitive_params = self._map_primitive_params(
6811 config_primitive, {}, vnfr_params
6812 )
6813
6814 # Post-scale retry check: Check if this sub-operation has been executed before
6815 op_index = self._check_or_add_scale_suboperation(
6816 db_nslcmop,
6817 vnf_index,
6818 vnf_config_primitive,
6819 primitive_params,
6820 "POST-SCALE",
6821 )
6822 if op_index == self.SUBOPERATION_STATUS_SKIP:
6823 # Skip sub-operation
6824 result = "COMPLETED"
6825 result_detail = "Done"
6826 self.logger.debug(
6827 logging_text
6828 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6829 vnf_config_primitive, result, result_detail
6830 )
6831 )
6832 else:
6833 if op_index == self.SUBOPERATION_STATUS_NEW:
6834 # New sub-operation: Get index of this sub-operation
6835 op_index = (
6836 len(db_nslcmop.get("_admin", {}).get("operations"))
6837 - 1
6838 )
6839 self.logger.debug(
6840 logging_text
6841 + "vnf_config_primitive={} New sub-operation".format(
6842 vnf_config_primitive
6843 )
6844 )
6845 else:
6846 # retry: Get registered params for this existing sub-operation
6847 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6848 op_index
6849 ]
6850 vnf_index = op.get("member_vnf_index")
6851 vnf_config_primitive = op.get("primitive")
6852 primitive_params = op.get("primitive_params")
6853 self.logger.debug(
6854 logging_text
6855 + "vnf_config_primitive={} Sub-operation retry".format(
6856 vnf_config_primitive
6857 )
6858 )
6859 # Execute the primitive, either with new (first-time) or registered (reintent) args
6860 ee_descriptor_id = config_primitive.get(
6861 "execution-environment-ref"
6862 )
6863 primitive_name = config_primitive.get(
6864 "execution-environment-primitive", vnf_config_primitive
6865 )
6866 ee_id, vca_type = self._look_for_deployed_vca(
6867 nsr_deployed["VCA"],
6868 member_vnf_index=vnf_index,
6869 vdu_id=None,
6870 vdu_count_index=None,
6871 ee_descriptor_id=ee_descriptor_id,
6872 )
6873 result, result_detail = await self._ns_execute_primitive(
6874 ee_id,
6875 primitive_name,
6876 primitive_params,
6877 vca_type=vca_type,
6878 vca_id=vca_id,
6879 )
6880 self.logger.debug(
6881 logging_text
6882 + "vnf_config_primitive={} Done with result {} {}".format(
6883 vnf_config_primitive, result, result_detail
6884 )
6885 )
6886 # Update operationState = COMPLETED | FAILED
6887 self._update_suboperation_status(
6888 db_nslcmop, op_index, result, result_detail
6889 )
6890
6891 if result == "FAILED":
6892 raise LcmException(result_detail)
6893 db_nsr_update["config-status"] = old_config_status
6894 scale_process = None
6895 # POST-SCALE END
6896
6897 db_nsr_update[
6898 "detailed-status"
6899 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6900 db_nsr_update["operational-status"] = (
6901 "running"
6902 if old_operational_status == "failed"
6903 else old_operational_status
6904 )
6905 db_nsr_update["config-status"] = old_config_status
6906 return
6907 except (
6908 ROclient.ROClientException,
6909 DbException,
6910 LcmException,
6911 NgRoException,
6912 ) as e:
6913 self.logger.error(logging_text + "Exit Exception {}".format(e))
6914 exc = e
6915 except asyncio.CancelledError:
6916 self.logger.error(
6917 logging_text + "Cancelled Exception while '{}'".format(step)
6918 )
6919 exc = "Operation was cancelled"
6920 except Exception as e:
6921 exc = traceback.format_exc()
6922 self.logger.critical(
6923 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6924 exc_info=True,
6925 )
6926 finally:
6927 self._write_ns_status(
6928 nsr_id=nsr_id,
6929 ns_state=None,
6930 current_operation="IDLE",
6931 current_operation_id=None,
6932 )
6933 if tasks_dict_info:
6934 stage[1] = "Waiting for instantiate pending tasks."
6935 self.logger.debug(logging_text + stage[1])
6936 exc = await self._wait_for_tasks(
6937 logging_text,
6938 tasks_dict_info,
6939 self.timeout_ns_deploy,
6940 stage,
6941 nslcmop_id,
6942 nsr_id=nsr_id,
6943 )
6944 if exc:
6945 db_nslcmop_update[
6946 "detailed-status"
6947 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6948 nslcmop_operation_state = "FAILED"
6949 if db_nsr:
6950 db_nsr_update["operational-status"] = old_operational_status
6951 db_nsr_update["config-status"] = old_config_status
6952 db_nsr_update["detailed-status"] = ""
6953 if scale_process:
6954 if "VCA" in scale_process:
6955 db_nsr_update["config-status"] = "failed"
6956 if "RO" in scale_process:
6957 db_nsr_update["operational-status"] = "failed"
6958 db_nsr_update[
6959 "detailed-status"
6960 ] = "FAILED scaling nslcmop={} {}: {}".format(
6961 nslcmop_id, step, exc
6962 )
6963 else:
6964 error_description_nslcmop = None
6965 nslcmop_operation_state = "COMPLETED"
6966 db_nslcmop_update["detailed-status"] = "Done"
6967
6968 self._write_op_status(
6969 op_id=nslcmop_id,
6970 stage="",
6971 error_message=error_description_nslcmop,
6972 operation_state=nslcmop_operation_state,
6973 other_update=db_nslcmop_update,
6974 )
6975 if db_nsr:
6976 self._write_ns_status(
6977 nsr_id=nsr_id,
6978 ns_state=None,
6979 current_operation="IDLE",
6980 current_operation_id=None,
6981 other_update=db_nsr_update,
6982 )
6983
6984 if nslcmop_operation_state:
6985 try:
6986 msg = {
6987 "nsr_id": nsr_id,
6988 "nslcmop_id": nslcmop_id,
6989 "operationState": nslcmop_operation_state,
6990 }
6991 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6992 except Exception as e:
6993 self.logger.error(
6994 logging_text + "kafka_write notification Exception {}".format(e)
6995 )
6996 self.logger.debug(logging_text + "Exit")
6997 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6998
6999 async def _scale_kdu(
7000 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
7001 ):
7002 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
7003 for kdu_name in _scaling_info:
7004 for kdu_scaling_info in _scaling_info[kdu_name]:
7005 deployed_kdu, index = get_deployed_kdu(
7006 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
7007 )
7008 cluster_uuid = deployed_kdu["k8scluster-uuid"]
7009 kdu_instance = deployed_kdu["kdu-instance"]
7010 kdu_model = deployed_kdu.get("kdu-model")
7011 scale = int(kdu_scaling_info["scale"])
7012 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
7013
7014 db_dict = {
7015 "collection": "nsrs",
7016 "filter": {"_id": nsr_id},
7017 "path": "_admin.deployed.K8s.{}".format(index),
7018 }
7019
7020 step = "scaling application {}".format(
7021 kdu_scaling_info["resource-name"]
7022 )
7023 self.logger.debug(logging_text + step)
7024
7025 if kdu_scaling_info["type"] == "delete":
7026 kdu_config = get_configuration(db_vnfd, kdu_name)
7027 if (
7028 kdu_config
7029 and kdu_config.get("terminate-config-primitive")
7030 and get_juju_ee_ref(db_vnfd, kdu_name) is None
7031 ):
7032 terminate_config_primitive_list = kdu_config.get(
7033 "terminate-config-primitive"
7034 )
7035 terminate_config_primitive_list.sort(
7036 key=lambda val: int(val["seq"])
7037 )
7038
7039 for (
7040 terminate_config_primitive
7041 ) in terminate_config_primitive_list:
7042 primitive_params_ = self._map_primitive_params(
7043 terminate_config_primitive, {}, {}
7044 )
7045 step = "execute terminate config primitive"
7046 self.logger.debug(logging_text + step)
7047 await asyncio.wait_for(
7048 self.k8scluster_map[k8s_cluster_type].exec_primitive(
7049 cluster_uuid=cluster_uuid,
7050 kdu_instance=kdu_instance,
7051 primitive_name=terminate_config_primitive["name"],
7052 params=primitive_params_,
7053 db_dict=db_dict,
7054 vca_id=vca_id,
7055 ),
7056 timeout=600,
7057 )
7058
7059 await asyncio.wait_for(
7060 self.k8scluster_map[k8s_cluster_type].scale(
7061 kdu_instance,
7062 scale,
7063 kdu_scaling_info["resource-name"],
7064 vca_id=vca_id,
7065 cluster_uuid=cluster_uuid,
7066 kdu_model=kdu_model,
7067 atomic=True,
7068 db_dict=db_dict,
7069 ),
7070 timeout=self.timeout_vca_on_error,
7071 )
7072
7073 if kdu_scaling_info["type"] == "create":
7074 kdu_config = get_configuration(db_vnfd, kdu_name)
7075 if (
7076 kdu_config
7077 and kdu_config.get("initial-config-primitive")
7078 and get_juju_ee_ref(db_vnfd, kdu_name) is None
7079 ):
7080 initial_config_primitive_list = kdu_config.get(
7081 "initial-config-primitive"
7082 )
7083 initial_config_primitive_list.sort(
7084 key=lambda val: int(val["seq"])
7085 )
7086
7087 for initial_config_primitive in initial_config_primitive_list:
7088 primitive_params_ = self._map_primitive_params(
7089 initial_config_primitive, {}, {}
7090 )
7091 step = "execute initial config primitive"
7092 self.logger.debug(logging_text + step)
7093 await asyncio.wait_for(
7094 self.k8scluster_map[k8s_cluster_type].exec_primitive(
7095 cluster_uuid=cluster_uuid,
7096 kdu_instance=kdu_instance,
7097 primitive_name=initial_config_primitive["name"],
7098 params=primitive_params_,
7099 db_dict=db_dict,
7100 vca_id=vca_id,
7101 ),
7102 timeout=600,
7103 )
7104
7105 async def _scale_ng_ro(
7106 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
7107 ):
7108 nsr_id = db_nslcmop["nsInstanceId"]
7109 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
7110 db_vnfrs = {}
7111
7112 # read from db: vnfd's for every vnf
7113 db_vnfds = []
7114
7115 # for each vnf in ns, read vnfd
7116 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
7117 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
7118 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
7119 # if we haven't this vnfd, read it from db
7120 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
7121 # read from db
7122 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
7123 db_vnfds.append(vnfd)
7124 n2vc_key = self.n2vc.get_public_key()
7125 n2vc_key_list = [n2vc_key]
7126 self.scale_vnfr(
7127 db_vnfr,
7128 vdu_scaling_info.get("vdu-create"),
7129 vdu_scaling_info.get("vdu-delete"),
7130 mark_delete=True,
7131 )
7132 # db_vnfr has been updated, update db_vnfrs to use it
7133 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
7134 await self._instantiate_ng_ro(
7135 logging_text,
7136 nsr_id,
7137 db_nsd,
7138 db_nsr,
7139 db_nslcmop,
7140 db_vnfrs,
7141 db_vnfds,
7142 n2vc_key_list,
7143 stage=stage,
7144 start_deploy=time(),
7145 timeout_ns_deploy=self.timeout_ns_deploy,
7146 )
7147 if vdu_scaling_info.get("vdu-delete"):
7148 self.scale_vnfr(
7149 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
7150 )
7151
7152 async def extract_prometheus_scrape_jobs(
7153 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
7154 ):
7155 # look if exist a file called 'prometheus*.j2' and
7156 artifact_content = self.fs.dir_ls(artifact_path)
7157 job_file = next(
7158 (
7159 f
7160 for f in artifact_content
7161 if f.startswith("prometheus") and f.endswith(".j2")
7162 ),
7163 None,
7164 )
7165 if not job_file:
7166 return
7167 with self.fs.file_open((artifact_path, job_file), "r") as f:
7168 job_data = f.read()
7169
7170 # TODO get_service
7171 _, _, service = ee_id.partition(".") # remove prefix "namespace."
7172 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
7173 host_port = "80"
7174 vnfr_id = vnfr_id.replace("-", "")
7175 variables = {
7176 "JOB_NAME": vnfr_id,
7177 "TARGET_IP": target_ip,
7178 "EXPORTER_POD_IP": host_name,
7179 "EXPORTER_POD_PORT": host_port,
7180 }
7181 job_list = parse_job(job_data, variables)
7182 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
7183 for job in job_list:
7184 if (
7185 not isinstance(job.get("job_name"), str)
7186 or vnfr_id not in job["job_name"]
7187 ):
7188 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
7189 job["nsr_id"] = nsr_id
7190 job["vnfr_id"] = vnfr_id
7191 return job_list
7192
7193 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
7194 """
7195 Get VCA Cloud and VCA Cloud Credentials for the VIM account
7196
7197 :param: vim_account_id: VIM Account ID
7198
7199 :return: (cloud_name, cloud_credential)
7200 """
7201 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
7202 return config.get("vca_cloud"), config.get("vca_cloud_credential")
7203
7204 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
7205 """
7206 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
7207
7208 :param: vim_account_id: VIM Account ID
7209
7210 :return: (cloud_name, cloud_credential)
7211 """
7212 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
7213 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
7214
7215 async def migrate(self, nsr_id, nslcmop_id):
7216 """
7217 Migrate VNFs and VDUs instances in a NS
7218
7219 :param: nsr_id: NS Instance ID
7220 :param: nslcmop_id: nslcmop ID of migrate
7221
7222 """
7223 # Try to lock HA task here
7224 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
7225 if not task_is_locked_by_me:
7226 return
7227 logging_text = "Task ns={} migrate ".format(nsr_id)
7228 self.logger.debug(logging_text + "Enter")
7229 # get all needed from database
7230 db_nslcmop = None
7231 db_nslcmop_update = {}
7232 nslcmop_operation_state = None
7233 db_nsr_update = {}
7234 target = {}
7235 exc = None
7236 # in case of error, indicates what part of scale was failed to put nsr at error status
7237 start_deploy = time()
7238
7239 try:
7240 # wait for any previous tasks in process
7241 step = "Waiting for previous operations to terminate"
7242 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
7243
7244 self._write_ns_status(
7245 nsr_id=nsr_id,
7246 ns_state=None,
7247 current_operation="MIGRATING",
7248 current_operation_id=nslcmop_id,
7249 )
7250 step = "Getting nslcmop from database"
7251 self.logger.debug(
7252 step + " after having waited for previous tasks to be completed"
7253 )
7254 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
7255 migrate_params = db_nslcmop.get("operationParams")
7256
7257 target = {}
7258 target.update(migrate_params)
7259 desc = await self.RO.migrate(nsr_id, target)
7260 self.logger.debug("RO return > {}".format(desc))
7261 action_id = desc["action_id"]
7262 await self._wait_ng_ro(
7263 nsr_id, action_id, nslcmop_id, start_deploy, self.timeout_migrate
7264 )
7265 except (ROclient.ROClientException, DbException, LcmException) as e:
7266 self.logger.error("Exit Exception {}".format(e))
7267 exc = e
7268 except asyncio.CancelledError:
7269 self.logger.error("Cancelled Exception while '{}'".format(step))
7270 exc = "Operation was cancelled"
7271 except Exception as e:
7272 exc = traceback.format_exc()
7273 self.logger.critical(
7274 "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True
7275 )
7276 finally:
7277 self._write_ns_status(
7278 nsr_id=nsr_id,
7279 ns_state=None,
7280 current_operation="IDLE",
7281 current_operation_id=None,
7282 )
7283 if exc:
7284 db_nslcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc)
7285 nslcmop_operation_state = "FAILED"
7286 else:
7287 nslcmop_operation_state = "COMPLETED"
7288 db_nslcmop_update["detailed-status"] = "Done"
7289 db_nsr_update["detailed-status"] = "Done"
7290
7291 self._write_op_status(
7292 op_id=nslcmop_id,
7293 stage="",
7294 error_message="",
7295 operation_state=nslcmop_operation_state,
7296 other_update=db_nslcmop_update,
7297 )
7298 if nslcmop_operation_state:
7299 try:
7300 msg = {
7301 "nsr_id": nsr_id,
7302 "nslcmop_id": nslcmop_id,
7303 "operationState": nslcmop_operation_state,
7304 }
7305 await self.msg.aiowrite("ns", "migrated", msg, loop=self.loop)
7306 except Exception as e:
7307 self.logger.error(
7308 logging_text + "kafka_write notification Exception {}".format(e)
7309 )
7310 self.logger.debug(logging_text + "Exit")
7311 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_migrate")