Reformat LCM to standardized format
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import (
26 Environment,
27 TemplateError,
28 TemplateNotFound,
29 StrictUndefined,
30 UndefinedError,
31 )
32
33 from osm_lcm import ROclient
34 from osm_lcm.ng_ro import NgRoClient, NgRoException
35 from osm_lcm.lcm_utils import (
36 LcmException,
37 LcmExceptionNoMgmtIP,
38 LcmBase,
39 deep_get,
40 get_iterable,
41 populate_dict,
42 )
43 from osm_lcm.data_utils.nsd import get_vnf_profiles
44 from osm_lcm.data_utils.vnfd import (
45 get_vdu_list,
46 get_vdu_profile,
47 get_ee_sorted_initial_config_primitive_list,
48 get_ee_sorted_terminate_config_primitive_list,
49 get_kdu_list,
50 get_virtual_link_profiles,
51 get_vdu,
52 get_configuration,
53 get_vdu_index,
54 get_scaling_aspect,
55 get_number_of_instances,
56 get_juju_ee_ref,
57 )
58 from osm_lcm.data_utils.list_utils import find_in_list
59 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index
60 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
61 from osm_lcm.data_utils.database.vim_account import VimAccountDB
62 from n2vc.k8s_helm_conn import K8sHelmConnector
63 from n2vc.k8s_helm3_conn import K8sHelm3Connector
64 from n2vc.k8s_juju_conn import K8sJujuConnector
65
66 from osm_common.dbbase import DbException
67 from osm_common.fsbase import FsException
68
69 from osm_lcm.data_utils.database.database import Database
70 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
71
72 from n2vc.n2vc_juju_conn import N2VCJujuConnector
73 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
74
75 from osm_lcm.lcm_helm_conn import LCMHelmConn
76
77 from copy import copy, deepcopy
78 from time import time
79 from uuid import uuid4
80
81 from random import randint
82
83 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
84
85
86 class NsLcm(LcmBase):
87 timeout_vca_on_error = (
88 5 * 60
89 ) # Time for charm from first time at blocked,error status to mark as failed
90 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
91 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
92 timeout_charm_delete = 10 * 60
93 timeout_primitive = 30 * 60 # timeout for primitive execution
94 timeout_progress_primitive = (
95 10 * 60
96 ) # timeout for some progress in a primitive execution
97
98 SUBOPERATION_STATUS_NOT_FOUND = -1
99 SUBOPERATION_STATUS_NEW = -2
100 SUBOPERATION_STATUS_SKIP = -3
101 task_name_deploy_vca = "Deploying VCA"
102
103 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
104 """
105 Init, Connect to database, filesystem storage, and messaging
106 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
107 :return: None
108 """
109 super().__init__(msg=msg, logger=logging.getLogger("lcm.ns"))
110
111 self.db = Database().instance.db
112 self.fs = Filesystem().instance.fs
113 self.loop = loop
114 self.lcm_tasks = lcm_tasks
115 self.timeout = config["timeout"]
116 self.ro_config = config["ro_config"]
117 self.ng_ro = config["ro_config"].get("ng")
118 self.vca_config = config["VCA"].copy()
119
120 # create N2VC connector
121 self.n2vc = N2VCJujuConnector(
122 log=self.logger,
123 loop=self.loop,
124 on_update_db=self._on_update_n2vc_db,
125 fs=self.fs,
126 db=self.db,
127 )
128
129 self.conn_helm_ee = LCMHelmConn(
130 log=self.logger,
131 loop=self.loop,
132 vca_config=self.vca_config,
133 on_update_db=self._on_update_n2vc_db,
134 )
135
136 self.k8sclusterhelm2 = K8sHelmConnector(
137 kubectl_command=self.vca_config.get("kubectlpath"),
138 helm_command=self.vca_config.get("helmpath"),
139 log=self.logger,
140 on_update_db=None,
141 fs=self.fs,
142 db=self.db,
143 )
144
145 self.k8sclusterhelm3 = K8sHelm3Connector(
146 kubectl_command=self.vca_config.get("kubectlpath"),
147 helm_command=self.vca_config.get("helm3path"),
148 fs=self.fs,
149 log=self.logger,
150 db=self.db,
151 on_update_db=None,
152 )
153
154 self.k8sclusterjuju = K8sJujuConnector(
155 kubectl_command=self.vca_config.get("kubectlpath"),
156 juju_command=self.vca_config.get("jujupath"),
157 log=self.logger,
158 loop=self.loop,
159 on_update_db=self._on_update_k8s_db,
160 fs=self.fs,
161 db=self.db,
162 )
163
164 self.k8scluster_map = {
165 "helm-chart": self.k8sclusterhelm2,
166 "helm-chart-v3": self.k8sclusterhelm3,
167 "chart": self.k8sclusterhelm3,
168 "juju-bundle": self.k8sclusterjuju,
169 "juju": self.k8sclusterjuju,
170 }
171
172 self.vca_map = {
173 "lxc_proxy_charm": self.n2vc,
174 "native_charm": self.n2vc,
175 "k8s_proxy_charm": self.n2vc,
176 "helm": self.conn_helm_ee,
177 "helm-v3": self.conn_helm_ee,
178 }
179
180 self.prometheus = prometheus
181
182 # create RO client
183 self.RO = NgRoClient(self.loop, **self.ro_config)
184
185 @staticmethod
186 def increment_ip_mac(ip_mac, vm_index=1):
187 if not isinstance(ip_mac, str):
188 return ip_mac
189 try:
190 # try with ipv4 look for last dot
191 i = ip_mac.rfind(".")
192 if i > 0:
193 i += 1
194 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
195 # try with ipv6 or mac look for last colon. Operate in hex
196 i = ip_mac.rfind(":")
197 if i > 0:
198 i += 1
199 # format in hex, len can be 2 for mac or 4 for ipv6
200 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(
201 ip_mac[:i], int(ip_mac[i:], 16) + vm_index
202 )
203 except Exception:
204 pass
205 return None
206
207 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
208
209 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
210
211 try:
212 # TODO filter RO descriptor fields...
213
214 # write to database
215 db_dict = dict()
216 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
217 db_dict["deploymentStatus"] = ro_descriptor
218 self.update_db_2("nsrs", nsrs_id, db_dict)
219
220 except Exception as e:
221 self.logger.warn(
222 "Cannot write database RO deployment for ns={} -> {}".format(nsrs_id, e)
223 )
224
225 async def _on_update_n2vc_db(self, table, filter, path, updated_data, vca_id=None):
226
227 # remove last dot from path (if exists)
228 if path.endswith("."):
229 path = path[:-1]
230
231 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
232 # .format(table, filter, path, updated_data))
233 try:
234
235 nsr_id = filter.get("_id")
236
237 # read ns record from database
238 nsr = self.db.get_one(table="nsrs", q_filter=filter)
239 current_ns_status = nsr.get("nsState")
240
241 # get vca status for NS
242 status_dict = await self.n2vc.get_status(
243 namespace="." + nsr_id, yaml_format=False, vca_id=vca_id
244 )
245
246 # vcaStatus
247 db_dict = dict()
248 db_dict["vcaStatus"] = status_dict
249 await self.n2vc.update_vca_status(db_dict["vcaStatus"], vca_id=vca_id)
250
251 # update configurationStatus for this VCA
252 try:
253 vca_index = int(path[path.rfind(".") + 1 :])
254
255 vca_list = deep_get(
256 target_dict=nsr, key_list=("_admin", "deployed", "VCA")
257 )
258 vca_status = vca_list[vca_index].get("status")
259
260 configuration_status_list = nsr.get("configurationStatus")
261 config_status = configuration_status_list[vca_index].get("status")
262
263 if config_status == "BROKEN" and vca_status != "failed":
264 db_dict["configurationStatus"][vca_index] = "READY"
265 elif config_status != "BROKEN" and vca_status == "failed":
266 db_dict["configurationStatus"][vca_index] = "BROKEN"
267 except Exception as e:
268 # not update configurationStatus
269 self.logger.debug("Error updating vca_index (ignore): {}".format(e))
270
271 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
272 # if nsState = 'DEGRADED' check if all is OK
273 is_degraded = False
274 if current_ns_status in ("READY", "DEGRADED"):
275 error_description = ""
276 # check machines
277 if status_dict.get("machines"):
278 for machine_id in status_dict.get("machines"):
279 machine = status_dict.get("machines").get(machine_id)
280 # check machine agent-status
281 if machine.get("agent-status"):
282 s = machine.get("agent-status").get("status")
283 if s != "started":
284 is_degraded = True
285 error_description += (
286 "machine {} agent-status={} ; ".format(
287 machine_id, s
288 )
289 )
290 # check machine instance status
291 if machine.get("instance-status"):
292 s = machine.get("instance-status").get("status")
293 if s != "running":
294 is_degraded = True
295 error_description += (
296 "machine {} instance-status={} ; ".format(
297 machine_id, s
298 )
299 )
300 # check applications
301 if status_dict.get("applications"):
302 for app_id in status_dict.get("applications"):
303 app = status_dict.get("applications").get(app_id)
304 # check application status
305 if app.get("status"):
306 s = app.get("status").get("status")
307 if s != "active":
308 is_degraded = True
309 error_description += (
310 "application {} status={} ; ".format(app_id, s)
311 )
312
313 if error_description:
314 db_dict["errorDescription"] = error_description
315 if current_ns_status == "READY" and is_degraded:
316 db_dict["nsState"] = "DEGRADED"
317 if current_ns_status == "DEGRADED" and not is_degraded:
318 db_dict["nsState"] = "READY"
319
320 # write to database
321 self.update_db_2("nsrs", nsr_id, db_dict)
322
323 except (asyncio.CancelledError, asyncio.TimeoutError):
324 raise
325 except Exception as e:
326 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
327
328 async def _on_update_k8s_db(
329 self, cluster_uuid, kdu_instance, filter=None, vca_id=None
330 ):
331 """
332 Updating vca status in NSR record
333 :param cluster_uuid: UUID of a k8s cluster
334 :param kdu_instance: The unique name of the KDU instance
335 :param filter: To get nsr_id
336 :return: none
337 """
338
339 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
340 # .format(cluster_uuid, kdu_instance, filter))
341
342 try:
343 nsr_id = filter.get("_id")
344
345 # get vca status for NS
346 vca_status = await self.k8sclusterjuju.status_kdu(
347 cluster_uuid,
348 kdu_instance,
349 complete_status=True,
350 yaml_format=False,
351 vca_id=vca_id,
352 )
353 # vcaStatus
354 db_dict = dict()
355 db_dict["vcaStatus"] = {nsr_id: vca_status}
356
357 await self.k8sclusterjuju.update_vca_status(
358 db_dict["vcaStatus"],
359 kdu_instance,
360 vca_id=vca_id,
361 )
362
363 # write to database
364 self.update_db_2("nsrs", nsr_id, db_dict)
365
366 except (asyncio.CancelledError, asyncio.TimeoutError):
367 raise
368 except Exception as e:
369 self.logger.warn("Error updating NS state for ns={}: {}".format(nsr_id, e))
370
371 @staticmethod
372 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
373 try:
374 env = Environment(undefined=StrictUndefined)
375 template = env.from_string(cloud_init_text)
376 return template.render(additional_params or {})
377 except UndefinedError as e:
378 raise LcmException(
379 "Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
380 "file, must be provided in the instantiation parameters inside the "
381 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id)
382 )
383 except (TemplateError, TemplateNotFound) as e:
384 raise LcmException(
385 "Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".format(
386 vnfd_id, vdu_id, e
387 )
388 )
389
390 def _get_vdu_cloud_init_content(self, vdu, vnfd):
391 cloud_init_content = cloud_init_file = None
392 try:
393 if vdu.get("cloud-init-file"):
394 base_folder = vnfd["_admin"]["storage"]
395 cloud_init_file = "{}/{}/cloud_init/{}".format(
396 base_folder["folder"],
397 base_folder["pkg-dir"],
398 vdu["cloud-init-file"],
399 )
400 with self.fs.file_open(cloud_init_file, "r") as ci_file:
401 cloud_init_content = ci_file.read()
402 elif vdu.get("cloud-init"):
403 cloud_init_content = vdu["cloud-init"]
404
405 return cloud_init_content
406 except FsException as e:
407 raise LcmException(
408 "Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".format(
409 vnfd["id"], vdu["id"], cloud_init_file, e
410 )
411 )
412
413 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
414 vdur = next(
415 vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"]
416 )
417 additional_params = vdur.get("additionalParams")
418 return parse_yaml_strings(additional_params)
419
420 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
421 """
422 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
423 :param vnfd: input vnfd
424 :param new_id: overrides vnf id if provided
425 :param additionalParams: Instantiation params for VNFs provided
426 :param nsrId: Id of the NSR
427 :return: copy of vnfd
428 """
429 vnfd_RO = deepcopy(vnfd)
430 # remove unused by RO configuration, monitoring, scaling and internal keys
431 vnfd_RO.pop("_id", None)
432 vnfd_RO.pop("_admin", None)
433 vnfd_RO.pop("monitoring-param", None)
434 vnfd_RO.pop("scaling-group-descriptor", None)
435 vnfd_RO.pop("kdu", None)
436 vnfd_RO.pop("k8s-cluster", None)
437 if new_id:
438 vnfd_RO["id"] = new_id
439
440 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
441 for vdu in get_iterable(vnfd_RO, "vdu"):
442 vdu.pop("cloud-init-file", None)
443 vdu.pop("cloud-init", None)
444 return vnfd_RO
445
446 @staticmethod
447 def ip_profile_2_RO(ip_profile):
448 RO_ip_profile = deepcopy(ip_profile)
449 if "dns-server" in RO_ip_profile:
450 if isinstance(RO_ip_profile["dns-server"], list):
451 RO_ip_profile["dns-address"] = []
452 for ds in RO_ip_profile.pop("dns-server"):
453 RO_ip_profile["dns-address"].append(ds["address"])
454 else:
455 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
456 if RO_ip_profile.get("ip-version") == "ipv4":
457 RO_ip_profile["ip-version"] = "IPv4"
458 if RO_ip_profile.get("ip-version") == "ipv6":
459 RO_ip_profile["ip-version"] = "IPv6"
460 if "dhcp-params" in RO_ip_profile:
461 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
462 return RO_ip_profile
463
464 def _get_ro_vim_id_for_vim_account(self, vim_account):
465 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
466 if db_vim["_admin"]["operationalState"] != "ENABLED":
467 raise LcmException(
468 "VIM={} is not available. operationalState={}".format(
469 vim_account, db_vim["_admin"]["operationalState"]
470 )
471 )
472 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
473 return RO_vim_id
474
475 def get_ro_wim_id_for_wim_account(self, wim_account):
476 if isinstance(wim_account, str):
477 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
478 if db_wim["_admin"]["operationalState"] != "ENABLED":
479 raise LcmException(
480 "WIM={} is not available. operationalState={}".format(
481 wim_account, db_wim["_admin"]["operationalState"]
482 )
483 )
484 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
485 return RO_wim_id
486 else:
487 return wim_account
488
489 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
490
491 db_vdu_push_list = []
492 db_update = {"_admin.modified": time()}
493 if vdu_create:
494 for vdu_id, vdu_count in vdu_create.items():
495 vdur = next(
496 (
497 vdur
498 for vdur in reversed(db_vnfr["vdur"])
499 if vdur["vdu-id-ref"] == vdu_id
500 ),
501 None,
502 )
503 if not vdur:
504 raise LcmException(
505 "Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
506 vdu_id
507 )
508 )
509
510 for count in range(vdu_count):
511 vdur_copy = deepcopy(vdur)
512 vdur_copy["status"] = "BUILD"
513 vdur_copy["status-detailed"] = None
514 vdur_copy["ip-address"]: None
515 vdur_copy["_id"] = str(uuid4())
516 vdur_copy["count-index"] += count + 1
517 vdur_copy["id"] = "{}-{}".format(
518 vdur_copy["vdu-id-ref"], vdur_copy["count-index"]
519 )
520 vdur_copy.pop("vim_info", None)
521 for iface in vdur_copy["interfaces"]:
522 if iface.get("fixed-ip"):
523 iface["ip-address"] = self.increment_ip_mac(
524 iface["ip-address"], count + 1
525 )
526 else:
527 iface.pop("ip-address", None)
528 if iface.get("fixed-mac"):
529 iface["mac-address"] = self.increment_ip_mac(
530 iface["mac-address"], count + 1
531 )
532 else:
533 iface.pop("mac-address", None)
534 iface.pop(
535 "mgmt_vnf", None
536 ) # only first vdu can be managment of vnf
537 db_vdu_push_list.append(vdur_copy)
538 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
539 if vdu_delete:
540 for vdu_id, vdu_count in vdu_delete.items():
541 if mark_delete:
542 indexes_to_delete = [
543 iv[0]
544 for iv in enumerate(db_vnfr["vdur"])
545 if iv[1]["vdu-id-ref"] == vdu_id
546 ]
547 db_update.update(
548 {
549 "vdur.{}.status".format(i): "DELETING"
550 for i in indexes_to_delete[-vdu_count:]
551 }
552 )
553 else:
554 # it must be deleted one by one because common.db does not allow otherwise
555 vdus_to_delete = [
556 v
557 for v in reversed(db_vnfr["vdur"])
558 if v["vdu-id-ref"] == vdu_id
559 ]
560 for vdu in vdus_to_delete[:vdu_count]:
561 self.db.set_one(
562 "vnfrs",
563 {"_id": db_vnfr["_id"]},
564 None,
565 pull={"vdur": {"_id": vdu["_id"]}},
566 )
567 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
568 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
569 # modify passed dictionary db_vnfr
570 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
571 db_vnfr["vdur"] = db_vnfr_["vdur"]
572
573 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
574 """
575 Updates database nsr with the RO info for the created vld
576 :param ns_update_nsr: dictionary to be filled with the updated info
577 :param db_nsr: content of db_nsr. This is also modified
578 :param nsr_desc_RO: nsr descriptor from RO
579 :return: Nothing, LcmException is raised on errors
580 """
581
582 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
583 for net_RO in get_iterable(nsr_desc_RO, "nets"):
584 if vld["id"] != net_RO.get("ns_net_osm_id"):
585 continue
586 vld["vim-id"] = net_RO.get("vim_net_id")
587 vld["name"] = net_RO.get("vim_name")
588 vld["status"] = net_RO.get("status")
589 vld["status-detailed"] = net_RO.get("error_msg")
590 ns_update_nsr["vld.{}".format(vld_index)] = vld
591 break
592 else:
593 raise LcmException(
594 "ns_update_nsr: Not found vld={} at RO info".format(vld["id"])
595 )
596
597 def set_vnfr_at_error(self, db_vnfrs, error_text):
598 try:
599 for db_vnfr in db_vnfrs.values():
600 vnfr_update = {"status": "ERROR"}
601 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
602 if "status" not in vdur:
603 vdur["status"] = "ERROR"
604 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
605 if error_text:
606 vdur["status-detailed"] = str(error_text)
607 vnfr_update[
608 "vdur.{}.status-detailed".format(vdu_index)
609 ] = "ERROR"
610 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
611 except DbException as e:
612 self.logger.error("Cannot update vnf. {}".format(e))
613
614 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
615 """
616 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
617 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
618 :param nsr_desc_RO: nsr descriptor from RO
619 :return: Nothing, LcmException is raised on errors
620 """
621 for vnf_index, db_vnfr in db_vnfrs.items():
622 for vnf_RO in nsr_desc_RO["vnfs"]:
623 if vnf_RO["member_vnf_index"] != vnf_index:
624 continue
625 vnfr_update = {}
626 if vnf_RO.get("ip_address"):
627 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO[
628 "ip_address"
629 ].split(";")[0]
630 elif not db_vnfr.get("ip-address"):
631 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
632 raise LcmExceptionNoMgmtIP(
633 "ns member_vnf_index '{}' has no IP address".format(
634 vnf_index
635 )
636 )
637
638 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
639 vdur_RO_count_index = 0
640 if vdur.get("pdu-type"):
641 continue
642 for vdur_RO in get_iterable(vnf_RO, "vms"):
643 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
644 continue
645 if vdur["count-index"] != vdur_RO_count_index:
646 vdur_RO_count_index += 1
647 continue
648 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
649 if vdur_RO.get("ip_address"):
650 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
651 else:
652 vdur["ip-address"] = None
653 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
654 vdur["name"] = vdur_RO.get("vim_name")
655 vdur["status"] = vdur_RO.get("status")
656 vdur["status-detailed"] = vdur_RO.get("error_msg")
657 for ifacer in get_iterable(vdur, "interfaces"):
658 for interface_RO in get_iterable(vdur_RO, "interfaces"):
659 if ifacer["name"] == interface_RO.get("internal_name"):
660 ifacer["ip-address"] = interface_RO.get(
661 "ip_address"
662 )
663 ifacer["mac-address"] = interface_RO.get(
664 "mac_address"
665 )
666 break
667 else:
668 raise LcmException(
669 "ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
670 "from VIM info".format(
671 vnf_index, vdur["vdu-id-ref"], ifacer["name"]
672 )
673 )
674 vnfr_update["vdur.{}".format(vdu_index)] = vdur
675 break
676 else:
677 raise LcmException(
678 "ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
679 "VIM info".format(
680 vnf_index, vdur["vdu-id-ref"], vdur["count-index"]
681 )
682 )
683
684 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
685 for net_RO in get_iterable(nsr_desc_RO, "nets"):
686 if vld["id"] != net_RO.get("vnf_net_osm_id"):
687 continue
688 vld["vim-id"] = net_RO.get("vim_net_id")
689 vld["name"] = net_RO.get("vim_name")
690 vld["status"] = net_RO.get("status")
691 vld["status-detailed"] = net_RO.get("error_msg")
692 vnfr_update["vld.{}".format(vld_index)] = vld
693 break
694 else:
695 raise LcmException(
696 "ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
697 vnf_index, vld["id"]
698 )
699 )
700
701 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
702 break
703
704 else:
705 raise LcmException(
706 "ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(
707 vnf_index
708 )
709 )
710
711 def _get_ns_config_info(self, nsr_id):
712 """
713 Generates a mapping between vnf,vdu elements and the N2VC id
714 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
715 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
716 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
717 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
718 """
719 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
720 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
721 mapping = {}
722 ns_config_info = {"osm-config-mapping": mapping}
723 for vca in vca_deployed_list:
724 if not vca["member-vnf-index"]:
725 continue
726 if not vca["vdu_id"]:
727 mapping[vca["member-vnf-index"]] = vca["application"]
728 else:
729 mapping[
730 "{}.{}.{}".format(
731 vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"]
732 )
733 ] = vca["application"]
734 return ns_config_info
735
736 async def _instantiate_ng_ro(
737 self,
738 logging_text,
739 nsr_id,
740 nsd,
741 db_nsr,
742 db_nslcmop,
743 db_vnfrs,
744 db_vnfds,
745 n2vc_key_list,
746 stage,
747 start_deploy,
748 timeout_ns_deploy,
749 ):
750
751 db_vims = {}
752
753 def get_vim_account(vim_account_id):
754 nonlocal db_vims
755 if vim_account_id in db_vims:
756 return db_vims[vim_account_id]
757 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
758 db_vims[vim_account_id] = db_vim
759 return db_vim
760
761 # modify target_vld info with instantiation parameters
762 def parse_vld_instantiation_params(
763 target_vim, target_vld, vld_params, target_sdn
764 ):
765 if vld_params.get("ip-profile"):
766 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params[
767 "ip-profile"
768 ]
769 if vld_params.get("provider-network"):
770 target_vld["vim_info"][target_vim]["provider_network"] = vld_params[
771 "provider-network"
772 ]
773 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
774 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params[
775 "provider-network"
776 ]["sdn-ports"]
777 if vld_params.get("wimAccountId"):
778 target_wim = "wim:{}".format(vld_params["wimAccountId"])
779 target_vld["vim_info"][target_wim] = {}
780 for param in ("vim-network-name", "vim-network-id"):
781 if vld_params.get(param):
782 if isinstance(vld_params[param], dict):
783 for vim, vim_net in vld_params[param].items():
784 other_target_vim = "vim:" + vim
785 populate_dict(
786 target_vld["vim_info"],
787 (other_target_vim, param.replace("-", "_")),
788 vim_net,
789 )
790 else: # isinstance str
791 target_vld["vim_info"][target_vim][
792 param.replace("-", "_")
793 ] = vld_params[param]
794 if vld_params.get("common_id"):
795 target_vld["common_id"] = vld_params.get("common_id")
796
797 nslcmop_id = db_nslcmop["_id"]
798 target = {
799 "name": db_nsr["name"],
800 "ns": {"vld": []},
801 "vnf": [],
802 "image": deepcopy(db_nsr["image"]),
803 "flavor": deepcopy(db_nsr["flavor"]),
804 "action_id": nslcmop_id,
805 "cloud_init_content": {},
806 }
807 for image in target["image"]:
808 image["vim_info"] = {}
809 for flavor in target["flavor"]:
810 flavor["vim_info"] = {}
811
812 if db_nslcmop.get("lcmOperationType") != "instantiate":
813 # get parameters of instantiation:
814 db_nslcmop_instantiate = self.db.get_list(
815 "nslcmops",
816 {
817 "nsInstanceId": db_nslcmop["nsInstanceId"],
818 "lcmOperationType": "instantiate",
819 },
820 )[-1]
821 ns_params = db_nslcmop_instantiate.get("operationParams")
822 else:
823 ns_params = db_nslcmop.get("operationParams")
824 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
825 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
826
827 cp2target = {}
828 for vld_index, vld in enumerate(db_nsr.get("vld")):
829 target_vim = "vim:{}".format(ns_params["vimAccountId"])
830 target_vld = {
831 "id": vld["id"],
832 "name": vld["name"],
833 "mgmt-network": vld.get("mgmt-network", False),
834 "type": vld.get("type"),
835 "vim_info": {
836 target_vim: {
837 "vim_network_name": vld.get("vim-network-name"),
838 "vim_account_id": ns_params["vimAccountId"],
839 }
840 },
841 }
842 # check if this network needs SDN assist
843 if vld.get("pci-interfaces"):
844 db_vim = get_vim_account(ns_params["vimAccountId"])
845 sdnc_id = db_vim["config"].get("sdn-controller")
846 if sdnc_id:
847 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
848 target_sdn = "sdn:{}".format(sdnc_id)
849 target_vld["vim_info"][target_sdn] = {
850 "sdn": True,
851 "target_vim": target_vim,
852 "vlds": [sdn_vld],
853 "type": vld.get("type"),
854 }
855
856 nsd_vnf_profiles = get_vnf_profiles(nsd)
857 for nsd_vnf_profile in nsd_vnf_profiles:
858 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
859 if cp["virtual-link-profile-id"] == vld["id"]:
860 cp2target[
861 "member_vnf:{}.{}".format(
862 cp["constituent-cpd-id"][0][
863 "constituent-base-element-id"
864 ],
865 cp["constituent-cpd-id"][0]["constituent-cpd-id"],
866 )
867 ] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
868
869 # check at nsd descriptor, if there is an ip-profile
870 vld_params = {}
871 nsd_vlp = find_in_list(
872 get_virtual_link_profiles(nsd),
873 lambda a_link_profile: a_link_profile["virtual-link-desc-id"]
874 == vld["id"],
875 )
876 if (
877 nsd_vlp
878 and nsd_vlp.get("virtual-link-protocol-data")
879 and nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
880 ):
881 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"][
882 "l3-protocol-data"
883 ]
884 ip_profile_dest_data = {}
885 if "ip-version" in ip_profile_source_data:
886 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
887 "ip-version"
888 ]
889 if "cidr" in ip_profile_source_data:
890 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
891 "cidr"
892 ]
893 if "gateway-ip" in ip_profile_source_data:
894 ip_profile_dest_data["gateway-address"] = ip_profile_source_data[
895 "gateway-ip"
896 ]
897 if "dhcp-enabled" in ip_profile_source_data:
898 ip_profile_dest_data["dhcp-params"] = {
899 "enabled": ip_profile_source_data["dhcp-enabled"]
900 }
901 vld_params["ip-profile"] = ip_profile_dest_data
902
903 # update vld_params with instantiation params
904 vld_instantiation_params = find_in_list(
905 get_iterable(ns_params, "vld"),
906 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]),
907 )
908 if vld_instantiation_params:
909 vld_params.update(vld_instantiation_params)
910 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
911 target["ns"]["vld"].append(target_vld)
912
913 for vnfr in db_vnfrs.values():
914 vnfd = find_in_list(
915 db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"]
916 )
917 vnf_params = find_in_list(
918 get_iterable(ns_params, "vnf"),
919 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"],
920 )
921 target_vnf = deepcopy(vnfr)
922 target_vim = "vim:{}".format(vnfr["vim-account-id"])
923 for vld in target_vnf.get("vld", ()):
924 # check if connected to a ns.vld, to fill target'
925 vnf_cp = find_in_list(
926 vnfd.get("int-virtual-link-desc", ()),
927 lambda cpd: cpd.get("id") == vld["id"],
928 )
929 if vnf_cp:
930 ns_cp = "member_vnf:{}.{}".format(
931 vnfr["member-vnf-index-ref"], vnf_cp["id"]
932 )
933 if cp2target.get(ns_cp):
934 vld["target"] = cp2target[ns_cp]
935
936 vld["vim_info"] = {
937 target_vim: {"vim_network_name": vld.get("vim-network-name")}
938 }
939 # check if this network needs SDN assist
940 target_sdn = None
941 if vld.get("pci-interfaces"):
942 db_vim = get_vim_account(vnfr["vim-account-id"])
943 sdnc_id = db_vim["config"].get("sdn-controller")
944 if sdnc_id:
945 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
946 target_sdn = "sdn:{}".format(sdnc_id)
947 vld["vim_info"][target_sdn] = {
948 "sdn": True,
949 "target_vim": target_vim,
950 "vlds": [sdn_vld],
951 "type": vld.get("type"),
952 }
953
954 # check at vnfd descriptor, if there is an ip-profile
955 vld_params = {}
956 vnfd_vlp = find_in_list(
957 get_virtual_link_profiles(vnfd),
958 lambda a_link_profile: a_link_profile["id"] == vld["id"],
959 )
960 if (
961 vnfd_vlp
962 and vnfd_vlp.get("virtual-link-protocol-data")
963 and vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data")
964 ):
965 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"][
966 "l3-protocol-data"
967 ]
968 ip_profile_dest_data = {}
969 if "ip-version" in ip_profile_source_data:
970 ip_profile_dest_data["ip-version"] = ip_profile_source_data[
971 "ip-version"
972 ]
973 if "cidr" in ip_profile_source_data:
974 ip_profile_dest_data["subnet-address"] = ip_profile_source_data[
975 "cidr"
976 ]
977 if "gateway-ip" in ip_profile_source_data:
978 ip_profile_dest_data[
979 "gateway-address"
980 ] = ip_profile_source_data["gateway-ip"]
981 if "dhcp-enabled" in ip_profile_source_data:
982 ip_profile_dest_data["dhcp-params"] = {
983 "enabled": ip_profile_source_data["dhcp-enabled"]
984 }
985
986 vld_params["ip-profile"] = ip_profile_dest_data
987 # update vld_params with instantiation params
988 if vnf_params:
989 vld_instantiation_params = find_in_list(
990 get_iterable(vnf_params, "internal-vld"),
991 lambda i_vld: i_vld["name"] == vld["id"],
992 )
993 if vld_instantiation_params:
994 vld_params.update(vld_instantiation_params)
995 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
996
997 vdur_list = []
998 for vdur in target_vnf.get("vdur", ()):
999 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1000 continue # This vdu must not be created
1001 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
1002
1003 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
1004
1005 if ssh_keys_all:
1006 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
1007 vnf_configuration = get_configuration(vnfd, vnfd["id"])
1008 if (
1009 vdu_configuration
1010 and vdu_configuration.get("config-access")
1011 and vdu_configuration.get("config-access").get("ssh-access")
1012 ):
1013 vdur["ssh-keys"] = ssh_keys_all
1014 vdur["ssh-access-required"] = vdu_configuration[
1015 "config-access"
1016 ]["ssh-access"]["required"]
1017 elif (
1018 vnf_configuration
1019 and vnf_configuration.get("config-access")
1020 and vnf_configuration.get("config-access").get("ssh-access")
1021 and any(iface.get("mgmt-vnf") for iface in vdur["interfaces"])
1022 ):
1023 vdur["ssh-keys"] = ssh_keys_all
1024 vdur["ssh-access-required"] = vnf_configuration[
1025 "config-access"
1026 ]["ssh-access"]["required"]
1027 elif ssh_keys_instantiation and find_in_list(
1028 vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")
1029 ):
1030 vdur["ssh-keys"] = ssh_keys_instantiation
1031
1032 self.logger.debug("NS > vdur > {}".format(vdur))
1033
1034 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
1035 # cloud-init
1036 if vdud.get("cloud-init-file"):
1037 vdur["cloud-init"] = "{}:file:{}".format(
1038 vnfd["_id"], vdud.get("cloud-init-file")
1039 )
1040 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1041 if vdur["cloud-init"] not in target["cloud_init_content"]:
1042 base_folder = vnfd["_admin"]["storage"]
1043 cloud_init_file = "{}/{}/cloud_init/{}".format(
1044 base_folder["folder"],
1045 base_folder["pkg-dir"],
1046 vdud.get("cloud-init-file"),
1047 )
1048 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1049 target["cloud_init_content"][
1050 vdur["cloud-init"]
1051 ] = ci_file.read()
1052 elif vdud.get("cloud-init"):
1053 vdur["cloud-init"] = "{}:vdu:{}".format(
1054 vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"])
1055 )
1056 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1057 target["cloud_init_content"][vdur["cloud-init"]] = vdud[
1058 "cloud-init"
1059 ]
1060 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1061 deploy_params_vdu = self._format_additional_params(
1062 vdur.get("additionalParams") or {}
1063 )
1064 deploy_params_vdu["OSM"] = get_osm_params(
1065 vnfr, vdur["vdu-id-ref"], vdur["count-index"]
1066 )
1067 vdur["additionalParams"] = deploy_params_vdu
1068
1069 # flavor
1070 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1071 if target_vim not in ns_flavor["vim_info"]:
1072 ns_flavor["vim_info"][target_vim] = {}
1073
1074 # deal with images
1075 # in case alternative images are provided we must check if they should be applied
1076 # for the vim_type, modify the vim_type taking into account
1077 ns_image_id = int(vdur["ns-image-id"])
1078 if vdur.get("alt-image-ids"):
1079 db_vim = get_vim_account(vnfr["vim-account-id"])
1080 vim_type = db_vim["vim_type"]
1081 for alt_image_id in vdur.get("alt-image-ids"):
1082 ns_alt_image = target["image"][int(alt_image_id)]
1083 if vim_type == ns_alt_image.get("vim-type"):
1084 # must use alternative image
1085 self.logger.debug(
1086 "use alternative image id: {}".format(alt_image_id)
1087 )
1088 ns_image_id = alt_image_id
1089 vdur["ns-image-id"] = ns_image_id
1090 break
1091 ns_image = target["image"][int(ns_image_id)]
1092 if target_vim not in ns_image["vim_info"]:
1093 ns_image["vim_info"][target_vim] = {}
1094
1095 vdur["vim_info"] = {target_vim: {}}
1096 # instantiation parameters
1097 # if vnf_params:
1098 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1099 # vdud["id"]), None)
1100 vdur_list.append(vdur)
1101 target_vnf["vdur"] = vdur_list
1102 target["vnf"].append(target_vnf)
1103
1104 desc = await self.RO.deploy(nsr_id, target)
1105 self.logger.debug("RO return > {}".format(desc))
1106 action_id = desc["action_id"]
1107 await self._wait_ng_ro(
1108 nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage
1109 )
1110
1111 # Updating NSR
1112 db_nsr_update = {
1113 "_admin.deployed.RO.operational-status": "running",
1114 "detailed-status": " ".join(stage),
1115 }
1116 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1117 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1118 self._write_op_status(nslcmop_id, stage)
1119 self.logger.debug(
1120 logging_text + "ns deployed at RO. RO_id={}".format(action_id)
1121 )
1122 return
1123
1124 async def _wait_ng_ro(
1125 self,
1126 nsr_id,
1127 action_id,
1128 nslcmop_id=None,
1129 start_time=None,
1130 timeout=600,
1131 stage=None,
1132 ):
1133 detailed_status_old = None
1134 db_nsr_update = {}
1135 start_time = start_time or time()
1136 while time() <= start_time + timeout:
1137 desc_status = await self.RO.status(nsr_id, action_id)
1138 self.logger.debug("Wait NG RO > {}".format(desc_status))
1139 if desc_status["status"] == "FAILED":
1140 raise NgRoException(desc_status["details"])
1141 elif desc_status["status"] == "BUILD":
1142 if stage:
1143 stage[2] = "VIM: ({})".format(desc_status["details"])
1144 elif desc_status["status"] == "DONE":
1145 if stage:
1146 stage[2] = "Deployed at VIM"
1147 break
1148 else:
1149 assert False, "ROclient.check_ns_status returns unknown {}".format(
1150 desc_status["status"]
1151 )
1152 if stage and nslcmop_id and stage[2] != detailed_status_old:
1153 detailed_status_old = stage[2]
1154 db_nsr_update["detailed-status"] = " ".join(stage)
1155 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1156 self._write_op_status(nslcmop_id, stage)
1157 await asyncio.sleep(15, loop=self.loop)
1158 else: # timeout_ns_deploy
1159 raise NgRoException("Timeout waiting ns to deploy")
1160
1161 async def _terminate_ng_ro(
1162 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
1163 ):
1164 db_nsr_update = {}
1165 failed_detail = []
1166 action_id = None
1167 start_deploy = time()
1168 try:
1169 target = {
1170 "ns": {"vld": []},
1171 "vnf": [],
1172 "image": [],
1173 "flavor": [],
1174 "action_id": nslcmop_id,
1175 }
1176 desc = await self.RO.deploy(nsr_id, target)
1177 action_id = desc["action_id"]
1178 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1179 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1180 self.logger.debug(
1181 logging_text
1182 + "ns terminate action at RO. action_id={}".format(action_id)
1183 )
1184
1185 # wait until done
1186 delete_timeout = 20 * 60 # 20 minutes
1187 await self._wait_ng_ro(
1188 nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage
1189 )
1190
1191 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1192 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1193 # delete all nsr
1194 await self.RO.delete(nsr_id)
1195 except Exception as e:
1196 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1197 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1198 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1199 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1200 self.logger.debug(
1201 logging_text + "RO_action_id={} already deleted".format(action_id)
1202 )
1203 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1204 failed_detail.append("delete conflict: {}".format(e))
1205 self.logger.debug(
1206 logging_text
1207 + "RO_action_id={} delete conflict: {}".format(action_id, e)
1208 )
1209 else:
1210 failed_detail.append("delete error: {}".format(e))
1211 self.logger.error(
1212 logging_text
1213 + "RO_action_id={} delete error: {}".format(action_id, e)
1214 )
1215
1216 if failed_detail:
1217 stage[2] = "Error deleting from VIM"
1218 else:
1219 stage[2] = "Deleted from VIM"
1220 db_nsr_update["detailed-status"] = " ".join(stage)
1221 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1222 self._write_op_status(nslcmop_id, stage)
1223
1224 if failed_detail:
1225 raise LcmException("; ".join(failed_detail))
1226 return
1227
1228 async def instantiate_RO(
1229 self,
1230 logging_text,
1231 nsr_id,
1232 nsd,
1233 db_nsr,
1234 db_nslcmop,
1235 db_vnfrs,
1236 db_vnfds,
1237 n2vc_key_list,
1238 stage,
1239 ):
1240 """
1241 Instantiate at RO
1242 :param logging_text: preffix text to use at logging
1243 :param nsr_id: nsr identity
1244 :param nsd: database content of ns descriptor
1245 :param db_nsr: database content of ns record
1246 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1247 :param db_vnfrs:
1248 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1249 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1250 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1251 :return: None or exception
1252 """
1253 try:
1254 start_deploy = time()
1255 ns_params = db_nslcmop.get("operationParams")
1256 if ns_params and ns_params.get("timeout_ns_deploy"):
1257 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1258 else:
1259 timeout_ns_deploy = self.timeout.get(
1260 "ns_deploy", self.timeout_ns_deploy
1261 )
1262
1263 # Check for and optionally request placement optimization. Database will be updated if placement activated
1264 stage[2] = "Waiting for Placement."
1265 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1266 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1267 for vnfr in db_vnfrs.values():
1268 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1269 break
1270 else:
1271 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1272
1273 return await self._instantiate_ng_ro(
1274 logging_text,
1275 nsr_id,
1276 nsd,
1277 db_nsr,
1278 db_nslcmop,
1279 db_vnfrs,
1280 db_vnfds,
1281 n2vc_key_list,
1282 stage,
1283 start_deploy,
1284 timeout_ns_deploy,
1285 )
1286 except Exception as e:
1287 stage[2] = "ERROR deploying at VIM"
1288 self.set_vnfr_at_error(db_vnfrs, str(e))
1289 self.logger.error(
1290 "Error deploying at VIM {}".format(e),
1291 exc_info=not isinstance(
1292 e,
1293 (
1294 ROclient.ROClientException,
1295 LcmException,
1296 DbException,
1297 NgRoException,
1298 ),
1299 ),
1300 )
1301 raise
1302
1303 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1304 """
1305 Wait for kdu to be up, get ip address
1306 :param logging_text: prefix use for logging
1307 :param nsr_id:
1308 :param vnfr_id:
1309 :param kdu_name:
1310 :return: IP address
1311 """
1312
1313 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1314 nb_tries = 0
1315
1316 while nb_tries < 360:
1317 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1318 kdur = next(
1319 (
1320 x
1321 for x in get_iterable(db_vnfr, "kdur")
1322 if x.get("kdu-name") == kdu_name
1323 ),
1324 None,
1325 )
1326 if not kdur:
1327 raise LcmException(
1328 "Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name)
1329 )
1330 if kdur.get("status"):
1331 if kdur["status"] in ("READY", "ENABLED"):
1332 return kdur.get("ip-address")
1333 else:
1334 raise LcmException(
1335 "target KDU={} is in error state".format(kdu_name)
1336 )
1337
1338 await asyncio.sleep(10, loop=self.loop)
1339 nb_tries += 1
1340 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1341
1342 async def wait_vm_up_insert_key_ro(
1343 self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None
1344 ):
1345 """
1346 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1347 :param logging_text: prefix use for logging
1348 :param nsr_id:
1349 :param vnfr_id:
1350 :param vdu_id:
1351 :param vdu_index:
1352 :param pub_key: public ssh key to inject, None to skip
1353 :param user: user to apply the public ssh key
1354 :return: IP address
1355 """
1356
1357 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1358 ro_nsr_id = None
1359 ip_address = None
1360 nb_tries = 0
1361 target_vdu_id = None
1362 ro_retries = 0
1363
1364 while True:
1365
1366 ro_retries += 1
1367 if ro_retries >= 360: # 1 hour
1368 raise LcmException(
1369 "Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id)
1370 )
1371
1372 await asyncio.sleep(10, loop=self.loop)
1373
1374 # get ip address
1375 if not target_vdu_id:
1376 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1377
1378 if not vdu_id: # for the VNF case
1379 if db_vnfr.get("status") == "ERROR":
1380 raise LcmException(
1381 "Cannot inject ssh-key because target VNF is in error state"
1382 )
1383 ip_address = db_vnfr.get("ip-address")
1384 if not ip_address:
1385 continue
1386 vdur = next(
1387 (
1388 x
1389 for x in get_iterable(db_vnfr, "vdur")
1390 if x.get("ip-address") == ip_address
1391 ),
1392 None,
1393 )
1394 else: # VDU case
1395 vdur = next(
1396 (
1397 x
1398 for x in get_iterable(db_vnfr, "vdur")
1399 if x.get("vdu-id-ref") == vdu_id
1400 and x.get("count-index") == vdu_index
1401 ),
1402 None,
1403 )
1404
1405 if (
1406 not vdur and len(db_vnfr.get("vdur", ())) == 1
1407 ): # If only one, this should be the target vdu
1408 vdur = db_vnfr["vdur"][0]
1409 if not vdur:
1410 raise LcmException(
1411 "Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(
1412 vnfr_id, vdu_id, vdu_index
1413 )
1414 )
1415 # New generation RO stores information at "vim_info"
1416 ng_ro_status = None
1417 target_vim = None
1418 if vdur.get("vim_info"):
1419 target_vim = next(
1420 t for t in vdur["vim_info"]
1421 ) # there should be only one key
1422 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1423 if (
1424 vdur.get("pdu-type")
1425 or vdur.get("status") == "ACTIVE"
1426 or ng_ro_status == "ACTIVE"
1427 ):
1428 ip_address = vdur.get("ip-address")
1429 if not ip_address:
1430 continue
1431 target_vdu_id = vdur["vdu-id-ref"]
1432 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1433 raise LcmException(
1434 "Cannot inject ssh-key because target VM is in error state"
1435 )
1436
1437 if not target_vdu_id:
1438 continue
1439
1440 # inject public key into machine
1441 if pub_key and user:
1442 self.logger.debug(logging_text + "Inserting RO key")
1443 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1444 if vdur.get("pdu-type"):
1445 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1446 return ip_address
1447 try:
1448 ro_vm_id = "{}-{}".format(
1449 db_vnfr["member-vnf-index-ref"], target_vdu_id
1450 ) # TODO add vdu_index
1451 if self.ng_ro:
1452 target = {
1453 "action": {
1454 "action": "inject_ssh_key",
1455 "key": pub_key,
1456 "user": user,
1457 },
1458 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1459 }
1460 desc = await self.RO.deploy(nsr_id, target)
1461 action_id = desc["action_id"]
1462 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1463 break
1464 else:
1465 # wait until NS is deployed at RO
1466 if not ro_nsr_id:
1467 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1468 ro_nsr_id = deep_get(
1469 db_nsrs, ("_admin", "deployed", "RO", "nsr_id")
1470 )
1471 if not ro_nsr_id:
1472 continue
1473 result_dict = await self.RO.create_action(
1474 item="ns",
1475 item_id_name=ro_nsr_id,
1476 descriptor={
1477 "add_public_key": pub_key,
1478 "vms": [ro_vm_id],
1479 "user": user,
1480 },
1481 )
1482 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1483 if not result_dict or not isinstance(result_dict, dict):
1484 raise LcmException(
1485 "Unknown response from RO when injecting key"
1486 )
1487 for result in result_dict.values():
1488 if result.get("vim_result") == 200:
1489 break
1490 else:
1491 raise ROclient.ROClientException(
1492 "error injecting key: {}".format(
1493 result.get("description")
1494 )
1495 )
1496 break
1497 except NgRoException as e:
1498 raise LcmException(
1499 "Reaching max tries injecting key. Error: {}".format(e)
1500 )
1501 except ROclient.ROClientException as e:
1502 if not nb_tries:
1503 self.logger.debug(
1504 logging_text
1505 + "error injecting key: {}. Retrying until {} seconds".format(
1506 e, 20 * 10
1507 )
1508 )
1509 nb_tries += 1
1510 if nb_tries >= 20:
1511 raise LcmException(
1512 "Reaching max tries injecting key. Error: {}".format(e)
1513 )
1514 else:
1515 break
1516
1517 return ip_address
1518
1519 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1520 """
1521 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1522 """
1523 my_vca = vca_deployed_list[vca_index]
1524 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1525 # vdu or kdu: no dependencies
1526 return
1527 timeout = 300
1528 while timeout >= 0:
1529 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1530 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1531 configuration_status_list = db_nsr["configurationStatus"]
1532 for index, vca_deployed in enumerate(configuration_status_list):
1533 if index == vca_index:
1534 # myself
1535 continue
1536 if not my_vca.get("member-vnf-index") or (
1537 vca_deployed.get("member-vnf-index")
1538 == my_vca.get("member-vnf-index")
1539 ):
1540 internal_status = configuration_status_list[index].get("status")
1541 if internal_status == "READY":
1542 continue
1543 elif internal_status == "BROKEN":
1544 raise LcmException(
1545 "Configuration aborted because dependent charm/s has failed"
1546 )
1547 else:
1548 break
1549 else:
1550 # no dependencies, return
1551 return
1552 await asyncio.sleep(10)
1553 timeout -= 1
1554
1555 raise LcmException("Configuration aborted because dependent charm/s timeout")
1556
1557 def get_vca_id(self, db_vnfr: dict, db_nsr: dict):
1558 return deep_get(db_vnfr, ("vca-id",)) or deep_get(
1559 db_nsr, ("instantiate_params", "vcaId")
1560 )
1561
1562 async def instantiate_N2VC(
1563 self,
1564 logging_text,
1565 vca_index,
1566 nsi_id,
1567 db_nsr,
1568 db_vnfr,
1569 vdu_id,
1570 kdu_name,
1571 vdu_index,
1572 config_descriptor,
1573 deploy_params,
1574 base_folder,
1575 nslcmop_id,
1576 stage,
1577 vca_type,
1578 vca_name,
1579 ee_config_descriptor,
1580 ):
1581 nsr_id = db_nsr["_id"]
1582 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1583 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1584 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1585 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1586 db_dict = {
1587 "collection": "nsrs",
1588 "filter": {"_id": nsr_id},
1589 "path": db_update_entry,
1590 }
1591 step = ""
1592 try:
1593
1594 element_type = "NS"
1595 element_under_configuration = nsr_id
1596
1597 vnfr_id = None
1598 if db_vnfr:
1599 vnfr_id = db_vnfr["_id"]
1600 osm_config["osm"]["vnf_id"] = vnfr_id
1601
1602 namespace = "{nsi}.{ns}".format(nsi=nsi_id if nsi_id else "", ns=nsr_id)
1603
1604 if vnfr_id:
1605 element_type = "VNF"
1606 element_under_configuration = vnfr_id
1607 namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
1608 if vdu_id:
1609 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1610 element_type = "VDU"
1611 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1612 osm_config["osm"]["vdu_id"] = vdu_id
1613 elif kdu_name:
1614 namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
1615 element_type = "KDU"
1616 element_under_configuration = kdu_name
1617 osm_config["osm"]["kdu_name"] = kdu_name
1618
1619 # Get artifact path
1620 artifact_path = "{}/{}/{}/{}".format(
1621 base_folder["folder"],
1622 base_folder["pkg-dir"],
1623 "charms"
1624 if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm")
1625 else "helm-charts",
1626 vca_name,
1627 )
1628
1629 self.logger.debug("Artifact path > {}".format(artifact_path))
1630
1631 # get initial_config_primitive_list that applies to this element
1632 initial_config_primitive_list = config_descriptor.get(
1633 "initial-config-primitive"
1634 )
1635
1636 self.logger.debug(
1637 "Initial config primitive list > {}".format(
1638 initial_config_primitive_list
1639 )
1640 )
1641
1642 # add config if not present for NS charm
1643 ee_descriptor_id = ee_config_descriptor.get("id")
1644 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1645 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(
1646 initial_config_primitive_list, vca_deployed, ee_descriptor_id
1647 )
1648
1649 self.logger.debug(
1650 "Initial config primitive list #2 > {}".format(
1651 initial_config_primitive_list
1652 )
1653 )
1654 # n2vc_redesign STEP 3.1
1655 # find old ee_id if exists
1656 ee_id = vca_deployed.get("ee_id")
1657
1658 vca_id = self.get_vca_id(db_vnfr, db_nsr)
1659 # create or register execution environment in VCA
1660 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1661
1662 self._write_configuration_status(
1663 nsr_id=nsr_id,
1664 vca_index=vca_index,
1665 status="CREATING",
1666 element_under_configuration=element_under_configuration,
1667 element_type=element_type,
1668 )
1669
1670 step = "create execution environment"
1671 self.logger.debug(logging_text + step)
1672
1673 ee_id = None
1674 credentials = None
1675 if vca_type == "k8s_proxy_charm":
1676 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1677 charm_name=artifact_path[artifact_path.rfind("/") + 1 :],
1678 namespace=namespace,
1679 artifact_path=artifact_path,
1680 db_dict=db_dict,
1681 vca_id=vca_id,
1682 )
1683 elif vca_type == "helm" or vca_type == "helm-v3":
1684 ee_id, credentials = await self.vca_map[
1685 vca_type
1686 ].create_execution_environment(
1687 namespace=namespace,
1688 reuse_ee_id=ee_id,
1689 db_dict=db_dict,
1690 config=osm_config,
1691 artifact_path=artifact_path,
1692 vca_type=vca_type,
1693 )
1694 else:
1695 ee_id, credentials = await self.vca_map[
1696 vca_type
1697 ].create_execution_environment(
1698 namespace=namespace,
1699 reuse_ee_id=ee_id,
1700 db_dict=db_dict,
1701 vca_id=vca_id,
1702 )
1703
1704 elif vca_type == "native_charm":
1705 step = "Waiting to VM being up and getting IP address"
1706 self.logger.debug(logging_text + step)
1707 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1708 logging_text,
1709 nsr_id,
1710 vnfr_id,
1711 vdu_id,
1712 vdu_index,
1713 user=None,
1714 pub_key=None,
1715 )
1716 credentials = {"hostname": rw_mgmt_ip}
1717 # get username
1718 username = deep_get(
1719 config_descriptor, ("config-access", "ssh-access", "default-user")
1720 )
1721 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1722 # merged. Meanwhile let's get username from initial-config-primitive
1723 if not username and initial_config_primitive_list:
1724 for config_primitive in initial_config_primitive_list:
1725 for param in config_primitive.get("parameter", ()):
1726 if param["name"] == "ssh-username":
1727 username = param["value"]
1728 break
1729 if not username:
1730 raise LcmException(
1731 "Cannot determine the username neither with 'initial-config-primitive' nor with "
1732 "'config-access.ssh-access.default-user'"
1733 )
1734 credentials["username"] = username
1735 # n2vc_redesign STEP 3.2
1736
1737 self._write_configuration_status(
1738 nsr_id=nsr_id,
1739 vca_index=vca_index,
1740 status="REGISTERING",
1741 element_under_configuration=element_under_configuration,
1742 element_type=element_type,
1743 )
1744
1745 step = "register execution environment {}".format(credentials)
1746 self.logger.debug(logging_text + step)
1747 ee_id = await self.vca_map[vca_type].register_execution_environment(
1748 credentials=credentials,
1749 namespace=namespace,
1750 db_dict=db_dict,
1751 vca_id=vca_id,
1752 )
1753
1754 # for compatibility with MON/POL modules, the need model and application name at database
1755 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1756 ee_id_parts = ee_id.split(".")
1757 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1758 if len(ee_id_parts) >= 2:
1759 model_name = ee_id_parts[0]
1760 application_name = ee_id_parts[1]
1761 db_nsr_update[db_update_entry + "model"] = model_name
1762 db_nsr_update[db_update_entry + "application"] = application_name
1763
1764 # n2vc_redesign STEP 3.3
1765 step = "Install configuration Software"
1766
1767 self._write_configuration_status(
1768 nsr_id=nsr_id,
1769 vca_index=vca_index,
1770 status="INSTALLING SW",
1771 element_under_configuration=element_under_configuration,
1772 element_type=element_type,
1773 other_update=db_nsr_update,
1774 )
1775
1776 # TODO check if already done
1777 self.logger.debug(logging_text + step)
1778 config = None
1779 if vca_type == "native_charm":
1780 config_primitive = next(
1781 (p for p in initial_config_primitive_list if p["name"] == "config"),
1782 None,
1783 )
1784 if config_primitive:
1785 config = self._map_primitive_params(
1786 config_primitive, {}, deploy_params
1787 )
1788 num_units = 1
1789 if vca_type == "lxc_proxy_charm":
1790 if element_type == "NS":
1791 num_units = db_nsr.get("config-units") or 1
1792 elif element_type == "VNF":
1793 num_units = db_vnfr.get("config-units") or 1
1794 elif element_type == "VDU":
1795 for v in db_vnfr["vdur"]:
1796 if vdu_id == v["vdu-id-ref"]:
1797 num_units = v.get("config-units") or 1
1798 break
1799 if vca_type != "k8s_proxy_charm":
1800 await self.vca_map[vca_type].install_configuration_sw(
1801 ee_id=ee_id,
1802 artifact_path=artifact_path,
1803 db_dict=db_dict,
1804 config=config,
1805 num_units=num_units,
1806 vca_id=vca_id,
1807 )
1808
1809 # write in db flag of configuration_sw already installed
1810 self.update_db_2(
1811 "nsrs", nsr_id, {db_update_entry + "config_sw_installed": True}
1812 )
1813
1814 # add relations for this VCA (wait for other peers related with this VCA)
1815 await self._add_vca_relations(
1816 logging_text=logging_text,
1817 nsr_id=nsr_id,
1818 vca_index=vca_index,
1819 vca_id=vca_id,
1820 vca_type=vca_type,
1821 )
1822
1823 # if SSH access is required, then get execution environment SSH public
1824 # if native charm we have waited already to VM be UP
1825 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1826 pub_key = None
1827 user = None
1828 # self.logger.debug("get ssh key block")
1829 if deep_get(
1830 config_descriptor, ("config-access", "ssh-access", "required")
1831 ):
1832 # self.logger.debug("ssh key needed")
1833 # Needed to inject a ssh key
1834 user = deep_get(
1835 config_descriptor,
1836 ("config-access", "ssh-access", "default-user"),
1837 )
1838 step = "Install configuration Software, getting public ssh key"
1839 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(
1840 ee_id=ee_id, db_dict=db_dict, vca_id=vca_id
1841 )
1842
1843 step = "Insert public key into VM user={} ssh_key={}".format(
1844 user, pub_key
1845 )
1846 else:
1847 # self.logger.debug("no need to get ssh key")
1848 step = "Waiting to VM being up and getting IP address"
1849 self.logger.debug(logging_text + step)
1850
1851 # n2vc_redesign STEP 5.1
1852 # wait for RO (ip-address) Insert pub_key into VM
1853 if vnfr_id:
1854 if kdu_name:
1855 rw_mgmt_ip = await self.wait_kdu_up(
1856 logging_text, nsr_id, vnfr_id, kdu_name
1857 )
1858 else:
1859 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(
1860 logging_text,
1861 nsr_id,
1862 vnfr_id,
1863 vdu_id,
1864 vdu_index,
1865 user=user,
1866 pub_key=pub_key,
1867 )
1868 else:
1869 rw_mgmt_ip = None # This is for a NS configuration
1870
1871 self.logger.debug(logging_text + " VM_ip_address={}".format(rw_mgmt_ip))
1872
1873 # store rw_mgmt_ip in deploy params for later replacement
1874 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1875
1876 # n2vc_redesign STEP 6 Execute initial config primitive
1877 step = "execute initial config primitive"
1878
1879 # wait for dependent primitives execution (NS -> VNF -> VDU)
1880 if initial_config_primitive_list:
1881 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1882
1883 # stage, in function of element type: vdu, kdu, vnf or ns
1884 my_vca = vca_deployed_list[vca_index]
1885 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1886 # VDU or KDU
1887 stage[0] = "Stage 3/5: running Day-1 primitives for VDU."
1888 elif my_vca.get("member-vnf-index"):
1889 # VNF
1890 stage[0] = "Stage 4/5: running Day-1 primitives for VNF."
1891 else:
1892 # NS
1893 stage[0] = "Stage 5/5: running Day-1 primitives for NS."
1894
1895 self._write_configuration_status(
1896 nsr_id=nsr_id, vca_index=vca_index, status="EXECUTING PRIMITIVE"
1897 )
1898
1899 self._write_op_status(op_id=nslcmop_id, stage=stage)
1900
1901 check_if_terminated_needed = True
1902 for initial_config_primitive in initial_config_primitive_list:
1903 # adding information on the vca_deployed if it is a NS execution environment
1904 if not vca_deployed["member-vnf-index"]:
1905 deploy_params["ns_config_info"] = json.dumps(
1906 self._get_ns_config_info(nsr_id)
1907 )
1908 # TODO check if already done
1909 primitive_params_ = self._map_primitive_params(
1910 initial_config_primitive, {}, deploy_params
1911 )
1912
1913 step = "execute primitive '{}' params '{}'".format(
1914 initial_config_primitive["name"], primitive_params_
1915 )
1916 self.logger.debug(logging_text + step)
1917 await self.vca_map[vca_type].exec_primitive(
1918 ee_id=ee_id,
1919 primitive_name=initial_config_primitive["name"],
1920 params_dict=primitive_params_,
1921 db_dict=db_dict,
1922 vca_id=vca_id,
1923 )
1924 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1925 if check_if_terminated_needed:
1926 if config_descriptor.get("terminate-config-primitive"):
1927 self.update_db_2(
1928 "nsrs", nsr_id, {db_update_entry + "needed_terminate": True}
1929 )
1930 check_if_terminated_needed = False
1931
1932 # TODO register in database that primitive is done
1933
1934 # STEP 7 Configure metrics
1935 if vca_type == "helm" or vca_type == "helm-v3":
1936 prometheus_jobs = await self.add_prometheus_metrics(
1937 ee_id=ee_id,
1938 artifact_path=artifact_path,
1939 ee_config_descriptor=ee_config_descriptor,
1940 vnfr_id=vnfr_id,
1941 nsr_id=nsr_id,
1942 target_ip=rw_mgmt_ip,
1943 )
1944 if prometheus_jobs:
1945 self.update_db_2(
1946 "nsrs",
1947 nsr_id,
1948 {db_update_entry + "prometheus_jobs": prometheus_jobs},
1949 )
1950
1951 step = "instantiated at VCA"
1952 self.logger.debug(logging_text + step)
1953
1954 self._write_configuration_status(
1955 nsr_id=nsr_id, vca_index=vca_index, status="READY"
1956 )
1957
1958 except Exception as e: # TODO not use Exception but N2VC exception
1959 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1960 if not isinstance(
1961 e, (DbException, N2VCException, LcmException, asyncio.CancelledError)
1962 ):
1963 self.logger.error(
1964 "Exception while {} : {}".format(step, e), exc_info=True
1965 )
1966 self._write_configuration_status(
1967 nsr_id=nsr_id, vca_index=vca_index, status="BROKEN"
1968 )
1969 raise LcmException("{} {}".format(step, e)) from e
1970
1971 def _write_ns_status(
1972 self,
1973 nsr_id: str,
1974 ns_state: str,
1975 current_operation: str,
1976 current_operation_id: str,
1977 error_description: str = None,
1978 error_detail: str = None,
1979 other_update: dict = None,
1980 ):
1981 """
1982 Update db_nsr fields.
1983 :param nsr_id:
1984 :param ns_state:
1985 :param current_operation:
1986 :param current_operation_id:
1987 :param error_description:
1988 :param error_detail:
1989 :param other_update: Other required changes at database if provided, will be cleared
1990 :return:
1991 """
1992 try:
1993 db_dict = other_update or {}
1994 db_dict[
1995 "_admin.nslcmop"
1996 ] = current_operation_id # for backward compatibility
1997 db_dict["_admin.current-operation"] = current_operation_id
1998 db_dict["_admin.operation-type"] = (
1999 current_operation if current_operation != "IDLE" else None
2000 )
2001 db_dict["currentOperation"] = current_operation
2002 db_dict["currentOperationID"] = current_operation_id
2003 db_dict["errorDescription"] = error_description
2004 db_dict["errorDetail"] = error_detail
2005
2006 if ns_state:
2007 db_dict["nsState"] = ns_state
2008 self.update_db_2("nsrs", nsr_id, db_dict)
2009 except DbException as e:
2010 self.logger.warn("Error writing NS status, ns={}: {}".format(nsr_id, e))
2011
2012 def _write_op_status(
2013 self,
2014 op_id: str,
2015 stage: list = None,
2016 error_message: str = None,
2017 queuePosition: int = 0,
2018 operation_state: str = None,
2019 other_update: dict = None,
2020 ):
2021 try:
2022 db_dict = other_update or {}
2023 db_dict["queuePosition"] = queuePosition
2024 if isinstance(stage, list):
2025 db_dict["stage"] = stage[0]
2026 db_dict["detailed-status"] = " ".join(stage)
2027 elif stage is not None:
2028 db_dict["stage"] = str(stage)
2029
2030 if error_message is not None:
2031 db_dict["errorMessage"] = error_message
2032 if operation_state is not None:
2033 db_dict["operationState"] = operation_state
2034 db_dict["statusEnteredTime"] = time()
2035 self.update_db_2("nslcmops", op_id, db_dict)
2036 except DbException as e:
2037 self.logger.warn(
2038 "Error writing OPERATION status for op_id: {} -> {}".format(op_id, e)
2039 )
2040
2041 def _write_all_config_status(self, db_nsr: dict, status: str):
2042 try:
2043 nsr_id = db_nsr["_id"]
2044 # configurationStatus
2045 config_status = db_nsr.get("configurationStatus")
2046 if config_status:
2047 db_nsr_update = {
2048 "configurationStatus.{}.status".format(index): status
2049 for index, v in enumerate(config_status)
2050 if v
2051 }
2052 # update status
2053 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2054
2055 except DbException as e:
2056 self.logger.warn(
2057 "Error writing all configuration status, ns={}: {}".format(nsr_id, e)
2058 )
2059
2060 def _write_configuration_status(
2061 self,
2062 nsr_id: str,
2063 vca_index: int,
2064 status: str = None,
2065 element_under_configuration: str = None,
2066 element_type: str = None,
2067 other_update: dict = None,
2068 ):
2069
2070 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
2071 # .format(vca_index, status))
2072
2073 try:
2074 db_path = "configurationStatus.{}.".format(vca_index)
2075 db_dict = other_update or {}
2076 if status:
2077 db_dict[db_path + "status"] = status
2078 if element_under_configuration:
2079 db_dict[
2080 db_path + "elementUnderConfiguration"
2081 ] = element_under_configuration
2082 if element_type:
2083 db_dict[db_path + "elementType"] = element_type
2084 self.update_db_2("nsrs", nsr_id, db_dict)
2085 except DbException as e:
2086 self.logger.warn(
2087 "Error writing configuration status={}, ns={}, vca_index={}: {}".format(
2088 status, nsr_id, vca_index, e
2089 )
2090 )
2091
2092 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2093 """
2094 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2095 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2096 Database is used because the result can be obtained from a different LCM worker in case of HA.
2097 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2098 :param db_nslcmop: database content of nslcmop
2099 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2100 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2101 computed 'vim-account-id'
2102 """
2103 modified = False
2104 nslcmop_id = db_nslcmop["_id"]
2105 placement_engine = deep_get(db_nslcmop, ("operationParams", "placement-engine"))
2106 if placement_engine == "PLA":
2107 self.logger.debug(
2108 logging_text + "Invoke and wait for placement optimization"
2109 )
2110 await self.msg.aiowrite(
2111 "pla", "get_placement", {"nslcmopId": nslcmop_id}, loop=self.loop
2112 )
2113 db_poll_interval = 5
2114 wait = db_poll_interval * 10
2115 pla_result = None
2116 while not pla_result and wait >= 0:
2117 await asyncio.sleep(db_poll_interval)
2118 wait -= db_poll_interval
2119 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2120 pla_result = deep_get(db_nslcmop, ("_admin", "pla"))
2121
2122 if not pla_result:
2123 raise LcmException(
2124 "Placement timeout for nslcmopId={}".format(nslcmop_id)
2125 )
2126
2127 for pla_vnf in pla_result["vnf"]:
2128 vnfr = db_vnfrs.get(pla_vnf["member-vnf-index"])
2129 if not pla_vnf.get("vimAccountId") or not vnfr:
2130 continue
2131 modified = True
2132 self.db.set_one(
2133 "vnfrs",
2134 {"_id": vnfr["_id"]},
2135 {"vim-account-id": pla_vnf["vimAccountId"]},
2136 )
2137 # Modifies db_vnfrs
2138 vnfr["vim-account-id"] = pla_vnf["vimAccountId"]
2139 return modified
2140
2141 def update_nsrs_with_pla_result(self, params):
2142 try:
2143 nslcmop_id = deep_get(params, ("placement", "nslcmopId"))
2144 self.update_db_2(
2145 "nslcmops", nslcmop_id, {"_admin.pla": params.get("placement")}
2146 )
2147 except Exception as e:
2148 self.logger.warn("Update failed for nslcmop_id={}:{}".format(nslcmop_id, e))
2149
2150 async def instantiate(self, nsr_id, nslcmop_id):
2151 """
2152
2153 :param nsr_id: ns instance to deploy
2154 :param nslcmop_id: operation to run
2155 :return:
2156 """
2157
2158 # Try to lock HA task here
2159 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
2160 if not task_is_locked_by_me:
2161 self.logger.debug(
2162 "instantiate() task is not locked by me, ns={}".format(nsr_id)
2163 )
2164 return
2165
2166 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2167 self.logger.debug(logging_text + "Enter")
2168
2169 # get all needed from database
2170
2171 # database nsrs record
2172 db_nsr = None
2173
2174 # database nslcmops record
2175 db_nslcmop = None
2176
2177 # update operation on nsrs
2178 db_nsr_update = {}
2179 # update operation on nslcmops
2180 db_nslcmop_update = {}
2181
2182 nslcmop_operation_state = None
2183 db_vnfrs = {} # vnf's info indexed by member-index
2184 # n2vc_info = {}
2185 tasks_dict_info = {} # from task to info text
2186 exc = None
2187 error_list = []
2188 stage = [
2189 "Stage 1/5: preparation of the environment.",
2190 "Waiting for previous operations to terminate.",
2191 "",
2192 ]
2193 # ^ stage, step, VIM progress
2194 try:
2195 # wait for any previous tasks in process
2196 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
2197
2198 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2199 stage[1] = "Reading from database."
2200 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2201 db_nsr_update["detailed-status"] = "creating"
2202 db_nsr_update["operational-status"] = "init"
2203 self._write_ns_status(
2204 nsr_id=nsr_id,
2205 ns_state="BUILDING",
2206 current_operation="INSTANTIATING",
2207 current_operation_id=nslcmop_id,
2208 other_update=db_nsr_update,
2209 )
2210 self._write_op_status(op_id=nslcmop_id, stage=stage, queuePosition=0)
2211
2212 # read from db: operation
2213 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2214 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2215 ns_params = db_nslcmop.get("operationParams")
2216 if ns_params and ns_params.get("timeout_ns_deploy"):
2217 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2218 else:
2219 timeout_ns_deploy = self.timeout.get(
2220 "ns_deploy", self.timeout_ns_deploy
2221 )
2222
2223 # read from db: ns
2224 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2225 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2226 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2227 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2228 self.fs.sync(db_nsr["nsd-id"])
2229 db_nsr["nsd"] = nsd
2230 # nsr_name = db_nsr["name"] # TODO short-name??
2231
2232 # read from db: vnf's of this ns
2233 stage[1] = "Getting vnfrs from db."
2234 self.logger.debug(logging_text + stage[1])
2235 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2236
2237 # read from db: vnfd's for every vnf
2238 db_vnfds = [] # every vnfd data
2239
2240 # for each vnf in ns, read vnfd
2241 for vnfr in db_vnfrs_list:
2242 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
2243 vnfd_id = vnfr["vnfd-id"]
2244 vnfd_ref = vnfr["vnfd-ref"]
2245 self.fs.sync(vnfd_id)
2246
2247 # if we haven't this vnfd, read it from db
2248 if vnfd_id not in db_vnfds:
2249 # read from db
2250 stage[1] = "Getting vnfd={} id='{}' from db.".format(
2251 vnfd_id, vnfd_ref
2252 )
2253 self.logger.debug(logging_text + stage[1])
2254 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2255
2256 # store vnfd
2257 db_vnfds.append(vnfd)
2258
2259 # Get or generates the _admin.deployed.VCA list
2260 vca_deployed_list = None
2261 if db_nsr["_admin"].get("deployed"):
2262 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2263 if vca_deployed_list is None:
2264 vca_deployed_list = []
2265 configuration_status_list = []
2266 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2267 db_nsr_update["configurationStatus"] = configuration_status_list
2268 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2269 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2270 elif isinstance(vca_deployed_list, dict):
2271 # maintain backward compatibility. Change a dict to list at database
2272 vca_deployed_list = list(vca_deployed_list.values())
2273 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2274 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2275
2276 if not isinstance(
2277 deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list
2278 ):
2279 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2280 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2281
2282 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2283 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2284 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2285 self.db.set_list(
2286 "vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"}
2287 )
2288
2289 # n2vc_redesign STEP 2 Deploy Network Scenario
2290 stage[0] = "Stage 2/5: deployment of KDUs, VMs and execution environments."
2291 self._write_op_status(op_id=nslcmop_id, stage=stage)
2292
2293 stage[1] = "Deploying KDUs."
2294 # self.logger.debug(logging_text + "Before deploy_kdus")
2295 # Call to deploy_kdus in case exists the "vdu:kdu" param
2296 await self.deploy_kdus(
2297 logging_text=logging_text,
2298 nsr_id=nsr_id,
2299 nslcmop_id=nslcmop_id,
2300 db_vnfrs=db_vnfrs,
2301 db_vnfds=db_vnfds,
2302 task_instantiation_info=tasks_dict_info,
2303 )
2304
2305 stage[1] = "Getting VCA public key."
2306 # n2vc_redesign STEP 1 Get VCA public ssh-key
2307 # feature 1429. Add n2vc public key to needed VMs
2308 n2vc_key = self.n2vc.get_public_key()
2309 n2vc_key_list = [n2vc_key]
2310 if self.vca_config.get("public_key"):
2311 n2vc_key_list.append(self.vca_config["public_key"])
2312
2313 stage[1] = "Deploying NS at VIM."
2314 task_ro = asyncio.ensure_future(
2315 self.instantiate_RO(
2316 logging_text=logging_text,
2317 nsr_id=nsr_id,
2318 nsd=nsd,
2319 db_nsr=db_nsr,
2320 db_nslcmop=db_nslcmop,
2321 db_vnfrs=db_vnfrs,
2322 db_vnfds=db_vnfds,
2323 n2vc_key_list=n2vc_key_list,
2324 stage=stage,
2325 )
2326 )
2327 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2328 tasks_dict_info[task_ro] = "Deploying at VIM"
2329
2330 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2331 stage[1] = "Deploying Execution Environments."
2332 self.logger.debug(logging_text + stage[1])
2333
2334 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2335 for vnf_profile in get_vnf_profiles(nsd):
2336 vnfd_id = vnf_profile["vnfd-id"]
2337 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
2338 member_vnf_index = str(vnf_profile["id"])
2339 db_vnfr = db_vnfrs[member_vnf_index]
2340 base_folder = vnfd["_admin"]["storage"]
2341 vdu_id = None
2342 vdu_index = 0
2343 vdu_name = None
2344 kdu_name = None
2345
2346 # Get additional parameters
2347 deploy_params = {"OSM": get_osm_params(db_vnfr)}
2348 if db_vnfr.get("additionalParamsForVnf"):
2349 deploy_params.update(
2350 parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy())
2351 )
2352
2353 descriptor_config = get_configuration(vnfd, vnfd["id"])
2354 if descriptor_config:
2355 self._deploy_n2vc(
2356 logging_text=logging_text
2357 + "member_vnf_index={} ".format(member_vnf_index),
2358 db_nsr=db_nsr,
2359 db_vnfr=db_vnfr,
2360 nslcmop_id=nslcmop_id,
2361 nsr_id=nsr_id,
2362 nsi_id=nsi_id,
2363 vnfd_id=vnfd_id,
2364 vdu_id=vdu_id,
2365 kdu_name=kdu_name,
2366 member_vnf_index=member_vnf_index,
2367 vdu_index=vdu_index,
2368 vdu_name=vdu_name,
2369 deploy_params=deploy_params,
2370 descriptor_config=descriptor_config,
2371 base_folder=base_folder,
2372 task_instantiation_info=tasks_dict_info,
2373 stage=stage,
2374 )
2375
2376 # Deploy charms for each VDU that supports one.
2377 for vdud in get_vdu_list(vnfd):
2378 vdu_id = vdud["id"]
2379 descriptor_config = get_configuration(vnfd, vdu_id)
2380 vdur = find_in_list(
2381 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
2382 )
2383
2384 if vdur.get("additionalParams"):
2385 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
2386 else:
2387 deploy_params_vdu = deploy_params
2388 deploy_params_vdu["OSM"] = get_osm_params(
2389 db_vnfr, vdu_id, vdu_count_index=0
2390 )
2391 vdud_count = get_vdu_profile(vnfd, vdu_id).get(
2392 "max-number-of-instances", 1
2393 )
2394
2395 self.logger.debug("VDUD > {}".format(vdud))
2396 self.logger.debug(
2397 "Descriptor config > {}".format(descriptor_config)
2398 )
2399 if descriptor_config:
2400 vdu_name = None
2401 kdu_name = None
2402 for vdu_index in range(vdud_count):
2403 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2404 self._deploy_n2vc(
2405 logging_text=logging_text
2406 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2407 member_vnf_index, vdu_id, vdu_index
2408 ),
2409 db_nsr=db_nsr,
2410 db_vnfr=db_vnfr,
2411 nslcmop_id=nslcmop_id,
2412 nsr_id=nsr_id,
2413 nsi_id=nsi_id,
2414 vnfd_id=vnfd_id,
2415 vdu_id=vdu_id,
2416 kdu_name=kdu_name,
2417 member_vnf_index=member_vnf_index,
2418 vdu_index=vdu_index,
2419 vdu_name=vdu_name,
2420 deploy_params=deploy_params_vdu,
2421 descriptor_config=descriptor_config,
2422 base_folder=base_folder,
2423 task_instantiation_info=tasks_dict_info,
2424 stage=stage,
2425 )
2426 for kdud in get_kdu_list(vnfd):
2427 kdu_name = kdud["name"]
2428 descriptor_config = get_configuration(vnfd, kdu_name)
2429 if descriptor_config:
2430 vdu_id = None
2431 vdu_index = 0
2432 vdu_name = None
2433 kdur = next(
2434 x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name
2435 )
2436 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
2437 if kdur.get("additionalParams"):
2438 deploy_params_kdu = parse_yaml_strings(
2439 kdur["additionalParams"]
2440 )
2441
2442 self._deploy_n2vc(
2443 logging_text=logging_text,
2444 db_nsr=db_nsr,
2445 db_vnfr=db_vnfr,
2446 nslcmop_id=nslcmop_id,
2447 nsr_id=nsr_id,
2448 nsi_id=nsi_id,
2449 vnfd_id=vnfd_id,
2450 vdu_id=vdu_id,
2451 kdu_name=kdu_name,
2452 member_vnf_index=member_vnf_index,
2453 vdu_index=vdu_index,
2454 vdu_name=vdu_name,
2455 deploy_params=deploy_params_kdu,
2456 descriptor_config=descriptor_config,
2457 base_folder=base_folder,
2458 task_instantiation_info=tasks_dict_info,
2459 stage=stage,
2460 )
2461
2462 # Check if this NS has a charm configuration
2463 descriptor_config = nsd.get("ns-configuration")
2464 if descriptor_config and descriptor_config.get("juju"):
2465 vnfd_id = None
2466 db_vnfr = None
2467 member_vnf_index = None
2468 vdu_id = None
2469 kdu_name = None
2470 vdu_index = 0
2471 vdu_name = None
2472
2473 # Get additional parameters
2474 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
2475 if db_nsr.get("additionalParamsForNs"):
2476 deploy_params.update(
2477 parse_yaml_strings(db_nsr["additionalParamsForNs"].copy())
2478 )
2479 base_folder = nsd["_admin"]["storage"]
2480 self._deploy_n2vc(
2481 logging_text=logging_text,
2482 db_nsr=db_nsr,
2483 db_vnfr=db_vnfr,
2484 nslcmop_id=nslcmop_id,
2485 nsr_id=nsr_id,
2486 nsi_id=nsi_id,
2487 vnfd_id=vnfd_id,
2488 vdu_id=vdu_id,
2489 kdu_name=kdu_name,
2490 member_vnf_index=member_vnf_index,
2491 vdu_index=vdu_index,
2492 vdu_name=vdu_name,
2493 deploy_params=deploy_params,
2494 descriptor_config=descriptor_config,
2495 base_folder=base_folder,
2496 task_instantiation_info=tasks_dict_info,
2497 stage=stage,
2498 )
2499
2500 # rest of staff will be done at finally
2501
2502 except (
2503 ROclient.ROClientException,
2504 DbException,
2505 LcmException,
2506 N2VCException,
2507 ) as e:
2508 self.logger.error(
2509 logging_text + "Exit Exception while '{}': {}".format(stage[1], e)
2510 )
2511 exc = e
2512 except asyncio.CancelledError:
2513 self.logger.error(
2514 logging_text + "Cancelled Exception while '{}'".format(stage[1])
2515 )
2516 exc = "Operation was cancelled"
2517 except Exception as e:
2518 exc = traceback.format_exc()
2519 self.logger.critical(
2520 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
2521 exc_info=True,
2522 )
2523 finally:
2524 if exc:
2525 error_list.append(str(exc))
2526 try:
2527 # wait for pending tasks
2528 if tasks_dict_info:
2529 stage[1] = "Waiting for instantiate pending tasks."
2530 self.logger.debug(logging_text + stage[1])
2531 error_list += await self._wait_for_tasks(
2532 logging_text,
2533 tasks_dict_info,
2534 timeout_ns_deploy,
2535 stage,
2536 nslcmop_id,
2537 nsr_id=nsr_id,
2538 )
2539 stage[1] = stage[2] = ""
2540 except asyncio.CancelledError:
2541 error_list.append("Cancelled")
2542 # TODO cancel all tasks
2543 except Exception as exc:
2544 error_list.append(str(exc))
2545
2546 # update operation-status
2547 db_nsr_update["operational-status"] = "running"
2548 # let's begin with VCA 'configured' status (later we can change it)
2549 db_nsr_update["config-status"] = "configured"
2550 for task, task_name in tasks_dict_info.items():
2551 if not task.done() or task.cancelled() or task.exception():
2552 if task_name.startswith(self.task_name_deploy_vca):
2553 # A N2VC task is pending
2554 db_nsr_update["config-status"] = "failed"
2555 else:
2556 # RO or KDU task is pending
2557 db_nsr_update["operational-status"] = "failed"
2558
2559 # update status at database
2560 if error_list:
2561 error_detail = ". ".join(error_list)
2562 self.logger.error(logging_text + error_detail)
2563 error_description_nslcmop = "{} Detail: {}".format(
2564 stage[0], error_detail
2565 )
2566 error_description_nsr = "Operation: INSTANTIATING.{}, {}".format(
2567 nslcmop_id, stage[0]
2568 )
2569
2570 db_nsr_update["detailed-status"] = (
2571 error_description_nsr + " Detail: " + error_detail
2572 )
2573 db_nslcmop_update["detailed-status"] = error_detail
2574 nslcmop_operation_state = "FAILED"
2575 ns_state = "BROKEN"
2576 else:
2577 error_detail = None
2578 error_description_nsr = error_description_nslcmop = None
2579 ns_state = "READY"
2580 db_nsr_update["detailed-status"] = "Done"
2581 db_nslcmop_update["detailed-status"] = "Done"
2582 nslcmop_operation_state = "COMPLETED"
2583
2584 if db_nsr:
2585 self._write_ns_status(
2586 nsr_id=nsr_id,
2587 ns_state=ns_state,
2588 current_operation="IDLE",
2589 current_operation_id=None,
2590 error_description=error_description_nsr,
2591 error_detail=error_detail,
2592 other_update=db_nsr_update,
2593 )
2594 self._write_op_status(
2595 op_id=nslcmop_id,
2596 stage="",
2597 error_message=error_description_nslcmop,
2598 operation_state=nslcmop_operation_state,
2599 other_update=db_nslcmop_update,
2600 )
2601
2602 if nslcmop_operation_state:
2603 try:
2604 await self.msg.aiowrite(
2605 "ns",
2606 "instantiated",
2607 {
2608 "nsr_id": nsr_id,
2609 "nslcmop_id": nslcmop_id,
2610 "operationState": nslcmop_operation_state,
2611 },
2612 loop=self.loop,
2613 )
2614 except Exception as e:
2615 self.logger.error(
2616 logging_text + "kafka_write notification Exception {}".format(e)
2617 )
2618
2619 self.logger.debug(logging_text + "Exit")
2620 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2621
2622 async def _add_vca_relations(
2623 self,
2624 logging_text,
2625 nsr_id,
2626 vca_index: int,
2627 timeout: int = 3600,
2628 vca_type: str = None,
2629 vca_id: str = None,
2630 ) -> bool:
2631
2632 # steps:
2633 # 1. find all relations for this VCA
2634 # 2. wait for other peers related
2635 # 3. add relations
2636
2637 try:
2638 vca_type = vca_type or "lxc_proxy_charm"
2639
2640 # STEP 1: find all relations for this VCA
2641
2642 # read nsr record
2643 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2644 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2645
2646 # this VCA data
2647 my_vca = deep_get(db_nsr, ("_admin", "deployed", "VCA"))[vca_index]
2648
2649 # read all ns-configuration relations
2650 ns_relations = list()
2651 db_ns_relations = deep_get(nsd, ("ns-configuration", "relation"))
2652 if db_ns_relations:
2653 for r in db_ns_relations:
2654 # check if this VCA is in the relation
2655 if my_vca.get("member-vnf-index") in (
2656 r.get("entities")[0].get("id"),
2657 r.get("entities")[1].get("id"),
2658 ):
2659 ns_relations.append(r)
2660
2661 # read all vnf-configuration relations
2662 vnf_relations = list()
2663 db_vnfd_list = db_nsr.get("vnfd-id")
2664 if db_vnfd_list:
2665 for vnfd in db_vnfd_list:
2666 db_vnf_relations = None
2667 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2668 db_vnf_configuration = get_configuration(db_vnfd, db_vnfd["id"])
2669 if db_vnf_configuration:
2670 db_vnf_relations = db_vnf_configuration.get("relation", [])
2671 if db_vnf_relations:
2672 for r in db_vnf_relations:
2673 # check if this VCA is in the relation
2674 if my_vca.get("vdu_id") in (
2675 r.get("entities")[0].get("id"),
2676 r.get("entities")[1].get("id"),
2677 ):
2678 vnf_relations.append(r)
2679
2680 # if no relations, terminate
2681 if not ns_relations and not vnf_relations:
2682 self.logger.debug(logging_text + " No relations")
2683 return True
2684
2685 self.logger.debug(
2686 logging_text
2687 + " adding relations\n {}\n {}".format(
2688 ns_relations, vnf_relations
2689 )
2690 )
2691
2692 # add all relations
2693 start = time()
2694 while True:
2695 # check timeout
2696 now = time()
2697 if now - start >= timeout:
2698 self.logger.error(logging_text + " : timeout adding relations")
2699 return False
2700
2701 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2702 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2703
2704 # for each defined NS relation, find the VCA's related
2705 for r in ns_relations.copy():
2706 from_vca_ee_id = None
2707 to_vca_ee_id = None
2708 from_vca_endpoint = None
2709 to_vca_endpoint = None
2710 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2711 for vca in vca_list:
2712 if vca.get("member-vnf-index") == r.get("entities")[0].get(
2713 "id"
2714 ) and vca.get("config_sw_installed"):
2715 from_vca_ee_id = vca.get("ee_id")
2716 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2717 if vca.get("member-vnf-index") == r.get("entities")[1].get(
2718 "id"
2719 ) and vca.get("config_sw_installed"):
2720 to_vca_ee_id = vca.get("ee_id")
2721 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2722 if from_vca_ee_id and to_vca_ee_id:
2723 # add relation
2724 await self.vca_map[vca_type].add_relation(
2725 ee_id_1=from_vca_ee_id,
2726 ee_id_2=to_vca_ee_id,
2727 endpoint_1=from_vca_endpoint,
2728 endpoint_2=to_vca_endpoint,
2729 vca_id=vca_id,
2730 )
2731 # remove entry from relations list
2732 ns_relations.remove(r)
2733 else:
2734 # check failed peers
2735 try:
2736 vca_status_list = db_nsr.get("configurationStatus")
2737 if vca_status_list:
2738 for i in range(len(vca_list)):
2739 vca = vca_list[i]
2740 vca_status = vca_status_list[i]
2741 if vca.get("member-vnf-index") == r.get("entities")[
2742 0
2743 ].get("id"):
2744 if vca_status.get("status") == "BROKEN":
2745 # peer broken: remove relation from list
2746 ns_relations.remove(r)
2747 if vca.get("member-vnf-index") == r.get("entities")[
2748 1
2749 ].get("id"):
2750 if vca_status.get("status") == "BROKEN":
2751 # peer broken: remove relation from list
2752 ns_relations.remove(r)
2753 except Exception:
2754 # ignore
2755 pass
2756
2757 # for each defined VNF relation, find the VCA's related
2758 for r in vnf_relations.copy():
2759 from_vca_ee_id = None
2760 to_vca_ee_id = None
2761 from_vca_endpoint = None
2762 to_vca_endpoint = None
2763 vca_list = deep_get(db_nsr, ("_admin", "deployed", "VCA"))
2764 for vca in vca_list:
2765 key_to_check = "vdu_id"
2766 if vca.get("vdu_id") is None:
2767 key_to_check = "vnfd_id"
2768 if vca.get(key_to_check) == r.get("entities")[0].get(
2769 "id"
2770 ) and vca.get("config_sw_installed"):
2771 from_vca_ee_id = vca.get("ee_id")
2772 from_vca_endpoint = r.get("entities")[0].get("endpoint")
2773 if vca.get(key_to_check) == r.get("entities")[1].get(
2774 "id"
2775 ) and vca.get("config_sw_installed"):
2776 to_vca_ee_id = vca.get("ee_id")
2777 to_vca_endpoint = r.get("entities")[1].get("endpoint")
2778 if from_vca_ee_id and to_vca_ee_id:
2779 # add relation
2780 await self.vca_map[vca_type].add_relation(
2781 ee_id_1=from_vca_ee_id,
2782 ee_id_2=to_vca_ee_id,
2783 endpoint_1=from_vca_endpoint,
2784 endpoint_2=to_vca_endpoint,
2785 vca_id=vca_id,
2786 )
2787 # remove entry from relations list
2788 vnf_relations.remove(r)
2789 else:
2790 # check failed peers
2791 try:
2792 vca_status_list = db_nsr.get("configurationStatus")
2793 if vca_status_list:
2794 for i in range(len(vca_list)):
2795 vca = vca_list[i]
2796 vca_status = vca_status_list[i]
2797 if vca.get("vdu_id") == r.get("entities")[0].get(
2798 "id"
2799 ):
2800 if vca_status.get("status") == "BROKEN":
2801 # peer broken: remove relation from list
2802 vnf_relations.remove(r)
2803 if vca.get("vdu_id") == r.get("entities")[1].get(
2804 "id"
2805 ):
2806 if vca_status.get("status") == "BROKEN":
2807 # peer broken: remove relation from list
2808 vnf_relations.remove(r)
2809 except Exception:
2810 # ignore
2811 pass
2812
2813 # wait for next try
2814 await asyncio.sleep(5.0)
2815
2816 if not ns_relations and not vnf_relations:
2817 self.logger.debug("Relations added")
2818 break
2819
2820 return True
2821
2822 except Exception as e:
2823 self.logger.warn(logging_text + " ERROR adding relations: {}".format(e))
2824 return False
2825
2826 async def _install_kdu(
2827 self,
2828 nsr_id: str,
2829 nsr_db_path: str,
2830 vnfr_data: dict,
2831 kdu_index: int,
2832 kdud: dict,
2833 vnfd: dict,
2834 k8s_instance_info: dict,
2835 k8params: dict = None,
2836 timeout: int = 600,
2837 vca_id: str = None,
2838 ):
2839
2840 try:
2841 k8sclustertype = k8s_instance_info["k8scluster-type"]
2842 # Instantiate kdu
2843 db_dict_install = {
2844 "collection": "nsrs",
2845 "filter": {"_id": nsr_id},
2846 "path": nsr_db_path,
2847 }
2848
2849 kdu_instance = self.k8scluster_map[
2850 k8sclustertype
2851 ].generate_kdu_instance_name(
2852 db_dict=db_dict_install,
2853 kdu_model=k8s_instance_info["kdu-model"],
2854 kdu_name=k8s_instance_info["kdu-name"],
2855 )
2856 self.update_db_2(
2857 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2858 )
2859 await self.k8scluster_map[k8sclustertype].install(
2860 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2861 kdu_model=k8s_instance_info["kdu-model"],
2862 atomic=True,
2863 params=k8params,
2864 db_dict=db_dict_install,
2865 timeout=timeout,
2866 kdu_name=k8s_instance_info["kdu-name"],
2867 namespace=k8s_instance_info["namespace"],
2868 kdu_instance=kdu_instance,
2869 vca_id=vca_id,
2870 )
2871 self.update_db_2(
2872 "nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance}
2873 )
2874
2875 # Obtain services to obtain management service ip
2876 services = await self.k8scluster_map[k8sclustertype].get_services(
2877 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2878 kdu_instance=kdu_instance,
2879 namespace=k8s_instance_info["namespace"],
2880 )
2881
2882 # Obtain management service info (if exists)
2883 vnfr_update_dict = {}
2884 kdu_config = get_configuration(vnfd, kdud["name"])
2885 if kdu_config:
2886 target_ee_list = kdu_config.get("execution-environment-list", [])
2887 else:
2888 target_ee_list = []
2889
2890 if services:
2891 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2892 mgmt_services = [
2893 service
2894 for service in kdud.get("service", [])
2895 if service.get("mgmt-service")
2896 ]
2897 for mgmt_service in mgmt_services:
2898 for service in services:
2899 if service["name"].startswith(mgmt_service["name"]):
2900 # Mgmt service found, Obtain service ip
2901 ip = service.get("external_ip", service.get("cluster_ip"))
2902 if isinstance(ip, list) and len(ip) == 1:
2903 ip = ip[0]
2904
2905 vnfr_update_dict[
2906 "kdur.{}.ip-address".format(kdu_index)
2907 ] = ip
2908
2909 # Check if must update also mgmt ip at the vnf
2910 service_external_cp = mgmt_service.get(
2911 "external-connection-point-ref"
2912 )
2913 if service_external_cp:
2914 if (
2915 deep_get(vnfd, ("mgmt-interface", "cp"))
2916 == service_external_cp
2917 ):
2918 vnfr_update_dict["ip-address"] = ip
2919
2920 if find_in_list(
2921 target_ee_list,
2922 lambda ee: ee.get(
2923 "external-connection-point-ref", ""
2924 )
2925 == service_external_cp,
2926 ):
2927 vnfr_update_dict[
2928 "kdur.{}.ip-address".format(kdu_index)
2929 ] = ip
2930 break
2931 else:
2932 self.logger.warn(
2933 "Mgmt service name: {} not found".format(
2934 mgmt_service["name"]
2935 )
2936 )
2937
2938 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2939 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2940
2941 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
2942 if (
2943 kdu_config
2944 and kdu_config.get("initial-config-primitive")
2945 and get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None
2946 ):
2947 initial_config_primitive_list = kdu_config.get(
2948 "initial-config-primitive"
2949 )
2950 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2951
2952 for initial_config_primitive in initial_config_primitive_list:
2953 primitive_params_ = self._map_primitive_params(
2954 initial_config_primitive, {}, {}
2955 )
2956
2957 await asyncio.wait_for(
2958 self.k8scluster_map[k8sclustertype].exec_primitive(
2959 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2960 kdu_instance=kdu_instance,
2961 primitive_name=initial_config_primitive["name"],
2962 params=primitive_params_,
2963 db_dict=db_dict_install,
2964 vca_id=vca_id,
2965 ),
2966 timeout=timeout,
2967 )
2968
2969 except Exception as e:
2970 # Prepare update db with error and raise exception
2971 try:
2972 self.update_db_2(
2973 "nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)}
2974 )
2975 self.update_db_2(
2976 "vnfrs",
2977 vnfr_data.get("_id"),
2978 {"kdur.{}.status".format(kdu_index): "ERROR"},
2979 )
2980 except Exception:
2981 # ignore to keep original exception
2982 pass
2983 # reraise original error
2984 raise
2985
2986 return kdu_instance
2987
2988 async def deploy_kdus(
2989 self,
2990 logging_text,
2991 nsr_id,
2992 nslcmop_id,
2993 db_vnfrs,
2994 db_vnfds,
2995 task_instantiation_info,
2996 ):
2997 # Launch kdus if present in the descriptor
2998
2999 k8scluster_id_2_uuic = {
3000 "helm-chart-v3": {},
3001 "helm-chart": {},
3002 "juju-bundle": {},
3003 }
3004
3005 async def _get_cluster_id(cluster_id, cluster_type):
3006 nonlocal k8scluster_id_2_uuic
3007 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
3008 return k8scluster_id_2_uuic[cluster_type][cluster_id]
3009
3010 # check if K8scluster is creating and wait look if previous tasks in process
3011 task_name, task_dependency = self.lcm_tasks.lookfor_related(
3012 "k8scluster", cluster_id
3013 )
3014 if task_dependency:
3015 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(
3016 task_name, cluster_id
3017 )
3018 self.logger.debug(logging_text + text)
3019 await asyncio.wait(task_dependency, timeout=3600)
3020
3021 db_k8scluster = self.db.get_one(
3022 "k8sclusters", {"_id": cluster_id}, fail_on_empty=False
3023 )
3024 if not db_k8scluster:
3025 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
3026
3027 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
3028 if not k8s_id:
3029 if cluster_type == "helm-chart-v3":
3030 try:
3031 # backward compatibility for existing clusters that have not been initialized for helm v3
3032 k8s_credentials = yaml.safe_dump(
3033 db_k8scluster.get("credentials")
3034 )
3035 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(
3036 k8s_credentials, reuse_cluster_uuid=cluster_id
3037 )
3038 db_k8scluster_update = {}
3039 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
3040 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
3041 db_k8scluster_update[
3042 "_admin.helm-chart-v3.created"
3043 ] = uninstall_sw
3044 db_k8scluster_update[
3045 "_admin.helm-chart-v3.operationalState"
3046 ] = "ENABLED"
3047 self.update_db_2(
3048 "k8sclusters", cluster_id, db_k8scluster_update
3049 )
3050 except Exception as e:
3051 self.logger.error(
3052 logging_text
3053 + "error initializing helm-v3 cluster: {}".format(str(e))
3054 )
3055 raise LcmException(
3056 "K8s cluster '{}' has not been initialized for '{}'".format(
3057 cluster_id, cluster_type
3058 )
3059 )
3060 else:
3061 raise LcmException(
3062 "K8s cluster '{}' has not been initialized for '{}'".format(
3063 cluster_id, cluster_type
3064 )
3065 )
3066 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
3067 return k8s_id
3068
3069 logging_text += "Deploy kdus: "
3070 step = ""
3071 try:
3072 db_nsr_update = {"_admin.deployed.K8s": []}
3073 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3074
3075 index = 0
3076 updated_cluster_list = []
3077 updated_v3_cluster_list = []
3078
3079 for vnfr_data in db_vnfrs.values():
3080 vca_id = self.get_vca_id(vnfr_data, {})
3081 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
3082 # Step 0: Prepare and set parameters
3083 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3084 vnfd_id = vnfr_data.get("vnfd-id")
3085 vnfd_with_id = find_in_list(
3086 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3087 )
3088 kdud = next(
3089 kdud
3090 for kdud in vnfd_with_id["kdu"]
3091 if kdud["name"] == kdur["kdu-name"]
3092 )
3093 namespace = kdur.get("k8s-namespace")
3094 if kdur.get("helm-chart"):
3095 kdumodel = kdur["helm-chart"]
3096 # Default version: helm3, if helm-version is v2 assign v2
3097 k8sclustertype = "helm-chart-v3"
3098 self.logger.debug("kdur: {}".format(kdur))
3099 if (
3100 kdur.get("helm-version")
3101 and kdur.get("helm-version") == "v2"
3102 ):
3103 k8sclustertype = "helm-chart"
3104 elif kdur.get("juju-bundle"):
3105 kdumodel = kdur["juju-bundle"]
3106 k8sclustertype = "juju-bundle"
3107 else:
3108 raise LcmException(
3109 "kdu type for kdu='{}.{}' is neither helm-chart nor "
3110 "juju-bundle. Maybe an old NBI version is running".format(
3111 vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]
3112 )
3113 )
3114 # check if kdumodel is a file and exists
3115 try:
3116 vnfd_with_id = find_in_list(
3117 db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id
3118 )
3119 storage = deep_get(vnfd_with_id, ("_admin", "storage"))
3120 if storage and storage.get(
3121 "pkg-dir"
3122 ): # may be not present if vnfd has not artifacts
3123 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
3124 filename = "{}/{}/{}s/{}".format(
3125 storage["folder"],
3126 storage["pkg-dir"],
3127 k8sclustertype,
3128 kdumodel,
3129 )
3130 if self.fs.file_exists(
3131 filename, mode="file"
3132 ) or self.fs.file_exists(filename, mode="dir"):
3133 kdumodel = self.fs.path + filename
3134 except (asyncio.TimeoutError, asyncio.CancelledError):
3135 raise
3136 except Exception: # it is not a file
3137 pass
3138
3139 k8s_cluster_id = kdur["k8s-cluster"]["id"]
3140 step = "Synchronize repos for k8s cluster '{}'".format(
3141 k8s_cluster_id
3142 )
3143 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
3144
3145 # Synchronize repos
3146 if (
3147 k8sclustertype == "helm-chart"
3148 and cluster_uuid not in updated_cluster_list
3149 ) or (
3150 k8sclustertype == "helm-chart-v3"
3151 and cluster_uuid not in updated_v3_cluster_list
3152 ):
3153 del_repo_list, added_repo_dict = await asyncio.ensure_future(
3154 self.k8scluster_map[k8sclustertype].synchronize_repos(
3155 cluster_uuid=cluster_uuid
3156 )
3157 )
3158 if del_repo_list or added_repo_dict:
3159 if k8sclustertype == "helm-chart":
3160 unset = {
3161 "_admin.helm_charts_added." + item: None
3162 for item in del_repo_list
3163 }
3164 updated = {
3165 "_admin.helm_charts_added." + item: name
3166 for item, name in added_repo_dict.items()
3167 }
3168 updated_cluster_list.append(cluster_uuid)
3169 elif k8sclustertype == "helm-chart-v3":
3170 unset = {
3171 "_admin.helm_charts_v3_added." + item: None
3172 for item in del_repo_list
3173 }
3174 updated = {
3175 "_admin.helm_charts_v3_added." + item: name
3176 for item, name in added_repo_dict.items()
3177 }
3178 updated_v3_cluster_list.append(cluster_uuid)
3179 self.logger.debug(
3180 logging_text + "repos synchronized on k8s cluster "
3181 "'{}' to_delete: {}, to_add: {}".format(
3182 k8s_cluster_id, del_repo_list, added_repo_dict
3183 )
3184 )
3185 self.db.set_one(
3186 "k8sclusters",
3187 {"_id": k8s_cluster_id},
3188 updated,
3189 unset=unset,
3190 )
3191
3192 # Instantiate kdu
3193 step = "Instantiating KDU {}.{} in k8s cluster {}".format(
3194 vnfr_data["member-vnf-index-ref"],
3195 kdur["kdu-name"],
3196 k8s_cluster_id,
3197 )
3198 k8s_instance_info = {
3199 "kdu-instance": None,
3200 "k8scluster-uuid": cluster_uuid,
3201 "k8scluster-type": k8sclustertype,
3202 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
3203 "kdu-name": kdur["kdu-name"],
3204 "kdu-model": kdumodel,
3205 "namespace": namespace,
3206 }
3207 db_path = "_admin.deployed.K8s.{}".format(index)
3208 db_nsr_update[db_path] = k8s_instance_info
3209 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3210 vnfd_with_id = find_in_list(
3211 db_vnfds, lambda vnf: vnf["_id"] == vnfd_id
3212 )
3213 task = asyncio.ensure_future(
3214 self._install_kdu(
3215 nsr_id,
3216 db_path,
3217 vnfr_data,
3218 kdu_index,
3219 kdud,
3220 vnfd_with_id,
3221 k8s_instance_info,
3222 k8params=desc_params,
3223 timeout=600,
3224 vca_id=vca_id,
3225 )
3226 )
3227 self.lcm_tasks.register(
3228 "ns",
3229 nsr_id,
3230 nslcmop_id,
3231 "instantiate_KDU-{}".format(index),
3232 task,
3233 )
3234 task_instantiation_info[task] = "Deploying KDU {}".format(
3235 kdur["kdu-name"]
3236 )
3237
3238 index += 1
3239
3240 except (LcmException, asyncio.CancelledError):
3241 raise
3242 except Exception as e:
3243 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
3244 if isinstance(e, (N2VCException, DbException)):
3245 self.logger.error(logging_text + msg)
3246 else:
3247 self.logger.critical(logging_text + msg, exc_info=True)
3248 raise LcmException(msg)
3249 finally:
3250 if db_nsr_update:
3251 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3252
3253 def _deploy_n2vc(
3254 self,
3255 logging_text,
3256 db_nsr,
3257 db_vnfr,
3258 nslcmop_id,
3259 nsr_id,
3260 nsi_id,
3261 vnfd_id,
3262 vdu_id,
3263 kdu_name,
3264 member_vnf_index,
3265 vdu_index,
3266 vdu_name,
3267 deploy_params,
3268 descriptor_config,
3269 base_folder,
3270 task_instantiation_info,
3271 stage,
3272 ):
3273 # launch instantiate_N2VC in a asyncio task and register task object
3274 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
3275 # if not found, create one entry and update database
3276 # fill db_nsr._admin.deployed.VCA.<index>
3277
3278 self.logger.debug(
3279 logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id)
3280 )
3281 if "execution-environment-list" in descriptor_config:
3282 ee_list = descriptor_config.get("execution-environment-list", [])
3283 else: # other types as script are not supported
3284 ee_list = []
3285
3286 for ee_item in ee_list:
3287 self.logger.debug(
3288 logging_text
3289 + "_deploy_n2vc ee_item juju={}, helm={}".format(
3290 ee_item.get("juju"), ee_item.get("helm-chart")
3291 )
3292 )
3293 ee_descriptor_id = ee_item.get("id")
3294 if ee_item.get("juju"):
3295 vca_name = ee_item["juju"].get("charm")
3296 vca_type = (
3297 "lxc_proxy_charm"
3298 if ee_item["juju"].get("charm") is not None
3299 else "native_charm"
3300 )
3301 if ee_item["juju"].get("cloud") == "k8s":
3302 vca_type = "k8s_proxy_charm"
3303 elif ee_item["juju"].get("proxy") is False:
3304 vca_type = "native_charm"
3305 elif ee_item.get("helm-chart"):
3306 vca_name = ee_item["helm-chart"]
3307 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
3308 vca_type = "helm"
3309 else:
3310 vca_type = "helm-v3"
3311 else:
3312 self.logger.debug(
3313 logging_text + "skipping non juju neither charm configuration"
3314 )
3315 continue
3316
3317 vca_index = -1
3318 for vca_index, vca_deployed in enumerate(
3319 db_nsr["_admin"]["deployed"]["VCA"]
3320 ):
3321 if not vca_deployed:
3322 continue
3323 if (
3324 vca_deployed.get("member-vnf-index") == member_vnf_index
3325 and vca_deployed.get("vdu_id") == vdu_id
3326 and vca_deployed.get("kdu_name") == kdu_name
3327 and vca_deployed.get("vdu_count_index", 0) == vdu_index
3328 and vca_deployed.get("ee_descriptor_id") == ee_descriptor_id
3329 ):
3330 break
3331 else:
3332 # not found, create one.
3333 target = (
3334 "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
3335 )
3336 if vdu_id:
3337 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
3338 elif kdu_name:
3339 target += "/kdu/{}".format(kdu_name)
3340 vca_deployed = {
3341 "target_element": target,
3342 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
3343 "member-vnf-index": member_vnf_index,
3344 "vdu_id": vdu_id,
3345 "kdu_name": kdu_name,
3346 "vdu_count_index": vdu_index,
3347 "operational-status": "init", # TODO revise
3348 "detailed-status": "", # TODO revise
3349 "step": "initial-deploy", # TODO revise
3350 "vnfd_id": vnfd_id,
3351 "vdu_name": vdu_name,
3352 "type": vca_type,
3353 "ee_descriptor_id": ee_descriptor_id,
3354 }
3355 vca_index += 1
3356
3357 # create VCA and configurationStatus in db
3358 db_dict = {
3359 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
3360 "configurationStatus.{}".format(vca_index): dict(),
3361 }
3362 self.update_db_2("nsrs", nsr_id, db_dict)
3363
3364 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
3365
3366 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
3367 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
3368 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
3369
3370 # Launch task
3371 task_n2vc = asyncio.ensure_future(
3372 self.instantiate_N2VC(
3373 logging_text=logging_text,
3374 vca_index=vca_index,
3375 nsi_id=nsi_id,
3376 db_nsr=db_nsr,
3377 db_vnfr=db_vnfr,
3378 vdu_id=vdu_id,
3379 kdu_name=kdu_name,
3380 vdu_index=vdu_index,
3381 deploy_params=deploy_params,
3382 config_descriptor=descriptor_config,
3383 base_folder=base_folder,
3384 nslcmop_id=nslcmop_id,
3385 stage=stage,
3386 vca_type=vca_type,
3387 vca_name=vca_name,
3388 ee_config_descriptor=ee_item,
3389 )
3390 )
3391 self.lcm_tasks.register(
3392 "ns",
3393 nsr_id,
3394 nslcmop_id,
3395 "instantiate_N2VC-{}".format(vca_index),
3396 task_n2vc,
3397 )
3398 task_instantiation_info[
3399 task_n2vc
3400 ] = self.task_name_deploy_vca + " {}.{}".format(
3401 member_vnf_index or "", vdu_id or ""
3402 )
3403
3404 @staticmethod
3405 def _create_nslcmop(nsr_id, operation, params):
3406 """
3407 Creates a ns-lcm-opp content to be stored at database.
3408 :param nsr_id: internal id of the instance
3409 :param operation: instantiate, terminate, scale, action, ...
3410 :param params: user parameters for the operation
3411 :return: dictionary following SOL005 format
3412 """
3413 # Raise exception if invalid arguments
3414 if not (nsr_id and operation and params):
3415 raise LcmException(
3416 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided"
3417 )
3418 now = time()
3419 _id = str(uuid4())
3420 nslcmop = {
3421 "id": _id,
3422 "_id": _id,
3423 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3424 "operationState": "PROCESSING",
3425 "statusEnteredTime": now,
3426 "nsInstanceId": nsr_id,
3427 "lcmOperationType": operation,
3428 "startTime": now,
3429 "isAutomaticInvocation": False,
3430 "operationParams": params,
3431 "isCancelPending": False,
3432 "links": {
3433 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3434 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3435 },
3436 }
3437 return nslcmop
3438
3439 def _format_additional_params(self, params):
3440 params = params or {}
3441 for key, value in params.items():
3442 if str(value).startswith("!!yaml "):
3443 params[key] = yaml.safe_load(value[7:])
3444 return params
3445
3446 def _get_terminate_primitive_params(self, seq, vnf_index):
3447 primitive = seq.get("name")
3448 primitive_params = {}
3449 params = {
3450 "member_vnf_index": vnf_index,
3451 "primitive": primitive,
3452 "primitive_params": primitive_params,
3453 }
3454 desc_params = {}
3455 return self._map_primitive_params(seq, params, desc_params)
3456
3457 # sub-operations
3458
3459 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3460 op = deep_get(db_nslcmop, ("_admin", "operations"), [])[op_index]
3461 if op.get("operationState") == "COMPLETED":
3462 # b. Skip sub-operation
3463 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3464 return self.SUBOPERATION_STATUS_SKIP
3465 else:
3466 # c. retry executing sub-operation
3467 # The sub-operation exists, and operationState != 'COMPLETED'
3468 # Update operationState = 'PROCESSING' to indicate a retry.
3469 operationState = "PROCESSING"
3470 detailed_status = "In progress"
3471 self._update_suboperation_status(
3472 db_nslcmop, op_index, operationState, detailed_status
3473 )
3474 # Return the sub-operation index
3475 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3476 # with arguments extracted from the sub-operation
3477 return op_index
3478
3479 # Find a sub-operation where all keys in a matching dictionary must match
3480 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3481 def _find_suboperation(self, db_nslcmop, match):
3482 if db_nslcmop and match:
3483 op_list = db_nslcmop.get("_admin", {}).get("operations", [])
3484 for i, op in enumerate(op_list):
3485 if all(op.get(k) == match[k] for k in match):
3486 return i
3487 return self.SUBOPERATION_STATUS_NOT_FOUND
3488
3489 # Update status for a sub-operation given its index
3490 def _update_suboperation_status(
3491 self, db_nslcmop, op_index, operationState, detailed_status
3492 ):
3493 # Update DB for HA tasks
3494 q_filter = {"_id": db_nslcmop["_id"]}
3495 update_dict = {
3496 "_admin.operations.{}.operationState".format(op_index): operationState,
3497 "_admin.operations.{}.detailed-status".format(op_index): detailed_status,
3498 }
3499 self.db.set_one(
3500 "nslcmops", q_filter=q_filter, update_dict=update_dict, fail_on_empty=False
3501 )
3502
3503 # Add sub-operation, return the index of the added sub-operation
3504 # Optionally, set operationState, detailed-status, and operationType
3505 # Status and type are currently set for 'scale' sub-operations:
3506 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3507 # 'detailed-status' : status message
3508 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3509 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3510 def _add_suboperation(
3511 self,
3512 db_nslcmop,
3513 vnf_index,
3514 vdu_id,
3515 vdu_count_index,
3516 vdu_name,
3517 primitive,
3518 mapped_primitive_params,
3519 operationState=None,
3520 detailed_status=None,
3521 operationType=None,
3522 RO_nsr_id=None,
3523 RO_scaling_info=None,
3524 ):
3525 if not db_nslcmop:
3526 return self.SUBOPERATION_STATUS_NOT_FOUND
3527 # Get the "_admin.operations" list, if it exists
3528 db_nslcmop_admin = db_nslcmop.get("_admin", {})
3529 op_list = db_nslcmop_admin.get("operations")
3530 # Create or append to the "_admin.operations" list
3531 new_op = {
3532 "member_vnf_index": vnf_index,
3533 "vdu_id": vdu_id,
3534 "vdu_count_index": vdu_count_index,
3535 "primitive": primitive,
3536 "primitive_params": mapped_primitive_params,
3537 }
3538 if operationState:
3539 new_op["operationState"] = operationState
3540 if detailed_status:
3541 new_op["detailed-status"] = detailed_status
3542 if operationType:
3543 new_op["lcmOperationType"] = operationType
3544 if RO_nsr_id:
3545 new_op["RO_nsr_id"] = RO_nsr_id
3546 if RO_scaling_info:
3547 new_op["RO_scaling_info"] = RO_scaling_info
3548 if not op_list:
3549 # No existing operations, create key 'operations' with current operation as first list element
3550 db_nslcmop_admin.update({"operations": [new_op]})
3551 op_list = db_nslcmop_admin.get("operations")
3552 else:
3553 # Existing operations, append operation to list
3554 op_list.append(new_op)
3555
3556 db_nslcmop_update = {"_admin.operations": op_list}
3557 self.update_db_2("nslcmops", db_nslcmop["_id"], db_nslcmop_update)
3558 op_index = len(op_list) - 1
3559 return op_index
3560
3561 # Helper methods for scale() sub-operations
3562
3563 # pre-scale/post-scale:
3564 # Check for 3 different cases:
3565 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3566 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3567 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3568 def _check_or_add_scale_suboperation(
3569 self,
3570 db_nslcmop,
3571 vnf_index,
3572 vnf_config_primitive,
3573 primitive_params,
3574 operationType,
3575 RO_nsr_id=None,
3576 RO_scaling_info=None,
3577 ):
3578 # Find this sub-operation
3579 if RO_nsr_id and RO_scaling_info:
3580 operationType = "SCALE-RO"
3581 match = {
3582 "member_vnf_index": vnf_index,
3583 "RO_nsr_id": RO_nsr_id,
3584 "RO_scaling_info": RO_scaling_info,
3585 }
3586 else:
3587 match = {
3588 "member_vnf_index": vnf_index,
3589 "primitive": vnf_config_primitive,
3590 "primitive_params": primitive_params,
3591 "lcmOperationType": operationType,
3592 }
3593 op_index = self._find_suboperation(db_nslcmop, match)
3594 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3595 # a. New sub-operation
3596 # The sub-operation does not exist, add it.
3597 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3598 # The following parameters are set to None for all kind of scaling:
3599 vdu_id = None
3600 vdu_count_index = None
3601 vdu_name = None
3602 if RO_nsr_id and RO_scaling_info:
3603 vnf_config_primitive = None
3604 primitive_params = None
3605 else:
3606 RO_nsr_id = None
3607 RO_scaling_info = None
3608 # Initial status for sub-operation
3609 operationState = "PROCESSING"
3610 detailed_status = "In progress"
3611 # Add sub-operation for pre/post-scaling (zero or more operations)
3612 self._add_suboperation(
3613 db_nslcmop,
3614 vnf_index,
3615 vdu_id,
3616 vdu_count_index,
3617 vdu_name,
3618 vnf_config_primitive,
3619 primitive_params,
3620 operationState,
3621 detailed_status,
3622 operationType,
3623 RO_nsr_id,
3624 RO_scaling_info,
3625 )
3626 return self.SUBOPERATION_STATUS_NEW
3627 else:
3628 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3629 # or op_index (operationState != 'COMPLETED')
3630 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3631
3632 # Function to return execution_environment id
3633
3634 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3635 # TODO vdu_index_count
3636 for vca in vca_deployed_list:
3637 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3638 return vca["ee_id"]
3639
3640 async def destroy_N2VC(
3641 self,
3642 logging_text,
3643 db_nslcmop,
3644 vca_deployed,
3645 config_descriptor,
3646 vca_index,
3647 destroy_ee=True,
3648 exec_primitives=True,
3649 scaling_in=False,
3650 vca_id: str = None,
3651 ):
3652 """
3653 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3654 :param logging_text:
3655 :param db_nslcmop:
3656 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3657 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3658 :param vca_index: index in the database _admin.deployed.VCA
3659 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3660 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3661 not executed properly
3662 :param scaling_in: True destroys the application, False destroys the model
3663 :return: None or exception
3664 """
3665
3666 self.logger.debug(
3667 logging_text
3668 + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3669 vca_index, vca_deployed, config_descriptor, destroy_ee
3670 )
3671 )
3672
3673 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3674
3675 # execute terminate_primitives
3676 if exec_primitives:
3677 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
3678 config_descriptor.get("terminate-config-primitive"),
3679 vca_deployed.get("ee_descriptor_id"),
3680 )
3681 vdu_id = vca_deployed.get("vdu_id")
3682 vdu_count_index = vca_deployed.get("vdu_count_index")
3683 vdu_name = vca_deployed.get("vdu_name")
3684 vnf_index = vca_deployed.get("member-vnf-index")
3685 if terminate_primitives and vca_deployed.get("needed_terminate"):
3686 for seq in terminate_primitives:
3687 # For each sequence in list, get primitive and call _ns_execute_primitive()
3688 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3689 vnf_index, seq.get("name")
3690 )
3691 self.logger.debug(logging_text + step)
3692 # Create the primitive for each sequence, i.e. "primitive": "touch"
3693 primitive = seq.get("name")
3694 mapped_primitive_params = self._get_terminate_primitive_params(
3695 seq, vnf_index
3696 )
3697
3698 # Add sub-operation
3699 self._add_suboperation(
3700 db_nslcmop,
3701 vnf_index,
3702 vdu_id,
3703 vdu_count_index,
3704 vdu_name,
3705 primitive,
3706 mapped_primitive_params,
3707 )
3708 # Sub-operations: Call _ns_execute_primitive() instead of action()
3709 try:
3710 result, result_detail = await self._ns_execute_primitive(
3711 vca_deployed["ee_id"],
3712 primitive,
3713 mapped_primitive_params,
3714 vca_type=vca_type,
3715 vca_id=vca_id,
3716 )
3717 except LcmException:
3718 # this happens when VCA is not deployed. In this case it is not needed to terminate
3719 continue
3720 result_ok = ["COMPLETED", "PARTIALLY_COMPLETED"]
3721 if result not in result_ok:
3722 raise LcmException(
3723 "terminate_primitive {} for vnf_member_index={} fails with "
3724 "error {}".format(seq.get("name"), vnf_index, result_detail)
3725 )
3726 # set that this VCA do not need terminated
3727 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(
3728 vca_index
3729 )
3730 self.update_db_2(
3731 "nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False}
3732 )
3733
3734 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3735 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3736
3737 if destroy_ee:
3738 await self.vca_map[vca_type].delete_execution_environment(
3739 vca_deployed["ee_id"],
3740 scaling_in=scaling_in,
3741 vca_id=vca_id,
3742 )
3743
3744 async def _delete_all_N2VC(self, db_nsr: dict, vca_id: str = None):
3745 self._write_all_config_status(db_nsr=db_nsr, status="TERMINATING")
3746 namespace = "." + db_nsr["_id"]
3747 try:
3748 await self.n2vc.delete_namespace(
3749 namespace=namespace,
3750 total_timeout=self.timeout_charm_delete,
3751 vca_id=vca_id,
3752 )
3753 except N2VCNotFound: # already deleted. Skip
3754 pass
3755 self._write_all_config_status(db_nsr=db_nsr, status="DELETED")
3756
3757 async def _terminate_RO(
3758 self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
3759 ):
3760 """
3761 Terminates a deployment from RO
3762 :param logging_text:
3763 :param nsr_deployed: db_nsr._admin.deployed
3764 :param nsr_id:
3765 :param nslcmop_id:
3766 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3767 this method will update only the index 2, but it will write on database the concatenated content of the list
3768 :return:
3769 """
3770 db_nsr_update = {}
3771 failed_detail = []
3772 ro_nsr_id = ro_delete_action = None
3773 if nsr_deployed and nsr_deployed.get("RO"):
3774 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3775 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3776 try:
3777 if ro_nsr_id:
3778 stage[2] = "Deleting ns from VIM."
3779 db_nsr_update["detailed-status"] = " ".join(stage)
3780 self._write_op_status(nslcmop_id, stage)
3781 self.logger.debug(logging_text + stage[2])
3782 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3783 self._write_op_status(nslcmop_id, stage)
3784 desc = await self.RO.delete("ns", ro_nsr_id)
3785 ro_delete_action = desc["action_id"]
3786 db_nsr_update[
3787 "_admin.deployed.RO.nsr_delete_action_id"
3788 ] = ro_delete_action
3789 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3790 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3791 if ro_delete_action:
3792 # wait until NS is deleted from VIM
3793 stage[2] = "Waiting ns deleted from VIM."
3794 detailed_status_old = None
3795 self.logger.debug(
3796 logging_text
3797 + stage[2]
3798 + " RO_id={} ro_delete_action={}".format(
3799 ro_nsr_id, ro_delete_action
3800 )
3801 )
3802 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3803 self._write_op_status(nslcmop_id, stage)
3804
3805 delete_timeout = 20 * 60 # 20 minutes
3806 while delete_timeout > 0:
3807 desc = await self.RO.show(
3808 "ns",
3809 item_id_name=ro_nsr_id,
3810 extra_item="action",
3811 extra_item_id=ro_delete_action,
3812 )
3813
3814 # deploymentStatus
3815 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3816
3817 ns_status, ns_status_info = self.RO.check_action_status(desc)
3818 if ns_status == "ERROR":
3819 raise ROclient.ROClientException(ns_status_info)
3820 elif ns_status == "BUILD":
3821 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3822 elif ns_status == "ACTIVE":
3823 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3824 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3825 break
3826 else:
3827 assert (
3828 False
3829 ), "ROclient.check_action_status returns unknown {}".format(
3830 ns_status
3831 )
3832 if stage[2] != detailed_status_old:
3833 detailed_status_old = stage[2]
3834 db_nsr_update["detailed-status"] = " ".join(stage)
3835 self._write_op_status(nslcmop_id, stage)
3836 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3837 await asyncio.sleep(5, loop=self.loop)
3838 delete_timeout -= 5
3839 else: # delete_timeout <= 0:
3840 raise ROclient.ROClientException(
3841 "Timeout waiting ns deleted from VIM"
3842 )
3843
3844 except Exception as e:
3845 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3846 if (
3847 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3848 ): # not found
3849 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3850 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3851 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3852 self.logger.debug(
3853 logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id)
3854 )
3855 elif (
3856 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3857 ): # conflict
3858 failed_detail.append("delete conflict: {}".format(e))
3859 self.logger.debug(
3860 logging_text
3861 + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e)
3862 )
3863 else:
3864 failed_detail.append("delete error: {}".format(e))
3865 self.logger.error(
3866 logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e)
3867 )
3868
3869 # Delete nsd
3870 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3871 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3872 try:
3873 stage[2] = "Deleting nsd from RO."
3874 db_nsr_update["detailed-status"] = " ".join(stage)
3875 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3876 self._write_op_status(nslcmop_id, stage)
3877 await self.RO.delete("nsd", ro_nsd_id)
3878 self.logger.debug(
3879 logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id)
3880 )
3881 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3882 except Exception as e:
3883 if (
3884 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3885 ): # not found
3886 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3887 self.logger.debug(
3888 logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id)
3889 )
3890 elif (
3891 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3892 ): # conflict
3893 failed_detail.append(
3894 "ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e)
3895 )
3896 self.logger.debug(logging_text + failed_detail[-1])
3897 else:
3898 failed_detail.append(
3899 "ro_nsd_id={} delete error: {}".format(ro_nsd_id, e)
3900 )
3901 self.logger.error(logging_text + failed_detail[-1])
3902
3903 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3904 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3905 if not vnf_deployed or not vnf_deployed["id"]:
3906 continue
3907 try:
3908 ro_vnfd_id = vnf_deployed["id"]
3909 stage[
3910 2
3911 ] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3912 vnf_deployed["member-vnf-index"], ro_vnfd_id
3913 )
3914 db_nsr_update["detailed-status"] = " ".join(stage)
3915 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3916 self._write_op_status(nslcmop_id, stage)
3917 await self.RO.delete("vnfd", ro_vnfd_id)
3918 self.logger.debug(
3919 logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id)
3920 )
3921 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3922 except Exception as e:
3923 if (
3924 isinstance(e, ROclient.ROClientException) and e.http_code == 404
3925 ): # not found
3926 db_nsr_update[
3927 "_admin.deployed.RO.vnfd.{}.id".format(index)
3928 ] = None
3929 self.logger.debug(
3930 logging_text
3931 + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id)
3932 )
3933 elif (
3934 isinstance(e, ROclient.ROClientException) and e.http_code == 409
3935 ): # conflict
3936 failed_detail.append(
3937 "ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e)
3938 )
3939 self.logger.debug(logging_text + failed_detail[-1])
3940 else:
3941 failed_detail.append(
3942 "ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e)
3943 )
3944 self.logger.error(logging_text + failed_detail[-1])
3945
3946 if failed_detail:
3947 stage[2] = "Error deleting from VIM"
3948 else:
3949 stage[2] = "Deleted from VIM"
3950 db_nsr_update["detailed-status"] = " ".join(stage)
3951 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3952 self._write_op_status(nslcmop_id, stage)
3953
3954 if failed_detail:
3955 raise LcmException("; ".join(failed_detail))
3956
3957 async def terminate(self, nsr_id, nslcmop_id):
3958 # Try to lock HA task here
3959 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
3960 if not task_is_locked_by_me:
3961 return
3962
3963 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3964 self.logger.debug(logging_text + "Enter")
3965 timeout_ns_terminate = self.timeout_ns_terminate
3966 db_nsr = None
3967 db_nslcmop = None
3968 operation_params = None
3969 exc = None
3970 error_list = [] # annotates all failed error messages
3971 db_nslcmop_update = {}
3972 autoremove = False # autoremove after terminated
3973 tasks_dict_info = {}
3974 db_nsr_update = {}
3975 stage = [
3976 "Stage 1/3: Preparing task.",
3977 "Waiting for previous operations to terminate.",
3978 "",
3979 ]
3980 # ^ contains [stage, step, VIM-status]
3981 try:
3982 # wait for any previous tasks in process
3983 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
3984
3985 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3986 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3987 operation_params = db_nslcmop.get("operationParams") or {}
3988 if operation_params.get("timeout_ns_terminate"):
3989 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3990 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3991 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3992
3993 db_nsr_update["operational-status"] = "terminating"
3994 db_nsr_update["config-status"] = "terminating"
3995 self._write_ns_status(
3996 nsr_id=nsr_id,
3997 ns_state="TERMINATING",
3998 current_operation="TERMINATING",
3999 current_operation_id=nslcmop_id,
4000 other_update=db_nsr_update,
4001 )
4002 self._write_op_status(op_id=nslcmop_id, queuePosition=0, stage=stage)
4003 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
4004 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
4005 return
4006
4007 stage[1] = "Getting vnf descriptors from db."
4008 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
4009 db_vnfrs_dict = {
4010 db_vnfr["member-vnf-index-ref"]: db_vnfr for db_vnfr in db_vnfrs_list
4011 }
4012 db_vnfds_from_id = {}
4013 db_vnfds_from_member_index = {}
4014 # Loop over VNFRs
4015 for vnfr in db_vnfrs_list:
4016 vnfd_id = vnfr["vnfd-id"]
4017 if vnfd_id not in db_vnfds_from_id:
4018 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4019 db_vnfds_from_id[vnfd_id] = vnfd
4020 db_vnfds_from_member_index[
4021 vnfr["member-vnf-index-ref"]
4022 ] = db_vnfds_from_id[vnfd_id]
4023
4024 # Destroy individual execution environments when there are terminating primitives.
4025 # Rest of EE will be deleted at once
4026 # TODO - check before calling _destroy_N2VC
4027 # if not operation_params.get("skip_terminate_primitives"):#
4028 # or not vca.get("needed_terminate"):
4029 stage[0] = "Stage 2/3 execute terminating primitives."
4030 self.logger.debug(logging_text + stage[0])
4031 stage[1] = "Looking execution environment that needs terminate."
4032 self.logger.debug(logging_text + stage[1])
4033
4034 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
4035 config_descriptor = None
4036
4037 vca_id = self.get_vca_id(db_vnfrs_dict[vca["member-vnf-index"]], db_nsr)
4038 if not vca or not vca.get("ee_id"):
4039 continue
4040 if not vca.get("member-vnf-index"):
4041 # ns
4042 config_descriptor = db_nsr.get("ns-configuration")
4043 elif vca.get("vdu_id"):
4044 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4045 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4046 elif vca.get("kdu_name"):
4047 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4048 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4049 else:
4050 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
4051 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4052 vca_type = vca.get("type")
4053 exec_terminate_primitives = not operation_params.get(
4054 "skip_terminate_primitives"
4055 ) and vca.get("needed_terminate")
4056 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
4057 # pending native charms
4058 destroy_ee = (
4059 True if vca_type in ("helm", "helm-v3", "native_charm") else False
4060 )
4061 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
4062 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
4063 task = asyncio.ensure_future(
4064 self.destroy_N2VC(
4065 logging_text,
4066 db_nslcmop,
4067 vca,
4068 config_descriptor,
4069 vca_index,
4070 destroy_ee,
4071 exec_terminate_primitives,
4072 vca_id=vca_id,
4073 )
4074 )
4075 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4076
4077 # wait for pending tasks of terminate primitives
4078 if tasks_dict_info:
4079 self.logger.debug(
4080 logging_text
4081 + "Waiting for tasks {}".format(list(tasks_dict_info.keys()))
4082 )
4083 error_list = await self._wait_for_tasks(
4084 logging_text,
4085 tasks_dict_info,
4086 min(self.timeout_charm_delete, timeout_ns_terminate),
4087 stage,
4088 nslcmop_id,
4089 )
4090 tasks_dict_info.clear()
4091 if error_list:
4092 return # raise LcmException("; ".join(error_list))
4093
4094 # remove All execution environments at once
4095 stage[0] = "Stage 3/3 delete all."
4096
4097 if nsr_deployed.get("VCA"):
4098 stage[1] = "Deleting all execution environments."
4099 self.logger.debug(logging_text + stage[1])
4100 vca_id = self.get_vca_id({}, db_nsr)
4101 task_delete_ee = asyncio.ensure_future(
4102 asyncio.wait_for(
4103 self._delete_all_N2VC(db_nsr=db_nsr, vca_id=vca_id),
4104 timeout=self.timeout_charm_delete,
4105 )
4106 )
4107 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
4108 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
4109
4110 # Delete from k8scluster
4111 stage[1] = "Deleting KDUs."
4112 self.logger.debug(logging_text + stage[1])
4113 # print(nsr_deployed)
4114 for kdu in get_iterable(nsr_deployed, "K8s"):
4115 if not kdu or not kdu.get("kdu-instance"):
4116 continue
4117 kdu_instance = kdu.get("kdu-instance")
4118 if kdu.get("k8scluster-type") in self.k8scluster_map:
4119 # TODO: Uninstall kdu instances taking into account they could be deployed in different VIMs
4120 vca_id = self.get_vca_id({}, db_nsr)
4121 task_delete_kdu_instance = asyncio.ensure_future(
4122 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
4123 cluster_uuid=kdu.get("k8scluster-uuid"),
4124 kdu_instance=kdu_instance,
4125 vca_id=vca_id,
4126 )
4127 )
4128 else:
4129 self.logger.error(
4130 logging_text
4131 + "Unknown k8s deployment type {}".format(
4132 kdu.get("k8scluster-type")
4133 )
4134 )
4135 continue
4136 tasks_dict_info[
4137 task_delete_kdu_instance
4138 ] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
4139
4140 # remove from RO
4141 stage[1] = "Deleting ns from VIM."
4142 if self.ng_ro:
4143 task_delete_ro = asyncio.ensure_future(
4144 self._terminate_ng_ro(
4145 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4146 )
4147 )
4148 else:
4149 task_delete_ro = asyncio.ensure_future(
4150 self._terminate_RO(
4151 logging_text, nsr_deployed, nsr_id, nslcmop_id, stage
4152 )
4153 )
4154 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
4155
4156 # rest of staff will be done at finally
4157
4158 except (
4159 ROclient.ROClientException,
4160 DbException,
4161 LcmException,
4162 N2VCException,
4163 ) as e:
4164 self.logger.error(logging_text + "Exit Exception {}".format(e))
4165 exc = e
4166 except asyncio.CancelledError:
4167 self.logger.error(
4168 logging_text + "Cancelled Exception while '{}'".format(stage[1])
4169 )
4170 exc = "Operation was cancelled"
4171 except Exception as e:
4172 exc = traceback.format_exc()
4173 self.logger.critical(
4174 logging_text + "Exit Exception while '{}': {}".format(stage[1], e),
4175 exc_info=True,
4176 )
4177 finally:
4178 if exc:
4179 error_list.append(str(exc))
4180 try:
4181 # wait for pending tasks
4182 if tasks_dict_info:
4183 stage[1] = "Waiting for terminate pending tasks."
4184 self.logger.debug(logging_text + stage[1])
4185 error_list += await self._wait_for_tasks(
4186 logging_text,
4187 tasks_dict_info,
4188 timeout_ns_terminate,
4189 stage,
4190 nslcmop_id,
4191 )
4192 stage[1] = stage[2] = ""
4193 except asyncio.CancelledError:
4194 error_list.append("Cancelled")
4195 # TODO cancell all tasks
4196 except Exception as exc:
4197 error_list.append(str(exc))
4198 # update status at database
4199 if error_list:
4200 error_detail = "; ".join(error_list)
4201 # self.logger.error(logging_text + error_detail)
4202 error_description_nslcmop = "{} Detail: {}".format(
4203 stage[0], error_detail
4204 )
4205 error_description_nsr = "Operation: TERMINATING.{}, {}.".format(
4206 nslcmop_id, stage[0]
4207 )
4208
4209 db_nsr_update["operational-status"] = "failed"
4210 db_nsr_update["detailed-status"] = (
4211 error_description_nsr + " Detail: " + error_detail
4212 )
4213 db_nslcmop_update["detailed-status"] = error_detail
4214 nslcmop_operation_state = "FAILED"
4215 ns_state = "BROKEN"
4216 else:
4217 error_detail = None
4218 error_description_nsr = error_description_nslcmop = None
4219 ns_state = "NOT_INSTANTIATED"
4220 db_nsr_update["operational-status"] = "terminated"
4221 db_nsr_update["detailed-status"] = "Done"
4222 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
4223 db_nslcmop_update["detailed-status"] = "Done"
4224 nslcmop_operation_state = "COMPLETED"
4225
4226 if db_nsr:
4227 self._write_ns_status(
4228 nsr_id=nsr_id,
4229 ns_state=ns_state,
4230 current_operation="IDLE",
4231 current_operation_id=None,
4232 error_description=error_description_nsr,
4233 error_detail=error_detail,
4234 other_update=db_nsr_update,
4235 )
4236 self._write_op_status(
4237 op_id=nslcmop_id,
4238 stage="",
4239 error_message=error_description_nslcmop,
4240 operation_state=nslcmop_operation_state,
4241 other_update=db_nslcmop_update,
4242 )
4243 if ns_state == "NOT_INSTANTIATED":
4244 try:
4245 self.db.set_list(
4246 "vnfrs",
4247 {"nsr-id-ref": nsr_id},
4248 {"_admin.nsState": "NOT_INSTANTIATED"},
4249 )
4250 except DbException as e:
4251 self.logger.warn(
4252 logging_text
4253 + "Error writing VNFR status for nsr-id-ref: {} -> {}".format(
4254 nsr_id, e
4255 )
4256 )
4257 if operation_params:
4258 autoremove = operation_params.get("autoremove", False)
4259 if nslcmop_operation_state:
4260 try:
4261 await self.msg.aiowrite(
4262 "ns",
4263 "terminated",
4264 {
4265 "nsr_id": nsr_id,
4266 "nslcmop_id": nslcmop_id,
4267 "operationState": nslcmop_operation_state,
4268 "autoremove": autoremove,
4269 },
4270 loop=self.loop,
4271 )
4272 except Exception as e:
4273 self.logger.error(
4274 logging_text + "kafka_write notification Exception {}".format(e)
4275 )
4276
4277 self.logger.debug(logging_text + "Exit")
4278 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
4279
4280 async def _wait_for_tasks(
4281 self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None
4282 ):
4283 time_start = time()
4284 error_detail_list = []
4285 error_list = []
4286 pending_tasks = list(created_tasks_info.keys())
4287 num_tasks = len(pending_tasks)
4288 num_done = 0
4289 stage[1] = "{}/{}.".format(num_done, num_tasks)
4290 self._write_op_status(nslcmop_id, stage)
4291 while pending_tasks:
4292 new_error = None
4293 _timeout = timeout + time_start - time()
4294 done, pending_tasks = await asyncio.wait(
4295 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
4296 )
4297 num_done += len(done)
4298 if not done: # Timeout
4299 for task in pending_tasks:
4300 new_error = created_tasks_info[task] + ": Timeout"
4301 error_detail_list.append(new_error)
4302 error_list.append(new_error)
4303 break
4304 for task in done:
4305 if task.cancelled():
4306 exc = "Cancelled"
4307 else:
4308 exc = task.exception()
4309 if exc:
4310 if isinstance(exc, asyncio.TimeoutError):
4311 exc = "Timeout"
4312 new_error = created_tasks_info[task] + ": {}".format(exc)
4313 error_list.append(created_tasks_info[task])
4314 error_detail_list.append(new_error)
4315 if isinstance(
4316 exc,
4317 (
4318 str,
4319 DbException,
4320 N2VCException,
4321 ROclient.ROClientException,
4322 LcmException,
4323 K8sException,
4324 NgRoException,
4325 ),
4326 ):
4327 self.logger.error(logging_text + new_error)
4328 else:
4329 exc_traceback = "".join(
4330 traceback.format_exception(None, exc, exc.__traceback__)
4331 )
4332 self.logger.error(
4333 logging_text
4334 + created_tasks_info[task]
4335 + " "
4336 + exc_traceback
4337 )
4338 else:
4339 self.logger.debug(
4340 logging_text + created_tasks_info[task] + ": Done"
4341 )
4342 stage[1] = "{}/{}.".format(num_done, num_tasks)
4343 if new_error:
4344 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
4345 if nsr_id: # update also nsr
4346 self.update_db_2(
4347 "nsrs",
4348 nsr_id,
4349 {
4350 "errorDescription": "Error at: " + ", ".join(error_list),
4351 "errorDetail": ". ".join(error_detail_list),
4352 },
4353 )
4354 self._write_op_status(nslcmop_id, stage)
4355 return error_detail_list
4356
4357 @staticmethod
4358 def _map_primitive_params(primitive_desc, params, instantiation_params):
4359 """
4360 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
4361 The default-value is used. If it is between < > it look for a value at instantiation_params
4362 :param primitive_desc: portion of VNFD/NSD that describes primitive
4363 :param params: Params provided by user
4364 :param instantiation_params: Instantiation params provided by user
4365 :return: a dictionary with the calculated params
4366 """
4367 calculated_params = {}
4368 for parameter in primitive_desc.get("parameter", ()):
4369 param_name = parameter["name"]
4370 if param_name in params:
4371 calculated_params[param_name] = params[param_name]
4372 elif "default-value" in parameter or "value" in parameter:
4373 if "value" in parameter:
4374 calculated_params[param_name] = parameter["value"]
4375 else:
4376 calculated_params[param_name] = parameter["default-value"]
4377 if (
4378 isinstance(calculated_params[param_name], str)
4379 and calculated_params[param_name].startswith("<")
4380 and calculated_params[param_name].endswith(">")
4381 ):
4382 if calculated_params[param_name][1:-1] in instantiation_params:
4383 calculated_params[param_name] = instantiation_params[
4384 calculated_params[param_name][1:-1]
4385 ]
4386 else:
4387 raise LcmException(
4388 "Parameter {} needed to execute primitive {} not provided".format(
4389 calculated_params[param_name], primitive_desc["name"]
4390 )
4391 )
4392 else:
4393 raise LcmException(
4394 "Parameter {} needed to execute primitive {} not provided".format(
4395 param_name, primitive_desc["name"]
4396 )
4397 )
4398
4399 if isinstance(calculated_params[param_name], (dict, list, tuple)):
4400 calculated_params[param_name] = yaml.safe_dump(
4401 calculated_params[param_name], default_flow_style=True, width=256
4402 )
4403 elif isinstance(calculated_params[param_name], str) and calculated_params[
4404 param_name
4405 ].startswith("!!yaml "):
4406 calculated_params[param_name] = calculated_params[param_name][7:]
4407 if parameter.get("data-type") == "INTEGER":
4408 try:
4409 calculated_params[param_name] = int(calculated_params[param_name])
4410 except ValueError: # error converting string to int
4411 raise LcmException(
4412 "Parameter {} of primitive {} must be integer".format(
4413 param_name, primitive_desc["name"]
4414 )
4415 )
4416 elif parameter.get("data-type") == "BOOLEAN":
4417 calculated_params[param_name] = not (
4418 (str(calculated_params[param_name])).lower() == "false"
4419 )
4420
4421 # add always ns_config_info if primitive name is config
4422 if primitive_desc["name"] == "config":
4423 if "ns_config_info" in instantiation_params:
4424 calculated_params["ns_config_info"] = instantiation_params[
4425 "ns_config_info"
4426 ]
4427 return calculated_params
4428
4429 def _look_for_deployed_vca(
4430 self,
4431 deployed_vca,
4432 member_vnf_index,
4433 vdu_id,
4434 vdu_count_index,
4435 kdu_name=None,
4436 ee_descriptor_id=None,
4437 ):
4438 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
4439 for vca in deployed_vca:
4440 if not vca:
4441 continue
4442 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
4443 continue
4444 if (
4445 vdu_count_index is not None
4446 and vdu_count_index != vca["vdu_count_index"]
4447 ):
4448 continue
4449 if kdu_name and kdu_name != vca["kdu_name"]:
4450 continue
4451 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
4452 continue
4453 break
4454 else:
4455 # vca_deployed not found
4456 raise LcmException(
4457 "charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
4458 " is not deployed".format(
4459 member_vnf_index,
4460 vdu_id,
4461 vdu_count_index,
4462 kdu_name,
4463 ee_descriptor_id,
4464 )
4465 )
4466 # get ee_id
4467 ee_id = vca.get("ee_id")
4468 vca_type = vca.get(
4469 "type", "lxc_proxy_charm"
4470 ) # default value for backward compatibility - proxy charm
4471 if not ee_id:
4472 raise LcmException(
4473 "charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
4474 "execution environment".format(
4475 member_vnf_index, vdu_id, kdu_name, vdu_count_index
4476 )
4477 )
4478 return ee_id, vca_type
4479
4480 async def _ns_execute_primitive(
4481 self,
4482 ee_id,
4483 primitive,
4484 primitive_params,
4485 retries=0,
4486 retries_interval=30,
4487 timeout=None,
4488 vca_type=None,
4489 db_dict=None,
4490 vca_id: str = None,
4491 ) -> (str, str):
4492 try:
4493 if primitive == "config":
4494 primitive_params = {"params": primitive_params}
4495
4496 vca_type = vca_type or "lxc_proxy_charm"
4497
4498 while retries >= 0:
4499 try:
4500 output = await asyncio.wait_for(
4501 self.vca_map[vca_type].exec_primitive(
4502 ee_id=ee_id,
4503 primitive_name=primitive,
4504 params_dict=primitive_params,
4505 progress_timeout=self.timeout_progress_primitive,
4506 total_timeout=self.timeout_primitive,
4507 db_dict=db_dict,
4508 vca_id=vca_id,
4509 ),
4510 timeout=timeout or self.timeout_primitive,
4511 )
4512 # execution was OK
4513 break
4514 except asyncio.CancelledError:
4515 raise
4516 except Exception as e: # asyncio.TimeoutError
4517 if isinstance(e, asyncio.TimeoutError):
4518 e = "Timeout"
4519 retries -= 1
4520 if retries >= 0:
4521 self.logger.debug(
4522 "Error executing action {} on {} -> {}".format(
4523 primitive, ee_id, e
4524 )
4525 )
4526 # wait and retry
4527 await asyncio.sleep(retries_interval, loop=self.loop)
4528 else:
4529 return "FAILED", str(e)
4530
4531 return "COMPLETED", output
4532
4533 except (LcmException, asyncio.CancelledError):
4534 raise
4535 except Exception as e:
4536 return "FAIL", "Error executing action {}: {}".format(primitive, e)
4537
4538 async def vca_status_refresh(self, nsr_id, nslcmop_id):
4539 """
4540 Updating the vca_status with latest juju information in nsrs record
4541 :param: nsr_id: Id of the nsr
4542 :param: nslcmop_id: Id of the nslcmop
4543 :return: None
4544 """
4545
4546 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
4547 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4548 vca_id = self.get_vca_id({}, db_nsr)
4549 if db_nsr["_admin"]["deployed"]["K8s"]:
4550 for k8s_index, k8s in enumerate(db_nsr["_admin"]["deployed"]["K8s"]):
4551 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
4552 await self._on_update_k8s_db(
4553 cluster_uuid, kdu_instance, filter={"_id": nsr_id}, vca_id=vca_id
4554 )
4555 else:
4556 for vca_index, _ in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
4557 table, filter = "nsrs", {"_id": nsr_id}
4558 path = "_admin.deployed.VCA.{}.".format(vca_index)
4559 await self._on_update_n2vc_db(table, filter, path, {})
4560
4561 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
4562 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
4563
4564 async def action(self, nsr_id, nslcmop_id):
4565 # Try to lock HA task here
4566 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4567 if not task_is_locked_by_me:
4568 return
4569
4570 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
4571 self.logger.debug(logging_text + "Enter")
4572 # get all needed from database
4573 db_nsr = None
4574 db_nslcmop = None
4575 db_nsr_update = {}
4576 db_nslcmop_update = {}
4577 nslcmop_operation_state = None
4578 error_description_nslcmop = None
4579 exc = None
4580 try:
4581 # wait for any previous tasks in process
4582 step = "Waiting for previous operations to terminate"
4583 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4584
4585 self._write_ns_status(
4586 nsr_id=nsr_id,
4587 ns_state=None,
4588 current_operation="RUNNING ACTION",
4589 current_operation_id=nslcmop_id,
4590 )
4591
4592 step = "Getting information from database"
4593 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4594 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4595
4596 nsr_deployed = db_nsr["_admin"].get("deployed")
4597 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4598 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4599 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
4600 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4601 primitive = db_nslcmop["operationParams"]["primitive"]
4602 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
4603 timeout_ns_action = db_nslcmop["operationParams"].get(
4604 "timeout_ns_action", self.timeout_primitive
4605 )
4606
4607 if vnf_index:
4608 step = "Getting vnfr from database"
4609 db_vnfr = self.db.get_one(
4610 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4611 )
4612 step = "Getting vnfd from database"
4613 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4614 else:
4615 step = "Getting nsd from database"
4616 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4617
4618 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4619 # for backward compatibility
4620 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4621 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4622 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4623 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4624
4625 # look for primitive
4626 config_primitive_desc = descriptor_configuration = None
4627 if vdu_id:
4628 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
4629 elif kdu_name:
4630 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
4631 elif vnf_index:
4632 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
4633 else:
4634 descriptor_configuration = db_nsd.get("ns-configuration")
4635
4636 if descriptor_configuration and descriptor_configuration.get(
4637 "config-primitive"
4638 ):
4639 for config_primitive in descriptor_configuration["config-primitive"]:
4640 if config_primitive["name"] == primitive:
4641 config_primitive_desc = config_primitive
4642 break
4643
4644 if not config_primitive_desc:
4645 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
4646 raise LcmException(
4647 "Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".format(
4648 primitive
4649 )
4650 )
4651 primitive_name = primitive
4652 ee_descriptor_id = None
4653 else:
4654 primitive_name = config_primitive_desc.get(
4655 "execution-environment-primitive", primitive
4656 )
4657 ee_descriptor_id = config_primitive_desc.get(
4658 "execution-environment-ref"
4659 )
4660
4661 if vnf_index:
4662 if vdu_id:
4663 vdur = next(
4664 (x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None
4665 )
4666 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
4667 elif kdu_name:
4668 kdur = next(
4669 (x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None
4670 )
4671 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
4672 else:
4673 desc_params = parse_yaml_strings(
4674 db_vnfr.get("additionalParamsForVnf")
4675 )
4676 else:
4677 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
4678 if kdu_name and get_configuration(db_vnfd, kdu_name):
4679 kdu_configuration = get_configuration(db_vnfd, kdu_name)
4680 actions = set()
4681 for primitive in kdu_configuration.get("initial-config-primitive", []):
4682 actions.add(primitive["name"])
4683 for primitive in kdu_configuration.get("config-primitive", []):
4684 actions.add(primitive["name"])
4685 kdu_action = True if primitive_name in actions else False
4686
4687 # TODO check if ns is in a proper status
4688 if kdu_name and (
4689 primitive_name in ("upgrade", "rollback", "status") or kdu_action
4690 ):
4691 # kdur and desc_params already set from before
4692 if primitive_params:
4693 desc_params.update(primitive_params)
4694 # TODO Check if we will need something at vnf level
4695 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
4696 if (
4697 kdu_name == kdu["kdu-name"]
4698 and kdu["member-vnf-index"] == vnf_index
4699 ):
4700 break
4701 else:
4702 raise LcmException(
4703 "KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index)
4704 )
4705
4706 if kdu.get("k8scluster-type") not in self.k8scluster_map:
4707 msg = "unknown k8scluster-type '{}'".format(
4708 kdu.get("k8scluster-type")
4709 )
4710 raise LcmException(msg)
4711
4712 db_dict = {
4713 "collection": "nsrs",
4714 "filter": {"_id": nsr_id},
4715 "path": "_admin.deployed.K8s.{}".format(index),
4716 }
4717 self.logger.debug(
4718 logging_text
4719 + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name)
4720 )
4721 step = "Executing kdu {}".format(primitive_name)
4722 if primitive_name == "upgrade":
4723 if desc_params.get("kdu_model"):
4724 kdu_model = desc_params.get("kdu_model")
4725 del desc_params["kdu_model"]
4726 else:
4727 kdu_model = kdu.get("kdu-model")
4728 parts = kdu_model.split(sep=":")
4729 if len(parts) == 2:
4730 kdu_model = parts[0]
4731
4732 detailed_status = await asyncio.wait_for(
4733 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
4734 cluster_uuid=kdu.get("k8scluster-uuid"),
4735 kdu_instance=kdu.get("kdu-instance"),
4736 atomic=True,
4737 kdu_model=kdu_model,
4738 params=desc_params,
4739 db_dict=db_dict,
4740 timeout=timeout_ns_action,
4741 ),
4742 timeout=timeout_ns_action + 10,
4743 )
4744 self.logger.debug(
4745 logging_text + " Upgrade of kdu {} done".format(detailed_status)
4746 )
4747 elif primitive_name == "rollback":
4748 detailed_status = await asyncio.wait_for(
4749 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
4750 cluster_uuid=kdu.get("k8scluster-uuid"),
4751 kdu_instance=kdu.get("kdu-instance"),
4752 db_dict=db_dict,
4753 ),
4754 timeout=timeout_ns_action,
4755 )
4756 elif primitive_name == "status":
4757 detailed_status = await asyncio.wait_for(
4758 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
4759 cluster_uuid=kdu.get("k8scluster-uuid"),
4760 kdu_instance=kdu.get("kdu-instance"),
4761 vca_id=vca_id,
4762 ),
4763 timeout=timeout_ns_action,
4764 )
4765 else:
4766 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(
4767 kdu["kdu-name"], nsr_id
4768 )
4769 params = self._map_primitive_params(
4770 config_primitive_desc, primitive_params, desc_params
4771 )
4772
4773 detailed_status = await asyncio.wait_for(
4774 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
4775 cluster_uuid=kdu.get("k8scluster-uuid"),
4776 kdu_instance=kdu_instance,
4777 primitive_name=primitive_name,
4778 params=params,
4779 db_dict=db_dict,
4780 timeout=timeout_ns_action,
4781 vca_id=vca_id,
4782 ),
4783 timeout=timeout_ns_action,
4784 )
4785
4786 if detailed_status:
4787 nslcmop_operation_state = "COMPLETED"
4788 else:
4789 detailed_status = ""
4790 nslcmop_operation_state = "FAILED"
4791 else:
4792 ee_id, vca_type = self._look_for_deployed_vca(
4793 nsr_deployed["VCA"],
4794 member_vnf_index=vnf_index,
4795 vdu_id=vdu_id,
4796 vdu_count_index=vdu_count_index,
4797 ee_descriptor_id=ee_descriptor_id,
4798 )
4799 for vca_index, vca_deployed in enumerate(
4800 db_nsr["_admin"]["deployed"]["VCA"]
4801 ):
4802 if vca_deployed.get("member-vnf-index") == vnf_index:
4803 db_dict = {
4804 "collection": "nsrs",
4805 "filter": {"_id": nsr_id},
4806 "path": "_admin.deployed.VCA.{}.".format(vca_index),
4807 }
4808 break
4809 (
4810 nslcmop_operation_state,
4811 detailed_status,
4812 ) = await self._ns_execute_primitive(
4813 ee_id,
4814 primitive=primitive_name,
4815 primitive_params=self._map_primitive_params(
4816 config_primitive_desc, primitive_params, desc_params
4817 ),
4818 timeout=timeout_ns_action,
4819 vca_type=vca_type,
4820 db_dict=db_dict,
4821 vca_id=vca_id,
4822 )
4823
4824 db_nslcmop_update["detailed-status"] = detailed_status
4825 error_description_nslcmop = (
4826 detailed_status if nslcmop_operation_state == "FAILED" else ""
4827 )
4828 self.logger.debug(
4829 logging_text
4830 + " task Done with result {} {}".format(
4831 nslcmop_operation_state, detailed_status
4832 )
4833 )
4834 return # database update is called inside finally
4835
4836 except (DbException, LcmException, N2VCException, K8sException) as e:
4837 self.logger.error(logging_text + "Exit Exception {}".format(e))
4838 exc = e
4839 except asyncio.CancelledError:
4840 self.logger.error(
4841 logging_text + "Cancelled Exception while '{}'".format(step)
4842 )
4843 exc = "Operation was cancelled"
4844 except asyncio.TimeoutError:
4845 self.logger.error(logging_text + "Timeout while '{}'".format(step))
4846 exc = "Timeout"
4847 except Exception as e:
4848 exc = traceback.format_exc()
4849 self.logger.critical(
4850 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
4851 exc_info=True,
4852 )
4853 finally:
4854 if exc:
4855 db_nslcmop_update[
4856 "detailed-status"
4857 ] = (
4858 detailed_status
4859 ) = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4860 nslcmop_operation_state = "FAILED"
4861 if db_nsr:
4862 self._write_ns_status(
4863 nsr_id=nsr_id,
4864 ns_state=db_nsr[
4865 "nsState"
4866 ], # TODO check if degraded. For the moment use previous status
4867 current_operation="IDLE",
4868 current_operation_id=None,
4869 # error_description=error_description_nsr,
4870 # error_detail=error_detail,
4871 other_update=db_nsr_update,
4872 )
4873
4874 self._write_op_status(
4875 op_id=nslcmop_id,
4876 stage="",
4877 error_message=error_description_nslcmop,
4878 operation_state=nslcmop_operation_state,
4879 other_update=db_nslcmop_update,
4880 )
4881
4882 if nslcmop_operation_state:
4883 try:
4884 await self.msg.aiowrite(
4885 "ns",
4886 "actioned",
4887 {
4888 "nsr_id": nsr_id,
4889 "nslcmop_id": nslcmop_id,
4890 "operationState": nslcmop_operation_state,
4891 },
4892 loop=self.loop,
4893 )
4894 except Exception as e:
4895 self.logger.error(
4896 logging_text + "kafka_write notification Exception {}".format(e)
4897 )
4898 self.logger.debug(logging_text + "Exit")
4899 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
4900 return nslcmop_operation_state, detailed_status
4901
4902 async def scale(self, nsr_id, nslcmop_id):
4903 # Try to lock HA task here
4904 task_is_locked_by_me = self.lcm_tasks.lock_HA("ns", "nslcmops", nslcmop_id)
4905 if not task_is_locked_by_me:
4906 return
4907
4908 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
4909 stage = ["", "", ""]
4910 tasks_dict_info = {}
4911 # ^ stage, step, VIM progress
4912 self.logger.debug(logging_text + "Enter")
4913 # get all needed from database
4914 db_nsr = None
4915 db_nslcmop_update = {}
4916 db_nsr_update = {}
4917 exc = None
4918 # in case of error, indicates what part of scale was failed to put nsr at error status
4919 scale_process = None
4920 old_operational_status = ""
4921 old_config_status = ""
4922 nsi_id = None
4923 try:
4924 # wait for any previous tasks in process
4925 step = "Waiting for previous operations to terminate"
4926 await self.lcm_tasks.waitfor_related_HA("ns", "nslcmops", nslcmop_id)
4927 self._write_ns_status(
4928 nsr_id=nsr_id,
4929 ns_state=None,
4930 current_operation="SCALING",
4931 current_operation_id=nslcmop_id,
4932 )
4933
4934 step = "Getting nslcmop from database"
4935 self.logger.debug(
4936 step + " after having waited for previous tasks to be completed"
4937 )
4938 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4939
4940 step = "Getting nsr from database"
4941 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4942 old_operational_status = db_nsr["operational-status"]
4943 old_config_status = db_nsr["config-status"]
4944
4945 step = "Parsing scaling parameters"
4946 db_nsr_update["operational-status"] = "scaling"
4947 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4948 nsr_deployed = db_nsr["_admin"].get("deployed")
4949
4950 #######
4951 nsr_deployed = db_nsr["_admin"].get("deployed")
4952 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4953 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4954 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4955 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
4956 #######
4957
4958 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"][
4959 "scaleByStepData"
4960 ]["member-vnf-index"]
4961 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"][
4962 "scaleByStepData"
4963 ]["scaling-group-descriptor"]
4964 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
4965 # for backward compatibility
4966 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4967 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4968 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4969 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4970
4971 step = "Getting vnfr from database"
4972 db_vnfr = self.db.get_one(
4973 "vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id}
4974 )
4975
4976 vca_id = self.get_vca_id(db_vnfr, db_nsr)
4977
4978 step = "Getting vnfd from database"
4979 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4980
4981 base_folder = db_vnfd["_admin"]["storage"]
4982
4983 step = "Getting scaling-group-descriptor"
4984 scaling_descriptor = find_in_list(
4985 get_scaling_aspect(db_vnfd),
4986 lambda scale_desc: scale_desc["name"] == scaling_group,
4987 )
4988 if not scaling_descriptor:
4989 raise LcmException(
4990 "input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
4991 "at vnfd:scaling-group-descriptor".format(scaling_group)
4992 )
4993
4994 step = "Sending scale order to VIM"
4995 # TODO check if ns is in a proper status
4996 nb_scale_op = 0
4997 if not db_nsr["_admin"].get("scaling-group"):
4998 self.update_db_2(
4999 "nsrs",
5000 nsr_id,
5001 {
5002 "_admin.scaling-group": [
5003 {"name": scaling_group, "nb-scale-op": 0}
5004 ]
5005 },
5006 )
5007 admin_scale_index = 0
5008 else:
5009 for admin_scale_index, admin_scale_info in enumerate(
5010 db_nsr["_admin"]["scaling-group"]
5011 ):
5012 if admin_scale_info["name"] == scaling_group:
5013 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
5014 break
5015 else: # not found, set index one plus last element and add new entry with the name
5016 admin_scale_index += 1
5017 db_nsr_update[
5018 "_admin.scaling-group.{}.name".format(admin_scale_index)
5019 ] = scaling_group
5020 RO_scaling_info = []
5021 VCA_scaling_info = []
5022 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
5023 if scaling_type == "SCALE_OUT":
5024 if "aspect-delta-details" not in scaling_descriptor:
5025 raise LcmException(
5026 "Aspect delta details not fount in scaling descriptor {}".format(
5027 scaling_descriptor["name"]
5028 )
5029 )
5030 # count if max-instance-count is reached
5031 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5032
5033 vdu_scaling_info["scaling_direction"] = "OUT"
5034 vdu_scaling_info["vdu-create"] = {}
5035 for delta in deltas:
5036 for vdu_delta in delta["vdu-delta"]:
5037 vdud = get_vdu(db_vnfd, vdu_delta["id"])
5038 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5039 cloud_init_text = self._get_vdu_cloud_init_content(
5040 vdud, db_vnfd
5041 )
5042 if cloud_init_text:
5043 additional_params = (
5044 self._get_vdu_additional_params(db_vnfr, vdud["id"])
5045 or {}
5046 )
5047 cloud_init_list = []
5048
5049 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5050 max_instance_count = 10
5051 if vdu_profile and "max-number-of-instances" in vdu_profile:
5052 max_instance_count = vdu_profile.get(
5053 "max-number-of-instances", 10
5054 )
5055
5056 default_instance_num = get_number_of_instances(
5057 db_vnfd, vdud["id"]
5058 )
5059
5060 nb_scale_op += vdu_delta.get("number-of-instances", 1)
5061
5062 if nb_scale_op + default_instance_num > max_instance_count:
5063 raise LcmException(
5064 "reached the limit of {} (max-instance-count) "
5065 "scaling-out operations for the "
5066 "scaling-group-descriptor '{}'".format(
5067 nb_scale_op, scaling_group
5068 )
5069 )
5070 for x in range(vdu_delta.get("number-of-instances", 1)):
5071 if cloud_init_text:
5072 # TODO Information of its own ip is not available because db_vnfr is not updated.
5073 additional_params["OSM"] = get_osm_params(
5074 db_vnfr, vdu_delta["id"], vdu_index + x
5075 )
5076 cloud_init_list.append(
5077 self._parse_cloud_init(
5078 cloud_init_text,
5079 additional_params,
5080 db_vnfd["id"],
5081 vdud["id"],
5082 )
5083 )
5084 VCA_scaling_info.append(
5085 {
5086 "osm_vdu_id": vdu_delta["id"],
5087 "member-vnf-index": vnf_index,
5088 "type": "create",
5089 "vdu_index": vdu_index + x,
5090 }
5091 )
5092 RO_scaling_info.append(
5093 {
5094 "osm_vdu_id": vdu_delta["id"],
5095 "member-vnf-index": vnf_index,
5096 "type": "create",
5097 "count": vdu_delta.get("number-of-instances", 1),
5098 }
5099 )
5100 if cloud_init_list:
5101 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
5102 vdu_scaling_info["vdu-create"][vdu_delta["id"]] = vdu_delta.get(
5103 "number-of-instances", 1
5104 )
5105
5106 elif scaling_type == "SCALE_IN":
5107 if (
5108 "min-instance-count" in scaling_descriptor
5109 and scaling_descriptor["min-instance-count"] is not None
5110 ):
5111 min_instance_count = int(scaling_descriptor["min-instance-count"])
5112
5113 vdu_scaling_info["scaling_direction"] = "IN"
5114 vdu_scaling_info["vdu-delete"] = {}
5115 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
5116 for delta in deltas:
5117 for vdu_delta in delta["vdu-delta"]:
5118 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
5119 min_instance_count = 0
5120 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
5121 if vdu_profile and "min-number-of-instances" in vdu_profile:
5122 min_instance_count = vdu_profile["min-number-of-instances"]
5123
5124 default_instance_num = get_number_of_instances(
5125 db_vnfd, vdu_delta["id"]
5126 )
5127
5128 nb_scale_op -= vdu_delta.get("number-of-instances", 1)
5129 if nb_scale_op + default_instance_num < min_instance_count:
5130 raise LcmException(
5131 "reached the limit of {} (min-instance-count) scaling-in operations for the "
5132 "scaling-group-descriptor '{}'".format(
5133 nb_scale_op, scaling_group
5134 )
5135 )
5136 RO_scaling_info.append(
5137 {
5138 "osm_vdu_id": vdu_delta["id"],
5139 "member-vnf-index": vnf_index,
5140 "type": "delete",
5141 "count": vdu_delta.get("number-of-instances", 1),
5142 "vdu_index": vdu_index - 1,
5143 }
5144 )
5145 for x in range(vdu_delta.get("number-of-instances", 1)):
5146 VCA_scaling_info.append(
5147 {
5148 "osm_vdu_id": vdu_delta["id"],
5149 "member-vnf-index": vnf_index,
5150 "type": "delete",
5151 "vdu_index": vdu_index - 1 - x,
5152 }
5153 )
5154 vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get(
5155 "number-of-instances", 1
5156 )
5157
5158 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
5159 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
5160 if vdu_scaling_info["scaling_direction"] == "IN":
5161 for vdur in reversed(db_vnfr["vdur"]):
5162 if vdu_delete.get(vdur["vdu-id-ref"]):
5163 vdu_delete[vdur["vdu-id-ref"]] -= 1
5164 vdu_scaling_info["vdu"].append(
5165 {
5166 "name": vdur.get("name") or vdur.get("vdu-name"),
5167 "vdu_id": vdur["vdu-id-ref"],
5168 "interface": [],
5169 }
5170 )
5171 for interface in vdur["interfaces"]:
5172 vdu_scaling_info["vdu"][-1]["interface"].append(
5173 {
5174 "name": interface["name"],
5175 "ip_address": interface["ip-address"],
5176 "mac_address": interface.get("mac-address"),
5177 }
5178 )
5179 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
5180
5181 # PRE-SCALE BEGIN
5182 step = "Executing pre-scale vnf-config-primitive"
5183 if scaling_descriptor.get("scaling-config-action"):
5184 for scaling_config_action in scaling_descriptor[
5185 "scaling-config-action"
5186 ]:
5187 if (
5188 scaling_config_action.get("trigger") == "pre-scale-in"
5189 and scaling_type == "SCALE_IN"
5190 ) or (
5191 scaling_config_action.get("trigger") == "pre-scale-out"
5192 and scaling_type == "SCALE_OUT"
5193 ):
5194 vnf_config_primitive = scaling_config_action[
5195 "vnf-config-primitive-name-ref"
5196 ]
5197 step = db_nslcmop_update[
5198 "detailed-status"
5199 ] = "executing pre-scale scaling-config-action '{}'".format(
5200 vnf_config_primitive
5201 )
5202
5203 # look for primitive
5204 for config_primitive in (
5205 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5206 ).get("config-primitive", ()):
5207 if config_primitive["name"] == vnf_config_primitive:
5208 break
5209 else:
5210 raise LcmException(
5211 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
5212 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
5213 "primitive".format(scaling_group, vnf_config_primitive)
5214 )
5215
5216 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
5217 if db_vnfr.get("additionalParamsForVnf"):
5218 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5219
5220 scale_process = "VCA"
5221 db_nsr_update["config-status"] = "configuring pre-scaling"
5222 primitive_params = self._map_primitive_params(
5223 config_primitive, {}, vnfr_params
5224 )
5225
5226 # Pre-scale retry check: Check if this sub-operation has been executed before
5227 op_index = self._check_or_add_scale_suboperation(
5228 db_nslcmop,
5229 nslcmop_id,
5230 vnf_index,
5231 vnf_config_primitive,
5232 primitive_params,
5233 "PRE-SCALE",
5234 )
5235 if op_index == self.SUBOPERATION_STATUS_SKIP:
5236 # Skip sub-operation
5237 result = "COMPLETED"
5238 result_detail = "Done"
5239 self.logger.debug(
5240 logging_text
5241 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5242 vnf_config_primitive, result, result_detail
5243 )
5244 )
5245 else:
5246 if op_index == self.SUBOPERATION_STATUS_NEW:
5247 # New sub-operation: Get index of this sub-operation
5248 op_index = (
5249 len(db_nslcmop.get("_admin", {}).get("operations"))
5250 - 1
5251 )
5252 self.logger.debug(
5253 logging_text
5254 + "vnf_config_primitive={} New sub-operation".format(
5255 vnf_config_primitive
5256 )
5257 )
5258 else:
5259 # retry: Get registered params for this existing sub-operation
5260 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5261 op_index
5262 ]
5263 vnf_index = op.get("member_vnf_index")
5264 vnf_config_primitive = op.get("primitive")
5265 primitive_params = op.get("primitive_params")
5266 self.logger.debug(
5267 logging_text
5268 + "vnf_config_primitive={} Sub-operation retry".format(
5269 vnf_config_primitive
5270 )
5271 )
5272 # Execute the primitive, either with new (first-time) or registered (reintent) args
5273 ee_descriptor_id = config_primitive.get(
5274 "execution-environment-ref"
5275 )
5276 primitive_name = config_primitive.get(
5277 "execution-environment-primitive", vnf_config_primitive
5278 )
5279 ee_id, vca_type = self._look_for_deployed_vca(
5280 nsr_deployed["VCA"],
5281 member_vnf_index=vnf_index,
5282 vdu_id=None,
5283 vdu_count_index=None,
5284 ee_descriptor_id=ee_descriptor_id,
5285 )
5286 result, result_detail = await self._ns_execute_primitive(
5287 ee_id,
5288 primitive_name,
5289 primitive_params,
5290 vca_type=vca_type,
5291 vca_id=vca_id,
5292 )
5293 self.logger.debug(
5294 logging_text
5295 + "vnf_config_primitive={} Done with result {} {}".format(
5296 vnf_config_primitive, result, result_detail
5297 )
5298 )
5299 # Update operationState = COMPLETED | FAILED
5300 self._update_suboperation_status(
5301 db_nslcmop, op_index, result, result_detail
5302 )
5303
5304 if result == "FAILED":
5305 raise LcmException(result_detail)
5306 db_nsr_update["config-status"] = old_config_status
5307 scale_process = None
5308 # PRE-SCALE END
5309
5310 db_nsr_update[
5311 "_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)
5312 ] = nb_scale_op
5313 db_nsr_update[
5314 "_admin.scaling-group.{}.time".format(admin_scale_index)
5315 ] = time()
5316
5317 # SCALE-IN VCA - BEGIN
5318 if VCA_scaling_info:
5319 step = db_nslcmop_update[
5320 "detailed-status"
5321 ] = "Deleting the execution environments"
5322 scale_process = "VCA"
5323 for vdu_info in VCA_scaling_info:
5324 if vdu_info["type"] == "delete":
5325 member_vnf_index = str(vdu_info["member-vnf-index"])
5326 self.logger.debug(
5327 logging_text + "vdu info: {}".format(vdu_info)
5328 )
5329 vdu_id = vdu_info["osm_vdu_id"]
5330 vdu_index = int(vdu_info["vdu_index"])
5331 stage[
5332 1
5333 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5334 member_vnf_index, vdu_id, vdu_index
5335 )
5336 stage[2] = step = "Scaling in VCA"
5337 self._write_op_status(op_id=nslcmop_id, stage=stage)
5338 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
5339 config_update = db_nsr["configurationStatus"]
5340 for vca_index, vca in enumerate(vca_update):
5341 if (
5342 (vca or vca.get("ee_id"))
5343 and vca["member-vnf-index"] == member_vnf_index
5344 and vca["vdu_count_index"] == vdu_index
5345 ):
5346 if vca.get("vdu_id"):
5347 config_descriptor = get_configuration(
5348 db_vnfd, vca.get("vdu_id")
5349 )
5350 elif vca.get("kdu_name"):
5351 config_descriptor = get_configuration(
5352 db_vnfd, vca.get("kdu_name")
5353 )
5354 else:
5355 config_descriptor = get_configuration(
5356 db_vnfd, db_vnfd["id"]
5357 )
5358 operation_params = (
5359 db_nslcmop.get("operationParams") or {}
5360 )
5361 exec_terminate_primitives = not operation_params.get(
5362 "skip_terminate_primitives"
5363 ) and vca.get("needed_terminate")
5364 task = asyncio.ensure_future(
5365 asyncio.wait_for(
5366 self.destroy_N2VC(
5367 logging_text,
5368 db_nslcmop,
5369 vca,
5370 config_descriptor,
5371 vca_index,
5372 destroy_ee=True,
5373 exec_primitives=exec_terminate_primitives,
5374 scaling_in=True,
5375 vca_id=vca_id,
5376 ),
5377 timeout=self.timeout_charm_delete,
5378 )
5379 )
5380 tasks_dict_info[task] = "Terminating VCA {}".format(
5381 vca.get("ee_id")
5382 )
5383 del vca_update[vca_index]
5384 del config_update[vca_index]
5385 # wait for pending tasks of terminate primitives
5386 if tasks_dict_info:
5387 self.logger.debug(
5388 logging_text
5389 + "Waiting for tasks {}".format(
5390 list(tasks_dict_info.keys())
5391 )
5392 )
5393 error_list = await self._wait_for_tasks(
5394 logging_text,
5395 tasks_dict_info,
5396 min(
5397 self.timeout_charm_delete, self.timeout_ns_terminate
5398 ),
5399 stage,
5400 nslcmop_id,
5401 )
5402 tasks_dict_info.clear()
5403 if error_list:
5404 raise LcmException("; ".join(error_list))
5405
5406 db_vca_and_config_update = {
5407 "_admin.deployed.VCA": vca_update,
5408 "configurationStatus": config_update,
5409 }
5410 self.update_db_2(
5411 "nsrs", db_nsr["_id"], db_vca_and_config_update
5412 )
5413 scale_process = None
5414 # SCALE-IN VCA - END
5415
5416 # SCALE RO - BEGIN
5417 if RO_scaling_info:
5418 scale_process = "RO"
5419 if self.ro_config.get("ng"):
5420 await self._scale_ng_ro(
5421 logging_text,
5422 db_nsr,
5423 db_nslcmop,
5424 db_vnfr,
5425 vdu_scaling_info,
5426 stage,
5427 )
5428 vdu_scaling_info.pop("vdu-create", None)
5429 vdu_scaling_info.pop("vdu-delete", None)
5430
5431 scale_process = None
5432 if db_nsr_update:
5433 self.update_db_2("nsrs", nsr_id, db_nsr_update)
5434 # SCALE RO - END
5435
5436 # SCALE-UP VCA - BEGIN
5437 if VCA_scaling_info:
5438 step = db_nslcmop_update[
5439 "detailed-status"
5440 ] = "Creating new execution environments"
5441 scale_process = "VCA"
5442 for vdu_info in VCA_scaling_info:
5443 if vdu_info["type"] == "create":
5444 member_vnf_index = str(vdu_info["member-vnf-index"])
5445 self.logger.debug(
5446 logging_text + "vdu info: {}".format(vdu_info)
5447 )
5448 vnfd_id = db_vnfr["vnfd-ref"]
5449 vdu_index = int(vdu_info["vdu_index"])
5450 deploy_params = {"OSM": get_osm_params(db_vnfr)}
5451 if db_vnfr.get("additionalParamsForVnf"):
5452 deploy_params.update(
5453 parse_yaml_strings(
5454 db_vnfr["additionalParamsForVnf"].copy()
5455 )
5456 )
5457 descriptor_config = get_configuration(db_vnfd, db_vnfd["id"])
5458 if descriptor_config:
5459 vdu_id = None
5460 vdu_name = None
5461 kdu_name = None
5462 self._deploy_n2vc(
5463 logging_text=logging_text
5464 + "member_vnf_index={} ".format(member_vnf_index),
5465 db_nsr=db_nsr,
5466 db_vnfr=db_vnfr,
5467 nslcmop_id=nslcmop_id,
5468 nsr_id=nsr_id,
5469 nsi_id=nsi_id,
5470 vnfd_id=vnfd_id,
5471 vdu_id=vdu_id,
5472 kdu_name=kdu_name,
5473 member_vnf_index=member_vnf_index,
5474 vdu_index=vdu_index,
5475 vdu_name=vdu_name,
5476 deploy_params=deploy_params,
5477 descriptor_config=descriptor_config,
5478 base_folder=base_folder,
5479 task_instantiation_info=tasks_dict_info,
5480 stage=stage,
5481 )
5482 vdu_id = vdu_info["osm_vdu_id"]
5483 vdur = find_in_list(
5484 db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id
5485 )
5486 descriptor_config = get_configuration(db_vnfd, vdu_id)
5487 if vdur.get("additionalParams"):
5488 deploy_params_vdu = parse_yaml_strings(
5489 vdur["additionalParams"]
5490 )
5491 else:
5492 deploy_params_vdu = deploy_params
5493 deploy_params_vdu["OSM"] = get_osm_params(
5494 db_vnfr, vdu_id, vdu_count_index=vdu_index
5495 )
5496 if descriptor_config:
5497 vdu_name = None
5498 kdu_name = None
5499 stage[
5500 1
5501 ] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5502 member_vnf_index, vdu_id, vdu_index
5503 )
5504 stage[2] = step = "Scaling out VCA"
5505 self._write_op_status(op_id=nslcmop_id, stage=stage)
5506 self._deploy_n2vc(
5507 logging_text=logging_text
5508 + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
5509 member_vnf_index, vdu_id, vdu_index
5510 ),
5511 db_nsr=db_nsr,
5512 db_vnfr=db_vnfr,
5513 nslcmop_id=nslcmop_id,
5514 nsr_id=nsr_id,
5515 nsi_id=nsi_id,
5516 vnfd_id=vnfd_id,
5517 vdu_id=vdu_id,
5518 kdu_name=kdu_name,
5519 member_vnf_index=member_vnf_index,
5520 vdu_index=vdu_index,
5521 vdu_name=vdu_name,
5522 deploy_params=deploy_params_vdu,
5523 descriptor_config=descriptor_config,
5524 base_folder=base_folder,
5525 task_instantiation_info=tasks_dict_info,
5526 stage=stage,
5527 )
5528 # SCALE-UP VCA - END
5529 scale_process = None
5530
5531 # POST-SCALE BEGIN
5532 # execute primitive service POST-SCALING
5533 step = "Executing post-scale vnf-config-primitive"
5534 if scaling_descriptor.get("scaling-config-action"):
5535 for scaling_config_action in scaling_descriptor[
5536 "scaling-config-action"
5537 ]:
5538 if (
5539 scaling_config_action.get("trigger") == "post-scale-in"
5540 and scaling_type == "SCALE_IN"
5541 ) or (
5542 scaling_config_action.get("trigger") == "post-scale-out"
5543 and scaling_type == "SCALE_OUT"
5544 ):
5545 vnf_config_primitive = scaling_config_action[
5546 "vnf-config-primitive-name-ref"
5547 ]
5548 step = db_nslcmop_update[
5549 "detailed-status"
5550 ] = "executing post-scale scaling-config-action '{}'".format(
5551 vnf_config_primitive
5552 )
5553
5554 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
5555 if db_vnfr.get("additionalParamsForVnf"):
5556 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
5557
5558 # look for primitive
5559 for config_primitive in (
5560 get_configuration(db_vnfd, db_vnfd["id"]) or {}
5561 ).get("config-primitive", ()):
5562 if config_primitive["name"] == vnf_config_primitive:
5563 break
5564 else:
5565 raise LcmException(
5566 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
5567 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
5568 "config-primitive".format(
5569 scaling_group, vnf_config_primitive
5570 )
5571 )
5572 scale_process = "VCA"
5573 db_nsr_update["config-status"] = "configuring post-scaling"
5574 primitive_params = self._map_primitive_params(
5575 config_primitive, {}, vnfr_params
5576 )
5577
5578 # Post-scale retry check: Check if this sub-operation has been executed before
5579 op_index = self._check_or_add_scale_suboperation(
5580 db_nslcmop,
5581 nslcmop_id,
5582 vnf_index,
5583 vnf_config_primitive,
5584 primitive_params,
5585 "POST-SCALE",
5586 )
5587 if op_index == self.SUBOPERATION_STATUS_SKIP:
5588 # Skip sub-operation
5589 result = "COMPLETED"
5590 result_detail = "Done"
5591 self.logger.debug(
5592 logging_text
5593 + "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
5594 vnf_config_primitive, result, result_detail
5595 )
5596 )
5597 else:
5598 if op_index == self.SUBOPERATION_STATUS_NEW:
5599 # New sub-operation: Get index of this sub-operation
5600 op_index = (
5601 len(db_nslcmop.get("_admin", {}).get("operations"))
5602 - 1
5603 )
5604 self.logger.debug(
5605 logging_text
5606 + "vnf_config_primitive={} New sub-operation".format(
5607 vnf_config_primitive
5608 )
5609 )
5610 else:
5611 # retry: Get registered params for this existing sub-operation
5612 op = db_nslcmop.get("_admin", {}).get("operations", [])[
5613 op_index
5614 ]
5615 vnf_index = op.get("member_vnf_index")
5616 vnf_config_primitive = op.get("primitive")
5617 primitive_params = op.get("primitive_params")
5618 self.logger.debug(
5619 logging_text
5620 + "vnf_config_primitive={} Sub-operation retry".format(
5621 vnf_config_primitive
5622 )
5623 )
5624 # Execute the primitive, either with new (first-time) or registered (reintent) args
5625 ee_descriptor_id = config_primitive.get(
5626 "execution-environment-ref"
5627 )
5628 primitive_name = config_primitive.get(
5629 "execution-environment-primitive", vnf_config_primitive
5630 )
5631 ee_id, vca_type = self._look_for_deployed_vca(
5632 nsr_deployed["VCA"],
5633 member_vnf_index=vnf_index,
5634 vdu_id=None,
5635 vdu_count_index=None,
5636 ee_descriptor_id=ee_descriptor_id,
5637 )
5638 result, result_detail = await self._ns_execute_primitive(
5639 ee_id,
5640 primitive_name,
5641 primitive_params,
5642 vca_type=vca_type,
5643 vca_id=vca_id,
5644 )
5645 self.logger.debug(
5646 logging_text
5647 + "vnf_config_primitive={} Done with result {} {}".format(
5648 vnf_config_primitive, result, result_detail
5649 )
5650 )
5651 # Update operationState = COMPLETED | FAILED
5652 self._update_suboperation_status(
5653 db_nslcmop, op_index, result, result_detail
5654 )
5655
5656 if result == "FAILED":
5657 raise LcmException(result_detail)
5658 db_nsr_update["config-status"] = old_config_status
5659 scale_process = None
5660 # POST-SCALE END
5661
5662 db_nsr_update[
5663 "detailed-status"
5664 ] = "" # "scaled {} {}".format(scaling_group, scaling_type)
5665 db_nsr_update["operational-status"] = (
5666 "running"
5667 if old_operational_status == "failed"
5668 else old_operational_status
5669 )
5670 db_nsr_update["config-status"] = old_config_status
5671 return
5672 except (
5673 ROclient.ROClientException,
5674 DbException,
5675 LcmException,
5676 NgRoException,
5677 ) as e:
5678 self.logger.error(logging_text + "Exit Exception {}".format(e))
5679 exc = e
5680 except asyncio.CancelledError:
5681 self.logger.error(
5682 logging_text + "Cancelled Exception while '{}'".format(step)
5683 )
5684 exc = "Operation was cancelled"
5685 except Exception as e:
5686 exc = traceback.format_exc()
5687 self.logger.critical(
5688 logging_text + "Exit Exception {} {}".format(type(e).__name__, e),
5689 exc_info=True,
5690 )
5691 finally:
5692 self._write_ns_status(
5693 nsr_id=nsr_id,
5694 ns_state=None,
5695 current_operation="IDLE",
5696 current_operation_id=None,
5697 )
5698 if tasks_dict_info:
5699 stage[1] = "Waiting for instantiate pending tasks."
5700 self.logger.debug(logging_text + stage[1])
5701 exc = await self._wait_for_tasks(
5702 logging_text,
5703 tasks_dict_info,
5704 self.timeout_ns_deploy,
5705 stage,
5706 nslcmop_id,
5707 nsr_id=nsr_id,
5708 )
5709 if exc:
5710 db_nslcmop_update[
5711 "detailed-status"
5712 ] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
5713 nslcmop_operation_state = "FAILED"
5714 if db_nsr:
5715 db_nsr_update["operational-status"] = old_operational_status
5716 db_nsr_update["config-status"] = old_config_status
5717 db_nsr_update["detailed-status"] = ""
5718 if scale_process:
5719 if "VCA" in scale_process:
5720 db_nsr_update["config-status"] = "failed"
5721 if "RO" in scale_process:
5722 db_nsr_update["operational-status"] = "failed"
5723 db_nsr_update[
5724 "detailed-status"
5725 ] = "FAILED scaling nslcmop={} {}: {}".format(
5726 nslcmop_id, step, exc
5727 )
5728 else:
5729 error_description_nslcmop = None
5730 nslcmop_operation_state = "COMPLETED"
5731 db_nslcmop_update["detailed-status"] = "Done"
5732
5733 self._write_op_status(
5734 op_id=nslcmop_id,
5735 stage="",
5736 error_message=error_description_nslcmop,
5737 operation_state=nslcmop_operation_state,
5738 other_update=db_nslcmop_update,
5739 )
5740 if db_nsr:
5741 self._write_ns_status(
5742 nsr_id=nsr_id,
5743 ns_state=None,
5744 current_operation="IDLE",
5745 current_operation_id=None,
5746 other_update=db_nsr_update,
5747 )
5748
5749 if nslcmop_operation_state:
5750 try:
5751 msg = {
5752 "nsr_id": nsr_id,
5753 "nslcmop_id": nslcmop_id,
5754 "operationState": nslcmop_operation_state,
5755 }
5756 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
5757 except Exception as e:
5758 self.logger.error(
5759 logging_text + "kafka_write notification Exception {}".format(e)
5760 )
5761 self.logger.debug(logging_text + "Exit")
5762 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
5763
5764 async def _scale_ng_ro(
5765 self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage
5766 ):
5767 nsr_id = db_nslcmop["nsInstanceId"]
5768 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
5769 db_vnfrs = {}
5770
5771 # read from db: vnfd's for every vnf
5772 db_vnfds = []
5773
5774 # for each vnf in ns, read vnfd
5775 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
5776 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
5777 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
5778 # if we haven't this vnfd, read it from db
5779 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
5780 # read from db
5781 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
5782 db_vnfds.append(vnfd)
5783 n2vc_key = self.n2vc.get_public_key()
5784 n2vc_key_list = [n2vc_key]
5785 self.scale_vnfr(
5786 db_vnfr,
5787 vdu_scaling_info.get("vdu-create"),
5788 vdu_scaling_info.get("vdu-delete"),
5789 mark_delete=True,
5790 )
5791 # db_vnfr has been updated, update db_vnfrs to use it
5792 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
5793 await self._instantiate_ng_ro(
5794 logging_text,
5795 nsr_id,
5796 db_nsd,
5797 db_nsr,
5798 db_nslcmop,
5799 db_vnfrs,
5800 db_vnfds,
5801 n2vc_key_list,
5802 stage=stage,
5803 start_deploy=time(),
5804 timeout_ns_deploy=self.timeout_ns_deploy,
5805 )
5806 if vdu_scaling_info.get("vdu-delete"):
5807 self.scale_vnfr(
5808 db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False
5809 )
5810
5811 async def add_prometheus_metrics(
5812 self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip
5813 ):
5814 if not self.prometheus:
5815 return
5816 # look if exist a file called 'prometheus*.j2' and
5817 artifact_content = self.fs.dir_ls(artifact_path)
5818 job_file = next(
5819 (
5820 f
5821 for f in artifact_content
5822 if f.startswith("prometheus") and f.endswith(".j2")
5823 ),
5824 None,
5825 )
5826 if not job_file:
5827 return
5828 with self.fs.file_open((artifact_path, job_file), "r") as f:
5829 job_data = f.read()
5830
5831 # TODO get_service
5832 _, _, service = ee_id.partition(".") # remove prefix "namespace."
5833 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
5834 host_port = "80"
5835 vnfr_id = vnfr_id.replace("-", "")
5836 variables = {
5837 "JOB_NAME": vnfr_id,
5838 "TARGET_IP": target_ip,
5839 "EXPORTER_POD_IP": host_name,
5840 "EXPORTER_POD_PORT": host_port,
5841 }
5842 job_list = self.prometheus.parse_job(job_data, variables)
5843 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
5844 for job in job_list:
5845 if (
5846 not isinstance(job.get("job_name"), str)
5847 or vnfr_id not in job["job_name"]
5848 ):
5849 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
5850 job["nsr_id"] = nsr_id
5851 job_dict = {jl["job_name"]: jl for jl in job_list}
5852 if await self.prometheus.update(job_dict):
5853 return list(job_dict.keys())
5854
5855 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
5856 """
5857 Get VCA Cloud and VCA Cloud Credentials for the VIM account
5858
5859 :param: vim_account_id: VIM Account ID
5860
5861 :return: (cloud_name, cloud_credential)
5862 """
5863 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
5864 return config.get("vca_cloud"), config.get("vca_cloud_credential")
5865
5866 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
5867 """
5868 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
5869
5870 :param: vim_account_id: VIM Account ID
5871
5872 :return: (cloud_name, cloud_credential)
5873 """
5874 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
5875 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")