Bug 1958 fixed: added the functionality of obtaining the KDU status for KNFs based...
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 from typing import Any, Dict, List
21 import yaml
22 import logging
23 import logging.handlers
24 import traceback
25 import json
26 from jinja2 import (
27 Environment,
28 TemplateError,
29 TemplateNotFound,
30 StrictUndefined,
31 UndefinedError,
32 )
33
34 from osm_lcm import ROclient
35 from osm_lcm.data_utils.nsr import (
36 get_deployed_kdu,
37 get_deployed_vca,
38 get_deployed_vca_list,
39 get_nsd,
40 )
41 from osm_lcm.data_utils.vca import (
42 DeployedComponent,
43 DeployedK8sResource,
44 DeployedVCA,
45 EELevel,
46 Relation,
47 EERelation,
48 safe_get_ee_relation,
49 )
50 from osm_lcm.ng_ro import NgRoClient, NgRoException
51 from osm_lcm.lcm_utils import (
52 LcmException,
53 LcmExceptionNoMgmtIP,
54 LcmBase,
55 deep_get,
56 get_iterable,
57 populate_dict,
58 )
59 from osm_lcm.data_utils.nsd import (
60 get_ns_configuration_relation_list,
61 get_vnf_profile,
62 get_vnf_profiles,
63 )
64 from osm_lcm.data_utils.vnfd import (
65 get_relation_list,
66 get_vdu_list,
67 get_vdu_profile,
68 get_ee_sorted_initial_config_primitive_list,
69 get_ee_sorted_terminate_config_primitive_list,
70 get_kdu_list,
71 get_virtual_link_profiles,
72 get_vdu,
73 get_configuration,
74 get_vdu_index,
75 get_scaling_aspect,
76 get_number_of_instances,
77 get_juju_ee_ref,
78 get_kdu_resource_profile,
79 )
80 from osm_lcm.data_utils.list_utils import find_in_list
81 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index, get_kdur
82 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
83 from osm_lcm.data_utils.database.vim_account import VimAccountDB
84 from n2vc.definitions import RelationEndpoint
85 from n2vc.k8s_helm_conn import K8sHelmConnector
86 from n2vc.k8s_helm3_conn import K8sHelm3Connector
87 from n2vc.k8s_juju_conn import K8sJujuConnector
88
89 from osm_common.dbbase import DbException
90 from osm_common.fsbase import FsException
91
92 from osm_lcm.data_utils.database.database import Database
93 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
94
95 from n2vc.n2vc_juju_conn import N2VCJujuConnector
96 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
97
98 from osm_lcm.lcm_helm_conn import LCMHelmConn
99 from osm_lcm.prometheus import parse_job
100
101 from copy import copy, deepcopy
102 from time import time
103 from uuid import uuid4
104
105 from random import randint
106
107 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
108
109
110 class NsLcm(LcmBase):
111 timeout_vca_on_error = (
112 5 * 60
113 ) # Time for charm from first time at blocked,error status to mark as failed
114 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
115 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
116 timeout_charm_delete = 10 * 60
117 timeout_primitive = 30 * 60 # timeout for primitive execution
118 timeout_progress_primitive = (
119 10 * 60
120 ) # timeout for some progress in a primitive execution
121
122 SUBOPERATION_STATUS_NOT_FOUND = -1
123 SUBOPERATION_STATUS_NEW = -2
124 SUBOPERATION_STATUS_SKIP = -3
125 task_name_deploy_vca = "Deploying VCA"
126
127 def __init__(self, msg, lcm_tasks, config, loop):
128 """
129 Init, Connect to database, filesystem storage, and messaging
130 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
131 :return: None
132 """
133 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
134
135 self.db = Database().instance.db
136 self.fs = Filesystem().instance.fs
137 self.loop = loop
138 self.lcm_tasks = lcm_tasks
139 self.timeout = config["timeout"]
140 self.ro_config = config["ro_config"]
141 self.ng_ro = config["ro_config"].get("ng")
142 self.vca_config = config["VCA"].copy()
143
144 # create N2VC connector
145 self.n2vc = N2VCJujuConnector(
146 log=self.logger,
147 loop=self.loop,
148 on_update_db=self._on_update_n2vc_db,
149 fs=self.fs,
150 db=self.db,
151 )
152
153 self.conn_helm_ee = LCMHelmConn(
154 log=self.logger,
155 loop=self.loop,
156 vca_config=self.vca_config,
157 on_update_db=self._on_update_n2vc_db,
158 )
159
160 self.k8sclusterhelm2 = K8sHelmConnector(
161 kubectl_command=self.vca_config.get("kubectlpath"),
162 helm_command=self.vca_config.get("helmpath"),
163 log=self.logger,
164 on_update_db=None,
165 fs=self.fs,
166 db=self.db,
167 )
168
169 self.k8sclusterhelm3 = K8sHelm3Connector(
170 kubectl_command=self.vca_config.get("kubectlpath"),
171 helm_command=self.vca_config.get("helm3path"),
172 fs=self.fs,
173 log=self.logger,
174 db=self.db,
175 on_update_db=None,
176 )
177
178 self.k8sclusterjuju = K8sJujuConnector(
179 kubectl_command=self.vca_config.get("kubectlpath"),
180 juju_command=self.vca_config.get("jujupath"),
181 log=self.logger,
182 loop=self.loop,
183 on_update_db=self._on_update_k8s_db,
184 fs=self.fs,
185 db=self.db,
186 )
187
188 self.k8scluster_map = {
189 "helm-chart": self.k8sclusterhelm2,
190 "helm-chart-v3": self.k8sclusterhelm3,
191 "chart": self.k8sclusterhelm3,
192 "juju-bundle": self.k8sclusterjuju,
193 "juju": self.k8sclusterjuju,
194 }
195
196 self.vca_map = {
197 "lxc_proxy_charm": self.n2vc,
198 "native_charm": self.n2vc,
199 "k8s_proxy_charm": self.n2vc,
200 "helm": self.conn_helm_ee,
201 "helm-v3": self.conn_helm_ee,
202 }
203
204 # create RO client
205 self.RO = NgRoClient(self.loop, **self.ro_config)
206
207 @staticmethod
208 def increment_ip_mac(ip_mac, vm_index=1):
209 if not isinstance(ip_mac, str):
210 return ip_mac
211 try:
212 # try with ipv4 look for last dot
213 i = ip_mac.rfind(".")
214 if i > 0:
215 i += 1
216 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
217 # try with ipv6 or mac look for last colon. Operate in hex
218 i = ip_mac.rfind(":")
219 if i > 0:
220 i += 1
221 # format in hex, len can be 2 for mac or 4 for ipv6
222 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
223 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
224 )
225 except Exception:
226 pass
227 return None
228
229 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
230
231 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
232
233 try:
234 # TODO filter RO descriptor fields...
235
236 # write to database
237 db_dict = dict()
238 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
239 db_dict["deploymentStatus"] = ro_descriptor
240 self.update_db_2("nsrs", nsrs_id, db_dict)
241
242 except Exception as e:
243 self.logger.warn(
244 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
245 )
246
247 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
248
249 # remove last dot from path (if exists)
250 if path.endswith("."):
251 path = path[:-1]
252
253 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
254 # .format(table, filter, path, updated_data))
255 try:
256
257 nsr_id = filter.get("_id")
258
259 # read ns record from database
260 nsr = self.db.get_one(table="nsrs", q_filter=filter)
261 current_ns_status = nsr.get("nsState")
262
263 # get vca status for NS
264 status_dict = await self.n2vc.get_status(
265 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
266 )
267
268 # vcaStatus
269 db_dict = dict()
270 db_dict["vcaStatus"] = status_dict
271 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
272
273 # update configurationStatus for this VCA
274 try:
275 vca_index = int(path[path.rfind(".") + 1 :])
276
277 vca_list = deep_get(
278 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
279 )
280 vca_status = vca_list[vca_index].get("status")
281
282 configuration_status_list = nsr.get("configurationStatus")
283 config_status = configuration_status_list[vca_index].get("status")
284
285 if config_status == "BROKEN" and vca_status != "failed":
286 db_dict["configurationStatus"][vca_index] = "READY"
287 elif config_status != "BROKEN" and vca_status == "failed":
288 db_dict["configurationStatus"][vca_index] = "BROKEN"
289 except Exception as e:
290 # not update configurationStatus
291 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
292
293 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
294 # if nsState = 'DEGRADED' check if all is OK
295 is_degraded = False
296 if current_ns_status in ("READY", "DEGRADED"):
297 error_description = ""
298 # check machines
299 if status_dict.get("machines"):
300 for machine_id in status_dict.get("machines"):
301 machine = status_dict.get("machines").get(machine_id)
302 # check machine agent-status
303 if machine.get("agent-status"):
304 s = machine.get("agent-status").get("status")
305 if s != "started":
306 is_degraded = True
307 error_description += (
308 "machine {} agent-status={} ; ".format(
309 machine_id, s
310 )
311 )
312 # check machine instance status
313 if machine.get("instance-status"):
314 s = machine.get("instance-status").get("status")
315 if s != "running":
316 is_degraded = True
317 error_description += (
318 "machine {} instance-status={} ; ".format(
319 machine_id, s
320 )
321 )
322 # check applications
323 if status_dict.get("applications"):
324 for app_id in status_dict.get("applications"):
325 app = status_dict.get("applications").get(app_id)
326 # check application status
327 if app.get("status"):
328 s = app.get("status").get("status")
329 if s != "active":
330 is_degraded = True
331 error_description += (
332 "application {} status={} ; ".format(app_id, s)
333 )
334
335 if error_description:
336 db_dict["errorDescription"] = error_description
337 if current_ns_status == "READY" and is_degraded:
338 db_dict["nsState"] = "DEGRADED"
339 if current_ns_status == "DEGRADED" and not is_degraded:
340 db_dict["nsState"] = "READY"
341
342 # write to database
343 self.update_db_2("nsrs", nsr_id, db_dict)
344
345 except (asyncio.CancelledError, asyncio.TimeoutError):
346 raise
347 except Exception as e:
348 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
349
350 async def _on_update_k8s_db(
351 self, cluster_uuid, kdu_instance, filter=None, vca_id=None, cluster_type="juju"
352 ):
353 """
354 Updating vca status in NSR record
355 :param cluster_uuid: UUID of a k8s cluster
356 :param kdu_instance: The unique name of the KDU instance
357 :param filter: To get nsr_id
358 :cluster_type: The cluster type (juju, k8s)
359 :return: none
360 """
361
362 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
363 # .format(cluster_uuid, kdu_instance, filter))
364
365 nsr_id = filter.get("_id")
366 try:
367 vca_status = await self.k8scluster_map[cluster_type].status_kdu(
368 cluster_uuid=cluster_uuid,
369 kdu_instance=kdu_instance,
370 yaml_format=False,
371 complete_status=True,
372 vca_id=vca_id,
373 )
374
375 # vcaStatus
376 db_dict = dict()
377 db_dict["vcaStatus"] = {nsr_id: vca_status}
378
379 if cluster_type in ("juju-bundle", "juju"):
380 # TODO -> this should be done in a more uniform way, I think in N2VC, in order to update the K8s VCA
381 # status in a similar way between Juju Bundles and Helm Charts on this side
382 await self.k8sclusterjuju.update_vca_status(
383 db_dict["vcaStatus"],
384 kdu_instance,
385 vca_id=vca_id,
386 )
387
388 self.logger.debug(
389 f"Obtained VCA status for cluster type '{cluster_type}': {vca_status}"
390 )
391
392 # write to database
393 self.update_db_2("nsrs", nsr_id, db_dict)
394 except (asyncio.CancelledError, asyncio.TimeoutError):
395 raise
396 except Exception as e:
397 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
398
399 @staticmethod
400 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
401 try:
402 env = Environment(undefined=StrictUndefined)
403 template = env.from_string(cloud_init_text)
404 return template.render(additional_params or {})
405 except UndefinedError as e:
406 raise LcmException(
407 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
408 "file, must be provided in the instantiation parameters inside the "
409 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
410 )
411 except (TemplateError, TemplateNotFound) as e:
412 raise LcmException(
413 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
414 vnfd_id, vdu_id, e
415 )
416 )
417
418 def _get_vdu_cloud_init_content(self, vdu, vnfd):
419 cloud_init_content = cloud_init_file = None
420 try:
421 if vdu.get("cloud-init-file"):
422 base_folder = vnfd["_admin"]["storage"]
423 if base_folder["pkg-dir"]:
424 cloud_init_file = "{}/{}/cloud_init/{}".format(
425 base_folder["folder"],
426 base_folder["pkg-dir"],
427 vdu["cloud-init-file"],
428 )
429 else:
430 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
431 base_folder["folder"],
432 vdu["cloud-init-file"],
433 )
434 with self.fs.file_open(cloud_init_file, "r") as ci_file:
435 cloud_init_content = ci_file.read()
436 elif vdu.get("cloud-init"):
437 cloud_init_content = vdu["cloud-init"]
438
439 return cloud_init_content
440 except FsException as e:
441 raise LcmException(
442 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
443 vnfd["id"], vdu["id"], cloud_init_file, e
444 )
445 )
446
447 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
448 vdur = next(
449 (vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]),
450 {}
451 )
452 additional_params = vdur.get("additionalParams")
453 return parse_yaml_strings(additional_params)
454
455 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
456 """
457 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
458 :param vnfd: input vnfd
459 :param new_id: overrides vnf id if provided
460 :param additionalParams: Instantiation params for VNFs provided
461 :param nsrId: Id of the NSR
462 :return: copy of vnfd
463 """
464 vnfd_RO = deepcopy(vnfd)
465 # remove unused by RO configuration, monitoring, scaling and internal keys
466 vnfd_RO.pop("_id", None)
467 vnfd_RO.pop("_admin", None)
468 vnfd_RO.pop("monitoring-param", None)
469 vnfd_RO.pop("scaling-group-descriptor", None)
470 vnfd_RO.pop("kdu", None)
471 vnfd_RO.pop("k8s-cluster", None)
472 if new_id:
473 vnfd_RO["id"] = new_id
474
475 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
476 for vdu in get_iterable(vnfd_RO, "vdu"):
477 vdu.pop("cloud-init-file", None)
478 vdu.pop("cloud-init", None)
479 return vnfd_RO
480
481 @staticmethod
482 def ip_profile_2_RO(ip_profile):
483 RO_ip_profile = deepcopy(ip_profile)
484 if "dns-server" in RO_ip_profile:
485 if isinstance(RO_ip_profile["dns-server"], list):
486 RO_ip_profile["dns-address"] = []
487 for ds in RO_ip_profile.pop("dns-server"):
488 RO_ip_profile["dns-address"].append(ds["address"])
489 else:
490 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
491 if RO_ip_profile.get("ip-version") == "ipv4":
492 RO_ip_profile["ip-version"] = "IPv4"
493 if RO_ip_profile.get("ip-version") == "ipv6":
494 RO_ip_profile["ip-version"] = "IPv6"
495 if "dhcp-params" in RO_ip_profile:
496 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
497 return RO_ip_profile
498
499 def _get_ro_vim_id_for_vim_account(self, vim_account):
500 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
501 if db_vim["_admin"]["operationalState"] != "ENABLED":
502 raise LcmException(
503 "VIM={} is not available. operationalState={}".format(
504 vim_account, db_vim["_admin"]["operationalState"]
505 )
506 )
507 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
508 return RO_vim_id
509
510 def get_ro_wim_id_for_wim_account(self, wim_account):
511 if isinstance(wim_account, str):
512 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
513 if db_wim["_admin"]["operationalState"] != "ENABLED":
514 raise LcmException(
515 "WIM={} is not available. operationalState={}".format(
516 wim_account, db_wim["_admin"]["operationalState"]
517 )
518 )
519 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
520 return RO_wim_id
521 else:
522 return wim_account
523
524 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
525
526 db_vdu_push_list = []
527 template_vdur = []
528 db_update = {"_admin.modified": time()}
529 if vdu_create:
530 for vdu_id, vdu_count in vdu_create.items():
531 vdur = next(
532 (
533 vdur
534 for vdur in reversed(db_vnfr["vdur"])
535 if vdur["vdu-id-ref"] == vdu_id
536 ),
537 None,
538 )
539 if not vdur:
540 # Read the template saved in the db:
541 self.logger.debug(f"No vdur in the database. Using the vdur-template to scale")
542 vdur_template = db_vnfr.get("vdur-template")
543 if not vdur_template:
544 raise LcmException(
545 "Error scaling OUT VNFR for {}. No vnfr or template exists".format(
546 vdu_id
547 )
548 )
549 vdur = vdur_template[0]
550 #Delete a template from the database after using it
551 self.db.set_one("vnfrs",
552 {"_id": db_vnfr["_id"]},
553 None,
554 pull={"vdur-template": {"_id": vdur['_id']}}
555 )
556 for count in range(vdu_count):
557 vdur_copy = deepcopy(vdur)
558 vdur_copy["status"] = "BUILD"
559 vdur_copy["status-detailed"] = None
560 vdur_copy["ip-address"] = None
561 vdur_copy["_id"] = str(uuid4())
562 vdur_copy["count-index"] += count + 1
563 vdur_copy["id"] = "{}-{}".format(
564 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
565 )
566 vdur_copy.pop("vim_info", None)
567 for iface in vdur_copy["interfaces"]:
568 if iface.get("fixed-ip"):
569 iface["ip-address"] = self.increment_ip_mac(
570 iface["ip-address"], count + 1
571 )
572 else:
573 iface.pop("ip-address", None)
574 if iface.get("fixed-mac"):
575 iface["mac-address"] = self.increment_ip_mac(
576 iface["mac-address"], count + 1
577 )
578 else:
579 iface.pop("mac-address", None)
580 if db_vnfr["vdur"]:
581 iface.pop(
582 "mgmt_vnf", None
583 ) # only first vdu can be managment of vnf
584 db_vdu_push_list.append(vdur_copy)
585 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
586 if vdu_delete:
587 if len(db_vnfr["vdur"]) == 1:
588 # The scale will move to 0 instances
589 self.logger.debug(f"Scaling to 0 !, creating the template with the last vdur")
590 template_vdur = [db_vnfr["vdur"][0]]
591 for vdu_id, vdu_count in vdu_delete.items():
592 if mark_delete:
593 indexes_to_delete = [
594 iv[0]
595 for iv in enumerate(db_vnfr["vdur"])
596 if iv[1]["vdu-id-ref"] == vdu_id
597 ]
598 db_update.update(
599 {
600 "vdur.{}.status".format(i): "DELETING"
601 for i in indexes_to_delete[-vdu_count:]
602 }
603 )
604 else:
605 # it must be deleted one by one because common.db does not allow otherwise
606 vdus_to_delete = [
607 v
608 for v in reversed(db_vnfr["vdur"])
609 if v["vdu-id-ref"] == vdu_id
610 ]
611 for vdu in vdus_to_delete[:vdu_count]:
612 self.db.set_one(
613 "vnfrs",
614 {"_id": db_vnfr["_id"]},
615 None,
616 pull={"vdur": {"_id": vdu["_id"]}},
617 )
618 db_push = {}
619 if db_vdu_push_list:
620 db_push["vdur"] = db_vdu_push_list
621 if template_vdur:
622 db_push["vdur-template"] = template_vdur
623 if not db_push:
624 db_push = None
625 db_vnfr["vdur-template"] = template_vdur
626 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
627 # modify passed dictionary db_vnfr
628 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
629 db_vnfr["vdur"] = db_vnfr_["vdur"]
630
631 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
632 """
633 Updates database nsr with the RO info for the created vld
634 :param ns_update_nsr: dictionary to be filled with the updated info
635 :param db_nsr: content of db_nsr. This is also modified
636 :param nsr_desc_RO: nsr descriptor from RO
637 :return: Nothing, LcmException is raised on errors
638 """
639
640 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
641 for net_RO in get_iterable(nsr_desc_RO, "nets"):
642 if vld["id"] != net_RO.get("ns_net_osm_id"):
643 continue
644 vld["vim-id"] = net_RO.get("vim_net_id")
645 vld["name"] = net_RO.get("vim_name")
646 vld["status"] = net_RO.get("status")
647 vld["status-detailed"] = net_RO.get("error_msg")
648 ns_update_nsr["vld.{}".format(vld_index)] = vld
649 break
650 else:
651 raise LcmException(
652 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
653 )
654
655 def set_vnfr_at_error(self, db_vnfrs, error_text):
656 try:
657 for db_vnfr in db_vnfrs.values():
658 vnfr_update = {"status": "ERROR"}
659 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
660 if "status" not in vdur:
661 vdur["status"] = "ERROR"
662 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
663 if error_text:
664 vdur["status-detailed"] = str(error_text)
665 vnfr_update[
666 "vdur.{}.status-detailed".format(vdu_index)
667 ] = "ERROR"
668 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
669 except DbException as e:
670 self.logger.error("Cannot update vnf. {}".format(e))
671
672 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
673 """
674 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
675 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
676 :param nsr_desc_RO: nsr descriptor from RO
677 :return: Nothing, LcmException is raised on errors
678 """
679 for vnf_index, db_vnfr in db_vnfrs.items():
680 for vnf_RO in nsr_desc_RO["vnfs"]:
681 if vnf_RO["member_vnf_index"] != vnf_index:
682 continue
683 vnfr_update = {}
684 if vnf_RO.get("ip_address"):
685 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
686 "ip_address"
687 ].split(";")[0]
688 elif not db_vnfr.get("ip-address"):
689 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
690 raise LcmExceptionNoMgmtIP(
691 "ns member_vnf_index '{}' has no IP address".format(
692 vnf_index
693 )
694 )
695
696 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
697 vdur_RO_count_index = 0
698 if vdur.get("pdu-type"):
699 continue
700 for vdur_RO in get_iterable(vnf_RO, "vms"):
701 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
702 continue
703 if vdur["count-index"] != vdur_RO_count_index:
704 vdur_RO_count_index += 1
705 continue
706 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
707 if vdur_RO.get("ip_address"):
708 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
709 else:
710 vdur["ip-address"] = None
711 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
712 vdur["name"] = vdur_RO.get("vim_name")
713 vdur["status"] = vdur_RO.get("status")
714 vdur["status-detailed"] = vdur_RO.get("error_msg")
715 for ifacer in get_iterable(vdur, "interfaces"):
716 for interface_RO in get_iterable(vdur_RO, "interfaces"):
717 if ifacer["name"] == interface_RO.get("internal_name"):
718 ifacer["ip-address"] = interface_RO.get(
719 "ip_address"
720 )
721 ifacer["mac-address"] = interface_RO.get(
722 "mac_address"
723 )
724 break
725 else:
726 raise LcmException(
727 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
728 "from VIM info".format(
729 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
730 )
731 )
732 vnfr_update["vdur.{}".format(vdu_index)] = vdur
733 break
734 else:
735 raise LcmException(
736 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
737 "VIM info".format(
738 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
739 )
740 )
741
742 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
743 for net_RO in get_iterable(nsr_desc_RO, "nets"):
744 if vld["id"] != net_RO.get("vnf_net_osm_id"):
745 continue
746 vld["vim-id"] = net_RO.get("vim_net_id")
747 vld["name"] = net_RO.get("vim_name")
748 vld["status"] = net_RO.get("status")
749 vld["status-detailed"] = net_RO.get("error_msg")
750 vnfr_update["vld.{}".format(vld_index)] = vld
751 break
752 else:
753 raise LcmException(
754 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
755 vnf_index, vld["id"]
756 )
757 )
758
759 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
760 break
761
762 else:
763 raise LcmException(
764 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
765 vnf_index
766 )
767 )
768
769 def _get_ns_config_info(self, nsr_id):
770 """
771 Generates a mapping between vnf,vdu elements and the N2VC id
772 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
773 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
774 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
775 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
776 """
777 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
778 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
779 mapping = {}
780 ns_config_info = {"osm-config-mapping": mapping}
781 for vca in vca_deployed_list:
782 if not vca["member-vnf-index"]:
783 continue
784 if not vca["vdu_id"]:
785 mapping[vca["member-vnf-index"]] = vca["application"]
786 else:
787 mapping[
788 "{}.{}.{}".format(
789 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
790 )
791 ] = vca["application"]
792 return ns_config_info
793
794 async def _instantiate_ng_ro(
795 self,
796 logging_text,
797 nsr_id,
798 nsd,
799 db_nsr,
800 db_nslcmop,
801 db_vnfrs,
802 db_vnfds,
803 n2vc_key_list,
804 stage,
805 start_deploy,
806 timeout_ns_deploy,
807 ):
808
809 db_vims = {}
810
811 def get_vim_account(vim_account_id):
812 nonlocal db_vims
813 if vim_account_id in db_vims:
814 return db_vims[vim_account_id]
815 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
816 db_vims[vim_account_id] = db_vim
817 return db_vim
818
819 # modify target_vld info with instantiation parameters
820 def parse_vld_instantiation_params(
821 target_vim, target_vld, vld_params, target_sdn
822 ):
823 if vld_params.get("ip-profile"):
824 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
825 "ip-profile"
826 ]
827 if vld_params.get("provider-network"):
828 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
829 "provider-network"
830 ]
831 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
832 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
833 "provider-network"
834 ]["sdn-ports"]
835 if vld_params.get("wimAccountId"):
836 target_wim = "wim:{}".format(vld_params["wimAccountId"])
837 target_vld["vim_info"][target_wim] = {}
838 for param in ("vim-network-name", "vim-network-id"):
839 if vld_params.get(param):
840 if isinstance(vld_params[param], dict):
841 for vim, vim_net in vld_params[param].items():
842 other_target_vim = "vim:" + vim
843 populate_dict(
844 target_vld["vim_info"],
845 (other_target_vim, param.replace("-", "_")),
846 vim_net,
847 )
848 else: # isinstance str
849 target_vld["vim_info"][target_vim][
850 param.replace("-", "_")
851 ] = vld_params[param]
852 if vld_params.get("common_id"):
853 target_vld["common_id"] = vld_params.get("common_id")
854
855 # modify target["ns"]["vld"] with instantiation parameters to override vnf vim-account
856 def update_ns_vld_target(target, ns_params):
857 for vnf_params in ns_params.get("vnf", ()):
858 if vnf_params.get("vimAccountId"):
859 target_vnf = next(
860 (
861 vnfr
862 for vnfr in db_vnfrs.values()
863 if vnf_params["member-vnf-index"]
864 == vnfr["member-vnf-index-ref"]
865 ),
866 None,
867 )
868 vdur = next((vdur for vdur in target_vnf.get("vdur", ())), None)
869 for a_index, a_vld in enumerate(target["ns"]["vld"]):
870 target_vld = find_in_list(
871 get_iterable(vdur, "interfaces"),
872 lambda iface: iface.get("ns-vld-id") == a_vld["name"],
873 )
874 if target_vld:
875 if vnf_params.get("vimAccountId") not in a_vld.get(
876 "vim_info", {}
877 ):
878 target["ns"]["vld"][a_index].get("vim_info").update(
879 {
880 "vim:{}".format(vnf_params["vimAccountId"]): {
881 "vim_network_name": ""
882 }
883 }
884 )
885
886 nslcmop_id = db_nslcmop["_id"]
887 target = {
888 "name": db_nsr["name"],
889 "ns": {"vld": []},
890 "vnf": [],
891 "image": deepcopy(db_nsr["image"]),
892 "flavor": deepcopy(db_nsr["flavor"]),
893 "action_id": nslcmop_id,
894 "cloud_init_content": {},
895 }
896 for image in target["image"]:
897 image["vim_info"] = {}
898 for flavor in target["flavor"]:
899 flavor["vim_info"] = {}
900 if db_nsr.get("affinity-or-anti-affinity-group"):
901 target["affinity-or-anti-affinity-group"] = deepcopy(db_nsr["affinity-or-anti-affinity-group"])
902 for affinity_or_anti_affinity_group in target["affinity-or-anti-affinity-group"]:
903 affinity_or_anti_affinity_group["vim_info"] = {}
904
905 if db_nslcmop.get("lcmOperationType") != "instantiate":
906 # get parameters of instantiation:
907 db_nslcmop_instantiate = self.db.get_list(
908 "nslcmops",
909 {
910 "nsInstanceId": db_nslcmop["nsInstanceId"],
911 "lcmOperationType": "instantiate",
912 },
913 )[-1]
914 ns_params = db_nslcmop_instantiate.get("operationParams")
915 else:
916 ns_params = db_nslcmop.get("operationParams")
917 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
918 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
919
920 cp2target = {}
921 for vld_index, vld in enumerate(db_nsr.get("vld")):
922 target_vim = "vim:{}".format(ns_params["vimAccountId"])
923 target_vld = {
924 "id": vld["id"],
925 "name": vld["name"],
926 "mgmt-network": vld.get("mgmt-network", False),
927 "type": vld.get("type"),
928 "vim_info": {
929 target_vim: {
930 "vim_network_name": vld.get("vim-network-name"),
931 "vim_account_id": ns_params["vimAccountId"],
932 }
933 },
934 }
935 # check if this network needs SDN assist
936 if vld.get("pci-interfaces"):
937 db_vim = get_vim_account(ns_params["vimAccountId"])
938 sdnc_id = db_vim["config"].get("sdn-controller")
939 if sdnc_id:
940 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
941 target_sdn = "sdn:{}".format(sdnc_id)
942 target_vld["vim_info"][target_sdn] = {
943 "sdn": True,
944 "target_vim": target_vim,
945 "vlds": [sdn_vld],
946 "type": vld.get("type"),
947 }
948
949 nsd_vnf_profiles = get_vnf_profiles(nsd)
950 for nsd_vnf_profile in nsd_vnf_profiles:
951 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
952 if cp["virtual-link-profile-id"] == vld["id"]:
953 cp2target[
954 "member_vnf:{}.{}".format(
955 cp["constituent-cpd-id"][0][
956 "constituent-base-element-id"
957 ],
958 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
959 )
960 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
961
962 # check at nsd descriptor, if there is an ip-profile
963 vld_params = {}
964 nsd_vlp = find_in_list(
965 get_virtual_link_profiles(nsd),
966 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
967 == vld["id"],
968 )
969 if (
970 nsd_vlp
971 and nsd_vlp.get("virtual-link-protocol-data")
972 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
973 ):
974 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
975 "l3-protocol-data"
976 ]
977 ip_profile_dest_data = {}
978 if "ip-version" in ip_profile_source_data:
979 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
980 "ip-version"
981 ]
982 if "cidr" in ip_profile_source_data:
983 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
984 "cidr"
985 ]
986 if "gateway-ip" in ip_profile_source_data:
987 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
988 "gateway-ip"
989 ]
990 if "dhcp-enabled" in ip_profile_source_data:
991 ip_profile_dest_data["dhcp-params"] = {
992 "enabled": ip_profile_source_data["dhcp-enabled"]
993 }
994 vld_params["ip-profile"] = ip_profile_dest_data
995
996 # update vld_params with instantiation params
997 vld_instantiation_params = find_in_list(
998 get_iterable(ns_params, "vld"),
999 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
1000 )
1001 if vld_instantiation_params:
1002 vld_params.update(vld_instantiation_params)
1003 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
1004 target["ns"]["vld"].append(target_vld)
1005 # Update the target ns_vld if vnf vim_account is overriden by instantiation params
1006 update_ns_vld_target(target, ns_params)
1007
1008 for vnfr in db_vnfrs.values():
1009 vnfd = find_in_list(
1010 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
1011 )
1012 vnf_params = find_in_list(
1013 get_iterable(ns_params, "vnf"),
1014 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
1015 )
1016 target_vnf = deepcopy(vnfr)
1017 target_vim = "vim:{}".format(vnfr["vim-account-id"])
1018 for vld in target_vnf.get("vld", ()):
1019 # check if connected to a ns.vld, to fill target'
1020 vnf_cp = find_in_list(
1021 vnfd.get("int-virtual-link-desc", ()),
1022 lambda cpd: cpd.get("id") == vld["id"],
1023 )
1024 if vnf_cp:
1025 ns_cp = "member_vnf:{}.{}".format(
1026 vnfr["member-vnf-index-ref"], vnf_cp["id"]
1027 )
1028 if cp2target.get(ns_cp):
1029 vld["target"] = cp2target[ns_cp]
1030
1031 vld["vim_info"] = {
1032 target_vim: {"vim_network_name": vld.get("vim-network-name")}
1033 }
1034 # check if this network needs SDN assist
1035 target_sdn = None
1036 if vld.get("pci-interfaces"):
1037 db_vim = get_vim_account(vnfr["vim-account-id"])
1038 sdnc_id = db_vim["config"].get("sdn-controller")
1039 if sdnc_id:
1040 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1041 target_sdn = "sdn:{}".format(sdnc_id)
1042 vld["vim_info"][target_sdn] = {
1043 "sdn": True,
1044 "target_vim": target_vim,
1045 "vlds": [sdn_vld],
1046 "type": vld.get("type"),
1047 }
1048
1049 # check at vnfd descriptor, if there is an ip-profile
1050 vld_params = {}
1051 vnfd_vlp = find_in_list(
1052 get_virtual_link_profiles(vnfd),
1053 lambda a_link_profile: a_link_profile["id"] == vld["id"],
1054 )
1055 if (
1056 vnfd_vlp
1057 and vnfd_vlp.get("virtual-link-protocol-data")
1058 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
1059 ):
1060 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
1061 "l3-protocol-data"
1062 ]
1063 ip_profile_dest_data = {}
1064 if "ip-version" in ip_profile_source_data:
1065 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
1066 "ip-version"
1067 ]
1068 if "cidr" in ip_profile_source_data:
1069 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
1070 "cidr"
1071 ]
1072 if "gateway-ip" in ip_profile_source_data:
1073 ip_profile_dest_data[
1074 "gateway-address"
1075 ] = ip_profile_source_data["gateway-ip"]
1076 if "dhcp-enabled" in ip_profile_source_data:
1077 ip_profile_dest_data["dhcp-params"] = {
1078 "enabled": ip_profile_source_data["dhcp-enabled"]
1079 }
1080
1081 vld_params["ip-profile"] = ip_profile_dest_data
1082 # update vld_params with instantiation params
1083 if vnf_params:
1084 vld_instantiation_params = find_in_list(
1085 get_iterable(vnf_params, "internal-vld"),
1086 lambda i_vld: i_vld["name"] == vld["id"],
1087 )
1088 if vld_instantiation_params:
1089 vld_params.update(vld_instantiation_params)
1090 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1091
1092 vdur_list = []
1093 for vdur in target_vnf.get("vdur", ()):
1094 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1095 continue # This vdu must not be created
1096 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1097
1098 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1099
1100 if ssh_keys_all:
1101 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1102 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1103 if (
1104 vdu_configuration
1105 and vdu_configuration.get("config-access")
1106 and vdu_configuration.get("config-access").get("ssh-access")
1107 ):
1108 vdur["ssh-keys"] = ssh_keys_all
1109 vdur["ssh-access-required"] = vdu_configuration[
1110 "config-access"
1111 ]["ssh-access"]["required"]
1112 elif (
1113 vnf_configuration
1114 and vnf_configuration.get("config-access")
1115 and vnf_configuration.get("config-access").get("ssh-access")
1116 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1117 ):
1118 vdur["ssh-keys"] = ssh_keys_all
1119 vdur["ssh-access-required"] = vnf_configuration[
1120 "config-access"
1121 ]["ssh-access"]["required"]
1122 elif ssh_keys_instantiation and find_in_list(
1123 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1124 ):
1125 vdur["ssh-keys"] = ssh_keys_instantiation
1126
1127 self.logger.debug("NS > vdur > {}".format(vdur))
1128
1129 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1130 # cloud-init
1131 if vdud.get("cloud-init-file"):
1132 vdur["cloud-init"] = "{}:file:{}".format(
1133 vnfd["_id"], vdud.get("cloud-init-file")
1134 )
1135 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1136 if vdur["cloud-init"] not in target["cloud_init_content"]:
1137 base_folder = vnfd["_admin"]["storage"]
1138 if base_folder["pkg-dir"]:
1139 cloud_init_file = "{}/{}/cloud_init/{}".format(
1140 base_folder["folder"],
1141 base_folder["pkg-dir"],
1142 vdud.get("cloud-init-file"),
1143 )
1144 else:
1145 cloud_init_file = "{}/Scripts/cloud_init/{}".format(
1146 base_folder["folder"],
1147 vdud.get("cloud-init-file"),
1148 )
1149 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1150 target["cloud_init_content"][
1151 vdur["cloud-init"]
1152 ] = ci_file.read()
1153 elif vdud.get("cloud-init"):
1154 vdur["cloud-init"] = "{}:vdu:{}".format(
1155 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1156 )
1157 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1158 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1159 "cloud-init"
1160 ]
1161 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1162 deploy_params_vdu = self._format_additional_params(
1163 vdur.get("additionalParams") or {}
1164 )
1165 deploy_params_vdu["OSM"] = get_osm_params(
1166 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1167 )
1168 vdur["additionalParams"] = deploy_params_vdu
1169
1170 # flavor
1171 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1172 if target_vim not in ns_flavor["vim_info"]:
1173 ns_flavor["vim_info"][target_vim] = {}
1174
1175 # deal with images
1176 # in case alternative images are provided we must check if they should be applied
1177 # for the vim_type, modify the vim_type taking into account
1178 ns_image_id = int(vdur["ns-image-id"])
1179 if vdur.get("alt-image-ids"):
1180 db_vim = get_vim_account(vnfr["vim-account-id"])
1181 vim_type = db_vim["vim_type"]
1182 for alt_image_id in vdur.get("alt-image-ids"):
1183 ns_alt_image = target["image"][int(alt_image_id)]
1184 if vim_type == ns_alt_image.get("vim-type"):
1185 # must use alternative image
1186 self.logger.debug(
1187 "use alternative image id: {}".format(alt_image_id)
1188 )
1189 ns_image_id = alt_image_id
1190 vdur["ns-image-id"] = ns_image_id
1191 break
1192 ns_image = target["image"][int(ns_image_id)]
1193 if target_vim not in ns_image["vim_info"]:
1194 ns_image["vim_info"][target_vim] = {}
1195
1196 # Affinity groups
1197 if vdur.get("affinity-or-anti-affinity-group-id"):
1198 for ags_id in vdur["affinity-or-anti-affinity-group-id"]:
1199 ns_ags = target["affinity-or-anti-affinity-group"][int(ags_id)]
1200 if target_vim not in ns_ags["vim_info"]:
1201 ns_ags["vim_info"][target_vim] = {}
1202
1203 vdur["vim_info"] = {target_vim: {}}
1204 # instantiation parameters
1205 # if vnf_params:
1206 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1207 # vdud["id"]), None)
1208 vdur_list.append(vdur)
1209 target_vnf["vdur"] = vdur_list
1210 target["vnf"].append(target_vnf)
1211
1212 desc = await self.RO.deploy(nsr_id, target)
1213 self.logger.debug("RO return > {}".format(desc))
1214 action_id = desc["action_id"]
1215 await self._wait_ng_ro(
1216 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1217 )
1218
1219 # Updating NSR
1220 db_nsr_update = {
1221 "_admin.deployed.RO.operational-status": "running",
1222 "detailed-status": " ".join(stage),
1223 }
1224 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1225 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1226 self._write_op_status(nslcmop_id, stage)
1227 self.logger.debug(
1228 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1229 )
1230 return
1231
1232 async def _wait_ng_ro(
1233 self,
1234 nsr_id,
1235 action_id,
1236 nslcmop_id=None,
1237 start_time=None,
1238 timeout=600,
1239 stage=None,
1240 ):
1241 detailed_status_old = None
1242 db_nsr_update = {}
1243 start_time = start_time or time()
1244 while time() <= start_time + timeout:
1245 desc_status = await self.RO.status(nsr_id, action_id)
1246 self.logger.debug("Wait NG RO > {}".format(desc_status))
1247 if desc_status["status"] == "FAILED":
1248 raise NgRoException(desc_status["details"])
1249 elif desc_status["status"] == "BUILD":
1250 if stage:
1251 stage[2] = "VIM: ({})".format(desc_status["details"])
1252 elif desc_status["status"] == "DONE":
1253 if stage:
1254 stage[2] = "Deployed at VIM"
1255 break
1256 else:
1257 assert False, "ROclient.check_ns_status returns unknown {}".format(
1258 desc_status["status"]
1259 )
1260 if stage and nslcmop_id and stage[2] != detailed_status_old:
1261 detailed_status_old = stage[2]
1262 db_nsr_update["detailed-status"] = " ".join(stage)
1263 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1264 self._write_op_status(nslcmop_id, stage)
1265 await asyncio.sleep(15, loop=self.loop)
1266 else: # timeout_ns_deploy
1267 raise NgRoException("Timeout waiting ns to deploy")
1268
1269 async def _terminate_ng_ro(
1270 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1271 ):
1272 db_nsr_update = {}
1273 failed_detail = []
1274 action_id = None
1275 start_deploy = time()
1276 try:
1277 target = {
1278 "ns": {"vld": []},
1279 "vnf": [],
1280 "image": [],
1281 "flavor": [],
1282 "action_id": nslcmop_id,
1283 }
1284 desc = await self.RO.deploy(nsr_id, target)
1285 action_id = desc["action_id"]
1286 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1287 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1288 self.logger.debug(
1289 logging_text
1290 + "ns terminate action at RO. action_id={}".format(action_id)
1291 )
1292
1293 # wait until done
1294 delete_timeout = 20 * 60 # 20 minutes
1295 await self._wait_ng_ro(
1296 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1297 )
1298
1299 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1300 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1301 # delete all nsr
1302 await self.RO.delete(nsr_id)
1303 except Exception as e:
1304 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1305 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1306 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1307 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1308 self.logger.debug(
1309 logging_text + "RO_action_id={} already deleted".format(action_id)
1310 )
1311 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1312 failed_detail.append("delete conflict: {}".format(e))
1313 self.logger.debug(
1314 logging_text
1315 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1316 )
1317 else:
1318 failed_detail.append("delete error: {}".format(e))
1319 self.logger.error(
1320 logging_text
1321 + "RO_action_id={} delete error: {}".format(action_id, e)
1322 )
1323
1324 if failed_detail:
1325 stage[2] = "Error deleting from VIM"
1326 else:
1327 stage[2] = "Deleted from VIM"
1328 db_nsr_update["detailed-status"] = " ".join(stage)
1329 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1330 self._write_op_status(nslcmop_id, stage)
1331
1332 if failed_detail:
1333 raise LcmException("; ".join(failed_detail))
1334 return
1335
1336 async def instantiate_RO(
1337 self,
1338 logging_text,
1339 nsr_id,
1340 nsd,
1341 db_nsr,
1342 db_nslcmop,
1343 db_vnfrs,
1344 db_vnfds,
1345 n2vc_key_list,
1346 stage,
1347 ):
1348 """
1349 Instantiate at RO
1350 :param logging_text: preffix text to use at logging
1351 :param nsr_id: nsr identity
1352 :param nsd: database content of ns descriptor
1353 :param db_nsr: database content of ns record
1354 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1355 :param db_vnfrs:
1356 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1357 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1358 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1359 :return: None or exception
1360 """
1361 try:
1362 start_deploy = time()
1363 ns_params = db_nslcmop.get("operationParams")
1364 if ns_params and ns_params.get("timeout_ns_deploy"):
1365 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1366 else:
1367 timeout_ns_deploy = self.timeout.get(
1368 "ns_deploy", self.timeout_ns_deploy
1369 )
1370
1371 # Check for and optionally request placement optimization. Database will be updated if placement activated
1372 stage[2] = "Waiting for Placement."
1373 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1374 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1375 for vnfr in db_vnfrs.values():
1376 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1377 break
1378 else:
1379 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1380
1381 return await self._instantiate_ng_ro(
1382 logging_text,
1383 nsr_id,
1384 nsd,
1385 db_nsr,
1386 db_nslcmop,
1387 db_vnfrs,
1388 db_vnfds,
1389 n2vc_key_list,
1390 stage,
1391 start_deploy,
1392 timeout_ns_deploy,
1393 )
1394 except Exception as e:
1395 stage[2] = "ERROR deploying at VIM"
1396 self.set_vnfr_at_error(db_vnfrs, str(e))
1397 self.logger.error(
1398 "Error deploying at VIM {}".format(e),
1399 exc_info=not isinstance(
1400 e,
1401 (
1402 ROclient.ROClientException,
1403 LcmException,
1404 DbException,
1405 NgRoException,
1406 ),
1407 ),
1408 )
1409 raise
1410
1411 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1412 """
1413 Wait for kdu to be up, get ip address
1414 :param logging_text: prefix use for logging
1415 :param nsr_id:
1416 :param vnfr_id:
1417 :param kdu_name:
1418 :return: IP address
1419 """
1420
1421 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1422 nb_tries = 0
1423
1424 while nb_tries < 360:
1425 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1426 kdur = next(
1427 (
1428 x
1429 for x in get_iterable(db_vnfr, "kdur")
1430 if x.get("kdu-name") == kdu_name
1431 ),
1432 None,
1433 )
1434 if not kdur:
1435 raise LcmException(
1436 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1437 )
1438 if kdur.get("status"):
1439 if kdur["status"] in ("READY", "ENABLED"):
1440 return kdur.get("ip-address")
1441 else:
1442 raise LcmException(
1443 "target KDU={} is in error state".format(kdu_name)
1444 )
1445
1446 await asyncio.sleep(10, loop=self.loop)
1447 nb_tries += 1
1448 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1449
1450 async def wait_vm_up_insert_key_ro(
1451 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1452 ):
1453 """
1454 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1455 :param logging_text: prefix use for logging
1456 :param nsr_id:
1457 :param vnfr_id:
1458 :param vdu_id:
1459 :param vdu_index:
1460 :param pub_key: public ssh key to inject, None to skip
1461 :param user: user to apply the public ssh key
1462 :return: IP address
1463 """
1464
1465 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1466 ro_nsr_id = None
1467 ip_address = None
1468 nb_tries = 0
1469 target_vdu_id = None
1470 ro_retries = 0
1471
1472 while True:
1473
1474 ro_retries += 1
1475 if ro_retries >= 360: # 1 hour
1476 raise LcmException(
1477 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1478 )
1479
1480 await asyncio.sleep(10, loop=self.loop)
1481
1482 # get ip address
1483 if not target_vdu_id:
1484 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1485
1486 if not vdu_id: # for the VNF case
1487 if db_vnfr.get("status") == "ERROR":
1488 raise LcmException(
1489 "Cannot inject ssh-key because target VNF is in error state"
1490 )
1491 ip_address = db_vnfr.get("ip-address")
1492 if not ip_address:
1493 continue
1494 vdur = next(
1495 (
1496 x
1497 for x in get_iterable(db_vnfr, "vdur")
1498 if x.get("ip-address") == ip_address
1499 ),
1500 None,
1501 )
1502 else: # VDU case
1503 vdur = next(
1504 (
1505 x
1506 for x in get_iterable(db_vnfr, "vdur")
1507 if x.get("vdu-id-ref") == vdu_id
1508 and x.get("count-index") == vdu_index
1509 ),
1510 None,
1511 )
1512
1513 if (
1514 not vdur and len(db_vnfr.get("vdur", ())) == 1
1515 ): # If only one, this should be the target vdu
1516 vdur = db_vnfr["vdur"][0]
1517 if not vdur:
1518 raise LcmException(
1519 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1520 vnfr_id, vdu_id, vdu_index
1521 )
1522 )
1523 # New generation RO stores information at "vim_info"
1524 ng_ro_status = None
1525 target_vim = None
1526 if vdur.get("vim_info"):
1527 target_vim = next(
1528 t for t in vdur["vim_info"]
1529 ) # there should be only one key
1530 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1531 if (
1532 vdur.get("pdu-type")
1533 or vdur.get("status") == "ACTIVE"
1534 or ng_ro_status == "ACTIVE"
1535 ):
1536 ip_address = vdur.get("ip-address")
1537 if not ip_address:
1538 continue
1539 target_vdu_id = vdur["vdu-id-ref"]
1540 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1541 raise LcmException(
1542 "Cannot inject ssh-key because target VM is in error state"
1543 )
1544
1545 if not target_vdu_id:
1546 continue
1547
1548 # inject public key into machine
1549 if pub_key and user:
1550 self.logger.debug(logging_text + "Inserting RO key")
1551 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1552 if vdur.get("pdu-type"):
1553 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1554 return ip_address
1555 try:
1556 ro_vm_id = "{}-{}".format(
1557 db_vnfr["member-vnf-index-ref"], target_vdu_id
1558 ) # TODO add vdu_index
1559 if self.ng_ro:
1560 target = {
1561 "action": {
1562 "action": "inject_ssh_key",
1563 "key": pub_key,
1564 "user": user,
1565 },
1566 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1567 }
1568 desc = await self.RO.deploy(nsr_id, target)
1569 action_id = desc["action_id"]
1570 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1571 break
1572 else:
1573 # wait until NS is deployed at RO
1574 if not ro_nsr_id:
1575 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1576 ro_nsr_id = deep_get(
1577 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1578 )
1579 if not ro_nsr_id:
1580 continue
1581 result_dict = await self.RO.create_action(
1582 item="ns",
1583 item_id_name=ro_nsr_id,
1584 descriptor={
1585 "add_public_key": pub_key,
1586 "vms": [ro_vm_id],
1587 "user": user,
1588 },
1589 )
1590 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1591 if not result_dict or not isinstance(result_dict, dict):
1592 raise LcmException(
1593 "Unknown response from RO when injecting key"
1594 )
1595 for result in result_dict.values():
1596 if result.get("vim_result") == 200:
1597 break
1598 else:
1599 raise ROclient.ROClientException(
1600 "error injecting key: {}".format(
1601 result.get("description")
1602 )
1603 )
1604 break
1605 except NgRoException as e:
1606 raise LcmException(
1607 "Reaching max tries injecting key. Error: {}".format(e)
1608 )
1609 except ROclient.ROClientException as e:
1610 if not nb_tries:
1611 self.logger.debug(
1612 logging_text
1613 + "error injecting key: {}. Retrying until {} seconds".format(
1614 e, 20 * 10
1615 )
1616 )
1617 nb_tries += 1
1618 if nb_tries >= 20:
1619 raise LcmException(
1620 "Reaching max tries injecting key. Error: {}".format(e)
1621 )
1622 else:
1623 break
1624
1625 return ip_address
1626
1627 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1628 """
1629 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1630 """
1631 my_vca = vca_deployed_list[vca_index]
1632 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1633 # vdu or kdu: no dependencies
1634 return
1635 timeout = 300
1636 while timeout >= 0:
1637 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1638 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1639 configuration_status_list = db_nsr["configurationStatus"]
1640 for index, vca_deployed in enumerate(configuration_status_list):
1641 if index == vca_index:
1642 # myself
1643 continue
1644 if not my_vca.get("member-vnf-index") or (
1645 vca_deployed.get("member-vnf-index")
1646 == my_vca.get("member-vnf-index")
1647 ):
1648 internal_status = configuration_status_list[index].get("status")
1649 if internal_status == "READY":
1650 continue
1651 elif internal_status == "BROKEN":
1652 raise LcmException(
1653 "Configuration aborted because dependent charm/s has failed"
1654 )
1655 else:
1656 break
1657 else:
1658 # no dependencies, return
1659 return
1660 await asyncio.sleep(10)
1661 timeout -= 1
1662
1663 raise LcmException("Configuration aborted because dependent charm/s timeout")
1664
1665 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1666 vca_id = None
1667 if db_vnfr:
1668 vca_id = deep_get(db_vnfr, ("vca-id",))
1669 elif db_nsr:
1670 vim_account_id = deep_get(db_nsr, ("instantiate_params", "vimAccountId"))
1671 vca_id = VimAccountDB.get_vim_account_with_id(vim_account_id).get("vca")
1672 return vca_id
1673
1674 async def instantiate_N2VC(
1675 self,
1676 logging_text,
1677 vca_index,
1678 nsi_id,
1679 db_nsr,
1680 db_vnfr,
1681 vdu_id,
1682 kdu_name,
1683 vdu_index,
1684 config_descriptor,
1685 deploy_params,
1686 base_folder,
1687 nslcmop_id,
1688 stage,
1689 vca_type,
1690 vca_name,
1691 ee_config_descriptor,
1692 ):
1693 nsr_id = db_nsr["_id"]
1694 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1695 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1696 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1697 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1698 db_dict = {
1699 "collection": "nsrs",
1700 "filter": {"_id": nsr_id},
1701 "path": db_update_entry,
1702 }
1703 step = ""
1704 try:
1705
1706 element_type = "NS"
1707 element_under_configuration = nsr_id
1708
1709 vnfr_id = None
1710 if db_vnfr:
1711 vnfr_id = db_vnfr["_id"]
1712 osm_config["osm"]["vnf_id"] = vnfr_id
1713
1714 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1715
1716 if vca_type == "native_charm":
1717 index_number = 0
1718 else:
1719 index_number = vdu_index or 0
1720
1721 if vnfr_id:
1722 element_type = "VNF"
1723 element_under_configuration = vnfr_id
1724 namespace += ".{}-{}".format(vnfr_id, index_number)
1725 if vdu_id:
1726 namespace += ".{}-{}".format(vdu_id, index_number)
1727 element_type = "VDU"
1728 element_under_configuration = "{}-{}".format(vdu_id, index_number)
1729 osm_config["osm"]["vdu_id"] = vdu_id
1730 elif kdu_name:
1731 namespace += ".{}".format(kdu_name)
1732 element_type = "KDU"
1733 element_under_configuration = kdu_name
1734 osm_config["osm"]["kdu_name"] = kdu_name
1735
1736 # Get artifact path
1737 if base_folder["pkg-dir"]:
1738 artifact_path = "{}/{}/{}/{}".format(
1739 base_folder["folder"],
1740 base_folder["pkg-dir"],
1741 "charms"
1742 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1743 else "helm-charts",
1744 vca_name,
1745 )
1746 else:
1747 artifact_path = "{}/Scripts/{}/{}/".format(
1748 base_folder["folder"],
1749 "charms"
1750 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1751 else "helm-charts",
1752 vca_name,
1753 )
1754
1755 self.logger.debug("Artifact path > {}".format(artifact_path))
1756
1757 # get initial_config_primitive_list that applies to this element
1758 initial_config_primitive_list = config_descriptor.get(
1759 "initial-config-primitive"
1760 )
1761
1762 self.logger.debug(
1763 "Initial config primitive list > {}".format(
1764 initial_config_primitive_list
1765 )
1766 )
1767
1768 # add config if not present for NS charm
1769 ee_descriptor_id = ee_config_descriptor.get("id")
1770 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1771 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1772 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1773 )
1774
1775 self.logger.debug(
1776 "Initial config primitive list #2 > {}".format(
1777 initial_config_primitive_list
1778 )
1779 )
1780 # n2vc_redesign STEP 3.1
1781 # find old ee_id if exists
1782 ee_id = vca_deployed.get("ee_id")
1783
1784 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1785 # create or register execution environment in VCA
1786 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1787
1788 self._write_configuration_status(
1789 nsr_id=nsr_id,
1790 vca_index=vca_index,
1791 status="CREATING",
1792 element_under_configuration=element_under_configuration,
1793 element_type=element_type,
1794 )
1795
1796 step = "create execution environment"
1797 self.logger.debug(logging_text + step)
1798
1799 ee_id = None
1800 credentials = None
1801 if vca_type == "k8s_proxy_charm":
1802 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1803 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1804 namespace=namespace,
1805 artifact_path=artifact_path,
1806 db_dict=db_dict,
1807 vca_id=vca_id,
1808 )
1809 elif vca_type == "helm" or vca_type == "helm-v3":
1810 ee_id, credentials = await self.vca_map[
1811 vca_type
1812 ].create_execution_environment(
1813 namespace=namespace,
1814 reuse_ee_id=ee_id,
1815 db_dict=db_dict,
1816 config=osm_config,
1817 artifact_path=artifact_path,
1818 vca_type=vca_type,
1819 )
1820 else:
1821 ee_id, credentials = await self.vca_map[
1822 vca_type
1823 ].create_execution_environment(
1824 namespace=namespace,
1825 reuse_ee_id=ee_id,
1826 db_dict=db_dict,
1827 vca_id=vca_id,
1828 )
1829
1830 elif vca_type == "native_charm":
1831 step = "Waiting to VM being up and getting IP address"
1832 self.logger.debug(logging_text + step)
1833 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1834 logging_text,
1835 nsr_id,
1836 vnfr_id,
1837 vdu_id,
1838 vdu_index,
1839 user=None,
1840 pub_key=None,
1841 )
1842 credentials = {"hostname": rw_mgmt_ip}
1843 # get username
1844 username = deep_get(
1845 config_descriptor, ("config-access", "ssh-access", "default-user")
1846 )
1847 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1848 # merged. Meanwhile let's get username from initial-config-primitive
1849 if not username and initial_config_primitive_list:
1850 for config_primitive in initial_config_primitive_list:
1851 for param in config_primitive.get("parameter", ()):
1852 if param["name"] == "ssh-username":
1853 username = param["value"]
1854 break
1855 if not username:
1856 raise LcmException(
1857 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1858 "'config-access.ssh-access.default-user'"
1859 )
1860 credentials["username"] = username
1861 # n2vc_redesign STEP 3.2
1862
1863 self._write_configuration_status(
1864 nsr_id=nsr_id,
1865 vca_index=vca_index,
1866 status="REGISTERING",
1867 element_under_configuration=element_under_configuration,
1868 element_type=element_type,
1869 )
1870
1871 step = "register execution environment {}".format(credentials)
1872 self.logger.debug(logging_text + step)
1873 ee_id = await self.vca_map[vca_type].register_execution_environment(
1874 credentials=credentials,
1875 namespace=namespace,
1876 db_dict=db_dict,
1877 vca_id=vca_id,
1878 )
1879
1880 # for compatibility with MON/POL modules, the need model and application name at database
1881 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1882 ee_id_parts = ee_id.split(".")
1883 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1884 if len(ee_id_parts) >= 2:
1885 model_name = ee_id_parts[0]
1886 application_name = ee_id_parts[1]
1887 db_nsr_update[db_update_entry + "model"] = model_name
1888 db_nsr_update[db_update_entry + "application"] = application_name
1889
1890 # n2vc_redesign STEP 3.3
1891 step = "Install configuration Software"
1892
1893 self._write_configuration_status(
1894 nsr_id=nsr_id,
1895 vca_index=vca_index,
1896 status="INSTALLING SW",
1897 element_under_configuration=element_under_configuration,
1898 element_type=element_type,
1899 other_update=db_nsr_update,
1900 )
1901
1902 # TODO check if already done
1903 self.logger.debug(logging_text + step)
1904 config = None
1905 if vca_type == "native_charm":
1906 config_primitive = next(
1907 (p for p in initial_config_primitive_list if p["name"] == "config"),
1908 None,
1909 )
1910 if config_primitive:
1911 config = self._map_primitive_params(
1912 config_primitive, {}, deploy_params
1913 )
1914 num_units = 1
1915 if vca_type == "lxc_proxy_charm":
1916 if element_type == "NS":
1917 num_units = db_nsr.get("config-units") or 1
1918 elif element_type == "VNF":
1919 num_units = db_vnfr.get("config-units") or 1
1920 elif element_type == "VDU":
1921 for v in db_vnfr["vdur"]:
1922 if vdu_id == v["vdu-id-ref"]:
1923 num_units = v.get("config-units") or 1
1924 break
1925 if vca_type != "k8s_proxy_charm":
1926 await self.vca_map[vca_type].install_configuration_sw(
1927 ee_id=ee_id,
1928 artifact_path=artifact_path,
1929 db_dict=db_dict,
1930 config=config,
1931 num_units=num_units,
1932 vca_id=vca_id,
1933 vca_type=vca_type,
1934 )
1935
1936 # write in db flag of configuration_sw already installed
1937 self.update_db_2(
1938 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1939 )
1940
1941 # add relations for this VCA (wait for other peers related with this VCA)
1942 await self._add_vca_relations(
1943 logging_text=logging_text,
1944 nsr_id=nsr_id,
1945 vca_type=vca_type,
1946 vca_index=vca_index,
1947 )
1948
1949 # if SSH access is required, then get execution environment SSH public
1950 # if native charm we have waited already to VM be UP
1951 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1952 pub_key = None
1953 user = None
1954 # self.logger.debug("get ssh key block")
1955 if deep_get(
1956 config_descriptor, ("config-access", "ssh-access", "required")
1957 ):
1958 # self.logger.debug("ssh key needed")
1959 # Needed to inject a ssh key
1960 user = deep_get(
1961 config_descriptor,
1962 ("config-access", "ssh-access", "default-user"),
1963 )
1964 step = "Install configuration Software, getting public ssh key"
1965 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1966 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1967 )
1968
1969 step = "Insert public key into VM user={} ssh_key={}".format(
1970 user, pub_key
1971 )
1972 else:
1973 # self.logger.debug("no need to get ssh key")
1974 step = "Waiting to VM being up and getting IP address"
1975 self.logger.debug(logging_text + step)
1976
1977 # n2vc_redesign STEP 5.1
1978 # wait for RO (ip-address) Insert pub_key into VM
1979 if vnfr_id:
1980 if kdu_name:
1981 rw_mgmt_ip = await self.wait_kdu_up(
1982 logging_text, nsr_id, vnfr_id, kdu_name
1983 )
1984 else:
1985 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1986 logging_text,
1987 nsr_id,
1988 vnfr_id,
1989 vdu_id,
1990 vdu_index,
1991 user=user,
1992 pub_key=pub_key,
1993 )
1994 else:
1995 rw_mgmt_ip = None # This is for a NS configuration
1996
1997 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1998
1999 # store rw_mgmt_ip in deploy params for later replacement
2000 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
2001
2002 # n2vc_redesign STEP 6 Execute initial config primitive
2003 step = "execute initial config primitive"
2004
2005 # wait for dependent primitives execution (NS -> VNF -> VDU)
2006 if initial_config_primitive_list:
2007 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
2008
2009 # stage, in function of element type: vdu, kdu, vnf or ns
2010 my_vca = vca_deployed_list[vca_index]
2011 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
2012 # VDU or KDU
2013 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
2014 elif my_vca.get("member-vnf-index"):
2015 # VNF
2016 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
2017 else:
2018 # NS
2019 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
2020
2021 self._write_configuration_status(
2022 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
2023 )
2024
2025 self._write_op_status(op_id=nslcmop_id, stage=stage)
2026
2027 check_if_terminated_needed = True
2028 for initial_config_primitive in initial_config_primitive_list:
2029 # adding information on the vca_deployed if it is a NS execution environment
2030 if not vca_deployed["member-vnf-index"]:
2031 deploy_params["ns_config_info"] = json.dumps(
2032 self._get_ns_config_info(nsr_id)
2033 )
2034 # TODO check if already done
2035 primitive_params_ = self._map_primitive_params(
2036 initial_config_primitive, {}, deploy_params
2037 )
2038
2039 step = "execute primitive '{}' params '{}'".format(
2040 initial_config_primitive["name"], primitive_params_
2041 )
2042 self.logger.debug(logging_text + step)
2043 await self.vca_map[vca_type].exec_primitive(
2044 ee_id=ee_id,
2045 primitive_name=initial_config_primitive["name"],
2046 params_dict=primitive_params_,
2047 db_dict=db_dict,
2048 vca_id=vca_id,
2049 vca_type=vca_type,
2050 )
2051 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
2052 if check_if_terminated_needed:
2053 if config_descriptor.get("terminate-config-primitive"):
2054 self.update_db_2(
2055 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
2056 )
2057 check_if_terminated_needed = False
2058
2059 # TODO register in database that primitive is done
2060
2061 # STEP 7 Configure metrics
2062 if vca_type == "helm" or vca_type == "helm-v3":
2063 prometheus_jobs = await self.extract_prometheus_scrape_jobs(
2064 ee_id=ee_id,
2065 artifact_path=artifact_path,
2066 ee_config_descriptor=ee_config_descriptor,
2067 vnfr_id=vnfr_id,
2068 nsr_id=nsr_id,
2069 target_ip=rw_mgmt_ip,
2070 )
2071 if prometheus_jobs:
2072 self.update_db_2(
2073 "nsrs",
2074 nsr_id,
2075 {db_update_entry + "prometheus_jobs": prometheus_jobs},
2076 )
2077
2078 for job in prometheus_jobs:
2079 self.db.set_one(
2080 "prometheus_jobs",
2081 {
2082 "job_name": job["job_name"]
2083 },
2084 job,
2085 upsert=True,
2086 fail_on_empty=False,
2087 )
2088
2089 step = "instantiated at VCA"
2090 self.logger.debug(logging_text + step)
2091
2092 self._write_configuration_status(
2093 nsr_id=nsr_id, vca_index=vca_index, status="READY"
2094 )
2095
2096 except Exception as e: # TODO not use Exception but N2VC exception
2097 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
2098 if not isinstance(
2099 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
2100 ):
2101 self.logger.error(
2102 "Exception while {} : {}".format(step, e), exc_info=True
2103 )
2104 self._write_configuration_status(
2105 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
2106 )
2107 raise LcmException("{} {}".format(step, e)) from e
2108
2109 def _write_ns_status(
2110 self,
2111 nsr_id: str,
2112 ns_state: str,
2113 current_operation: str,
2114 current_operation_id: str,
2115 error_description: str = None,
2116 error_detail: str = None,
2117 other_update: dict = None,
2118 ):
2119 """
2120 Update db_nsr fields.
2121 :param nsr_id:
2122 :param ns_state:
2123 :param current_operation:
2124 :param current_operation_id:
2125 :param error_description:
2126 :param error_detail:
2127 :param other_update: Other required changes at database if provided, will be cleared
2128 :return:
2129 """
2130 try:
2131 db_dict = other_update or {}
2132 db_dict[
2133 "_admin.nslcmop"
2134 ] = current_operation_id # for backward compatibility
2135 db_dict["_admin.current-operation"] = current_operation_id
2136 db_dict["_admin.operation-type"] = (
2137 current_operation if current_operation != "IDLE" else None
2138 )
2139 db_dict["currentOperation"] = current_operation
2140 db_dict["currentOperationID"] = current_operation_id
2141 db_dict["errorDescription"] = error_description
2142 db_dict["errorDetail"] = error_detail
2143
2144 if ns_state:
2145 db_dict["nsState"] = ns_state
2146 self.update_db_2("nsrs", nsr_id, db_dict)
2147 except DbException as e:
2148 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2149
2150 def _write_op_status(
2151 self,
2152 op_id: str,
2153 stage: list = None,
2154 error_message: str = None,
2155 queuePosition: int = 0,
2156 operation_state: str = None,
2157 other_update: dict = None,
2158 ):
2159 try:
2160 db_dict = other_update or {}
2161 db_dict["queuePosition"] = queuePosition
2162 if isinstance(stage, list):
2163 db_dict["stage"] = stage[0]
2164 db_dict["detailed-status"] = " ".join(stage)
2165 elif stage is not None:
2166 db_dict["stage"] = str(stage)
2167
2168 if error_message is not None:
2169 db_dict["errorMessage"] = error_message
2170 if operation_state is not None:
2171 db_dict["operationState"] = operation_state
2172 db_dict["statusEnteredTime"] = time()
2173 self.update_db_2("nslcmops", op_id, db_dict)
2174 except DbException as e:
2175 self.logger.warn(
2176 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2177 )
2178
2179 def _write_all_config_status(self, db_nsr: dict, status: str):
2180 try:
2181 nsr_id = db_nsr["_id"]
2182 # configurationStatus
2183 config_status = db_nsr.get("configurationStatus")
2184 if config_status:
2185 db_nsr_update = {
2186 "configurationStatus.{}.status".format(index): status
2187 for index, v in enumerate(config_status)
2188 if v
2189 }
2190 # update status
2191 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2192
2193 except DbException as e:
2194 self.logger.warn(
2195 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2196 )
2197
2198 def _write_configuration_status(
2199 self,
2200 nsr_id: str,
2201 vca_index: int,
2202 status: str = None,
2203 element_under_configuration: str = None,
2204 element_type: str = None,
2205 other_update: dict = None,
2206 ):
2207
2208 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2209 # .format(vca_index, status))
2210
2211 try:
2212 db_path = "configurationStatus.{}.".format(vca_index)
2213 db_dict = other_update or {}
2214 if status:
2215 db_dict[db_path + "status"] = status
2216 if element_under_configuration:
2217 db_dict[
2218 db_path + "elementUnderConfiguration"
2219 ] = element_under_configuration
2220 if element_type:
2221 db_dict[db_path + "elementType"] = element_type
2222 self.update_db_2("nsrs", nsr_id, db_dict)
2223 except DbException as e:
2224 self.logger.warn(
2225 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2226 status, nsr_id, vca_index, e
2227 )
2228 )
2229
2230 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2231 """
2232 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2233 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2234 Database is used because the result can be obtained from a different LCM worker in case of HA.
2235 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2236 :param db_nslcmop: database content of nslcmop
2237 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2238 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2239 computed 'vim-account-id'
2240 """
2241 modified = False
2242 nslcmop_id = db_nslcmop["_id"]
2243 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2244 if placement_engine == "PLA":
2245 self.logger.debug(
2246 logging_text + "Invoke and wait for placement optimization"
2247 )
2248 await self.msg.aiowrite(
2249 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2250 )
2251 db_poll_interval = 5
2252 wait = db_poll_interval * 10
2253 pla_result = None
2254 while not pla_result and wait >= 0:
2255 await asyncio.sleep(db_poll_interval)
2256 wait -= db_poll_interval
2257 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2258 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2259
2260 if not pla_result:
2261 raise LcmException(
2262 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2263 )
2264
2265 for pla_vnf in pla_result["vnf"]:
2266 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2267 if not pla_vnf.get("vimAccountId") or not vnfr:
2268 continue
2269 modified = True
2270 self.db.set_one(
2271 "vnfrs",
2272 {"_id": vnfr["_id"]},
2273 {"vim-account-id": pla_vnf["vimAccountId"]},
2274 )
2275 # Modifies db_vnfrs
2276 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2277 return modified
2278
2279 def update_nsrs_with_pla_result(self, params):
2280 try:
2281 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2282 self.update_db_2(
2283 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2284 )
2285 except Exception as e:
2286 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2287
2288 async def instantiate(self, nsr_id, nslcmop_id):
2289 """
2290
2291 :param nsr_id: ns instance to deploy
2292 :param nslcmop_id: operation to run
2293 :return:
2294 """
2295
2296 # Try to lock HA task here
2297 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2298 if not task_is_locked_by_me:
2299 self.logger.debug(
2300 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2301 )
2302 return
2303
2304 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2305 self.logger.debug(logging_text + "Enter")
2306
2307 # get all needed from database
2308
2309 # database nsrs record
2310 db_nsr = None
2311
2312 # database nslcmops record
2313 db_nslcmop = None
2314
2315 # update operation on nsrs
2316 db_nsr_update = {}
2317 # update operation on nslcmops
2318 db_nslcmop_update = {}
2319
2320 nslcmop_operation_state = None
2321 db_vnfrs = {} # vnf's info indexed by member-index
2322 # n2vc_info = {}
2323 tasks_dict_info = {} # from task to info text
2324 exc = None
2325 error_list = []
2326 stage = [
2327 "Stage 1/5: preparation of the environment.",
2328 "Waiting for previous operations to terminate.",
2329 "",
2330 ]
2331 # ^ stage, step, VIM progress
2332 try:
2333 # wait for any previous tasks in process
2334 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2335
2336 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2337 stage[1] = "Reading from database."
2338 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2339 db_nsr_update["detailed-status"] = "creating"
2340 db_nsr_update["operational-status"] = "init"
2341 self._write_ns_status(
2342 nsr_id=nsr_id,
2343 ns_state="BUILDING",
2344 current_operation="INSTANTIATING",
2345 current_operation_id=nslcmop_id,
2346 other_update=db_nsr_update,
2347 )
2348 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2349
2350 # read from db: operation
2351 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2352 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2353 if db_nslcmop["operationParams"].get("additionalParamsForVnf"):
2354 db_nslcmop["operationParams"]["additionalParamsForVnf"] = json.loads(
2355 db_nslcmop["operationParams"]["additionalParamsForVnf"]
2356 )
2357 ns_params = db_nslcmop.get("operationParams")
2358 if ns_params and ns_params.get("timeout_ns_deploy"):
2359 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2360 else:
2361 timeout_ns_deploy = self.timeout.get(
2362 "ns_deploy", self.timeout_ns_deploy
2363 )
2364
2365 # read from db: ns
2366 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2367 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2368 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2369 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2370 self.fs.sync(db_nsr["nsd-id"])
2371 db_nsr["nsd"] = nsd
2372 # nsr_name = db_nsr["name"] # TODO short-name??
2373
2374 # read from db: vnf's of this ns
2375 stage[1] = "Getting vnfrs from db."
2376 self.logger.debug(logging_text + stage[1])
2377 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2378
2379 # read from db: vnfd's for every vnf
2380 db_vnfds = [] # every vnfd data
2381
2382 # for each vnf in ns, read vnfd
2383 for vnfr in db_vnfrs_list:
2384 if vnfr.get("kdur"):
2385 kdur_list = []
2386 for kdur in vnfr["kdur"]:
2387 if kdur.get("additionalParams"):
2388 kdur["additionalParams"] = json.loads(
2389 kdur["additionalParams"]
2390 )
2391 kdur_list.append(kdur)
2392 vnfr["kdur"] = kdur_list
2393
2394 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2395 vnfd_id = vnfr["vnfd-id"]
2396 vnfd_ref = vnfr["vnfd-ref"]
2397 self.fs.sync(vnfd_id)
2398
2399 # if we haven't this vnfd, read it from db
2400 if vnfd_id not in db_vnfds:
2401 # read from db
2402 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2403 vnfd_id, vnfd_ref
2404 )
2405 self.logger.debug(logging_text + stage[1])
2406 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2407
2408 # store vnfd
2409 db_vnfds.append(vnfd)
2410
2411 # Get or generates the _admin.deployed.VCA list
2412 vca_deployed_list = None
2413 if db_nsr["_admin"].get("deployed"):
2414 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2415 if vca_deployed_list is None:
2416 vca_deployed_list = []
2417 configuration_status_list = []
2418 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2419 db_nsr_update["configurationStatus"] = configuration_status_list
2420 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2421 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2422 elif isinstance(vca_deployed_list, dict):
2423 # maintain backward compatibility. Change a dict to list at database
2424 vca_deployed_list = list(vca_deployed_list.values())
2425 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2426 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2427
2428 if not isinstance(
2429 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2430 ):
2431 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2432 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2433
2434 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2435 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2436 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2437 self.db.set_list(
2438 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2439 )
2440
2441 # n2vc_redesign STEP 2 Deploy Network Scenario
2442 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2443 self._write_op_status(op_id=nslcmop_id, stage=stage)
2444
2445 stage[1] = "Deploying KDUs."
2446 # self.logger.debug(logging_text + "Before deploy_kdus")
2447 # Call to deploy_kdus in case exists the "vdu:kdu" param
2448 await self.deploy_kdus(
2449 logging_text=logging_text,
2450 nsr_id=nsr_id,
2451 nslcmop_id=nslcmop_id,
2452 db_vnfrs=db_vnfrs,
2453 db_vnfds=db_vnfds,
2454 task_instantiation_info=tasks_dict_info,
2455 )
2456
2457 stage[1] = "Getting VCA public key."
2458 # n2vc_redesign STEP 1 Get VCA public ssh-key
2459 # feature 1429. Add n2vc public key to needed VMs
2460 n2vc_key = self.n2vc.get_public_key()
2461 n2vc_key_list = [n2vc_key]
2462 if self.vca_config.get("public_key"):
2463 n2vc_key_list.append(self.vca_config["public_key"])
2464
2465 stage[1] = "Deploying NS at VIM."
2466 task_ro = asyncio.ensure_future(
2467 self.instantiate_RO(
2468 logging_text=logging_text,
2469 nsr_id=nsr_id,
2470 nsd=nsd,
2471 db_nsr=db_nsr,
2472 db_nslcmop=db_nslcmop,
2473 db_vnfrs=db_vnfrs,
2474 db_vnfds=db_vnfds,
2475 n2vc_key_list=n2vc_key_list,
2476 stage=stage,
2477 )
2478 )
2479 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2480 tasks_dict_info[task_ro] = "Deploying at VIM"
2481
2482 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2483 stage[1] = "Deploying Execution Environments."
2484 self.logger.debug(logging_text + stage[1])
2485
2486 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2487 for vnf_profile in get_vnf_profiles(nsd):
2488 vnfd_id = vnf_profile["vnfd-id"]
2489 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2490 member_vnf_index = str(vnf_profile["id"])
2491 db_vnfr = db_vnfrs[member_vnf_index]
2492 base_folder = vnfd["_admin"]["storage"]
2493 vdu_id = None
2494 vdu_index = 0
2495 vdu_name = None
2496 kdu_name = None
2497
2498 # Get additional parameters
2499 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2500 if db_vnfr.get("additionalParamsForVnf"):
2501 deploy_params.update(
2502 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2503 )
2504
2505 descriptor_config = get_configuration(vnfd, vnfd["id"])
2506 if descriptor_config:
2507 self._deploy_n2vc(
2508 logging_text=logging_text
2509 + "member_vnf_index={} ".format(member_vnf_index),
2510 db_nsr=db_nsr,
2511 db_vnfr=db_vnfr,
2512 nslcmop_id=nslcmop_id,
2513 nsr_id=nsr_id,
2514 nsi_id=nsi_id,
2515 vnfd_id=vnfd_id,
2516 vdu_id=vdu_id,
2517 kdu_name=kdu_name,
2518 member_vnf_index=member_vnf_index,
2519 vdu_index=vdu_index,
2520 vdu_name=vdu_name,
2521 deploy_params=deploy_params,
2522 descriptor_config=descriptor_config,
2523 base_folder=base_folder,
2524 task_instantiation_info=tasks_dict_info,
2525 stage=stage,
2526 )
2527
2528 # Deploy charms for each VDU that supports one.
2529 for vdud in get_vdu_list(vnfd):
2530 vdu_id = vdud["id"]
2531 descriptor_config = get_configuration(vnfd, vdu_id)
2532 vdur = find_in_list(
2533 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2534 )
2535
2536 if vdur.get("additionalParams"):
2537 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2538 else:
2539 deploy_params_vdu = deploy_params
2540 deploy_params_vdu["OSM"] = get_osm_params(
2541 db_vnfr, vdu_id, vdu_count_index=0
2542 )
2543 vdud_count = get_number_of_instances(vnfd, vdu_id)
2544
2545 self.logger.debug("VDUD > {}".format(vdud))
2546 self.logger.debug(
2547 "Descriptor config > {}".format(descriptor_config)
2548 )
2549 if descriptor_config:
2550 vdu_name = None
2551 kdu_name = None
2552 for vdu_index in range(vdud_count):
2553 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2554 self._deploy_n2vc(
2555 logging_text=logging_text
2556 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2557 member_vnf_index, vdu_id, vdu_index
2558 ),
2559 db_nsr=db_nsr,
2560 db_vnfr=db_vnfr,
2561 nslcmop_id=nslcmop_id,
2562 nsr_id=nsr_id,
2563 nsi_id=nsi_id,
2564 vnfd_id=vnfd_id,
2565 vdu_id=vdu_id,
2566 kdu_name=kdu_name,
2567 member_vnf_index=member_vnf_index,
2568 vdu_index=vdu_index,
2569 vdu_name=vdu_name,
2570 deploy_params=deploy_params_vdu,
2571 descriptor_config=descriptor_config,
2572 base_folder=base_folder,
2573 task_instantiation_info=tasks_dict_info,
2574 stage=stage,
2575 )
2576 for kdud in get_kdu_list(vnfd):
2577 kdu_name = kdud["name"]
2578 descriptor_config = get_configuration(vnfd, kdu_name)
2579 if descriptor_config:
2580 vdu_id = None
2581 vdu_index = 0
2582 vdu_name = None
2583 kdur = next(
2584 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2585 )
2586 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2587 if kdur.get("additionalParams"):
2588 deploy_params_kdu.update(
2589 parse_yaml_strings(kdur["additionalParams"].copy())
2590 )
2591
2592 self._deploy_n2vc(
2593 logging_text=logging_text,
2594 db_nsr=db_nsr,
2595 db_vnfr=db_vnfr,
2596 nslcmop_id=nslcmop_id,
2597 nsr_id=nsr_id,
2598 nsi_id=nsi_id,
2599 vnfd_id=vnfd_id,
2600 vdu_id=vdu_id,
2601 kdu_name=kdu_name,
2602 member_vnf_index=member_vnf_index,
2603 vdu_index=vdu_index,
2604 vdu_name=vdu_name,
2605 deploy_params=deploy_params_kdu,
2606 descriptor_config=descriptor_config,
2607 base_folder=base_folder,
2608 task_instantiation_info=tasks_dict_info,
2609 stage=stage,
2610 )
2611
2612 # Check if this NS has a charm configuration
2613 descriptor_config = nsd.get("ns-configuration")
2614 if descriptor_config and descriptor_config.get("juju"):
2615 vnfd_id = None
2616 db_vnfr = None
2617 member_vnf_index = None
2618 vdu_id = None
2619 kdu_name = None
2620 vdu_index = 0
2621 vdu_name = None
2622
2623 # Get additional parameters
2624 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2625 if db_nsr.get("additionalParamsForNs"):
2626 deploy_params.update(
2627 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2628 )
2629 base_folder = nsd["_admin"]["storage"]
2630 self._deploy_n2vc(
2631 logging_text=logging_text,
2632 db_nsr=db_nsr,
2633 db_vnfr=db_vnfr,
2634 nslcmop_id=nslcmop_id,
2635 nsr_id=nsr_id,
2636 nsi_id=nsi_id,
2637 vnfd_id=vnfd_id,
2638 vdu_id=vdu_id,
2639 kdu_name=kdu_name,
2640 member_vnf_index=member_vnf_index,
2641 vdu_index=vdu_index,
2642 vdu_name=vdu_name,
2643 deploy_params=deploy_params,
2644 descriptor_config=descriptor_config,
2645 base_folder=base_folder,
2646 task_instantiation_info=tasks_dict_info,
2647 stage=stage,
2648 )
2649
2650 # rest of staff will be done at finally
2651
2652 except (
2653 ROclient.ROClientException,
2654 DbException,
2655 LcmException,
2656 N2VCException,
2657 ) as e:
2658 self.logger.error(
2659 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2660 )
2661 exc = e
2662 except asyncio.CancelledError:
2663 self.logger.error(
2664 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2665 )
2666 exc = "Operation was cancelled"
2667 except Exception as e:
2668 exc = traceback.format_exc()
2669 self.logger.critical(
2670 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2671 exc_info=True,
2672 )
2673 finally:
2674 if exc:
2675 error_list.append(str(exc))
2676 try:
2677 # wait for pending tasks
2678 if tasks_dict_info:
2679 stage[1] = "Waiting for instantiate pending tasks."
2680 self.logger.debug(logging_text + stage[1])
2681 error_list += await self._wait_for_tasks(
2682 logging_text,
2683 tasks_dict_info,
2684 timeout_ns_deploy,
2685 stage,
2686 nslcmop_id,
2687 nsr_id=nsr_id,
2688 )
2689 stage[1] = stage[2] = ""
2690 except asyncio.CancelledError:
2691 error_list.append("Cancelled")
2692 # TODO cancel all tasks
2693 except Exception as exc:
2694 error_list.append(str(exc))
2695
2696 # update operation-status
2697 db_nsr_update["operational-status"] = "running"
2698 # let's begin with VCA 'configured' status (later we can change it)
2699 db_nsr_update["config-status"] = "configured"
2700 for task, task_name in tasks_dict_info.items():
2701 if not task.done() or task.cancelled() or task.exception():
2702 if task_name.startswith(self.task_name_deploy_vca):
2703 # A N2VC task is pending
2704 db_nsr_update["config-status"] = "failed"
2705 else:
2706 # RO or KDU task is pending
2707 db_nsr_update["operational-status"] = "failed"
2708
2709 # update status at database
2710 if error_list:
2711 error_detail = ". ".join(error_list)
2712 self.logger.error(logging_text + error_detail)
2713 error_description_nslcmop = "{} Detail: {}".format(
2714 stage[0], error_detail
2715 )
2716 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2717 nslcmop_id, stage[0]
2718 )
2719
2720 db_nsr_update["detailed-status"] = (
2721 error_description_nsr + " Detail: " + error_detail
2722 )
2723 db_nslcmop_update["detailed-status"] = error_detail
2724 nslcmop_operation_state = "FAILED"
2725 ns_state = "BROKEN"
2726 else:
2727 error_detail = None
2728 error_description_nsr = error_description_nslcmop = None
2729 ns_state = "READY"
2730 db_nsr_update["detailed-status"] = "Done"
2731 db_nslcmop_update["detailed-status"] = "Done"
2732 nslcmop_operation_state = "COMPLETED"
2733
2734 if db_nsr:
2735 self._write_ns_status(
2736 nsr_id=nsr_id,
2737 ns_state=ns_state,
2738 current_operation="IDLE",
2739 current_operation_id=None,
2740 error_description=error_description_nsr,
2741 error_detail=error_detail,
2742 other_update=db_nsr_update,
2743 )
2744 self._write_op_status(
2745 op_id=nslcmop_id,
2746 stage="",
2747 error_message=error_description_nslcmop,
2748 operation_state=nslcmop_operation_state,
2749 other_update=db_nslcmop_update,
2750 )
2751
2752 if nslcmop_operation_state:
2753 try:
2754 await self.msg.aiowrite(
2755 "ns",
2756 "instantiated",
2757 {
2758 "nsr_id": nsr_id,
2759 "nslcmop_id": nslcmop_id,
2760 "operationState": nslcmop_operation_state,
2761 },
2762 loop=self.loop,
2763 )
2764 except Exception as e:
2765 self.logger.error(
2766 logging_text + "kafka_write notification Exception {}".format(e)
2767 )
2768
2769 self.logger.debug(logging_text + "Exit")
2770 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2771
2772 def _get_vnfd(self, vnfd_id: str, cached_vnfds: Dict[str, Any]):
2773 if vnfd_id not in cached_vnfds:
2774 cached_vnfds[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
2775 return cached_vnfds[vnfd_id]
2776
2777 def _get_vnfr(self, nsr_id: str, vnf_profile_id: str, cached_vnfrs: Dict[str, Any]):
2778 if vnf_profile_id not in cached_vnfrs:
2779 cached_vnfrs[vnf_profile_id] = self.db.get_one(
2780 "vnfrs",
2781 {
2782 "member-vnf-index-ref": vnf_profile_id,
2783 "nsr-id-ref": nsr_id,
2784 },
2785 )
2786 return cached_vnfrs[vnf_profile_id]
2787
2788 def _is_deployed_vca_in_relation(
2789 self, vca: DeployedVCA, relation: Relation
2790 ) -> bool:
2791 found = False
2792 for endpoint in (relation.provider, relation.requirer):
2793 if endpoint["kdu-resource-profile-id"]:
2794 continue
2795 found = (
2796 vca.vnf_profile_id == endpoint.vnf_profile_id
2797 and vca.vdu_profile_id == endpoint.vdu_profile_id
2798 and vca.execution_environment_ref == endpoint.execution_environment_ref
2799 )
2800 if found:
2801 break
2802 return found
2803
2804 def _update_ee_relation_data_with_implicit_data(
2805 self, nsr_id, nsd, ee_relation_data, cached_vnfds, vnf_profile_id: str = None
2806 ):
2807 ee_relation_data = safe_get_ee_relation(
2808 nsr_id, ee_relation_data, vnf_profile_id=vnf_profile_id
2809 )
2810 ee_relation_level = EELevel.get_level(ee_relation_data)
2811 if (ee_relation_level in (EELevel.VNF, EELevel.VDU)) and not ee_relation_data[
2812 "execution-environment-ref"
2813 ]:
2814 vnf_profile = get_vnf_profile(nsd, ee_relation_data["vnf-profile-id"])
2815 vnfd_id = vnf_profile["vnfd-id"]
2816 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2817 entity_id = (
2818 vnfd_id
2819 if ee_relation_level == EELevel.VNF
2820 else ee_relation_data["vdu-profile-id"]
2821 )
2822 ee = get_juju_ee_ref(db_vnfd, entity_id)
2823 if not ee:
2824 raise Exception(
2825 f"not execution environments found for ee_relation {ee_relation_data}"
2826 )
2827 ee_relation_data["execution-environment-ref"] = ee["id"]
2828 return ee_relation_data
2829
2830 def _get_ns_relations(
2831 self,
2832 nsr_id: str,
2833 nsd: Dict[str, Any],
2834 vca: DeployedVCA,
2835 cached_vnfds: Dict[str, Any],
2836 ) -> List[Relation]:
2837 relations = []
2838 db_ns_relations = get_ns_configuration_relation_list(nsd)
2839 for r in db_ns_relations:
2840 provider_dict = None
2841 requirer_dict = None
2842 if all(key in r for key in ("provider", "requirer")):
2843 provider_dict = r["provider"]
2844 requirer_dict = r["requirer"]
2845 elif "entities" in r:
2846 provider_id = r["entities"][0]["id"]
2847 provider_dict = {
2848 "nsr-id": nsr_id,
2849 "endpoint": r["entities"][0]["endpoint"],
2850 }
2851 if provider_id != nsd["id"]:
2852 provider_dict["vnf-profile-id"] = provider_id
2853 requirer_id = r["entities"][1]["id"]
2854 requirer_dict = {
2855 "nsr-id": nsr_id,
2856 "endpoint": r["entities"][1]["endpoint"],
2857 }
2858 if requirer_id != nsd["id"]:
2859 requirer_dict["vnf-profile-id"] = requirer_id
2860 else:
2861 raise Exception("provider/requirer or entities must be included in the relation.")
2862 relation_provider = self._update_ee_relation_data_with_implicit_data(
2863 nsr_id, nsd, provider_dict, cached_vnfds
2864 )
2865 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2866 nsr_id, nsd, requirer_dict, cached_vnfds
2867 )
2868 provider = EERelation(relation_provider)
2869 requirer = EERelation(relation_requirer)
2870 relation = Relation(r["name"], provider, requirer)
2871 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2872 if vca_in_relation:
2873 relations.append(relation)
2874 return relations
2875
2876 def _get_vnf_relations(
2877 self,
2878 nsr_id: str,
2879 nsd: Dict[str, Any],
2880 vca: DeployedVCA,
2881 cached_vnfds: Dict[str, Any],
2882 ) -> List[Relation]:
2883 relations = []
2884 vnf_profile = get_vnf_profile(nsd, vca.vnf_profile_id)
2885 vnf_profile_id = vnf_profile["id"]
2886 vnfd_id = vnf_profile["vnfd-id"]
2887 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2888 db_vnf_relations = get_relation_list(db_vnfd, vnfd_id)
2889 for r in db_vnf_relations:
2890 provider_dict = None
2891 requirer_dict = None
2892 if all(key in r for key in ("provider", "requirer")):
2893 provider_dict = r["provider"]
2894 requirer_dict = r["requirer"]
2895 elif "entities" in r:
2896 provider_id = r["entities"][0]["id"]
2897 provider_dict = {
2898 "nsr-id": nsr_id,
2899 "vnf-profile-id": vnf_profile_id,
2900 "endpoint": r["entities"][0]["endpoint"],
2901 }
2902 if provider_id != vnfd_id:
2903 provider_dict["vdu-profile-id"] = provider_id
2904 requirer_id = r["entities"][1]["id"]
2905 requirer_dict = {
2906 "nsr-id": nsr_id,
2907 "vnf-profile-id": vnf_profile_id,
2908 "endpoint": r["entities"][1]["endpoint"],
2909 }
2910 if requirer_id != vnfd_id:
2911 requirer_dict["vdu-profile-id"] = requirer_id
2912 else:
2913 raise Exception("provider/requirer or entities must be included in the relation.")
2914 relation_provider = self._update_ee_relation_data_with_implicit_data(
2915 nsr_id, nsd, provider_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2916 )
2917 relation_requirer = self._update_ee_relation_data_with_implicit_data(
2918 nsr_id, nsd, requirer_dict, cached_vnfds, vnf_profile_id=vnf_profile_id
2919 )
2920 provider = EERelation(relation_provider)
2921 requirer = EERelation(relation_requirer)
2922 relation = Relation(r["name"], provider, requirer)
2923 vca_in_relation = self._is_deployed_vca_in_relation(vca, relation)
2924 if vca_in_relation:
2925 relations.append(relation)
2926 return relations
2927
2928 def _get_kdu_resource_data(
2929 self,
2930 ee_relation: EERelation,
2931 db_nsr: Dict[str, Any],
2932 cached_vnfds: Dict[str, Any],
2933 ) -> DeployedK8sResource:
2934 nsd = get_nsd(db_nsr)
2935 vnf_profiles = get_vnf_profiles(nsd)
2936 vnfd_id = find_in_list(
2937 vnf_profiles,
2938 lambda vnf_profile: vnf_profile["id"] == ee_relation.vnf_profile_id,
2939 )["vnfd-id"]
2940 db_vnfd = self._get_vnfd(vnfd_id, cached_vnfds)
2941 kdu_resource_profile = get_kdu_resource_profile(
2942 db_vnfd, ee_relation.kdu_resource_profile_id
2943 )
2944 kdu_name = kdu_resource_profile["kdu-name"]
2945 deployed_kdu, _ = get_deployed_kdu(
2946 db_nsr.get("_admin", ()).get("deployed", ()),
2947 kdu_name,
2948 ee_relation.vnf_profile_id,
2949 )
2950 deployed_kdu.update({"resource-name": kdu_resource_profile["resource-name"]})
2951 return deployed_kdu
2952
2953 def _get_deployed_component(
2954 self,
2955 ee_relation: EERelation,
2956 db_nsr: Dict[str, Any],
2957 cached_vnfds: Dict[str, Any],
2958 ) -> DeployedComponent:
2959 nsr_id = db_nsr["_id"]
2960 deployed_component = None
2961 ee_level = EELevel.get_level(ee_relation)
2962 if ee_level == EELevel.NS:
2963 vca = get_deployed_vca(db_nsr, {"vdu_id": None, "member-vnf-index": None})
2964 if vca:
2965 deployed_component = DeployedVCA(nsr_id, vca)
2966 elif ee_level == EELevel.VNF:
2967 vca = get_deployed_vca(
2968 db_nsr,
2969 {
2970 "vdu_id": None,
2971 "member-vnf-index": ee_relation.vnf_profile_id,
2972 "ee_descriptor_id": ee_relation.execution_environment_ref,
2973 },
2974 )
2975 if vca:
2976 deployed_component = DeployedVCA(nsr_id, vca)
2977 elif ee_level == EELevel.VDU:
2978 vca = get_deployed_vca(
2979 db_nsr,
2980 {
2981 "vdu_id": ee_relation.vdu_profile_id,
2982 "member-vnf-index": ee_relation.vnf_profile_id,
2983 "ee_descriptor_id": ee_relation.execution_environment_ref,
2984 },
2985 )
2986 if vca:
2987 deployed_component = DeployedVCA(nsr_id, vca)
2988 elif ee_level == EELevel.KDU:
2989 kdu_resource_data = self._get_kdu_resource_data(
2990 ee_relation, db_nsr, cached_vnfds
2991 )
2992 if kdu_resource_data:
2993 deployed_component = DeployedK8sResource(kdu_resource_data)
2994 return deployed_component
2995
2996 async def _add_relation(
2997 self,
2998 relation: Relation,
2999 vca_type: str,
3000 db_nsr: Dict[str, Any],
3001 cached_vnfds: Dict[str, Any],
3002 cached_vnfrs: Dict[str, Any],
3003 ) -> bool:
3004 deployed_provider = self._get_deployed_component(
3005 relation.provider, db_nsr, cached_vnfds
3006 )
3007 deployed_requirer = self._get_deployed_component(
3008 relation.requirer, db_nsr, cached_vnfds
3009 )
3010 if (
3011 deployed_provider
3012 and deployed_requirer
3013 and deployed_provider.config_sw_installed
3014 and deployed_requirer.config_sw_installed
3015 ):
3016 provider_db_vnfr = (
3017 self._get_vnfr(
3018 relation.provider.nsr_id,
3019 relation.provider.vnf_profile_id,
3020 cached_vnfrs,
3021 )
3022 if relation.provider.vnf_profile_id
3023 else None
3024 )
3025 requirer_db_vnfr = (
3026 self._get_vnfr(
3027 relation.requirer.nsr_id,
3028 relation.requirer.vnf_profile_id,
3029 cached_vnfrs,
3030 )
3031 if relation.requirer.vnf_profile_id
3032 else None
3033 )
3034 provider_vca_id = self.get_vca_id(provider_db_vnfr, db_nsr)
3035 requirer_vca_id = self.get_vca_id(requirer_db_vnfr, db_nsr)
3036 provider_relation_endpoint = RelationEndpoint(
3037 deployed_provider.ee_id,
3038 provider_vca_id,
3039 relation.provider.endpoint,
3040 )
3041 requirer_relation_endpoint = RelationEndpoint(
3042 deployed_requirer.ee_id,
3043 requirer_vca_id,
3044 relation.requirer.endpoint,
3045 )
3046 await self.vca_map[vca_type].add_relation(
3047 provider=provider_relation_endpoint,
3048 requirer=requirer_relation_endpoint,
3049 )
3050 # remove entry from relations list
3051 return True
3052 return False
3053
3054 async def _add_vca_relations(
3055 self,
3056 logging_text,
3057 nsr_id,
3058 vca_type: str,
3059 vca_index: int,
3060 timeout: int = 3600,
3061 ) -> bool:
3062
3063 # steps:
3064 # 1. find all relations for this VCA
3065 # 2. wait for other peers related
3066 # 3. add relations
3067
3068 try:
3069 # STEP 1: find all relations for this VCA
3070
3071 # read nsr record
3072 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3073 nsd = get_nsd(db_nsr)
3074
3075 # this VCA data
3076 deployed_vca_dict = get_deployed_vca_list(db_nsr)[vca_index]
3077 my_vca = DeployedVCA(nsr_id, deployed_vca_dict)
3078
3079 cached_vnfds = {}
3080 cached_vnfrs = {}
3081 relations = []
3082 relations.extend(self._get_ns_relations(nsr_id, nsd, my_vca, cached_vnfds))
3083 relations.extend(self._get_vnf_relations(nsr_id, nsd, my_vca, cached_vnfds))
3084
3085 # if no relations, terminate
3086 if not relations:
3087 self.logger.debug(logging_text + " No relations")
3088 return True
3089
3090 self.logger.debug(logging_text + " adding relations {}".format(relations))
3091
3092 # add all relations
3093 start = time()
3094 while True:
3095 # check timeout
3096 now = time()
3097 if now - start >= timeout:
3098 self.logger.error(logging_text + " : timeout adding relations")
3099 return False
3100
3101 # reload nsr from database (we need to update record: _admin.deployed.VCA)
3102 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3103
3104 # for each relation, find the VCA's related
3105 for relation in relations.copy():
3106 added = await self._add_relation(
3107 relation,
3108 vca_type,
3109 db_nsr,
3110 cached_vnfds,
3111 cached_vnfrs,
3112 )
3113 if added:
3114 relations.remove(relation)
3115
3116 if not relations:
3117 self.logger.debug("Relations added")
3118 break
3119 await asyncio.sleep(5.0)
3120
3121 return True
3122
3123 except Exception as e:
3124 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
3125 return False
3126
3127 async def _install_kdu(
3128 self,
3129 nsr_id: str,
3130 nsr_db_path: str,
3131 vnfr_data: dict,
3132 kdu_index: int,
3133 kdud: dict,
3134 vnfd: dict,
3135 k8s_instance_info: dict,
3136 k8params: dict = None,
3137 timeout: int = 600,
3138 vca_id: str = None,
3139 ):
3140
3141 try:
3142 k8sclustertype = k8s_instance_info["k8scluster-type"]
3143 # Instantiate kdu
3144 db_dict_install = {
3145 "collection": "nsrs",
3146 "filter": {"_id": nsr_id},
3147 "path": nsr_db_path,
3148 }
3149
3150 if k8s_instance_info.get("kdu-deployment-name"):
3151 kdu_instance = k8s_instance_info.get("kdu-deployment-name")
3152 else:
3153 kdu_instance = self.k8scluster_map[
3154 k8sclustertype
3155 ].generate_kdu_instance_name(
3156 db_dict=db_dict_install,
3157 kdu_model=k8s_instance_info["kdu-model"],
3158 kdu_name=k8s_instance_info["kdu-name"],
3159 )
3160 self.update_db_2(
3161 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3162 )
3163 await self.k8scluster_map[k8sclustertype].install(
3164 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3165 kdu_model=k8s_instance_info["kdu-model"],
3166 atomic=True,
3167 params=k8params,
3168 db_dict=db_dict_install,
3169 timeout=timeout,
3170 kdu_name=k8s_instance_info["kdu-name"],
3171 namespace=k8s_instance_info["namespace"],
3172 kdu_instance=kdu_instance,
3173 vca_id=vca_id,
3174 )
3175 self.update_db_2(
3176 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
3177 )
3178
3179 # Obtain services to obtain management service ip
3180 services = await self.k8scluster_map[k8sclustertype].get_services(
3181 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3182 kdu_instance=kdu_instance,
3183 namespace=k8s_instance_info["namespace"],
3184 )
3185
3186 # Obtain management service info (if exists)
3187 vnfr_update_dict = {}
3188 kdu_config = get_configuration(vnfd, kdud["name"])
3189 if kdu_config:
3190 target_ee_list = kdu_config.get("execution-environment-list", [])
3191 else:
3192 target_ee_list = []
3193
3194 if services:
3195 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
3196 mgmt_services = [
3197 service
3198 for service in kdud.get("service", [])
3199 if service.get("mgmt-service")
3200 ]
3201 for mgmt_service in mgmt_services:
3202 for service in services:
3203 if service["name"].startswith(mgmt_service["name"]):
3204 # Mgmt service found, Obtain service ip
3205 ip = service.get("external_ip", service.get("cluster_ip"))
3206 if isinstance(ip, list) and len(ip) == 1:
3207 ip = ip[0]
3208
3209 vnfr_update_dict[
3210 "kdur.{}.ip-address".format(kdu_index)
3211 ] = ip
3212
3213 # Check if must update also mgmt ip at the vnf
3214 service_external_cp = mgmt_service.get(
3215 "external-connection-point-ref"
3216 )
3217 if service_external_cp:
3218 if (
3219 deep_get(vnfd, ("mgmt-interface", "cp"))
3220 == service_external_cp
3221 ):
3222 vnfr_update_dict["ip-address"] = ip
3223
3224 if find_in_list(
3225 target_ee_list,
3226 lambda ee: ee.get(
3227 "external-connection-point-ref", ""
3228 )
3229 == service_external_cp,
3230 ):
3231 vnfr_update_dict[
3232 "kdur.{}.ip-address".format(kdu_index)
3233 ] = ip
3234 break
3235 else:
3236 self.logger.warn(
3237 "Mgmt service name: {} not found".format(
3238 mgmt_service["name"]
3239 )
3240 )
3241
3242 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
3243 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
3244
3245 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
3246 if (
3247 kdu_config
3248 and kdu_config.get("initial-config-primitive")
3249 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
3250 ):
3251 initial_config_primitive_list = kdu_config.get(
3252 "initial-config-primitive"
3253 )
3254 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
3255
3256 for initial_config_primitive in initial_config_primitive_list:
3257 primitive_params_ = self._map_primitive_params(
3258 initial_config_primitive, {}, {}
3259 )
3260
3261 await asyncio.wait_for(
3262 self.k8scluster_map[k8sclustertype].exec_primitive(
3263 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
3264 kdu_instance=kdu_instance,
3265 primitive_name=initial_config_primitive["name"],
3266 params=primitive_params_,
3267 db_dict=db_dict_install,
3268 vca_id=vca_id,
3269 ),
3270 timeout=timeout,
3271 )
3272
3273 except Exception as e:
3274 # Prepare update db with error and raise exception
3275 try:
3276 self.update_db_2(
3277 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
3278 )
3279 self.update_db_2(
3280 "vnfrs",
3281 vnfr_data.get("_id"),
3282 {"kdur.{}.status".format(kdu_index): "ERROR"},
3283 )
3284 except Exception:
3285 # ignore to keep original exception
3286 pass
3287 # reraise original error
3288 raise
3289
3290 return kdu_instance
3291
3292 async def deploy_kdus(
3293 self,
3294 logging_text,
3295 nsr_id,
3296 nslcmop_id,
3297 db_vnfrs,
3298 db_vnfds,
3299 task_instantiation_info,
3300 ):
3301 # Launch kdus if present in the descriptor
3302
3303 k8scluster_id_2_uuic = {
3304 "helm-chart-v3": {},
3305 "helm-chart": {},
3306 "juju-bundle": {},
3307 }
3308
3309 async def _get_cluster_id(cluster_id, cluster_type):
3310 nonlocal k8scluster_id_2_uuic
3311 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3312 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3313
3314 # check if K8scluster is creating and wait look if previous tasks in process
3315 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3316 "k8scluster", cluster_id
3317 )
3318 if task_dependency:
3319 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3320 task_name, cluster_id
3321 )
3322 self.logger.debug(logging_text + text)
3323 await asyncio.wait(task_dependency, timeout=3600)
3324
3325 db_k8scluster = self.db.get_one(
3326 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3327 )
3328 if not db_k8scluster:
3329 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3330
3331 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3332 if not k8s_id:
3333 if cluster_type == "helm-chart-v3":
3334 try:
3335 # backward compatibility for existing clusters that have not been initialized for helm v3
3336 k8s_credentials = yaml.safe_dump(
3337 db_k8scluster.get("credentials")
3338 )
3339 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3340 k8s_credentials, reuse_cluster_uuid=cluster_id
3341 )
3342 db_k8scluster_update = {}
3343 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3344 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3345 db_k8scluster_update[
3346 "_admin.helm-chart-v3.created"
3347 ] = uninstall_sw
3348 db_k8scluster_update[
3349 "_admin.helm-chart-v3.operationalState"
3350 ] = "ENABLED"
3351 self.update_db_2(
3352 "k8sclusters", cluster_id, db_k8scluster_update
3353 )
3354 except Exception as e:
3355 self.logger.error(
3356 logging_text
3357 + "error initializing helm-v3 cluster: {}".format(str(e))
3358 )
3359 raise LcmException(
3360 "K8s cluster '{}' has not been initialized for '{}'".format(
3361 cluster_id, cluster_type
3362 )
3363 )
3364 else:
3365 raise LcmException(
3366 "K8s cluster '{}' has not been initialized for '{}'".format(
3367 cluster_id, cluster_type
3368 )
3369 )
3370 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3371 return k8s_id
3372
3373 logging_text += "Deploy kdus: "
3374 step = ""
3375 try:
3376 db_nsr_update = {"_admin.deployed.K8s": []}
3377 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3378
3379 index = 0
3380 updated_cluster_list = []
3381 updated_v3_cluster_list = []
3382
3383 for vnfr_data in db_vnfrs.values():
3384 vca_id = self.get_vca_id(vnfr_data, {})
3385 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3386 # Step 0: Prepare and set parameters
3387 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3388 vnfd_id = vnfr_data.get("vnfd-id")
3389 vnfd_with_id = find_in_list(
3390 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3391 )
3392 kdud = next(
3393 kdud
3394 for kdud in vnfd_with_id["kdu"]
3395 if kdud["name"] == kdur["kdu-name"]
3396 )
3397 namespace = kdur.get("k8s-namespace")
3398 kdu_deployment_name = kdur.get("kdu-deployment-name")
3399 if kdur.get("helm-chart"):
3400 kdumodel = kdur["helm-chart"]
3401 # Default version: helm3, if helm-version is v2 assign v2
3402 k8sclustertype = "helm-chart-v3"
3403 self.logger.debug("kdur: {}".format(kdur))
3404 if (
3405 kdur.get("helm-version")
3406 and kdur.get("helm-version") == "v2"
3407 ):
3408 k8sclustertype = "helm-chart"
3409 elif kdur.get("juju-bundle"):
3410 kdumodel = kdur["juju-bundle"]
3411 k8sclustertype = "juju-bundle"
3412 else:
3413 raise LcmException(
3414 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3415 "juju-bundle. Maybe an old NBI version is running".format(
3416 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3417 )
3418 )
3419 # check if kdumodel is a file and exists
3420 try:
3421 vnfd_with_id = find_in_list(
3422 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3423 )
3424 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3425 if storage: # may be not present if vnfd has not artifacts
3426 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3427 if storage["pkg-dir"]:
3428 filename = "{}/{}/{}s/{}".format(
3429 storage["folder"],
3430 storage["pkg-dir"],
3431 k8sclustertype,
3432 kdumodel,
3433 )
3434 else:
3435 filename = "{}/Scripts/{}s/{}".format(
3436 storage["folder"],
3437 k8sclustertype,
3438 kdumodel,
3439 )
3440 if self.fs.file_exists(
3441 filename, mode="file"
3442 ) or self.fs.file_exists(filename, mode="dir"):
3443 kdumodel = self.fs.path + filename
3444 except (asyncio.TimeoutError, asyncio.CancelledError):
3445 raise
3446 except Exception: # it is not a file
3447 pass
3448
3449 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3450 step = "Synchronize repos for k8s cluster '{}'".format(
3451 k8s_cluster_id
3452 )
3453 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3454
3455 # Synchronize repos
3456 if (
3457 k8sclustertype == "helm-chart"
3458 and cluster_uuid not in updated_cluster_list
3459 ) or (
3460 k8sclustertype == "helm-chart-v3"
3461 and cluster_uuid not in updated_v3_cluster_list
3462 ):
3463 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3464 self.k8scluster_map[k8sclustertype].synchronize_repos(
3465 cluster_uuid=cluster_uuid
3466 )
3467 )
3468 if del_repo_list or added_repo_dict:
3469 if k8sclustertype == "helm-chart":
3470 unset = {
3471 "_admin.helm_charts_added." + item: None
3472 for item in del_repo_list
3473 }
3474 updated = {
3475 "_admin.helm_charts_added." + item: name
3476 for item, name in added_repo_dict.items()
3477 }
3478 updated_cluster_list.append(cluster_uuid)
3479 elif k8sclustertype == "helm-chart-v3":
3480 unset = {
3481 "_admin.helm_charts_v3_added." + item: None
3482 for item in del_repo_list
3483 }
3484 updated = {
3485 "_admin.helm_charts_v3_added." + item: name
3486 for item, name in added_repo_dict.items()
3487 }
3488 updated_v3_cluster_list.append(cluster_uuid)
3489 self.logger.debug(
3490 logging_text + "repos synchronized on k8s cluster "
3491 "'{}' to_delete: {}, to_add: {}".format(
3492 k8s_cluster_id, del_repo_list, added_repo_dict
3493 )
3494 )
3495 self.db.set_one(
3496 "k8sclusters",
3497 {"_id": k8s_cluster_id},
3498 updated,
3499 unset=unset,
3500 )
3501
3502 # Instantiate kdu
3503 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3504 vnfr_data["member-vnf-index-ref"],
3505 kdur["kdu-name"],
3506 k8s_cluster_id,
3507 )
3508 k8s_instance_info = {
3509 "kdu-instance": None,
3510 "k8scluster-uuid": cluster_uuid,
3511 "k8scluster-type": k8sclustertype,
3512 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3513 "kdu-name": kdur["kdu-name"],
3514 "kdu-model": kdumodel,
3515 "namespace": namespace,
3516 "kdu-deployment-name": kdu_deployment_name,
3517 }
3518 db_path = "_admin.deployed.K8s.{}".format(index)
3519 db_nsr_update[db_path] = k8s_instance_info
3520 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3521 vnfd_with_id = find_in_list(
3522 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3523 )
3524 task = asyncio.ensure_future(
3525 self._install_kdu(
3526 nsr_id,
3527 db_path,
3528 vnfr_data,
3529 kdu_index,
3530 kdud,
3531 vnfd_with_id,
3532 k8s_instance_info,
3533 k8params=desc_params,
3534 timeout=600,
3535 vca_id=vca_id,
3536 )
3537 )
3538 self.lcm_tasks.register(
3539 "ns",
3540 nsr_id,
3541 nslcmop_id,
3542 "instantiate_KDU-{}".format(index),
3543 task,
3544 )
3545 task_instantiation_info[task] = "Deploying KDU {}".format(
3546 kdur["kdu-name"]
3547 )
3548
3549 index += 1
3550
3551 except (LcmException, asyncio.CancelledError):
3552 raise
3553 except Exception as e:
3554 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3555 if isinstance(e, (N2VCException, DbException)):
3556 self.logger.error(logging_text + msg)
3557 else:
3558 self.logger.critical(logging_text + msg, exc_info=True)
3559 raise LcmException(msg)
3560 finally:
3561 if db_nsr_update:
3562 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3563
3564 def _deploy_n2vc(
3565 self,
3566 logging_text,
3567 db_nsr,
3568 db_vnfr,
3569 nslcmop_id,
3570 nsr_id,
3571 nsi_id,
3572 vnfd_id,
3573 vdu_id,
3574 kdu_name,
3575 member_vnf_index,
3576 vdu_index,
3577 vdu_name,
3578 deploy_params,
3579 descriptor_config,
3580 base_folder,
3581 task_instantiation_info,
3582 stage,
3583 ):
3584 # launch instantiate_N2VC in a asyncio task and register task object
3585 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3586 # if not found, create one entry and update database
3587 # fill db_nsr._admin.deployed.VCA.<index>
3588
3589 self.logger.debug(
3590 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3591 )
3592 if "execution-environment-list" in descriptor_config:
3593 ee_list = descriptor_config.get("execution-environment-list", [])
3594 elif "juju" in descriptor_config:
3595 ee_list = [descriptor_config] # ns charms
3596 else: # other types as script are not supported
3597 ee_list = []
3598
3599 for ee_item in ee_list:
3600 self.logger.debug(
3601 logging_text
3602 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3603 ee_item.get("juju"), ee_item.get("helm-chart")
3604 )
3605 )
3606 ee_descriptor_id = ee_item.get("id")
3607 if ee_item.get("juju"):
3608 vca_name = ee_item["juju"].get("charm")
3609 vca_type = (
3610 "lxc_proxy_charm"
3611 if ee_item["juju"].get("charm") is not None
3612 else "native_charm"
3613 )
3614 if ee_item["juju"].get("cloud") == "k8s":
3615 vca_type = "k8s_proxy_charm"
3616 elif ee_item["juju"].get("proxy") is False:
3617 vca_type = "native_charm"
3618 elif ee_item.get("helm-chart"):
3619 vca_name = ee_item["helm-chart"]
3620 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3621 vca_type = "helm"
3622 else:
3623 vca_type = "helm-v3"
3624 else:
3625 self.logger.debug(
3626 logging_text + "skipping non juju neither charm configuration"
3627 )
3628 continue
3629
3630 vca_index = -1
3631 for vca_index, vca_deployed in enumerate(
3632 db_nsr["_admin"]["deployed"]["VCA"]
3633 ):
3634 if not vca_deployed:
3635 continue
3636 if (
3637 vca_deployed.get("member-vnf-index") == member_vnf_index
3638 and vca_deployed.get("vdu_id") == vdu_id
3639 and vca_deployed.get("kdu_name") == kdu_name
3640 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3641 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3642 ):
3643 break
3644 else:
3645 # not found, create one.
3646 target = (
3647 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3648 )
3649 if vdu_id:
3650 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3651 elif kdu_name:
3652 target += "/kdu/{}".format(kdu_name)
3653 vca_deployed = {
3654 "target_element": target,
3655 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3656 "member-vnf-index": member_vnf_index,
3657 "vdu_id": vdu_id,
3658 "kdu_name": kdu_name,
3659 "vdu_count_index": vdu_index,
3660 "operational-status": "init", # TODO revise
3661 "detailed-status": "", # TODO revise
3662 "step": "initial-deploy", # TODO revise
3663 "vnfd_id": vnfd_id,
3664 "vdu_name": vdu_name,
3665 "type": vca_type,
3666 "ee_descriptor_id": ee_descriptor_id,
3667 }
3668 vca_index += 1
3669
3670 # create VCA and configurationStatus in db
3671 db_dict = {
3672 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3673 "configurationStatus.{}".format(vca_index): dict(),
3674 }
3675 self.update_db_2("nsrs", nsr_id, db_dict)
3676
3677 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3678
3679 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3680 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3681 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3682
3683 # Launch task
3684 task_n2vc = asyncio.ensure_future(
3685 self.instantiate_N2VC(
3686 logging_text=logging_text,
3687 vca_index=vca_index,
3688 nsi_id=nsi_id,
3689 db_nsr=db_nsr,
3690 db_vnfr=db_vnfr,
3691 vdu_id=vdu_id,
3692 kdu_name=kdu_name,
3693 vdu_index=vdu_index,
3694 deploy_params=deploy_params,
3695 config_descriptor=descriptor_config,
3696 base_folder=base_folder,
3697 nslcmop_id=nslcmop_id,
3698 stage=stage,
3699 vca_type=vca_type,
3700 vca_name=vca_name,
3701 ee_config_descriptor=ee_item,
3702 )
3703 )
3704 self.lcm_tasks.register(
3705 "ns",
3706 nsr_id,
3707 nslcmop_id,
3708 "instantiate_N2VC-{}".format(vca_index),
3709 task_n2vc,
3710 )
3711 task_instantiation_info[
3712 task_n2vc
3713 ] = self.task_name_deploy_vca + " {}.{}".format(
3714 member_vnf_index or "", vdu_id or ""
3715 )
3716
3717 @staticmethod
3718 def _create_nslcmop(nsr_id, operation, params):
3719 """
3720 Creates a ns-lcm-opp content to be stored at database.
3721 :param nsr_id: internal id of the instance
3722 :param operation: instantiate, terminate, scale, action, ...
3723 :param params: user parameters for the operation
3724 :return: dictionary following SOL005 format
3725 """
3726 # Raise exception if invalid arguments
3727 if not (nsr_id and operation and params):
3728 raise LcmException(
3729 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3730 )
3731 now = time()
3732 _id = str(uuid4())
3733 nslcmop = {
3734 "id": _id,
3735 "_id": _id,
3736 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3737 "operationState": "PROCESSING",
3738 "statusEnteredTime": now,
3739 "nsInstanceId": nsr_id,
3740 "lcmOperationType": operation,
3741 "startTime": now,
3742 "isAutomaticInvocation": False,
3743 "operationParams": params,
3744 "isCancelPending": False,
3745 "links": {
3746 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3747 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3748 },
3749 }
3750 return nslcmop
3751
3752 def _format_additional_params(self, params):
3753 params = params or {}
3754 for key, value in params.items():
3755 if str(value).startswith("!!yaml "):
3756 params[key] = yaml.safe_load(value[7:])
3757 return params
3758
3759 def _get_terminate_primitive_params(self, seq, vnf_index):
3760 primitive = seq.get("name")
3761 primitive_params = {}
3762 params = {
3763 "member_vnf_index": vnf_index,
3764 "primitive": primitive,
3765 "primitive_params": primitive_params,
3766 }
3767 desc_params = {}
3768 return self._map_primitive_params(seq, params, desc_params)
3769
3770 # sub-operations
3771
3772 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3773 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3774 if op.get("operationState") == "COMPLETED":
3775 # b. Skip sub-operation
3776 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3777 return self.SUBOPERATION_STATUS_SKIP
3778 else:
3779 # c. retry executing sub-operation
3780 # The sub-operation exists, and operationState != 'COMPLETED'
3781 # Update operationState = 'PROCESSING' to indicate a retry.
3782 operationState = "PROCESSING"
3783 detailed_status = "In progress"
3784 self._update_suboperation_status(
3785 db_nslcmop, op_index, operationState, detailed_status
3786 )
3787 # Return the sub-operation index
3788 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3789 # with arguments extracted from the sub-operation
3790 return op_index
3791
3792 # Find a sub-operation where all keys in a matching dictionary must match
3793 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3794 def _find_suboperation(self, db_nslcmop, match):
3795 if db_nslcmop and match:
3796 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3797 for i, op in enumerate(op_list):
3798 if all(op.get(k) == match[k] for k in match):
3799 return i
3800 return self.SUBOPERATION_STATUS_NOT_FOUND
3801
3802 # Update status for a sub-operation given its index
3803 def _update_suboperation_status(
3804 self, db_nslcmop, op_index, operationState, detailed_status
3805 ):
3806 # Update DB for HA tasks
3807 q_filter = {"_id": db_nslcmop["_id"]}
3808 update_dict = {
3809 "_admin.operations.{}.operationState".format(op_index): operationState,
3810 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3811 }
3812 self.db.set_one(
3813 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3814 )
3815
3816 # Add sub-operation, return the index of the added sub-operation
3817 # Optionally, set operationState, detailed-status, and operationType
3818 # Status and type are currently set for 'scale' sub-operations:
3819 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3820 # 'detailed-status' : status message
3821 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3822 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3823 def _add_suboperation(
3824 self,
3825 db_nslcmop,
3826 vnf_index,
3827 vdu_id,
3828 vdu_count_index,
3829 vdu_name,
3830 primitive,
3831 mapped_primitive_params,
3832 operationState=None,
3833 detailed_status=None,
3834 operationType=None,
3835 RO_nsr_id=None,
3836 RO_scaling_info=None,
3837 ):
3838 if not db_nslcmop:
3839 return self.SUBOPERATION_STATUS_NOT_FOUND
3840 # Get the "_admin.operations" list, if it exists
3841 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3842 op_list = db_nslcmop_admin.get("operations")
3843 # Create or append to the "_admin.operations" list
3844 new_op = {
3845 "member_vnf_index": vnf_index,
3846 "vdu_id": vdu_id,
3847 "vdu_count_index": vdu_count_index,
3848 "primitive": primitive,
3849 "primitive_params": mapped_primitive_params,
3850 }
3851 if operationState:
3852 new_op["operationState"] = operationState
3853 if detailed_status:
3854 new_op["detailed-status"] = detailed_status
3855 if operationType:
3856 new_op["lcmOperationType"] = operationType
3857 if RO_nsr_id:
3858 new_op["RO_nsr_id"] = RO_nsr_id
3859 if RO_scaling_info:
3860 new_op["RO_scaling_info"] = RO_scaling_info
3861 if not op_list:
3862 # No existing operations, create key 'operations' with current operation as first list element
3863 db_nslcmop_admin.update({"operations": [new_op]})
3864 op_list = db_nslcmop_admin.get("operations")
3865 else:
3866 # Existing operations, append operation to list
3867 op_list.append(new_op)
3868
3869 db_nslcmop_update = {"_admin.operations": op_list}
3870 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3871 op_index = len(op_list) - 1
3872 return op_index
3873
3874 # Helper methods for scale() sub-operations
3875
3876 # pre-scale/post-scale:
3877 # Check for 3 different cases:
3878 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3879 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3880 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3881 def _check_or_add_scale_suboperation(
3882 self,
3883 db_nslcmop,
3884 vnf_index,
3885 vnf_config_primitive,
3886 primitive_params,
3887 operationType,
3888 RO_nsr_id=None,
3889 RO_scaling_info=None,
3890 ):
3891 # Find this sub-operation
3892 if RO_nsr_id and RO_scaling_info:
3893 operationType = "SCALE-RO"
3894 match = {
3895 "member_vnf_index": vnf_index,
3896 "RO_nsr_id": RO_nsr_id,
3897 "RO_scaling_info": RO_scaling_info,
3898 }
3899 else:
3900 match = {
3901 "member_vnf_index": vnf_index,
3902 "primitive": vnf_config_primitive,
3903 "primitive_params": primitive_params,
3904 "lcmOperationType": operationType,
3905 }
3906 op_index = self._find_suboperation(db_nslcmop, match)
3907 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3908 # a. New sub-operation
3909 # The sub-operation does not exist, add it.
3910 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3911 # The following parameters are set to None for all kind of scaling:
3912 vdu_id = None
3913 vdu_count_index = None
3914 vdu_name = None
3915 if RO_nsr_id and RO_scaling_info:
3916 vnf_config_primitive = None
3917 primitive_params = None
3918 else:
3919 RO_nsr_id = None
3920 RO_scaling_info = None
3921 # Initial status for sub-operation
3922 operationState = "PROCESSING"
3923 detailed_status = "In progress"
3924 # Add sub-operation for pre/post-scaling (zero or more operations)
3925 self._add_suboperation(
3926 db_nslcmop,
3927 vnf_index,
3928 vdu_id,
3929 vdu_count_index,
3930 vdu_name,
3931 vnf_config_primitive,
3932 primitive_params,
3933 operationState,
3934 detailed_status,
3935 operationType,
3936 RO_nsr_id,
3937 RO_scaling_info,
3938 )
3939 return self.SUBOPERATION_STATUS_NEW
3940 else:
3941 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3942 # or op_index (operationState != 'COMPLETED')
3943 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3944
3945 # Function to return execution_environment id
3946
3947 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3948 # TODO vdu_index_count
3949 for vca in vca_deployed_list:
3950 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3951 return vca["ee_id"]
3952
3953 async def destroy_N2VC(
3954 self,
3955 logging_text,
3956 db_nslcmop,
3957 vca_deployed,
3958 config_descriptor,
3959 vca_index,
3960 destroy_ee=True,
3961 exec_primitives=True,
3962 scaling_in=False,
3963 vca_id: str = None,
3964 ):
3965 """
3966 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3967 :param logging_text:
3968 :param db_nslcmop:
3969 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3970 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3971 :param vca_index: index in the database _admin.deployed.VCA
3972 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3973 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3974 not executed properly
3975 :param scaling_in: True destroys the application, False destroys the model
3976 :return: None or exception
3977 """
3978
3979 self.logger.debug(
3980 logging_text
3981 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3982 vca_index, vca_deployed, config_descriptor, destroy_ee
3983 )
3984 )
3985
3986 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3987
3988 # execute terminate_primitives
3989 if exec_primitives:
3990 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3991 config_descriptor.get("terminate-config-primitive"),
3992 vca_deployed.get("ee_descriptor_id"),
3993 )
3994 vdu_id = vca_deployed.get("vdu_id")
3995 vdu_count_index = vca_deployed.get("vdu_count_index")
3996 vdu_name = vca_deployed.get("vdu_name")
3997 vnf_index = vca_deployed.get("member-vnf-index")
3998 if terminate_primitives and vca_deployed.get("needed_terminate"):
3999 for seq in terminate_primitives:
4000 # For each sequence in list, get primitive and call _ns_execute_primitive()
4001 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
4002 vnf_index, seq.get("name")
4003 )
4004 self.logger.debug(logging_text + step)
4005 # Create the primitive for each sequence, i.e. "primitive": "touch"
4006 primitive = seq.get("name")
4007 mapped_primitive_params = self._get_terminate_primitive_params(
4008 seq, vnf_index
4009 )
4010
4011 # Add sub-operation
4012 self._add_suboperation(
4013 db_nslcmop,
4014 vnf_index,
4015 vdu_id,
4016 vdu_count_index,
4017 vdu_name,
4018 primitive,
4019 mapped_primitive_params,
4020 )
4021 # Sub-operations: Call _ns_execute_primitive() instead of action()
4022 try:
4023 result, result_detail = await self._ns_execute_primitive(
4024 vca_deployed["ee_id"],
4025 primitive,
4026 mapped_primitive_params,
4027 vca_type=vca_type,
4028 vca_id=vca_id,
4029 )
4030 except LcmException:
4031 # this happens when VCA is not deployed. In this case it is not needed to terminate
4032 continue
4033 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
4034 if result not in result_ok:
4035 raise LcmException(
4036 "terminate_primitive {} for vnf_member_index={} fails with "
4037 "error {}".format(seq.get("name"), vnf_index, result_detail)
4038 )
4039 # set that this VCA do not need terminated
4040 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
4041 vca_index
4042 )
4043 self.update_db_2(
4044 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
4045 )
4046
4047 # Delete Prometheus Jobs if any
4048 # This uses NSR_ID, so it will destroy any jobs under this index
4049 self.db.del_list("prometheus_jobs", {"nsr_id": db_nslcmop["nsInstanceId"]})
4050
4051 if destroy_ee:
4052 await self.vca_map[vca_type].delete_execution_environment(
4053 vca_deployed["ee_id"],
4054 scaling_in=scaling_in,
4055 vca_type=vca_type,
4056 vca_id=vca_id,
4057 )
4058
4059 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
4060 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
4061 namespace = "." + db_nsr["_id"]
4062 try:
4063 await self.n2vc.delete_namespace(
4064 namespace=namespace,
4065 total_timeout=self.timeout_charm_delete,
4066 vca_id=vca_id,
4067 )
4068 except N2VCNotFound: # already deleted. Skip
4069 pass
4070 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
4071
4072 async def _terminate_RO(
4073 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4074 ):
4075 """
4076 Terminates a deployment from RO
4077 :param logging_text:
4078 :param nsr_deployed: db_nsr._admin.deployed
4079 :param nsr_id:
4080 :param nslcmop_id:
4081 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
4082 this method will update only the index 2, but it will write on database the concatenated content of the list
4083 :return:
4084 """
4085 db_nsr_update = {}
4086 failed_detail = []
4087 ro_nsr_id = ro_delete_action = None
4088 if nsr_deployed and nsr_deployed.get("RO"):
4089 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
4090 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
4091 try:
4092 if ro_nsr_id:
4093 stage[2] = "Deleting ns from VIM."
4094 db_nsr_update["detailed-status"] = " ".join(stage)
4095 self._write_op_status(nslcmop_id, stage)
4096 self.logger.debug(logging_text + stage[2])
4097 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4098 self._write_op_status(nslcmop_id, stage)
4099 desc = await self.RO.delete("ns", ro_nsr_id)
4100 ro_delete_action = desc["action_id"]
4101 db_nsr_update[
4102 "_admin.deployed.RO.nsr_delete_action_id"
4103 ] = ro_delete_action
4104 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4105 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4106 if ro_delete_action:
4107 # wait until NS is deleted from VIM
4108 stage[2] = "Waiting ns deleted from VIM."
4109 detailed_status_old = None
4110 self.logger.debug(
4111 logging_text
4112 + stage[2]
4113 + " RO_id={} ro_delete_action={}".format(
4114 ro_nsr_id, ro_delete_action
4115 )
4116 )
4117 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4118 self._write_op_status(nslcmop_id, stage)
4119
4120 delete_timeout = 20 * 60 # 20 minutes
4121 while delete_timeout > 0:
4122 desc = await self.RO.show(
4123 "ns",
4124 item_id_name=ro_nsr_id,
4125 extra_item="action",
4126 extra_item_id=ro_delete_action,
4127 )
4128
4129 # deploymentStatus
4130 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4131
4132 ns_status, ns_status_info = self.RO.check_action_status(desc)
4133 if ns_status == "ERROR":
4134 raise ROclient.ROClientException(ns_status_info)
4135 elif ns_status == "BUILD":
4136 stage[2] = "Deleting from VIM {}".format(ns_status_info)
4137 elif ns_status == "ACTIVE":
4138 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4139 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4140 break
4141 else:
4142 assert (
4143 False
4144 ), "ROclient.check_action_status returns unknown {}".format(
4145 ns_status
4146 )
4147 if stage[2] != detailed_status_old:
4148 detailed_status_old = stage[2]
4149 db_nsr_update["detailed-status"] = " ".join(stage)
4150 self._write_op_status(nslcmop_id, stage)
4151 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4152 await asyncio.sleep(5, loop=self.loop)
4153 delete_timeout -= 5
4154 else: # delete_timeout <= 0:
4155 raise ROclient.ROClientException(
4156 "Timeout waiting ns deleted from VIM"
4157 )
4158
4159 except Exception as e:
4160 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4161 if (
4162 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4163 ): # not found
4164 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
4165 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
4166 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
4167 self.logger.debug(
4168 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
4169 )
4170 elif (
4171 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4172 ): # conflict
4173 failed_detail.append("delete conflict: {}".format(e))
4174 self.logger.debug(
4175 logging_text
4176 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
4177 )
4178 else:
4179 failed_detail.append("delete error: {}".format(e))
4180 self.logger.error(
4181 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
4182 )
4183
4184 # Delete nsd
4185 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
4186 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
4187 try:
4188 stage[2] = "Deleting nsd from RO."
4189 db_nsr_update["detailed-status"] = " ".join(stage)
4190 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4191 self._write_op_status(nslcmop_id, stage)
4192 await self.RO.delete("nsd", ro_nsd_id)
4193 self.logger.debug(
4194 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
4195 )
4196 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4197 except Exception as e:
4198 if (
4199 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4200 ): # not found
4201 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
4202 self.logger.debug(
4203 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
4204 )
4205 elif (
4206 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4207 ): # conflict
4208 failed_detail.append(
4209 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
4210 )
4211 self.logger.debug(logging_text + failed_detail[-1])
4212 else:
4213 failed_detail.append(
4214 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
4215 )
4216 self.logger.error(logging_text + failed_detail[-1])
4217
4218 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
4219 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
4220 if not vnf_deployed or not vnf_deployed["id"]:
4221 continue
4222 try:
4223 ro_vnfd_id = vnf_deployed["id"]
4224 stage[
4225 2
4226 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
4227 vnf_deployed["member-vnf-index"], ro_vnfd_id
4228 )
4229 db_nsr_update["detailed-status"] = " ".join(stage)
4230 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4231 self._write_op_status(nslcmop_id, stage)
4232 await self.RO.delete("vnfd", ro_vnfd_id)
4233 self.logger.debug(
4234 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
4235 )
4236 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
4237 except Exception as e:
4238 if (
4239 isinstance(e, ROclient.ROClientException) and e.http_code == 404
4240 ): # not found
4241 db_nsr_update[
4242 "_admin.deployed.RO.vnfd.{}.id".format(index)
4243 ] = None
4244 self.logger.debug(
4245 logging_text
4246 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
4247 )
4248 elif (
4249 isinstance(e, ROclient.ROClientException) and e.http_code == 409
4250 ): # conflict
4251 failed_detail.append(
4252 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
4253 )
4254 self.logger.debug(logging_text + failed_detail[-1])
4255 else:
4256 failed_detail.append(
4257 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
4258 )
4259 self.logger.error(logging_text + failed_detail[-1])
4260
4261 if failed_detail:
4262 stage[2] = "Error deleting from VIM"
4263 else:
4264 stage[2] = "Deleted from VIM"
4265 db_nsr_update["detailed-status"] = " ".join(stage)
4266 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4267 self._write_op_status(nslcmop_id, stage)
4268
4269 if failed_detail:
4270 raise LcmException("; ".join(failed_detail))
4271
4272 async def terminate(self, nsr_id, nslcmop_id):
4273 # Try to lock HA task here
4274 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4275 if not task_is_locked_by_me:
4276 return
4277
4278 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
4279 self.logger.debug(logging_text + "Enter")
4280 timeout_ns_terminate = self.timeout_ns_terminate
4281 db_nsr = None
4282 db_nslcmop = None
4283 operation_params = None
4284 exc = None
4285 error_list = [] # annotates all failed error messages
4286 db_nslcmop_update = {}
4287 autoremove = False # autoremove after terminated
4288 tasks_dict_info = {}
4289 db_nsr_update = {}
4290 stage = [
4291 "Stage 1/3: Preparing task.",
4292 "Waiting for previous operations to terminate.",
4293 "",
4294 ]
4295 # ^ contains [stage, step, VIM-status]
4296 try:
4297 # wait for any previous tasks in process
4298 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4299
4300 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
4301 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4302 operation_params = db_nslcmop.get("operationParams") or {}
4303 if operation_params.get("timeout_ns_terminate"):
4304 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
4305 stage[1] = "Getting nsr={} from db.".format(nsr_id)
4306 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4307
4308 db_nsr_update["operational-status"] = "terminating"
4309 db_nsr_update["config-status"] = "terminating"
4310 self._write_ns_status(
4311 nsr_id=nsr_id,
4312 ns_state="TERMINATING",
4313 current_operation="TERMINATING",
4314 current_operation_id=nslcmop_id,
4315 other_update=db_nsr_update,
4316 )
4317 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4318 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4319 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4320 return
4321
4322 stage[1] = "Getting vnf descriptors from db."
4323 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4324 db_vnfrs_dict = {
4325 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4326 }
4327 db_vnfds_from_id = {}
4328 db_vnfds_from_member_index = {}
4329 # Loop over VNFRs
4330 for vnfr in db_vnfrs_list:
4331 vnfd_id = vnfr["vnfd-id"]
4332 if vnfd_id not in db_vnfds_from_id:
4333 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4334 db_vnfds_from_id[vnfd_id] = vnfd
4335 db_vnfds_from_member_index[
4336 vnfr["member-vnf-index-ref"]
4337 ] = db_vnfds_from_id[vnfd_id]
4338
4339 # Destroy individual execution environments when there are terminating primitives.
4340 # Rest of EE will be deleted at once
4341 # TODO - check before calling _destroy_N2VC
4342 # if not operation_params.get("skip_terminate_primitives"):#
4343 # or not vca.get("needed_terminate"):
4344 stage[0] = "Stage 2/3 execute terminating primitives."
4345 self.logger.debug(logging_text + stage[0])
4346 stage[1] = "Looking execution environment that needs terminate."
4347 self.logger.debug(logging_text + stage[1])
4348
4349 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4350 config_descriptor = None
4351 vca_member_vnf_index = vca.get("member-vnf-index")
4352 vca_id = self.get_vca_id(
4353 db_vnfrs_dict.get(vca_member_vnf_index)
4354 if vca_member_vnf_index
4355 else None,
4356 db_nsr,
4357 )
4358 if not vca or not vca.get("ee_id"):
4359 continue
4360 if not vca.get("member-vnf-index"):
4361 # ns
4362 config_descriptor = db_nsr.get("ns-configuration")
4363 elif vca.get("vdu_id"):
4364 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4365 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4366 elif vca.get("kdu_name"):
4367 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4368 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4369 else:
4370 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4371 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4372 vca_type = vca.get("type")
4373 exec_terminate_primitives = not operation_params.get(
4374 "skip_terminate_primitives"
4375 ) and vca.get("needed_terminate")
4376 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4377 # pending native charms
4378 destroy_ee = (
4379 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4380 )
4381 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4382 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4383 task = asyncio.ensure_future(
4384 self.destroy_N2VC(
4385 logging_text,
4386 db_nslcmop,
4387 vca,
4388 config_descriptor,
4389 vca_index,
4390 destroy_ee,
4391 exec_terminate_primitives,
4392 vca_id=vca_id,
4393 )
4394 )
4395 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4396
4397 # wait for pending tasks of terminate primitives
4398 if tasks_dict_info:
4399 self.logger.debug(
4400 logging_text
4401 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4402 )
4403 error_list = await self._wait_for_tasks(
4404 logging_text,
4405 tasks_dict_info,
4406 min(self.timeout_charm_delete, timeout_ns_terminate),
4407 stage,
4408 nslcmop_id,
4409 )
4410 tasks_dict_info.clear()
4411 if error_list:
4412 return # raise LcmException("; ".join(error_list))
4413
4414 # remove All execution environments at once
4415 stage[0] = "Stage 3/3 delete all."
4416
4417 if nsr_deployed.get("VCA"):
4418 stage[1] = "Deleting all execution environments."
4419 self.logger.debug(logging_text + stage[1])
4420 vca_id = self.get_vca_id({}, db_nsr)
4421 task_delete_ee = asyncio.ensure_future(
4422 asyncio.wait_for(
4423 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4424 timeout=self.timeout_charm_delete,
4425 )
4426 )
4427 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4428 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4429
4430 # Delete from k8scluster
4431 stage[1] = "Deleting KDUs."
4432 self.logger.debug(logging_text + stage[1])
4433 # print(nsr_deployed)
4434 for kdu in get_iterable(nsr_deployed, "K8s"):
4435 if not kdu or not kdu.get("kdu-instance"):
4436 continue
4437 kdu_instance = kdu.get("kdu-instance")
4438 if kdu.get("k8scluster-type") in self.k8scluster_map:
4439 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4440 vca_id = self.get_vca_id({}, db_nsr)
4441 task_delete_kdu_instance = asyncio.ensure_future(
4442 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4443 cluster_uuid=kdu.get("k8scluster-uuid"),
4444 kdu_instance=kdu_instance,
4445 vca_id=vca_id,
4446 )
4447 )
4448 else:
4449 self.logger.error(
4450 logging_text
4451 + "Unknown k8s deployment type {}".format(
4452 kdu.get("k8scluster-type")
4453 )
4454 )
4455 continue
4456 tasks_dict_info[
4457 task_delete_kdu_instance
4458 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4459
4460 # remove from RO
4461 stage[1] = "Deleting ns from VIM."
4462 if self.ng_ro:
4463 task_delete_ro = asyncio.ensure_future(
4464 self._terminate_ng_ro(
4465 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4466 )
4467 )
4468 else:
4469 task_delete_ro = asyncio.ensure_future(
4470 self._terminate_RO(
4471 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4472 )
4473 )
4474 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4475
4476 # rest of staff will be done at finally
4477
4478 except (
4479 ROclient.ROClientException,
4480 DbException,
4481 LcmException,
4482 N2VCException,
4483 ) as e:
4484 self.logger.error(logging_text + "Exit Exception {}".format(e))
4485 exc = e
4486 except asyncio.CancelledError:
4487 self.logger.error(
4488 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4489 )
4490 exc = "Operation was cancelled"
4491 except Exception as e:
4492 exc = traceback.format_exc()
4493 self.logger.critical(
4494 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4495 exc_info=True,
4496 )
4497 finally:
4498 if exc:
4499 error_list.append(str(exc))
4500 try:
4501 # wait for pending tasks
4502 if tasks_dict_info:
4503 stage[1] = "Waiting for terminate pending tasks."
4504 self.logger.debug(logging_text + stage[1])
4505 error_list += await self._wait_for_tasks(
4506 logging_text,
4507 tasks_dict_info,
4508 timeout_ns_terminate,
4509 stage,
4510 nslcmop_id,
4511 )
4512 stage[1] = stage[2] = ""
4513 except asyncio.CancelledError:
4514 error_list.append("Cancelled")
4515 # TODO cancell all tasks
4516 except Exception as exc:
4517 error_list.append(str(exc))
4518 # update status at database
4519 if error_list:
4520 error_detail = "; ".join(error_list)
4521 # self.logger.error(logging_text + error_detail)
4522 error_description_nslcmop = "{} Detail: {}".format(
4523 stage[0], error_detail
4524 )
4525 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4526 nslcmop_id, stage[0]
4527 )
4528
4529 db_nsr_update["operational-status"] = "failed"
4530 db_nsr_update["detailed-status"] = (
4531 error_description_nsr + " Detail: " + error_detail
4532 )
4533 db_nslcmop_update["detailed-status"] = error_detail
4534 nslcmop_operation_state = "FAILED"
4535 ns_state = "BROKEN"
4536 else:
4537 error_detail = None
4538 error_description_nsr = error_description_nslcmop = None
4539 ns_state = "NOT_INSTANTIATED"
4540 db_nsr_update["operational-status"] = "terminated"
4541 db_nsr_update["detailed-status"] = "Done"
4542 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4543 db_nslcmop_update["detailed-status"] = "Done"
4544 nslcmop_operation_state = "COMPLETED"
4545
4546 if db_nsr:
4547 self._write_ns_status(
4548 nsr_id=nsr_id,
4549 ns_state=ns_state,
4550 current_operation="IDLE",
4551 current_operation_id=None,
4552 error_description=error_description_nsr,
4553 error_detail=error_detail,
4554 other_update=db_nsr_update,
4555 )
4556 self._write_op_status(
4557 op_id=nslcmop_id,
4558 stage="",
4559 error_message=error_description_nslcmop,
4560 operation_state=nslcmop_operation_state,
4561 other_update=db_nslcmop_update,
4562 )
4563 if ns_state == "NOT_INSTANTIATED":
4564 try:
4565 self.db.set_list(
4566 "vnfrs",
4567 {"nsr-id-ref": nsr_id},
4568 {"_admin.nsState": "NOT_INSTANTIATED"},
4569 )
4570 except DbException as e:
4571 self.logger.warn(
4572 logging_text
4573 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4574 nsr_id, e
4575 )
4576 )
4577 if operation_params:
4578 autoremove = operation_params.get("autoremove", False)
4579 if nslcmop_operation_state:
4580 try:
4581 await self.msg.aiowrite(
4582 "ns",
4583 "terminated",
4584 {
4585 "nsr_id": nsr_id,
4586 "nslcmop_id": nslcmop_id,
4587 "operationState": nslcmop_operation_state,
4588 "autoremove": autoremove,
4589 },
4590 loop=self.loop,
4591 )
4592 except Exception as e:
4593 self.logger.error(
4594 logging_text + "kafka_write notification Exception {}".format(e)
4595 )
4596
4597 self.logger.debug(logging_text + "Exit")
4598 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4599
4600 async def _wait_for_tasks(
4601 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4602 ):
4603 time_start = time()
4604 error_detail_list = []
4605 error_list = []
4606 pending_tasks = list(created_tasks_info.keys())
4607 num_tasks = len(pending_tasks)
4608 num_done = 0
4609 stage[1] = "{}/{}.".format(num_done, num_tasks)
4610 self._write_op_status(nslcmop_id, stage)
4611 while pending_tasks:
4612 new_error = None
4613 _timeout = timeout + time_start - time()
4614 done, pending_tasks = await asyncio.wait(
4615 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4616 )
4617 num_done += len(done)
4618 if not done: # Timeout
4619 for task in pending_tasks:
4620 new_error = created_tasks_info[task] + ": Timeout"
4621 error_detail_list.append(new_error)
4622 error_list.append(new_error)
4623 break
4624 for task in done:
4625 if task.cancelled():
4626 exc = "Cancelled"
4627 else:
4628 exc = task.exception()
4629 if exc:
4630 if isinstance(exc, asyncio.TimeoutError):
4631 exc = "Timeout"
4632 new_error = created_tasks_info[task] + ": {}".format(exc)
4633 error_list.append(created_tasks_info[task])
4634 error_detail_list.append(new_error)
4635 if isinstance(
4636 exc,
4637 (
4638 str,
4639 DbException,
4640 N2VCException,
4641 ROclient.ROClientException,
4642 LcmException,
4643 K8sException,
4644 NgRoException,
4645 ),
4646 ):
4647 self.logger.error(logging_text + new_error)
4648 else:
4649 exc_traceback = "".join(
4650 traceback.format_exception(None, exc, exc.__traceback__)
4651 )
4652 self.logger.error(
4653 logging_text
4654 + created_tasks_info[task]
4655 + " "
4656 + exc_traceback
4657 )
4658 else:
4659 self.logger.debug(
4660 logging_text + created_tasks_info[task] + ": Done"
4661 )
4662 stage[1] = "{}/{}.".format(num_done, num_tasks)
4663 if new_error:
4664 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4665 if nsr_id: # update also nsr
4666 self.update_db_2(
4667 "nsrs",
4668 nsr_id,
4669 {
4670 "errorDescription": "Error at: " + ", ".join(error_list),
4671 "errorDetail": ". ".join(error_detail_list),
4672 },
4673 )
4674 self._write_op_status(nslcmop_id, stage)
4675 return error_detail_list
4676
4677 @staticmethod
4678 def _map_primitive_params(primitive_desc, params, instantiation_params):
4679 """
4680 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4681 The default-value is used. If it is between < > it look for a value at instantiation_params
4682 :param primitive_desc: portion of VNFD/NSD that describes primitive
4683 :param params: Params provided by user
4684 :param instantiation_params: Instantiation params provided by user
4685 :return: a dictionary with the calculated params
4686 """
4687 calculated_params = {}
4688 for parameter in primitive_desc.get("parameter", ()):
4689 param_name = parameter["name"]
4690 if param_name in params:
4691 calculated_params[param_name] = params[param_name]
4692 elif "default-value" in parameter or "value" in parameter:
4693 if "value" in parameter:
4694 calculated_params[param_name] = parameter["value"]
4695 else:
4696 calculated_params[param_name] = parameter["default-value"]
4697 if (
4698 isinstance(calculated_params[param_name], str)
4699 and calculated_params[param_name].startswith("<")
4700 and calculated_params[param_name].endswith(">")
4701 ):
4702 if calculated_params[param_name][1:-1] in instantiation_params:
4703 calculated_params[param_name] = instantiation_params[
4704 calculated_params[param_name][1:-1]
4705 ]
4706 else:
4707 raise LcmException(
4708 "Parameter {} needed to execute primitive {} not provided".format(
4709 calculated_params[param_name], primitive_desc["name"]
4710 )
4711 )
4712 else:
4713 raise LcmException(
4714 "Parameter {} needed to execute primitive {} not provided".format(
4715 param_name, primitive_desc["name"]
4716 )
4717 )
4718
4719 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4720 calculated_params[param_name] = yaml.safe_dump(
4721 calculated_params[param_name], default_flow_style=True, width=256
4722 )
4723 elif isinstance(calculated_params[param_name], str) and calculated_params[
4724 param_name
4725 ].startswith("!!yaml "):
4726 calculated_params[param_name] = calculated_params[param_name][7:]
4727 if parameter.get("data-type") == "INTEGER":
4728 try:
4729 calculated_params[param_name] = int(calculated_params[param_name])
4730 except ValueError: # error converting string to int
4731 raise LcmException(
4732 "Parameter {} of primitive {} must be integer".format(
4733 param_name, primitive_desc["name"]
4734 )
4735 )
4736 elif parameter.get("data-type") == "BOOLEAN":
4737 calculated_params[param_name] = not (
4738 (str(calculated_params[param_name])).lower() == "false"
4739 )
4740
4741 # add always ns_config_info if primitive name is config
4742 if primitive_desc["name"] == "config":
4743 if "ns_config_info" in instantiation_params:
4744 calculated_params["ns_config_info"] = instantiation_params[
4745 "ns_config_info"
4746 ]
4747 return calculated_params
4748
4749 def _look_for_deployed_vca(
4750 self,
4751 deployed_vca,
4752 member_vnf_index,
4753 vdu_id,
4754 vdu_count_index,
4755 kdu_name=None,
4756 ee_descriptor_id=None,
4757 ):
4758 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4759 for vca in deployed_vca:
4760 if not vca:
4761 continue
4762 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4763 continue
4764 if (
4765 vdu_count_index is not None
4766 and vdu_count_index != vca["vdu_count_index"]
4767 ):
4768 continue
4769 if kdu_name and kdu_name != vca["kdu_name"]:
4770 continue
4771 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4772 continue
4773 break
4774 else:
4775 # vca_deployed not found
4776 raise LcmException(
4777 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4778 " is not deployed".format(
4779 member_vnf_index,
4780 vdu_id,
4781 vdu_count_index,
4782 kdu_name,
4783 ee_descriptor_id,
4784 )
4785 )
4786 # get ee_id
4787 ee_id = vca.get("ee_id")
4788 vca_type = vca.get(
4789 "type", "lxc_proxy_charm"
4790 ) # default value for backward compatibility - proxy charm
4791 if not ee_id:
4792 raise LcmException(
4793 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4794 "execution environment".format(
4795 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4796 )
4797 )
4798 return ee_id, vca_type
4799
4800 async def _ns_execute_primitive(
4801 self,
4802 ee_id,
4803 primitive,
4804 primitive_params,
4805 retries=0,
4806 retries_interval=30,
4807 timeout=None,
4808 vca_type=None,
4809 db_dict=None,
4810 vca_id: str = None,
4811 ) -> (str, str):
4812 try:
4813 if primitive == "config":
4814 primitive_params = {"params": primitive_params}
4815
4816 vca_type = vca_type or "lxc_proxy_charm"
4817
4818 while retries >= 0:
4819 try:
4820 output = await asyncio.wait_for(
4821 self.vca_map[vca_type].exec_primitive(
4822 ee_id=ee_id,
4823 primitive_name=primitive,
4824 params_dict=primitive_params,
4825 progress_timeout=self.timeout_progress_primitive,
4826 total_timeout=self.timeout_primitive,
4827 db_dict=db_dict,
4828 vca_id=vca_id,
4829 vca_type=vca_type,
4830 ),
4831 timeout=timeout or self.timeout_primitive,
4832 )
4833 # execution was OK
4834 break
4835 except asyncio.CancelledError:
4836 raise
4837 except Exception as e: # asyncio.TimeoutError
4838 if isinstance(e, asyncio.TimeoutError):
4839 e = "Timeout"
4840 retries -= 1
4841 if retries >= 0:
4842 self.logger.debug(
4843 "Error executing action {} on {} -> {}".format(
4844 primitive, ee_id, e
4845 )
4846 )
4847 # wait and retry
4848 await asyncio.sleep(retries_interval, loop=self.loop)
4849 else:
4850 return "FAILED", str(e)
4851
4852 return "COMPLETED", output
4853
4854 except (LcmException, asyncio.CancelledError):
4855 raise
4856 except Exception as e:
4857 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4858
4859 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4860 """
4861 Updating the vca_status with latest juju information in nsrs record
4862 :param: nsr_id: Id of the nsr
4863 :param: nslcmop_id: Id of the nslcmop
4864 :return: None
4865 """
4866
4867 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4868 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4869 vca_id = self.get_vca_id({}, db_nsr)
4870 if db_nsr["_admin"]["deployed"]["K8s"]:
4871 for _, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4872 cluster_uuid, kdu_instance, cluster_type = (
4873 k8s["k8scluster-uuid"],
4874 k8s["kdu-instance"],
4875 k8s["k8scluster-type"],
4876 )
4877 await self._on_update_k8s_db(
4878 cluster_uuid=cluster_uuid,
4879 kdu_instance=kdu_instance,
4880 filter={"_id": nsr_id},
4881 vca_id=vca_id,
4882 cluster_type=cluster_type,
4883 )
4884 else:
4885 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4886 table, filter = "nsrs", {"_id": nsr_id}
4887 path = "_admin.deployed.VCA.{}.".format(vca_index)
4888 await self._on_update_n2vc_db(table, filter, path, {})
4889
4890 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4891 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4892
4893 async def action(self, nsr_id, nslcmop_id):
4894 # Try to lock HA task here
4895 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4896 if not task_is_locked_by_me:
4897 return
4898
4899 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4900 self.logger.debug(logging_text + "Enter")
4901 # get all needed from database
4902 db_nsr = None
4903 db_nslcmop = None
4904 db_nsr_update = {}
4905 db_nslcmop_update = {}
4906 nslcmop_operation_state = None
4907 error_description_nslcmop = None
4908 exc = None
4909 try:
4910 # wait for any previous tasks in process
4911 step = "Waiting for previous operations to terminate"
4912 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4913
4914 self._write_ns_status(
4915 nsr_id=nsr_id,
4916 ns_state=None,
4917 current_operation="RUNNING ACTION",
4918 current_operation_id=nslcmop_id,
4919 )
4920
4921 step = "Getting information from database"
4922 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4923 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4924 if db_nslcmop["operationParams"].get("primitive_params"):
4925 db_nslcmop["operationParams"]["primitive_params"] = json.loads(
4926 db_nslcmop["operationParams"]["primitive_params"]
4927 )
4928
4929 nsr_deployed = db_nsr["_admin"].get("deployed")
4930 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4931 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4932 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4933 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4934 primitive = db_nslcmop["operationParams"]["primitive"]
4935 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4936 timeout_ns_action = db_nslcmop["operationParams"].get(
4937 "timeout_ns_action", self.timeout_primitive
4938 )
4939
4940 if vnf_index:
4941 step = "Getting vnfr from database"
4942 db_vnfr = self.db.get_one(
4943 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4944 )
4945 if db_vnfr.get("kdur"):
4946 kdur_list = []
4947 for kdur in db_vnfr["kdur"]:
4948 if kdur.get("additionalParams"):
4949 kdur["additionalParams"] = json.loads(
4950 kdur["additionalParams"]
4951 )
4952 kdur_list.append(kdur)
4953 db_vnfr["kdur"] = kdur_list
4954 step = "Getting vnfd from database"
4955 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4956 else:
4957 step = "Getting nsd from database"
4958 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4959
4960 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4961 # for backward compatibility
4962 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4963 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4964 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4965 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4966
4967 # look for primitive
4968 config_primitive_desc = descriptor_configuration = None
4969 if vdu_id:
4970 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4971 elif kdu_name:
4972 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4973 elif vnf_index:
4974 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4975 else:
4976 descriptor_configuration = db_nsd.get("ns-configuration")
4977
4978 if descriptor_configuration and descriptor_configuration.get(
4979 "config-primitive"
4980 ):
4981 for config_primitive in descriptor_configuration["config-primitive"]:
4982 if config_primitive["name"] == primitive:
4983 config_primitive_desc = config_primitive
4984 break
4985
4986 if not config_primitive_desc:
4987 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4988 raise LcmException(
4989 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4990 primitive
4991 )
4992 )
4993 primitive_name = primitive
4994 ee_descriptor_id = None
4995 else:
4996 primitive_name = config_primitive_desc.get(
4997 "execution-environment-primitive", primitive
4998 )
4999 ee_descriptor_id = config_primitive_desc.get(
5000 "execution-environment-ref"
5001 )
5002
5003 if vnf_index:
5004 if vdu_id:
5005 vdur = next(
5006 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
5007 )
5008 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
5009 elif kdu_name:
5010 kdur = next(
5011 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
5012 )
5013 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
5014 else:
5015 desc_params = parse_yaml_strings(
5016 db_vnfr.get("additionalParamsForVnf")
5017 )
5018 else:
5019 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
5020 if kdu_name and get_configuration(db_vnfd, kdu_name):
5021 kdu_configuration = get_configuration(db_vnfd, kdu_name)
5022 actions = set()
5023 for primitive in kdu_configuration.get("initial-config-primitive", []):
5024 actions.add(primitive["name"])
5025 for primitive in kdu_configuration.get("config-primitive", []):
5026 actions.add(primitive["name"])
5027 kdu_action = True if primitive_name in actions else False
5028
5029 # TODO check if ns is in a proper status
5030 if kdu_name and (
5031 primitive_name in ("upgrade", "rollback", "status") or kdu_action
5032 ):
5033 # kdur and desc_params already set from before
5034 if primitive_params:
5035 desc_params.update(primitive_params)
5036 # TODO Check if we will need something at vnf level
5037 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
5038 if (
5039 kdu_name == kdu["kdu-name"]
5040 and kdu["member-vnf-index"] == vnf_index
5041 ):
5042 break
5043 else:
5044 raise LcmException(
5045 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
5046 )
5047
5048 if kdu.get("k8scluster-type") not in self.k8scluster_map:
5049 msg = "unknown k8scluster-type '{}'".format(
5050 kdu.get("k8scluster-type")
5051 )
5052 raise LcmException(msg)
5053
5054 db_dict = {
5055 "collection": "nsrs",
5056 "filter": {"_id": nsr_id},
5057 "path": "_admin.deployed.K8s.{}".format(index),
5058 }
5059 self.logger.debug(
5060 logging_text
5061 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
5062 )
5063 step = "Executing kdu {}".format(primitive_name)
5064 if primitive_name == "upgrade":
5065 if desc_params.get("kdu_model"):
5066 kdu_model = desc_params.get("kdu_model")
5067 del desc_params["kdu_model"]
5068 else:
5069 kdu_model = kdu.get("kdu-model")
5070 parts = kdu_model.split(sep=":")
5071 if len(parts) == 2:
5072 kdu_model = parts[0]
5073
5074 detailed_status = await asyncio.wait_for(
5075 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
5076 cluster_uuid=kdu.get("k8scluster-uuid"),
5077 kdu_instance=kdu.get("kdu-instance"),
5078 atomic=True,
5079 kdu_model=kdu_model,
5080 params=desc_params,
5081 db_dict=db_dict,
5082 timeout=timeout_ns_action,
5083 ),
5084 timeout=timeout_ns_action + 10,
5085 )
5086 self.logger.debug(
5087 logging_text + " Upgrade of kdu {} done".format(detailed_status)
5088 )
5089 elif primitive_name == "rollback":
5090 detailed_status = await asyncio.wait_for(
5091 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
5092 cluster_uuid=kdu.get("k8scluster-uuid"),
5093 kdu_instance=kdu.get("kdu-instance"),
5094 db_dict=db_dict,
5095 ),
5096 timeout=timeout_ns_action,
5097 )
5098 elif primitive_name == "status":
5099 detailed_status = await asyncio.wait_for(
5100 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
5101 cluster_uuid=kdu.get("k8scluster-uuid"),
5102 kdu_instance=kdu.get("kdu-instance"),
5103 vca_id=vca_id,
5104 ),
5105 timeout=timeout_ns_action,
5106 )
5107 else:
5108 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
5109 kdu["kdu-name"], nsr_id
5110 )
5111 params = self._map_primitive_params(
5112 config_primitive_desc, primitive_params, desc_params
5113 )
5114
5115 detailed_status = await asyncio.wait_for(
5116 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
5117 cluster_uuid=kdu.get("k8scluster-uuid"),
5118 kdu_instance=kdu_instance,
5119 primitive_name=primitive_name,
5120 params=params,
5121 db_dict=db_dict,
5122 timeout=timeout_ns_action,
5123 vca_id=vca_id,
5124 ),
5125 timeout=timeout_ns_action,
5126 )
5127
5128 if detailed_status:
5129 nslcmop_operation_state = "COMPLETED"
5130 else:
5131 detailed_status = ""
5132 nslcmop_operation_state = "FAILED"
5133 else:
5134 ee_id, vca_type = self._look_for_deployed_vca(
5135 nsr_deployed["VCA"],
5136 member_vnf_index=vnf_index,
5137 vdu_id=vdu_id,
5138 vdu_count_index=vdu_count_index,
5139 ee_descriptor_id=ee_descriptor_id,
5140 )
5141 for vca_index, vca_deployed in enumerate(
5142 db_nsr["_admin"]["deployed"]["VCA"]
5143 ):
5144 if vca_deployed.get("member-vnf-index") == vnf_index:
5145 db_dict = {
5146 "collection": "nsrs",
5147 "filter": {"_id": nsr_id},
5148 "path": "_admin.deployed.VCA.{}.".format(vca_index),
5149 }
5150 break
5151 (
5152 nslcmop_operation_state,
5153 detailed_status,
5154 ) = await self._ns_execute_primitive(
5155 ee_id,
5156 primitive=primitive_name,
5157 primitive_params=self._map_primitive_params(
5158 config_primitive_desc, primitive_params, desc_params
5159 ),
5160 timeout=timeout_ns_action,
5161 vca_type=vca_type,
5162 db_dict=db_dict,
5163 vca_id=vca_id,
5164 )
5165
5166 db_nslcmop_update["detailed-status"] = detailed_status
5167 error_description_nslcmop = (
5168 detailed_status if nslcmop_operation_state == "FAILED" else ""
5169 )
5170 self.logger.debug(
5171 logging_text
5172 + " task Done with result {} {}".format(
5173 nslcmop_operation_state, detailed_status
5174 )
5175 )
5176 return # database update is called inside finally
5177
5178 except (DbException, LcmException, N2VCException, K8sException) as e:
5179 self.logger.error(logging_text + "Exit Exception {}".format(e))
5180 exc = e
5181 except asyncio.CancelledError:
5182 self.logger.error(
5183 logging_text + "Cancelled Exception while '{}'".format(step)
5184 )
5185 exc = "Operation was cancelled"
5186 except asyncio.TimeoutError:
5187 self.logger.error(logging_text + "Timeout while '{}'".format(step))
5188 exc = "Timeout"
5189 except Exception as e:
5190 exc = traceback.format_exc()
5191 self.logger.critical(
5192 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5193 exc_info=True,
5194 )
5195 finally:
5196 if exc:
5197 db_nslcmop_update[
5198 "detailed-status"
5199 ] = (
5200 detailed_status
5201 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5202 nslcmop_operation_state = "FAILED"
5203 if db_nsr:
5204 self._write_ns_status(
5205 nsr_id=nsr_id,
5206 ns_state=db_nsr[
5207 "nsState"
5208 ], # TODO check if degraded. For the moment use previous status
5209 current_operation="IDLE",
5210 current_operation_id=None,
5211 # error_description=error_description_nsr,
5212 # error_detail=error_detail,
5213 other_update=db_nsr_update,
5214 )
5215
5216 self._write_op_status(
5217 op_id=nslcmop_id,
5218 stage="",
5219 error_message=error_description_nslcmop,
5220 operation_state=nslcmop_operation_state,
5221 other_update=db_nslcmop_update,
5222 )
5223
5224 if nslcmop_operation_state:
5225 try:
5226 await self.msg.aiowrite(
5227 "ns",
5228 "actioned",
5229 {
5230 "nsr_id": nsr_id,
5231 "nslcmop_id": nslcmop_id,
5232 "operationState": nslcmop_operation_state,
5233 },
5234 loop=self.loop,
5235 )
5236 except Exception as e:
5237 self.logger.error(
5238 logging_text + "kafka_write notification Exception {}".format(e)
5239 )
5240 self.logger.debug(logging_text + "Exit")
5241 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
5242 return nslcmop_operation_state, detailed_status
5243
5244 async def scale(self, nsr_id, nslcmop_id):
5245 # Try to lock HA task here
5246 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
5247 if not task_is_locked_by_me:
5248 return
5249
5250 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
5251 stage = ["", "", ""]
5252 tasks_dict_info = {}
5253 # ^ stage, step, VIM progress
5254 self.logger.debug(logging_text + "Enter")
5255 # get all needed from database
5256 db_nsr = None
5257 db_nslcmop_update = {}
5258 db_nsr_update = {}
5259 exc = None
5260 # in case of error, indicates what part of scale was failed to put nsr at error status
5261 scale_process = None
5262 old_operational_status = ""
5263 old_config_status = ""
5264 nsi_id = None
5265 try:
5266 # wait for any previous tasks in process
5267 step = "Waiting for previous operations to terminate"
5268 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
5269 self._write_ns_status(
5270 nsr_id=nsr_id,
5271 ns_state=None,
5272 current_operation="SCALING",
5273 current_operation_id=nslcmop_id,
5274 )
5275
5276 step = "Getting nslcmop from database"
5277 self.logger.debug(
5278 step + " after having waited for previous tasks to be completed"
5279 )
5280 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
5281
5282 step = "Getting nsr from database"
5283 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
5284 old_operational_status = db_nsr["operational-status"]
5285 old_config_status = db_nsr["config-status"]
5286
5287 step = "Parsing scaling parameters"
5288 db_nsr_update["operational-status"] = "scaling"
5289 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5290 nsr_deployed = db_nsr["_admin"].get("deployed")
5291
5292 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
5293 "scaleByStepData"
5294 ]["member-vnf-index"]
5295 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
5296 "scaleByStepData"
5297 ]["scaling-group-descriptor"]
5298 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
5299 # for backward compatibility
5300 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
5301 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
5302 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
5303 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5304
5305 step = "Getting vnfr from database"
5306 db_vnfr = self.db.get_one(
5307 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
5308 )
5309
5310 vca_id = self.get_vca_id(db_vnfr, db_nsr)
5311
5312 step = "Getting vnfd from database"
5313 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
5314
5315 base_folder = db_vnfd["_admin"]["storage"]
5316
5317 step = "Getting scaling-group-descriptor"
5318 scaling_descriptor = find_in_list(
5319 get_scaling_aspect(db_vnfd),
5320 lambda scale_desc: scale_desc["name"] == scaling_group,
5321 )
5322 if not scaling_descriptor:
5323 raise LcmException(
5324 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
5325 "at vnfd:scaling-group-descriptor".format(scaling_group)
5326 )
5327
5328 step = "Sending scale order to VIM"
5329 # TODO check if ns is in a proper status
5330 nb_scale_op = 0
5331 if not db_nsr["_admin"].get("scaling-group"):
5332 self.update_db_2(
5333 "nsrs",
5334 nsr_id,
5335 {
5336 "_admin.scaling-group": [
5337 {"name": scaling_group, "nb-scale-op": 0}
5338 ]
5339 },
5340 )
5341 admin_scale_index = 0
5342 else:
5343 for admin_scale_index, admin_scale_info in enumerate(
5344 db_nsr["_admin"]["scaling-group"]
5345 ):
5346 if admin_scale_info["name"] == scaling_group:
5347 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5348 break
5349 else: # not found, set index one plus last element and add new entry with the name
5350 admin_scale_index += 1
5351 db_nsr_update[
5352 "_admin.scaling-group.{}.name".format(admin_scale_index)
5353 ] = scaling_group
5354
5355 vca_scaling_info = []
5356 scaling_info = {"scaling_group_name": scaling_group, "vdu": [], "kdu": []}
5357 if scaling_type == "SCALE_OUT":
5358 if "aspect-delta-details" not in scaling_descriptor:
5359 raise LcmException(
5360 "Aspect delta details not fount in scaling descriptor {}".format(
5361 scaling_descriptor["name"]
5362 )
5363 )
5364 # count if max-instance-count is reached
5365 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5366
5367 scaling_info["scaling_direction"] = "OUT"
5368 scaling_info["vdu-create"] = {}
5369 scaling_info["kdu-create"] = {}
5370 for delta in deltas:
5371 for vdu_delta in delta.get("vdu-delta", {}):
5372 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5373 # vdu_index also provides the number of instance of the targeted vdu
5374 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5375 cloud_init_text = self._get_vdu_cloud_init_content(
5376 vdud, db_vnfd
5377 )
5378 if cloud_init_text:
5379 additional_params = (
5380 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5381 or {}
5382 )
5383 cloud_init_list = []
5384
5385 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5386 max_instance_count = 10
5387 if vdu_profile and "max-number-of-instances" in vdu_profile:
5388 max_instance_count = vdu_profile.get(
5389 "max-number-of-instances", 10
5390 )
5391
5392 default_instance_num = get_number_of_instances(
5393 db_vnfd, vdud["id"]
5394 )
5395 instances_number = vdu_delta.get("number-of-instances", 1)
5396 nb_scale_op += instances_number
5397
5398 new_instance_count = nb_scale_op + default_instance_num
5399 # Control if new count is over max and vdu count is less than max.
5400 # Then assign new instance count
5401 if new_instance_count > max_instance_count > vdu_count:
5402 instances_number = new_instance_count - max_instance_count
5403 else:
5404 instances_number = instances_number
5405
5406 if new_instance_count > max_instance_count:
5407 raise LcmException(
5408 "reached the limit of {} (max-instance-count) "
5409 "scaling-out operations for the "
5410 "scaling-group-descriptor '{}'".format(
5411 nb_scale_op, scaling_group
5412 )
5413 )
5414 for x in range(vdu_delta.get("number-of-instances", 1)):
5415 if cloud_init_text:
5416 # TODO Information of its own ip is not available because db_vnfr is not updated.
5417 additional_params["OSM"] = get_osm_params(
5418 db_vnfr, vdu_delta["id"], vdu_index + x
5419 )
5420 cloud_init_list.append(
5421 self._parse_cloud_init(
5422 cloud_init_text,
5423 additional_params,
5424 db_vnfd["id"],
5425 vdud["id"],
5426 )
5427 )
5428 vca_scaling_info.append(
5429 {
5430 "osm_vdu_id": vdu_delta["id"],
5431 "member-vnf-index": vnf_index,
5432 "type": "create",
5433 "vdu_index": vdu_index + x,
5434 }
5435 )
5436 scaling_info["vdu-create"][vdu_delta["id"]] = instances_number
5437 for kdu_delta in delta.get("kdu-resource-delta", {}):
5438 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5439 kdu_name = kdu_profile["kdu-name"]
5440 resource_name = kdu_profile["resource-name"]
5441
5442 # Might have different kdus in the same delta
5443 # Should have list for each kdu
5444 if not scaling_info["kdu-create"].get(kdu_name, None):
5445 scaling_info["kdu-create"][kdu_name] = []
5446
5447 kdur = get_kdur(db_vnfr, kdu_name)
5448 if kdur.get("helm-chart"):
5449 k8s_cluster_type = "helm-chart-v3"
5450 self.logger.debug("kdur: {}".format(kdur))
5451 if (
5452 kdur.get("helm-version")
5453 and kdur.get("helm-version") == "v2"
5454 ):
5455 k8s_cluster_type = "helm-chart"
5456 raise NotImplementedError
5457 elif kdur.get("juju-bundle"):
5458 k8s_cluster_type = "juju-bundle"
5459 else:
5460 raise LcmException(
5461 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5462 "juju-bundle. Maybe an old NBI version is running".format(
5463 db_vnfr["member-vnf-index-ref"], kdu_name
5464 )
5465 )
5466
5467 max_instance_count = 10
5468 if kdu_profile and "max-number-of-instances" in kdu_profile:
5469 max_instance_count = kdu_profile.get(
5470 "max-number-of-instances", 10
5471 )
5472
5473 nb_scale_op += kdu_delta.get("number-of-instances", 1)
5474 deployed_kdu, _ = get_deployed_kdu(
5475 nsr_deployed, kdu_name, vnf_index
5476 )
5477 if deployed_kdu is None:
5478 raise LcmException(
5479 "KDU '{}' for vnf '{}' not deployed".format(
5480 kdu_name, vnf_index
5481 )
5482 )
5483 kdu_instance = deployed_kdu.get("kdu-instance")
5484 instance_num = await self.k8scluster_map[
5485 k8s_cluster_type
5486 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5487 kdu_replica_count = instance_num + kdu_delta.get(
5488 "number-of-instances", 1
5489 )
5490
5491 # Control if new count is over max and instance_num is less than max.
5492 # Then assign max instance number to kdu replica count
5493 if kdu_replica_count > max_instance_count > instance_num:
5494 kdu_replica_count = max_instance_count
5495 if kdu_replica_count > max_instance_count:
5496 raise LcmException(
5497 "reached the limit of {} (max-instance-count) "
5498 "scaling-out operations for the "
5499 "scaling-group-descriptor '{}'".format(
5500 instance_num, scaling_group
5501 )
5502 )
5503
5504 for x in range(kdu_delta.get("number-of-instances", 1)):
5505 vca_scaling_info.append(
5506 {
5507 "osm_kdu_id": kdu_name,
5508 "member-vnf-index": vnf_index,
5509 "type": "create",
5510 "kdu_index": instance_num + x - 1,
5511 }
5512 )
5513 scaling_info["kdu-create"][kdu_name].append(
5514 {
5515 "member-vnf-index": vnf_index,
5516 "type": "create",
5517 "k8s-cluster-type": k8s_cluster_type,
5518 "resource-name": resource_name,
5519 "scale": kdu_replica_count,
5520 }
5521 )
5522 elif scaling_type == "SCALE_IN":
5523 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5524
5525 scaling_info["scaling_direction"] = "IN"
5526 scaling_info["vdu-delete"] = {}
5527 scaling_info["kdu-delete"] = {}
5528
5529 for delta in deltas:
5530 for vdu_delta in delta.get("vdu-delta", {}):
5531 vdu_count = vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5532 min_instance_count = 0
5533 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5534 if vdu_profile and "min-number-of-instances" in vdu_profile:
5535 min_instance_count = vdu_profile["min-number-of-instances"]
5536
5537 default_instance_num = get_number_of_instances(
5538 db_vnfd, vdu_delta["id"]
5539 )
5540 instance_num = vdu_delta.get("number-of-instances", 1)
5541 nb_scale_op -= instance_num
5542
5543 new_instance_count = nb_scale_op + default_instance_num
5544
5545 if new_instance_count < min_instance_count < vdu_count:
5546 instances_number = min_instance_count - new_instance_count
5547 else:
5548 instances_number = instance_num
5549
5550 if new_instance_count < min_instance_count:
5551 raise LcmException(
5552 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5553 "scaling-group-descriptor '{}'".format(
5554 nb_scale_op, scaling_group
5555 )
5556 )
5557 for x in range(vdu_delta.get("number-of-instances", 1)):
5558 vca_scaling_info.append(
5559 {
5560 "osm_vdu_id": vdu_delta["id"],
5561 "member-vnf-index": vnf_index,
5562 "type": "delete",
5563 "vdu_index": vdu_index - 1 - x,
5564 }
5565 )
5566 scaling_info["vdu-delete"][vdu_delta["id"]] = instances_number
5567 for kdu_delta in delta.get("kdu-resource-delta", {}):
5568 kdu_profile = get_kdu_resource_profile(db_vnfd, kdu_delta["id"])
5569 kdu_name = kdu_profile["kdu-name"]
5570 resource_name = kdu_profile["resource-name"]
5571
5572 if not scaling_info["kdu-delete"].get(kdu_name, None):
5573 scaling_info["kdu-delete"][kdu_name] = []
5574
5575 kdur = get_kdur(db_vnfr, kdu_name)
5576 if kdur.get("helm-chart"):
5577 k8s_cluster_type = "helm-chart-v3"
5578 self.logger.debug("kdur: {}".format(kdur))
5579 if (
5580 kdur.get("helm-version")
5581 and kdur.get("helm-version") == "v2"
5582 ):
5583 k8s_cluster_type = "helm-chart"
5584 raise NotImplementedError
5585 elif kdur.get("juju-bundle"):
5586 k8s_cluster_type = "juju-bundle"
5587 else:
5588 raise LcmException(
5589 "kdu type for kdu='{}.{}' is neither helm-chart nor "
5590 "juju-bundle. Maybe an old NBI version is running".format(
5591 db_vnfr["member-vnf-index-ref"], kdur["kdu-name"]
5592 )
5593 )
5594
5595 min_instance_count = 0
5596 if kdu_profile and "min-number-of-instances" in kdu_profile:
5597 min_instance_count = kdu_profile["min-number-of-instances"]
5598
5599 nb_scale_op -= kdu_delta.get("number-of-instances", 1)
5600 deployed_kdu, _ = get_deployed_kdu(
5601 nsr_deployed, kdu_name, vnf_index
5602 )
5603 if deployed_kdu is None:
5604 raise LcmException(
5605 "KDU '{}' for vnf '{}' not deployed".format(
5606 kdu_name, vnf_index
5607 )
5608 )
5609 kdu_instance = deployed_kdu.get("kdu-instance")
5610 instance_num = await self.k8scluster_map[
5611 k8s_cluster_type
5612 ].get_scale_count(resource_name, kdu_instance, vca_id=vca_id)
5613 kdu_replica_count = instance_num - kdu_delta.get(
5614 "number-of-instances", 1
5615 )
5616
5617 if kdu_replica_count < min_instance_count < instance_num:
5618 kdu_replica_count = min_instance_count
5619 if kdu_replica_count < min_instance_count:
5620 raise LcmException(
5621 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5622 "scaling-group-descriptor '{}'".format(
5623 instance_num, scaling_group
5624 )
5625 )
5626
5627 for x in range(kdu_delta.get("number-of-instances", 1)):
5628 vca_scaling_info.append(
5629 {
5630 "osm_kdu_id": kdu_name,
5631 "member-vnf-index": vnf_index,
5632 "type": "delete",
5633 "kdu_index": instance_num - x - 1,
5634 }
5635 )
5636 scaling_info["kdu-delete"][kdu_name].append(
5637 {
5638 "member-vnf-index": vnf_index,
5639 "type": "delete",
5640 "k8s-cluster-type": k8s_cluster_type,
5641 "resource-name": resource_name,
5642 "scale": kdu_replica_count,
5643 }
5644 )
5645
5646 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5647 vdu_delete = copy(scaling_info.get("vdu-delete"))
5648 if scaling_info["scaling_direction"] == "IN":
5649 for vdur in reversed(db_vnfr["vdur"]):
5650 if vdu_delete.get(vdur["vdu-id-ref"]):
5651 vdu_delete[vdur["vdu-id-ref"]] -= 1
5652 scaling_info["vdu"].append(
5653 {
5654 "name": vdur.get("name") or vdur.get("vdu-name"),
5655 "vdu_id": vdur["vdu-id-ref"],
5656 "interface": [],
5657 }
5658 )
5659 for interface in vdur["interfaces"]:
5660 scaling_info["vdu"][-1]["interface"].append(
5661 {
5662 "name": interface["name"],
5663 "ip_address": interface["ip-address"],
5664 "mac_address": interface.get("mac-address"),
5665 }
5666 )
5667 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5668
5669 # PRE-SCALE BEGIN
5670 step = "Executing pre-scale vnf-config-primitive"
5671 if scaling_descriptor.get("scaling-config-action"):
5672 for scaling_config_action in scaling_descriptor[
5673 "scaling-config-action"
5674 ]:
5675 if (
5676 scaling_config_action.get("trigger") == "pre-scale-in"
5677 and scaling_type == "SCALE_IN"
5678 ) or (
5679 scaling_config_action.get("trigger") == "pre-scale-out"
5680 and scaling_type == "SCALE_OUT"
5681 ):
5682 vnf_config_primitive = scaling_config_action[
5683 "vnf-config-primitive-name-ref"
5684 ]
5685 step = db_nslcmop_update[
5686 "detailed-status"
5687 ] = "executing pre-scale scaling-config-action '{}'".format(
5688 vnf_config_primitive
5689 )
5690
5691 # look for primitive
5692 for config_primitive in (
5693 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5694 ).get("config-primitive", ()):
5695 if config_primitive["name"] == vnf_config_primitive:
5696 break
5697 else:
5698 raise LcmException(
5699 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5700 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5701 "primitive".format(scaling_group, vnf_config_primitive)
5702 )
5703
5704 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
5705 if db_vnfr.get("additionalParamsForVnf"):
5706 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5707
5708 scale_process = "VCA"
5709 db_nsr_update["config-status"] = "configuring pre-scaling"
5710 primitive_params = self._map_primitive_params(
5711 config_primitive, {}, vnfr_params
5712 )
5713
5714 # Pre-scale retry check: Check if this sub-operation has been executed before
5715 op_index = self._check_or_add_scale_suboperation(
5716 db_nslcmop,
5717 vnf_index,
5718 vnf_config_primitive,
5719 primitive_params,
5720 "PRE-SCALE",
5721 )
5722 if op_index == self.SUBOPERATION_STATUS_SKIP:
5723 # Skip sub-operation
5724 result = "COMPLETED"
5725 result_detail = "Done"
5726 self.logger.debug(
5727 logging_text
5728 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5729 vnf_config_primitive, result, result_detail
5730 )
5731 )
5732 else:
5733 if op_index == self.SUBOPERATION_STATUS_NEW:
5734 # New sub-operation: Get index of this sub-operation
5735 op_index = (
5736 len(db_nslcmop.get("_admin", {}).get("operations"))
5737 - 1
5738 )
5739 self.logger.debug(
5740 logging_text
5741 + "vnf_config_primitive={} New sub-operation".format(
5742 vnf_config_primitive
5743 )
5744 )
5745 else:
5746 # retry: Get registered params for this existing sub-operation
5747 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5748 op_index
5749 ]
5750 vnf_index = op.get("member_vnf_index")
5751 vnf_config_primitive = op.get("primitive")
5752 primitive_params = op.get("primitive_params")
5753 self.logger.debug(
5754 logging_text
5755 + "vnf_config_primitive={} Sub-operation retry".format(
5756 vnf_config_primitive
5757 )
5758 )
5759 # Execute the primitive, either with new (first-time) or registered (reintent) args
5760 ee_descriptor_id = config_primitive.get(
5761 "execution-environment-ref"
5762 )
5763 primitive_name = config_primitive.get(
5764 "execution-environment-primitive", vnf_config_primitive
5765 )
5766 ee_id, vca_type = self._look_for_deployed_vca(
5767 nsr_deployed["VCA"],
5768 member_vnf_index=vnf_index,
5769 vdu_id=None,
5770 vdu_count_index=None,
5771 ee_descriptor_id=ee_descriptor_id,
5772 )
5773 result, result_detail = await self._ns_execute_primitive(
5774 ee_id,
5775 primitive_name,
5776 primitive_params,
5777 vca_type=vca_type,
5778 vca_id=vca_id,
5779 )
5780 self.logger.debug(
5781 logging_text
5782 + "vnf_config_primitive={} Done with result {} {}".format(
5783 vnf_config_primitive, result, result_detail
5784 )
5785 )
5786 # Update operationState = COMPLETED | FAILED
5787 self._update_suboperation_status(
5788 db_nslcmop, op_index, result, result_detail
5789 )
5790
5791 if result == "FAILED":
5792 raise LcmException(result_detail)
5793 db_nsr_update["config-status"] = old_config_status
5794 scale_process = None
5795 # PRE-SCALE END
5796
5797 db_nsr_update[
5798 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5799 ] = nb_scale_op
5800 db_nsr_update[
5801 "_admin.scaling-group.{}.time".format(admin_scale_index)
5802 ] = time()
5803
5804 # SCALE-IN VCA - BEGIN
5805 if vca_scaling_info:
5806 step = db_nslcmop_update[
5807 "detailed-status"
5808 ] = "Deleting the execution environments"
5809 scale_process = "VCA"
5810 for vca_info in vca_scaling_info:
5811 if vca_info["type"] == "delete":
5812 member_vnf_index = str(vca_info["member-vnf-index"])
5813 self.logger.debug(
5814 logging_text + "vdu info: {}".format(vca_info)
5815 )
5816 if vca_info.get("osm_vdu_id"):
5817 vdu_id = vca_info["osm_vdu_id"]
5818 vdu_index = int(vca_info["vdu_index"])
5819 stage[
5820 1
5821 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5822 member_vnf_index, vdu_id, vdu_index
5823 )
5824 else:
5825 vdu_index = 0
5826 kdu_id = vca_info["osm_kdu_id"]
5827 stage[
5828 1
5829 ] = "Scaling member_vnf_index={}, kdu_id={}, vdu_index={} ".format(
5830 member_vnf_index, kdu_id, vdu_index
5831 )
5832 stage[2] = step = "Scaling in VCA"
5833 self._write_op_status(op_id=nslcmop_id, stage=stage)
5834 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5835 config_update = db_nsr["configurationStatus"]
5836 for vca_index, vca in enumerate(vca_update):
5837 if (
5838 (vca or vca.get("ee_id"))
5839 and vca["member-vnf-index"] == member_vnf_index
5840 and vca["vdu_count_index"] == vdu_index
5841 ):
5842 if vca.get("vdu_id"):
5843 config_descriptor = get_configuration(
5844 db_vnfd, vca.get("vdu_id")
5845 )
5846 elif vca.get("kdu_name"):
5847 config_descriptor = get_configuration(
5848 db_vnfd, vca.get("kdu_name")
5849 )
5850 else:
5851 config_descriptor = get_configuration(
5852 db_vnfd, db_vnfd["id"]
5853 )
5854 operation_params = (
5855 db_nslcmop.get("operationParams") or {}
5856 )
5857 exec_terminate_primitives = not operation_params.get(
5858 "skip_terminate_primitives"
5859 ) and vca.get("needed_terminate")
5860 task = asyncio.ensure_future(
5861 asyncio.wait_for(
5862 self.destroy_N2VC(
5863 logging_text,
5864 db_nslcmop,
5865 vca,
5866 config_descriptor,
5867 vca_index,
5868 destroy_ee=True,
5869 exec_primitives=exec_terminate_primitives,
5870 scaling_in=True,
5871 vca_id=vca_id,
5872 ),
5873 timeout=self.timeout_charm_delete,
5874 )
5875 )
5876 tasks_dict_info[task] = "Terminating VCA {}".format(
5877 vca.get("ee_id")
5878 )
5879 del vca_update[vca_index]
5880 del config_update[vca_index]
5881 # wait for pending tasks of terminate primitives
5882 if tasks_dict_info:
5883 self.logger.debug(
5884 logging_text
5885 + "Waiting for tasks {}".format(
5886 list(tasks_dict_info.keys())
5887 )
5888 )
5889 error_list = await self._wait_for_tasks(
5890 logging_text,
5891 tasks_dict_info,
5892 min(
5893 self.timeout_charm_delete, self.timeout_ns_terminate
5894 ),
5895 stage,
5896 nslcmop_id,
5897 )
5898 tasks_dict_info.clear()
5899 if error_list:
5900 raise LcmException("; ".join(error_list))
5901
5902 db_vca_and_config_update = {
5903 "_admin.deployed.VCA": vca_update,
5904 "configurationStatus": config_update,
5905 }
5906 self.update_db_2(
5907 "nsrs", db_nsr["_id"], db_vca_and_config_update
5908 )
5909 scale_process = None
5910 # SCALE-IN VCA - END
5911
5912 # SCALE RO - BEGIN
5913 if scaling_info.get("vdu-create") or scaling_info.get("vdu-delete"):
5914 scale_process = "RO"
5915 if self.ro_config.get("ng"):
5916 await self._scale_ng_ro(
5917 logging_text, db_nsr, db_nslcmop, db_vnfr, scaling_info, stage
5918 )
5919 scaling_info.pop("vdu-create", None)
5920 scaling_info.pop("vdu-delete", None)
5921
5922 scale_process = None
5923 # SCALE RO - END
5924
5925 # SCALE KDU - BEGIN
5926 if scaling_info.get("kdu-create") or scaling_info.get("kdu-delete"):
5927 scale_process = "KDU"
5928 await self._scale_kdu(
5929 logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
5930 )
5931 scaling_info.pop("kdu-create", None)
5932 scaling_info.pop("kdu-delete", None)
5933
5934 scale_process = None
5935 # SCALE KDU - END
5936
5937 if db_nsr_update:
5938 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5939
5940 # SCALE-UP VCA - BEGIN
5941 if vca_scaling_info:
5942 step = db_nslcmop_update[
5943 "detailed-status"
5944 ] = "Creating new execution environments"
5945 scale_process = "VCA"
5946 for vca_info in vca_scaling_info:
5947 if vca_info["type"] == "create":
5948 member_vnf_index = str(vca_info["member-vnf-index"])
5949 self.logger.debug(
5950 logging_text + "vdu info: {}".format(vca_info)
5951 )
5952 vnfd_id = db_vnfr["vnfd-ref"]
5953 if vca_info.get("osm_vdu_id"):
5954 vdu_index = int(vca_info["vdu_index"])
5955 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5956 if db_vnfr.get("additionalParamsForVnf"):
5957 deploy_params.update(
5958 parse_yaml_strings(
5959 db_vnfr["additionalParamsForVnf"].copy()
5960 )
5961 )
5962 descriptor_config = get_configuration(
5963 db_vnfd, db_vnfd["id"]
5964 )
5965 if descriptor_config:
5966 vdu_id = None
5967 vdu_name = None
5968 kdu_name = None
5969 self._deploy_n2vc(
5970 logging_text=logging_text
5971 + "member_vnf_index={} ".format(member_vnf_index),
5972 db_nsr=db_nsr,
5973 db_vnfr=db_vnfr,
5974 nslcmop_id=nslcmop_id,
5975 nsr_id=nsr_id,
5976 nsi_id=nsi_id,
5977 vnfd_id=vnfd_id,
5978 vdu_id=vdu_id,
5979 kdu_name=kdu_name,
5980 member_vnf_index=member_vnf_index,
5981 vdu_index=vdu_index,
5982 vdu_name=vdu_name,
5983 deploy_params=deploy_params,
5984 descriptor_config=descriptor_config,
5985 base_folder=base_folder,
5986 task_instantiation_info=tasks_dict_info,
5987 stage=stage,
5988 )
5989 vdu_id = vca_info["osm_vdu_id"]
5990 vdur = find_in_list(
5991 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5992 )
5993 descriptor_config = get_configuration(db_vnfd, vdu_id)
5994 if vdur.get("additionalParams"):
5995 deploy_params_vdu = parse_yaml_strings(
5996 vdur["additionalParams"]
5997 )
5998 else:
5999 deploy_params_vdu = deploy_params
6000 deploy_params_vdu["OSM"] = get_osm_params(
6001 db_vnfr, vdu_id, vdu_count_index=vdu_index
6002 )
6003 if descriptor_config:
6004 vdu_name = None
6005 kdu_name = None
6006 stage[
6007 1
6008 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6009 member_vnf_index, vdu_id, vdu_index
6010 )
6011 stage[2] = step = "Scaling out VCA"
6012 self._write_op_status(op_id=nslcmop_id, stage=stage)
6013 self._deploy_n2vc(
6014 logging_text=logging_text
6015 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
6016 member_vnf_index, vdu_id, vdu_index
6017 ),
6018 db_nsr=db_nsr,
6019 db_vnfr=db_vnfr,
6020 nslcmop_id=nslcmop_id,
6021 nsr_id=nsr_id,
6022 nsi_id=nsi_id,
6023 vnfd_id=vnfd_id,
6024 vdu_id=vdu_id,
6025 kdu_name=kdu_name,
6026 member_vnf_index=member_vnf_index,
6027 vdu_index=vdu_index,
6028 vdu_name=vdu_name,
6029 deploy_params=deploy_params_vdu,
6030 descriptor_config=descriptor_config,
6031 base_folder=base_folder,
6032 task_instantiation_info=tasks_dict_info,
6033 stage=stage,
6034 )
6035 else:
6036 kdu_name = vca_info["osm_kdu_id"]
6037 descriptor_config = get_configuration(db_vnfd, kdu_name)
6038 if descriptor_config:
6039 vdu_id = None
6040 kdu_index = int(vca_info["kdu_index"])
6041 vdu_name = None
6042 kdur = next(
6043 x
6044 for x in db_vnfr["kdur"]
6045 if x["kdu-name"] == kdu_name
6046 )
6047 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
6048 if kdur.get("additionalParams"):
6049 deploy_params_kdu = parse_yaml_strings(
6050 kdur["additionalParams"]
6051 )
6052
6053 self._deploy_n2vc(
6054 logging_text=logging_text,
6055 db_nsr=db_nsr,
6056 db_vnfr=db_vnfr,
6057 nslcmop_id=nslcmop_id,
6058 nsr_id=nsr_id,
6059 nsi_id=nsi_id,
6060 vnfd_id=vnfd_id,
6061 vdu_id=vdu_id,
6062 kdu_name=kdu_name,
6063 member_vnf_index=member_vnf_index,
6064 vdu_index=kdu_index,
6065 vdu_name=vdu_name,
6066 deploy_params=deploy_params_kdu,
6067 descriptor_config=descriptor_config,
6068 base_folder=base_folder,
6069 task_instantiation_info=tasks_dict_info,
6070 stage=stage,
6071 )
6072 # SCALE-UP VCA - END
6073 scale_process = None
6074
6075 # POST-SCALE BEGIN
6076 # execute primitive service POST-SCALING
6077 step = "Executing post-scale vnf-config-primitive"
6078 if scaling_descriptor.get("scaling-config-action"):
6079 for scaling_config_action in scaling_descriptor[
6080 "scaling-config-action"
6081 ]:
6082 if (
6083 scaling_config_action.get("trigger") == "post-scale-in"
6084 and scaling_type == "SCALE_IN"
6085 ) or (
6086 scaling_config_action.get("trigger") == "post-scale-out"
6087 and scaling_type == "SCALE_OUT"
6088 ):
6089 vnf_config_primitive = scaling_config_action[
6090 "vnf-config-primitive-name-ref"
6091 ]
6092 step = db_nslcmop_update[
6093 "detailed-status"
6094 ] = "executing post-scale scaling-config-action '{}'".format(
6095 vnf_config_primitive
6096 )
6097
6098 vnfr_params = {"VDU_SCALE_INFO": scaling_info}
6099 if db_vnfr.get("additionalParamsForVnf"):
6100 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
6101
6102 # look for primitive
6103 for config_primitive in (
6104 get_configuration(db_vnfd, db_vnfd["id"]) or {}
6105 ).get("config-primitive", ()):
6106 if config_primitive["name"] == vnf_config_primitive:
6107 break
6108 else:
6109 raise LcmException(
6110 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
6111 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
6112 "config-primitive".format(
6113 scaling_group, vnf_config_primitive
6114 )
6115 )
6116 scale_process = "VCA"
6117 db_nsr_update["config-status"] = "configuring post-scaling"
6118 primitive_params = self._map_primitive_params(
6119 config_primitive, {}, vnfr_params
6120 )
6121
6122 # Post-scale retry check: Check if this sub-operation has been executed before
6123 op_index = self._check_or_add_scale_suboperation(
6124 db_nslcmop,
6125 vnf_index,
6126 vnf_config_primitive,
6127 primitive_params,
6128 "POST-SCALE",
6129 )
6130 if op_index == self.SUBOPERATION_STATUS_SKIP:
6131 # Skip sub-operation
6132 result = "COMPLETED"
6133 result_detail = "Done"
6134 self.logger.debug(
6135 logging_text
6136 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
6137 vnf_config_primitive, result, result_detail
6138 )
6139 )
6140 else:
6141 if op_index == self.SUBOPERATION_STATUS_NEW:
6142 # New sub-operation: Get index of this sub-operation
6143 op_index = (
6144 len(db_nslcmop.get("_admin", {}).get("operations"))
6145 - 1
6146 )
6147 self.logger.debug(
6148 logging_text
6149 + "vnf_config_primitive={} New sub-operation".format(
6150 vnf_config_primitive
6151 )
6152 )
6153 else:
6154 # retry: Get registered params for this existing sub-operation
6155 op = db_nslcmop.get("_admin", {}).get("operations", [])[
6156 op_index
6157 ]
6158 vnf_index = op.get("member_vnf_index")
6159 vnf_config_primitive = op.get("primitive")
6160 primitive_params = op.get("primitive_params")
6161 self.logger.debug(
6162 logging_text
6163 + "vnf_config_primitive={} Sub-operation retry".format(
6164 vnf_config_primitive
6165 )
6166 )
6167 # Execute the primitive, either with new (first-time) or registered (reintent) args
6168 ee_descriptor_id = config_primitive.get(
6169 "execution-environment-ref"
6170 )
6171 primitive_name = config_primitive.get(
6172 "execution-environment-primitive", vnf_config_primitive
6173 )
6174 ee_id, vca_type = self._look_for_deployed_vca(
6175 nsr_deployed["VCA"],
6176 member_vnf_index=vnf_index,
6177 vdu_id=None,
6178 vdu_count_index=None,
6179 ee_descriptor_id=ee_descriptor_id,
6180 )
6181 result, result_detail = await self._ns_execute_primitive(
6182 ee_id,
6183 primitive_name,
6184 primitive_params,
6185 vca_type=vca_type,
6186 vca_id=vca_id,
6187 )
6188 self.logger.debug(
6189 logging_text
6190 + "vnf_config_primitive={} Done with result {} {}".format(
6191 vnf_config_primitive, result, result_detail
6192 )
6193 )
6194 # Update operationState = COMPLETED | FAILED
6195 self._update_suboperation_status(
6196 db_nslcmop, op_index, result, result_detail
6197 )
6198
6199 if result == "FAILED":
6200 raise LcmException(result_detail)
6201 db_nsr_update["config-status"] = old_config_status
6202 scale_process = None
6203 # POST-SCALE END
6204
6205 db_nsr_update[
6206 "detailed-status"
6207 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
6208 db_nsr_update["operational-status"] = (
6209 "running"
6210 if old_operational_status == "failed"
6211 else old_operational_status
6212 )
6213 db_nsr_update["config-status"] = old_config_status
6214 return
6215 except (
6216 ROclient.ROClientException,
6217 DbException,
6218 LcmException,
6219 NgRoException,
6220 ) as e:
6221 self.logger.error(logging_text + "Exit Exception {}".format(e))
6222 exc = e
6223 except asyncio.CancelledError:
6224 self.logger.error(
6225 logging_text + "Cancelled Exception while '{}'".format(step)
6226 )
6227 exc = "Operation was cancelled"
6228 except Exception as e:
6229 exc = traceback.format_exc()
6230 self.logger.critical(
6231 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
6232 exc_info=True,
6233 )
6234 finally:
6235 self._write_ns_status(
6236 nsr_id=nsr_id,
6237 ns_state=None,
6238 current_operation="IDLE",
6239 current_operation_id=None,
6240 )
6241 if tasks_dict_info:
6242 stage[1] = "Waiting for instantiate pending tasks."
6243 self.logger.debug(logging_text + stage[1])
6244 exc = await self._wait_for_tasks(
6245 logging_text,
6246 tasks_dict_info,
6247 self.timeout_ns_deploy,
6248 stage,
6249 nslcmop_id,
6250 nsr_id=nsr_id,
6251 )
6252 if exc:
6253 db_nslcmop_update[
6254 "detailed-status"
6255 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
6256 nslcmop_operation_state = "FAILED"
6257 if db_nsr:
6258 db_nsr_update["operational-status"] = old_operational_status
6259 db_nsr_update["config-status"] = old_config_status
6260 db_nsr_update["detailed-status"] = ""
6261 if scale_process:
6262 if "VCA" in scale_process:
6263 db_nsr_update["config-status"] = "failed"
6264 if "RO" in scale_process:
6265 db_nsr_update["operational-status"] = "failed"
6266 db_nsr_update[
6267 "detailed-status"
6268 ] = "FAILED scaling nslcmop={} {}: {}".format(
6269 nslcmop_id, step, exc
6270 )
6271 else:
6272 error_description_nslcmop = None
6273 nslcmop_operation_state = "COMPLETED"
6274 db_nslcmop_update["detailed-status"] = "Done"
6275
6276 self._write_op_status(
6277 op_id=nslcmop_id,
6278 stage="",
6279 error_message=error_description_nslcmop,
6280 operation_state=nslcmop_operation_state,
6281 other_update=db_nslcmop_update,
6282 )
6283 if db_nsr:
6284 self._write_ns_status(
6285 nsr_id=nsr_id,
6286 ns_state=None,
6287 current_operation="IDLE",
6288 current_operation_id=None,
6289 other_update=db_nsr_update,
6290 )
6291
6292 if nslcmop_operation_state:
6293 try:
6294 msg = {
6295 "nsr_id": nsr_id,
6296 "nslcmop_id": nslcmop_id,
6297 "operationState": nslcmop_operation_state,
6298 }
6299 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
6300 except Exception as e:
6301 self.logger.error(
6302 logging_text + "kafka_write notification Exception {}".format(e)
6303 )
6304 self.logger.debug(logging_text + "Exit")
6305 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
6306
6307 async def _scale_kdu(
6308 self, logging_text, nsr_id, nsr_deployed, db_vnfd, vca_id, scaling_info
6309 ):
6310 _scaling_info = scaling_info.get("kdu-create") or scaling_info.get("kdu-delete")
6311 for kdu_name in _scaling_info:
6312 for kdu_scaling_info in _scaling_info[kdu_name]:
6313 deployed_kdu, index = get_deployed_kdu(
6314 nsr_deployed, kdu_name, kdu_scaling_info["member-vnf-index"]
6315 )
6316 cluster_uuid = deployed_kdu["k8scluster-uuid"]
6317 kdu_instance = deployed_kdu["kdu-instance"]
6318 scale = int(kdu_scaling_info["scale"])
6319 k8s_cluster_type = kdu_scaling_info["k8s-cluster-type"]
6320
6321 db_dict = {
6322 "collection": "nsrs",
6323 "filter": {"_id": nsr_id},
6324 "path": "_admin.deployed.K8s.{}".format(index),
6325 }
6326
6327 step = "scaling application {}".format(
6328 kdu_scaling_info["resource-name"]
6329 )
6330 self.logger.debug(logging_text + step)
6331
6332 if kdu_scaling_info["type"] == "delete":
6333 kdu_config = get_configuration(db_vnfd, kdu_name)
6334 if (
6335 kdu_config
6336 and kdu_config.get("terminate-config-primitive")
6337 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6338 ):
6339 terminate_config_primitive_list = kdu_config.get(
6340 "terminate-config-primitive"
6341 )
6342 terminate_config_primitive_list.sort(
6343 key=lambda val: int(val["seq"])
6344 )
6345
6346 for (
6347 terminate_config_primitive
6348 ) in terminate_config_primitive_list:
6349 primitive_params_ = self._map_primitive_params(
6350 terminate_config_primitive, {}, {}
6351 )
6352 step = "execute terminate config primitive"
6353 self.logger.debug(logging_text + step)
6354 await asyncio.wait_for(
6355 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6356 cluster_uuid=cluster_uuid,
6357 kdu_instance=kdu_instance,
6358 primitive_name=terminate_config_primitive["name"],
6359 params=primitive_params_,
6360 db_dict=db_dict,
6361 vca_id=vca_id,
6362 ),
6363 timeout=600,
6364 )
6365
6366 await asyncio.wait_for(
6367 self.k8scluster_map[k8s_cluster_type].scale(
6368 kdu_instance,
6369 scale,
6370 kdu_scaling_info["resource-name"],
6371 vca_id=vca_id,
6372 ),
6373 timeout=self.timeout_vca_on_error,
6374 )
6375
6376 if kdu_scaling_info["type"] == "create":
6377 kdu_config = get_configuration(db_vnfd, kdu_name)
6378 if (
6379 kdu_config
6380 and kdu_config.get("initial-config-primitive")
6381 and get_juju_ee_ref(db_vnfd, kdu_name) is None
6382 ):
6383 initial_config_primitive_list = kdu_config.get(
6384 "initial-config-primitive"
6385 )
6386 initial_config_primitive_list.sort(
6387 key=lambda val: int(val["seq"])
6388 )
6389
6390 for initial_config_primitive in initial_config_primitive_list:
6391 primitive_params_ = self._map_primitive_params(
6392 initial_config_primitive, {}, {}
6393 )
6394 step = "execute initial config primitive"
6395 self.logger.debug(logging_text + step)
6396 await asyncio.wait_for(
6397 self.k8scluster_map[k8s_cluster_type].exec_primitive(
6398 cluster_uuid=cluster_uuid,
6399 kdu_instance=kdu_instance,
6400 primitive_name=initial_config_primitive["name"],
6401 params=primitive_params_,
6402 db_dict=db_dict,
6403 vca_id=vca_id,
6404 ),
6405 timeout=600,
6406 )
6407
6408 async def _scale_ng_ro(
6409 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
6410 ):
6411 nsr_id = db_nslcmop["nsInstanceId"]
6412 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
6413 db_vnfrs = {}
6414
6415 # read from db: vnfd's for every vnf
6416 db_vnfds = []
6417
6418 # for each vnf in ns, read vnfd
6419 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
6420 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
6421 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
6422 # if we haven't this vnfd, read it from db
6423 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
6424 # read from db
6425 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
6426 db_vnfds.append(vnfd)
6427 n2vc_key = self.n2vc.get_public_key()
6428 n2vc_key_list = [n2vc_key]
6429 self.scale_vnfr(
6430 db_vnfr,
6431 vdu_scaling_info.get("vdu-create"),
6432 vdu_scaling_info.get("vdu-delete"),
6433 mark_delete=True,
6434 )
6435 # db_vnfr has been updated, update db_vnfrs to use it
6436 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
6437 await self._instantiate_ng_ro(
6438 logging_text,
6439 nsr_id,
6440 db_nsd,
6441 db_nsr,
6442 db_nslcmop,
6443 db_vnfrs,
6444 db_vnfds,
6445 n2vc_key_list,
6446 stage=stage,
6447 start_deploy=time(),
6448 timeout_ns_deploy=self.timeout_ns_deploy,
6449 )
6450 if vdu_scaling_info.get("vdu-delete"):
6451 self.scale_vnfr(
6452 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
6453 )
6454
6455 async def extract_prometheus_scrape_jobs(
6456 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
6457 ):
6458 # look if exist a file called 'prometheus*.j2' and
6459 artifact_content = self.fs.dir_ls(artifact_path)
6460 job_file = next(
6461 (
6462 f
6463 for f in artifact_content
6464 if f.startswith("prometheus") and f.endswith(".j2")
6465 ),
6466 None,
6467 )
6468 if not job_file:
6469 return
6470 with self.fs.file_open((artifact_path, job_file), "r") as f:
6471 job_data = f.read()
6472
6473 # TODO get_service
6474 _, _, service = ee_id.partition(".") # remove prefix "namespace."
6475 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
6476 host_port = "80"
6477 vnfr_id = vnfr_id.replace("-", "")
6478 variables = {
6479 "JOB_NAME": vnfr_id,
6480 "TARGET_IP": target_ip,
6481 "EXPORTER_POD_IP": host_name,
6482 "EXPORTER_POD_PORT": host_port,
6483 }
6484 job_list = parse_job(job_data, variables)
6485 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
6486 for job in job_list:
6487 if (
6488 not isinstance(job.get("job_name"), str)
6489 or vnfr_id not in job["job_name"]
6490 ):
6491 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
6492 job["nsr_id"] = nsr_id
6493 job["vnfr_id"] = vnfr_id
6494 return job_list
6495
6496 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6497 """
6498 Get VCA Cloud and VCA Cloud Credentials for the VIM account
6499
6500 :param: vim_account_id: VIM Account ID
6501
6502 :return: (cloud_name, cloud_credential)
6503 """
6504 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6505 return config.get("vca_cloud"), config.get("vca_cloud_credential")
6506
6507 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
6508 """
6509 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
6510
6511 :param: vim_account_id: VIM Account ID
6512
6513 :return: (cloud_name, cloud_credential)
6514 """
6515 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
6516 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")