Bug 1422 - NSR record contain stale vcaStatus after successful completion of day...
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from osm_lcm.data_utils.nsd import get_vnf_profiles
31 from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \
32 get_ee_sorted_initial_config_primitive_list, get_ee_sorted_terminate_config_primitive_list, \
33 get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \
34 get_vdu_index, get_scaling_aspect, get_number_of_instances, get_juju_ee_ref
35 from osm_lcm.data_utils.list_utils import find_in_list
36 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index
37 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
38 from osm_lcm.data_utils.database.vim_account import VimAccountDB
39 from n2vc.k8s_helm_conn import K8sHelmConnector
40 from n2vc.k8s_helm3_conn import K8sHelm3Connector
41 from n2vc.k8s_juju_conn import K8sJujuConnector
42
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48
49 from n2vc.n2vc_juju_conn import N2VCJujuConnector
50 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
51
52 from osm_lcm.lcm_helm_conn import LCMHelmConn
53
54 from copy import copy, deepcopy
55 from time import time
56 from uuid import uuid4
57
58 from random import randint
59
60 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
61
62
63 class NsLcm(LcmBase):
64 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete = 10 * 60
68 timeout_primitive = 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
70
71 SUBOPERATION_STATUS_NOT_FOUND = -1
72 SUBOPERATION_STATUS_NEW = -2
73 SUBOPERATION_STATUS_SKIP = -3
74 task_name_deploy_vca = "Deploying VCA"
75
76 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 super().__init__(
83 msg=msg,
84 logger=logging.getLogger('lcm.ns')
85 )
86
87 self.db = Database().instance.db
88 self.fs = Filesystem().instance.fs
89 self.loop = loop
90 self.lcm_tasks = lcm_tasks
91 self.timeout = config["timeout"]
92 self.ro_config = config["ro_config"]
93 self.ng_ro = config["ro_config"].get("ng")
94 self.vca_config = config["VCA"].copy()
95
96 # create N2VC connector
97 self.n2vc = N2VCJujuConnector(
98 log=self.logger,
99 loop=self.loop,
100 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
101 username=self.vca_config.get('user', None),
102 vca_config=self.vca_config,
103 on_update_db=self._on_update_n2vc_db,
104 fs=self.fs,
105 db=self.db
106 )
107
108 self.conn_helm_ee = LCMHelmConn(
109 log=self.logger,
110 loop=self.loop,
111 url=None,
112 username=None,
113 vca_config=self.vca_config,
114 on_update_db=self._on_update_n2vc_db
115 )
116
117 self.k8sclusterhelm2 = K8sHelmConnector(
118 kubectl_command=self.vca_config.get("kubectlpath"),
119 helm_command=self.vca_config.get("helmpath"),
120 log=self.logger,
121 on_update_db=None,
122 fs=self.fs,
123 db=self.db
124 )
125
126 self.k8sclusterhelm3 = K8sHelm3Connector(
127 kubectl_command=self.vca_config.get("kubectlpath"),
128 helm_command=self.vca_config.get("helm3path"),
129 fs=self.fs,
130 log=self.logger,
131 db=self.db,
132 on_update_db=None,
133 )
134
135 self.k8sclusterjuju = K8sJujuConnector(
136 kubectl_command=self.vca_config.get("kubectlpath"),
137 juju_command=self.vca_config.get("jujupath"),
138 log=self.logger,
139 loop=self.loop,
140 on_update_db=self._on_update_k8s_db,
141 vca_config=self.vca_config,
142 fs=self.fs,
143 db=self.db
144 )
145
146 self.k8scluster_map = {
147 "helm-chart": self.k8sclusterhelm2,
148 "helm-chart-v3": self.k8sclusterhelm3,
149 "chart": self.k8sclusterhelm3,
150 "juju-bundle": self.k8sclusterjuju,
151 "juju": self.k8sclusterjuju,
152 }
153
154 self.vca_map = {
155 "lxc_proxy_charm": self.n2vc,
156 "native_charm": self.n2vc,
157 "k8s_proxy_charm": self.n2vc,
158 "helm": self.conn_helm_ee,
159 "helm-v3": self.conn_helm_ee
160 }
161
162 self.prometheus = prometheus
163
164 # create RO client
165 self.RO = NgRoClient(self.loop, **self.ro_config)
166
167 @staticmethod
168 def increment_ip_mac(ip_mac, vm_index=1):
169 if not isinstance(ip_mac, str):
170 return ip_mac
171 try:
172 # try with ipv4 look for last dot
173 i = ip_mac.rfind(".")
174 if i > 0:
175 i += 1
176 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i = ip_mac.rfind(":")
179 if i > 0:
180 i += 1
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
183 except Exception:
184 pass
185 return None
186
187 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
188
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
190
191 try:
192 # TODO filter RO descriptor fields...
193
194 # write to database
195 db_dict = dict()
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict['deploymentStatus'] = ro_descriptor
198 self.update_db_2("nsrs", nsrs_id, db_dict)
199
200 except Exception as e:
201 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
202
203 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
204
205 # remove last dot from path (if exists)
206 if path.endswith('.'):
207 path = path[:-1]
208
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
211 try:
212
213 nsr_id = filter.get('_id')
214
215 # read ns record from database
216 nsr = self.db.get_one(table='nsrs', q_filter=filter)
217 current_ns_status = nsr.get('nsState')
218
219 # get vca status for NS
220 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
221
222 # vcaStatus
223 db_dict = dict()
224 db_dict['vcaStatus'] = status_dict
225 await self.n2vc.update_vca_status(db_dict['vcaStatus'])
226
227 # update configurationStatus for this VCA
228 try:
229 vca_index = int(path[path.rfind(".")+1:])
230
231 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
232 vca_status = vca_list[vca_index].get('status')
233
234 configuration_status_list = nsr.get('configurationStatus')
235 config_status = configuration_status_list[vca_index].get('status')
236
237 if config_status == 'BROKEN' and vca_status != 'failed':
238 db_dict['configurationStatus'][vca_index] = 'READY'
239 elif config_status != 'BROKEN' and vca_status == 'failed':
240 db_dict['configurationStatus'][vca_index] = 'BROKEN'
241 except Exception as e:
242 # not update configurationStatus
243 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
244
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
247 is_degraded = False
248 if current_ns_status in ('READY', 'DEGRADED'):
249 error_description = ''
250 # check machines
251 if status_dict.get('machines'):
252 for machine_id in status_dict.get('machines'):
253 machine = status_dict.get('machines').get(machine_id)
254 # check machine agent-status
255 if machine.get('agent-status'):
256 s = machine.get('agent-status').get('status')
257 if s != 'started':
258 is_degraded = True
259 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
260 # check machine instance status
261 if machine.get('instance-status'):
262 s = machine.get('instance-status').get('status')
263 if s != 'running':
264 is_degraded = True
265 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
266 # check applications
267 if status_dict.get('applications'):
268 for app_id in status_dict.get('applications'):
269 app = status_dict.get('applications').get(app_id)
270 # check application status
271 if app.get('status'):
272 s = app.get('status').get('status')
273 if s != 'active':
274 is_degraded = True
275 error_description += 'application {} status={} ; '.format(app_id, s)
276
277 if error_description:
278 db_dict['errorDescription'] = error_description
279 if current_ns_status == 'READY' and is_degraded:
280 db_dict['nsState'] = 'DEGRADED'
281 if current_ns_status == 'DEGRADED' and not is_degraded:
282 db_dict['nsState'] = 'READY'
283
284 # write to database
285 self.update_db_2("nsrs", nsr_id, db_dict)
286
287 except (asyncio.CancelledError, asyncio.TimeoutError):
288 raise
289 except Exception as e:
290 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
291
292 async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None):
293 """
294 Updating vca status in NSR record
295 :param cluster_uuid: UUID of a k8s cluster
296 :param kdu_instance: The unique name of the KDU instance
297 :param filter: To get nsr_id
298 :return: none
299 """
300
301 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
302 # .format(cluster_uuid, kdu_instance, filter))
303
304 try:
305 nsr_id = filter.get('_id')
306
307 # get vca status for NS
308 vca_status = await self.k8sclusterjuju.status_kdu(cluster_uuid,
309 kdu_instance,
310 complete_status=True,
311 yaml_format=False)
312 # vcaStatus
313 db_dict = dict()
314 db_dict['vcaStatus'] = {nsr_id: vca_status}
315
316 await self.k8sclusterjuju.update_vca_status(db_dict['vcaStatus'], kdu_instance)
317
318 # write to database
319 self.update_db_2("nsrs", nsr_id, db_dict)
320
321 except (asyncio.CancelledError, asyncio.TimeoutError):
322 raise
323 except Exception as e:
324 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
325
326 @staticmethod
327 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
328 try:
329 env = Environment(undefined=StrictUndefined)
330 template = env.from_string(cloud_init_text)
331 return template.render(additional_params or {})
332 except UndefinedError as e:
333 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
334 "file, must be provided in the instantiation parameters inside the "
335 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
336 except (TemplateError, TemplateNotFound) as e:
337 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
338 format(vnfd_id, vdu_id, e))
339
340 def _get_vdu_cloud_init_content(self, vdu, vnfd):
341 cloud_init_content = cloud_init_file = None
342 try:
343 if vdu.get("cloud-init-file"):
344 base_folder = vnfd["_admin"]["storage"]
345 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
346 vdu["cloud-init-file"])
347 with self.fs.file_open(cloud_init_file, "r") as ci_file:
348 cloud_init_content = ci_file.read()
349 elif vdu.get("cloud-init"):
350 cloud_init_content = vdu["cloud-init"]
351
352 return cloud_init_content
353 except FsException as e:
354 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
355 format(vnfd["id"], vdu["id"], cloud_init_file, e))
356
357 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
358 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
359 additional_params = vdur.get("additionalParams")
360 return parse_yaml_strings(additional_params)
361
362 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
363 """
364 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
365 :param vnfd: input vnfd
366 :param new_id: overrides vnf id if provided
367 :param additionalParams: Instantiation params for VNFs provided
368 :param nsrId: Id of the NSR
369 :return: copy of vnfd
370 """
371 vnfd_RO = deepcopy(vnfd)
372 # remove unused by RO configuration, monitoring, scaling and internal keys
373 vnfd_RO.pop("_id", None)
374 vnfd_RO.pop("_admin", None)
375 vnfd_RO.pop("monitoring-param", None)
376 vnfd_RO.pop("scaling-group-descriptor", None)
377 vnfd_RO.pop("kdu", None)
378 vnfd_RO.pop("k8s-cluster", None)
379 if new_id:
380 vnfd_RO["id"] = new_id
381
382 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
383 for vdu in get_iterable(vnfd_RO, "vdu"):
384 vdu.pop("cloud-init-file", None)
385 vdu.pop("cloud-init", None)
386 return vnfd_RO
387
388 @staticmethod
389 def ip_profile_2_RO(ip_profile):
390 RO_ip_profile = deepcopy(ip_profile)
391 if "dns-server" in RO_ip_profile:
392 if isinstance(RO_ip_profile["dns-server"], list):
393 RO_ip_profile["dns-address"] = []
394 for ds in RO_ip_profile.pop("dns-server"):
395 RO_ip_profile["dns-address"].append(ds['address'])
396 else:
397 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
398 if RO_ip_profile.get("ip-version") == "ipv4":
399 RO_ip_profile["ip-version"] = "IPv4"
400 if RO_ip_profile.get("ip-version") == "ipv6":
401 RO_ip_profile["ip-version"] = "IPv6"
402 if "dhcp-params" in RO_ip_profile:
403 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
404 return RO_ip_profile
405
406 def _get_ro_vim_id_for_vim_account(self, vim_account):
407 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
408 if db_vim["_admin"]["operationalState"] != "ENABLED":
409 raise LcmException("VIM={} is not available. operationalState={}".format(
410 vim_account, db_vim["_admin"]["operationalState"]))
411 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
412 return RO_vim_id
413
414 def get_ro_wim_id_for_wim_account(self, wim_account):
415 if isinstance(wim_account, str):
416 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
417 if db_wim["_admin"]["operationalState"] != "ENABLED":
418 raise LcmException("WIM={} is not available. operationalState={}".format(
419 wim_account, db_wim["_admin"]["operationalState"]))
420 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
421 return RO_wim_id
422 else:
423 return wim_account
424
425 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
426
427 db_vdu_push_list = []
428 db_update = {"_admin.modified": time()}
429 if vdu_create:
430 for vdu_id, vdu_count in vdu_create.items():
431 vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
432 if not vdur:
433 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
434 format(vdu_id))
435
436 for count in range(vdu_count):
437 vdur_copy = deepcopy(vdur)
438 vdur_copy["status"] = "BUILD"
439 vdur_copy["status-detailed"] = None
440 vdur_copy["ip-address"]: None
441 vdur_copy["_id"] = str(uuid4())
442 vdur_copy["count-index"] += count + 1
443 vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
444 vdur_copy.pop("vim_info", None)
445 for iface in vdur_copy["interfaces"]:
446 if iface.get("fixed-ip"):
447 iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
448 else:
449 iface.pop("ip-address", None)
450 if iface.get("fixed-mac"):
451 iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
452 else:
453 iface.pop("mac-address", None)
454 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
455 db_vdu_push_list.append(vdur_copy)
456 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
457 if vdu_delete:
458 for vdu_id, vdu_count in vdu_delete.items():
459 if mark_delete:
460 indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
461 db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
462 else:
463 # it must be deleted one by one because common.db does not allow otherwise
464 vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
465 for vdu in vdus_to_delete[:vdu_count]:
466 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
467 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
468 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
469 # modify passed dictionary db_vnfr
470 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
471 db_vnfr["vdur"] = db_vnfr_["vdur"]
472
473 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
474 """
475 Updates database nsr with the RO info for the created vld
476 :param ns_update_nsr: dictionary to be filled with the updated info
477 :param db_nsr: content of db_nsr. This is also modified
478 :param nsr_desc_RO: nsr descriptor from RO
479 :return: Nothing, LcmException is raised on errors
480 """
481
482 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
483 for net_RO in get_iterable(nsr_desc_RO, "nets"):
484 if vld["id"] != net_RO.get("ns_net_osm_id"):
485 continue
486 vld["vim-id"] = net_RO.get("vim_net_id")
487 vld["name"] = net_RO.get("vim_name")
488 vld["status"] = net_RO.get("status")
489 vld["status-detailed"] = net_RO.get("error_msg")
490 ns_update_nsr["vld.{}".format(vld_index)] = vld
491 break
492 else:
493 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
494
495 def set_vnfr_at_error(self, db_vnfrs, error_text):
496 try:
497 for db_vnfr in db_vnfrs.values():
498 vnfr_update = {"status": "ERROR"}
499 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
500 if "status" not in vdur:
501 vdur["status"] = "ERROR"
502 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
503 if error_text:
504 vdur["status-detailed"] = str(error_text)
505 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
506 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
507 except DbException as e:
508 self.logger.error("Cannot update vnf. {}".format(e))
509
510 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
511 """
512 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
513 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
514 :param nsr_desc_RO: nsr descriptor from RO
515 :return: Nothing, LcmException is raised on errors
516 """
517 for vnf_index, db_vnfr in db_vnfrs.items():
518 for vnf_RO in nsr_desc_RO["vnfs"]:
519 if vnf_RO["member_vnf_index"] != vnf_index:
520 continue
521 vnfr_update = {}
522 if vnf_RO.get("ip_address"):
523 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
524 elif not db_vnfr.get("ip-address"):
525 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
526 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
527
528 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
529 vdur_RO_count_index = 0
530 if vdur.get("pdu-type"):
531 continue
532 for vdur_RO in get_iterable(vnf_RO, "vms"):
533 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
534 continue
535 if vdur["count-index"] != vdur_RO_count_index:
536 vdur_RO_count_index += 1
537 continue
538 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
539 if vdur_RO.get("ip_address"):
540 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
541 else:
542 vdur["ip-address"] = None
543 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
544 vdur["name"] = vdur_RO.get("vim_name")
545 vdur["status"] = vdur_RO.get("status")
546 vdur["status-detailed"] = vdur_RO.get("error_msg")
547 for ifacer in get_iterable(vdur, "interfaces"):
548 for interface_RO in get_iterable(vdur_RO, "interfaces"):
549 if ifacer["name"] == interface_RO.get("internal_name"):
550 ifacer["ip-address"] = interface_RO.get("ip_address")
551 ifacer["mac-address"] = interface_RO.get("mac_address")
552 break
553 else:
554 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
555 "from VIM info"
556 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
557 vnfr_update["vdur.{}".format(vdu_index)] = vdur
558 break
559 else:
560 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
561 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
562
563 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
564 for net_RO in get_iterable(nsr_desc_RO, "nets"):
565 if vld["id"] != net_RO.get("vnf_net_osm_id"):
566 continue
567 vld["vim-id"] = net_RO.get("vim_net_id")
568 vld["name"] = net_RO.get("vim_name")
569 vld["status"] = net_RO.get("status")
570 vld["status-detailed"] = net_RO.get("error_msg")
571 vnfr_update["vld.{}".format(vld_index)] = vld
572 break
573 else:
574 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
575 vnf_index, vld["id"]))
576
577 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
578 break
579
580 else:
581 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
582
583 def _get_ns_config_info(self, nsr_id):
584 """
585 Generates a mapping between vnf,vdu elements and the N2VC id
586 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
587 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
588 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
589 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
590 """
591 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
592 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
593 mapping = {}
594 ns_config_info = {"osm-config-mapping": mapping}
595 for vca in vca_deployed_list:
596 if not vca["member-vnf-index"]:
597 continue
598 if not vca["vdu_id"]:
599 mapping[vca["member-vnf-index"]] = vca["application"]
600 else:
601 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
602 vca["application"]
603 return ns_config_info
604
605 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
606 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
607
608 db_vims = {}
609
610 def get_vim_account(vim_account_id):
611 nonlocal db_vims
612 if vim_account_id in db_vims:
613 return db_vims[vim_account_id]
614 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
615 db_vims[vim_account_id] = db_vim
616 return db_vim
617
618 # modify target_vld info with instantiation parameters
619 def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
620 if vld_params.get("ip-profile"):
621 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
622 if vld_params.get("provider-network"):
623 target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
624 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
625 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
626 if vld_params.get("wimAccountId"):
627 target_wim = "wim:{}".format(vld_params["wimAccountId"])
628 target_vld["vim_info"][target_wim] = {}
629 for param in ("vim-network-name", "vim-network-id"):
630 if vld_params.get(param):
631 if isinstance(vld_params[param], dict):
632 for vim, vim_net in vld_params[param].items():
633 other_target_vim = "vim:" + vim
634 populate_dict(target_vld["vim_info"], (other_target_vim, param.replace("-", "_")), vim_net)
635 else: # isinstance str
636 target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
637 if vld_params.get("common_id"):
638 target_vld["common_id"] = vld_params.get("common_id")
639
640 nslcmop_id = db_nslcmop["_id"]
641 target = {
642 "name": db_nsr["name"],
643 "ns": {"vld": []},
644 "vnf": [],
645 "image": deepcopy(db_nsr["image"]),
646 "flavor": deepcopy(db_nsr["flavor"]),
647 "action_id": nslcmop_id,
648 "cloud_init_content": {},
649 }
650 for image in target["image"]:
651 image["vim_info"] = {}
652 for flavor in target["flavor"]:
653 flavor["vim_info"] = {}
654
655 if db_nslcmop.get("lcmOperationType") != "instantiate":
656 # get parameters of instantiation:
657 db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
658 "lcmOperationType": "instantiate"})[-1]
659 ns_params = db_nslcmop_instantiate.get("operationParams")
660 else:
661 ns_params = db_nslcmop.get("operationParams")
662 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
663 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
664
665 cp2target = {}
666 for vld_index, vld in enumerate(db_nsr.get("vld")):
667 target_vim = "vim:{}".format(ns_params["vimAccountId"])
668 target_vld = {
669 "id": vld["id"],
670 "name": vld["name"],
671 "mgmt-network": vld.get("mgmt-network", False),
672 "type": vld.get("type"),
673 "vim_info": {
674 target_vim: {
675 "vim_network_name": vld.get("vim-network-name"),
676 "vim_account_id": ns_params["vimAccountId"]
677 }
678 }
679 }
680 # check if this network needs SDN assist
681 if vld.get("pci-interfaces"):
682 db_vim = get_vim_account(ns_params["vimAccountId"])
683 sdnc_id = db_vim["config"].get("sdn-controller")
684 if sdnc_id:
685 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
686 target_sdn = "sdn:{}".format(sdnc_id)
687 target_vld["vim_info"][target_sdn] = {
688 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
689
690 nsd_vnf_profiles = get_vnf_profiles(nsd)
691 for nsd_vnf_profile in nsd_vnf_profiles:
692 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
693 if cp["virtual-link-profile-id"] == vld["id"]:
694 cp2target["member_vnf:{}.{}".format(
695 cp["constituent-cpd-id"][0]["constituent-base-element-id"],
696 cp["constituent-cpd-id"][0]["constituent-cpd-id"]
697 )] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
698
699 # check at nsd descriptor, if there is an ip-profile
700 vld_params = {}
701 virtual_link_profiles = get_virtual_link_profiles(nsd)
702
703 for vlp in virtual_link_profiles:
704 ip_profile = find_in_list(nsd["ip-profiles"],
705 lambda profile: profile["name"] == vlp["ip-profile-ref"])
706 vld_params["ip-profile"] = ip_profile["ip-profile-params"]
707 # update vld_params with instantiation params
708 vld_instantiation_params = find_in_list(get_iterable(ns_params, "vld"),
709 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]))
710 if vld_instantiation_params:
711 vld_params.update(vld_instantiation_params)
712 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
713 target["ns"]["vld"].append(target_vld)
714
715 for vnfr in db_vnfrs.values():
716 vnfd = find_in_list(db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"])
717 vnf_params = find_in_list(get_iterable(ns_params, "vnf"),
718 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"])
719 target_vnf = deepcopy(vnfr)
720 target_vim = "vim:{}".format(vnfr["vim-account-id"])
721 for vld in target_vnf.get("vld", ()):
722 # check if connected to a ns.vld, to fill target'
723 vnf_cp = find_in_list(vnfd.get("int-virtual-link-desc", ()),
724 lambda cpd: cpd.get("id") == vld["id"])
725 if vnf_cp:
726 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
727 if cp2target.get(ns_cp):
728 vld["target"] = cp2target[ns_cp]
729
730 vld["vim_info"] = {target_vim: {"vim_network_name": vld.get("vim-network-name")}}
731 # check if this network needs SDN assist
732 target_sdn = None
733 if vld.get("pci-interfaces"):
734 db_vim = get_vim_account(vnfr["vim-account-id"])
735 sdnc_id = db_vim["config"].get("sdn-controller")
736 if sdnc_id:
737 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
738 target_sdn = "sdn:{}".format(sdnc_id)
739 vld["vim_info"][target_sdn] = {
740 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
741
742 # check at vnfd descriptor, if there is an ip-profile
743 vld_params = {}
744 vnfd_vlp = find_in_list(
745 get_virtual_link_profiles(vnfd),
746 lambda a_link_profile: a_link_profile["id"] == vld["id"]
747 )
748 if vnfd_vlp and vnfd_vlp.get("virtual-link-protocol-data") and \
749 vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
750 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
751 ip_profile_dest_data = {}
752 if "ip-version" in ip_profile_source_data:
753 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
754 if "cidr" in ip_profile_source_data:
755 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
756 if "gateway-ip" in ip_profile_source_data:
757 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
758 if "dhcp-enabled" in ip_profile_source_data:
759 ip_profile_dest_data["dhcp-params"] = {
760 "enabled": ip_profile_source_data["dhcp-enabled"]
761 }
762
763 vld_params["ip-profile"] = ip_profile_dest_data
764 # update vld_params with instantiation params
765 if vnf_params:
766 vld_instantiation_params = find_in_list(get_iterable(vnf_params, "internal-vld"),
767 lambda i_vld: i_vld["name"] == vld["id"])
768 if vld_instantiation_params:
769 vld_params.update(vld_instantiation_params)
770 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
771
772 vdur_list = []
773 for vdur in target_vnf.get("vdur", ()):
774 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
775 continue # This vdu must not be created
776 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
777
778 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
779
780 if ssh_keys_all:
781 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
782 vnf_configuration = get_configuration(vnfd, vnfd["id"])
783 if vdu_configuration and vdu_configuration.get("config-access") and \
784 vdu_configuration.get("config-access").get("ssh-access"):
785 vdur["ssh-keys"] = ssh_keys_all
786 vdur["ssh-access-required"] = vdu_configuration["config-access"]["ssh-access"]["required"]
787 elif vnf_configuration and vnf_configuration.get("config-access") and \
788 vnf_configuration.get("config-access").get("ssh-access") and \
789 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
790 vdur["ssh-keys"] = ssh_keys_all
791 vdur["ssh-access-required"] = vnf_configuration["config-access"]["ssh-access"]["required"]
792 elif ssh_keys_instantiation and \
793 find_in_list(vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")):
794 vdur["ssh-keys"] = ssh_keys_instantiation
795
796 self.logger.debug("NS > vdur > {}".format(vdur))
797
798 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
799 # cloud-init
800 if vdud.get("cloud-init-file"):
801 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
802 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
803 if vdur["cloud-init"] not in target["cloud_init_content"]:
804 base_folder = vnfd["_admin"]["storage"]
805 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
806 vdud.get("cloud-init-file"))
807 with self.fs.file_open(cloud_init_file, "r") as ci_file:
808 target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
809 elif vdud.get("cloud-init"):
810 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"]))
811 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
812 target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
813 vdur["additionalParams"] = vdur.get("additionalParams") or {}
814 deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
815 deploy_params_vdu["OSM"] = get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
816 vdur["additionalParams"] = deploy_params_vdu
817
818 # flavor
819 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
820 if target_vim not in ns_flavor["vim_info"]:
821 ns_flavor["vim_info"][target_vim] = {}
822
823 # deal with images
824 # in case alternative images are provided we must check if they should be applied
825 # for the vim_type, modify the vim_type taking into account
826 ns_image_id = int(vdur["ns-image-id"])
827 if vdur.get("alt-image-ids"):
828 db_vim = get_vim_account(vnfr["vim-account-id"])
829 vim_type = db_vim["vim_type"]
830 for alt_image_id in vdur.get("alt-image-ids"):
831 ns_alt_image = target["image"][int(alt_image_id)]
832 if vim_type == ns_alt_image.get("vim-type"):
833 # must use alternative image
834 self.logger.debug("use alternative image id: {}".format(alt_image_id))
835 ns_image_id = alt_image_id
836 vdur["ns-image-id"] = ns_image_id
837 break
838 ns_image = target["image"][int(ns_image_id)]
839 if target_vim not in ns_image["vim_info"]:
840 ns_image["vim_info"][target_vim] = {}
841
842 vdur["vim_info"] = {target_vim: {}}
843 # instantiation parameters
844 # if vnf_params:
845 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
846 # vdud["id"]), None)
847 vdur_list.append(vdur)
848 target_vnf["vdur"] = vdur_list
849 target["vnf"].append(target_vnf)
850
851 desc = await self.RO.deploy(nsr_id, target)
852 self.logger.debug("RO return > {}".format(desc))
853 action_id = desc["action_id"]
854 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
855
856 # Updating NSR
857 db_nsr_update = {
858 "_admin.deployed.RO.operational-status": "running",
859 "detailed-status": " ".join(stage)
860 }
861 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
862 self.update_db_2("nsrs", nsr_id, db_nsr_update)
863 self._write_op_status(nslcmop_id, stage)
864 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
865 return
866
867 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
868 detailed_status_old = None
869 db_nsr_update = {}
870 start_time = start_time or time()
871 while time() <= start_time + timeout:
872 desc_status = await self.RO.status(nsr_id, action_id)
873 self.logger.debug("Wait NG RO > {}".format(desc_status))
874 if desc_status["status"] == "FAILED":
875 raise NgRoException(desc_status["details"])
876 elif desc_status["status"] == "BUILD":
877 if stage:
878 stage[2] = "VIM: ({})".format(desc_status["details"])
879 elif desc_status["status"] == "DONE":
880 if stage:
881 stage[2] = "Deployed at VIM"
882 break
883 else:
884 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
885 if stage and nslcmop_id and stage[2] != detailed_status_old:
886 detailed_status_old = stage[2]
887 db_nsr_update["detailed-status"] = " ".join(stage)
888 self.update_db_2("nsrs", nsr_id, db_nsr_update)
889 self._write_op_status(nslcmop_id, stage)
890 await asyncio.sleep(15, loop=self.loop)
891 else: # timeout_ns_deploy
892 raise NgRoException("Timeout waiting ns to deploy")
893
894 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
895 db_nsr_update = {}
896 failed_detail = []
897 action_id = None
898 start_deploy = time()
899 try:
900 target = {
901 "ns": {"vld": []},
902 "vnf": [],
903 "image": [],
904 "flavor": [],
905 "action_id": nslcmop_id
906 }
907 desc = await self.RO.deploy(nsr_id, target)
908 action_id = desc["action_id"]
909 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
910 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
911 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
912
913 # wait until done
914 delete_timeout = 20 * 60 # 20 minutes
915 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
916
917 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
918 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
919 # delete all nsr
920 await self.RO.delete(nsr_id)
921 except Exception as e:
922 if isinstance(e, NgRoException) and e.http_code == 404: # not found
923 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
924 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
925 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
926 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
927 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
928 failed_detail.append("delete conflict: {}".format(e))
929 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
930 else:
931 failed_detail.append("delete error: {}".format(e))
932 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
933
934 if failed_detail:
935 stage[2] = "Error deleting from VIM"
936 else:
937 stage[2] = "Deleted from VIM"
938 db_nsr_update["detailed-status"] = " ".join(stage)
939 self.update_db_2("nsrs", nsr_id, db_nsr_update)
940 self._write_op_status(nslcmop_id, stage)
941
942 if failed_detail:
943 raise LcmException("; ".join(failed_detail))
944 return
945
946 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
947 n2vc_key_list, stage):
948 """
949 Instantiate at RO
950 :param logging_text: preffix text to use at logging
951 :param nsr_id: nsr identity
952 :param nsd: database content of ns descriptor
953 :param db_nsr: database content of ns record
954 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
955 :param db_vnfrs:
956 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
957 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
958 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
959 :return: None or exception
960 """
961 try:
962 start_deploy = time()
963 ns_params = db_nslcmop.get("operationParams")
964 if ns_params and ns_params.get("timeout_ns_deploy"):
965 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
966 else:
967 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
968
969 # Check for and optionally request placement optimization. Database will be updated if placement activated
970 stage[2] = "Waiting for Placement."
971 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
972 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
973 for vnfr in db_vnfrs.values():
974 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
975 break
976 else:
977 ns_params["vimAccountId"] == vnfr["vim-account-id"]
978
979 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
980 db_vnfds, n2vc_key_list, stage, start_deploy, timeout_ns_deploy)
981 except Exception as e:
982 stage[2] = "ERROR deploying at VIM"
983 self.set_vnfr_at_error(db_vnfrs, str(e))
984 self.logger.error("Error deploying at VIM {}".format(e),
985 exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
986 NgRoException)))
987 raise
988
989 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
990 """
991 Wait for kdu to be up, get ip address
992 :param logging_text: prefix use for logging
993 :param nsr_id:
994 :param vnfr_id:
995 :param kdu_name:
996 :return: IP address
997 """
998
999 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1000 nb_tries = 0
1001
1002 while nb_tries < 360:
1003 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1004 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
1005 if not kdur:
1006 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
1007 if kdur.get("status"):
1008 if kdur["status"] in ("READY", "ENABLED"):
1009 return kdur.get("ip-address")
1010 else:
1011 raise LcmException("target KDU={} is in error state".format(kdu_name))
1012
1013 await asyncio.sleep(10, loop=self.loop)
1014 nb_tries += 1
1015 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1016
1017 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1018 """
1019 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1020 :param logging_text: prefix use for logging
1021 :param nsr_id:
1022 :param vnfr_id:
1023 :param vdu_id:
1024 :param vdu_index:
1025 :param pub_key: public ssh key to inject, None to skip
1026 :param user: user to apply the public ssh key
1027 :return: IP address
1028 """
1029
1030 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1031 ro_nsr_id = None
1032 ip_address = None
1033 nb_tries = 0
1034 target_vdu_id = None
1035 ro_retries = 0
1036
1037 while True:
1038
1039 ro_retries += 1
1040 if ro_retries >= 360: # 1 hour
1041 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1042
1043 await asyncio.sleep(10, loop=self.loop)
1044
1045 # get ip address
1046 if not target_vdu_id:
1047 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1048
1049 if not vdu_id: # for the VNF case
1050 if db_vnfr.get("status") == "ERROR":
1051 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1052 ip_address = db_vnfr.get("ip-address")
1053 if not ip_address:
1054 continue
1055 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1056 else: # VDU case
1057 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1058 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1059
1060 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1061 vdur = db_vnfr["vdur"][0]
1062 if not vdur:
1063 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1064 vdu_index))
1065 # New generation RO stores information at "vim_info"
1066 ng_ro_status = None
1067 target_vim = None
1068 if vdur.get("vim_info"):
1069 target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
1070 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1071 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
1072 ip_address = vdur.get("ip-address")
1073 if not ip_address:
1074 continue
1075 target_vdu_id = vdur["vdu-id-ref"]
1076 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1077 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1078
1079 if not target_vdu_id:
1080 continue
1081
1082 # inject public key into machine
1083 if pub_key and user:
1084 self.logger.debug(logging_text + "Inserting RO key")
1085 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1086 if vdur.get("pdu-type"):
1087 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1088 return ip_address
1089 try:
1090 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1091 if self.ng_ro:
1092 target = {"action": {"action": "inject_ssh_key", "key": pub_key, "user": user},
1093 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1094 }
1095 desc = await self.RO.deploy(nsr_id, target)
1096 action_id = desc["action_id"]
1097 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1098 break
1099 else:
1100 # wait until NS is deployed at RO
1101 if not ro_nsr_id:
1102 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1103 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1104 if not ro_nsr_id:
1105 continue
1106 result_dict = await self.RO.create_action(
1107 item="ns",
1108 item_id_name=ro_nsr_id,
1109 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1110 )
1111 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1112 if not result_dict or not isinstance(result_dict, dict):
1113 raise LcmException("Unknown response from RO when injecting key")
1114 for result in result_dict.values():
1115 if result.get("vim_result") == 200:
1116 break
1117 else:
1118 raise ROclient.ROClientException("error injecting key: {}".format(
1119 result.get("description")))
1120 break
1121 except NgRoException as e:
1122 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1123 except ROclient.ROClientException as e:
1124 if not nb_tries:
1125 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1126 format(e, 20*10))
1127 nb_tries += 1
1128 if nb_tries >= 20:
1129 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1130 else:
1131 break
1132
1133 return ip_address
1134
1135 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1136 """
1137 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1138 """
1139 my_vca = vca_deployed_list[vca_index]
1140 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1141 # vdu or kdu: no dependencies
1142 return
1143 timeout = 300
1144 while timeout >= 0:
1145 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1146 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1147 configuration_status_list = db_nsr["configurationStatus"]
1148 for index, vca_deployed in enumerate(configuration_status_list):
1149 if index == vca_index:
1150 # myself
1151 continue
1152 if not my_vca.get("member-vnf-index") or \
1153 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1154 internal_status = configuration_status_list[index].get("status")
1155 if internal_status == 'READY':
1156 continue
1157 elif internal_status == 'BROKEN':
1158 raise LcmException("Configuration aborted because dependent charm/s has failed")
1159 else:
1160 break
1161 else:
1162 # no dependencies, return
1163 return
1164 await asyncio.sleep(10)
1165 timeout -= 1
1166
1167 raise LcmException("Configuration aborted because dependent charm/s timeout")
1168
1169 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1170 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1171 ee_config_descriptor):
1172 nsr_id = db_nsr["_id"]
1173 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1174 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1175 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1176 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1177 db_dict = {
1178 'collection': 'nsrs',
1179 'filter': {'_id': nsr_id},
1180 'path': db_update_entry
1181 }
1182 step = ""
1183 try:
1184
1185 element_type = 'NS'
1186 element_under_configuration = nsr_id
1187
1188 vnfr_id = None
1189 if db_vnfr:
1190 vnfr_id = db_vnfr["_id"]
1191 osm_config["osm"]["vnf_id"] = vnfr_id
1192
1193 namespace = "{nsi}.{ns}".format(
1194 nsi=nsi_id if nsi_id else "",
1195 ns=nsr_id)
1196
1197 if vnfr_id:
1198 element_type = 'VNF'
1199 element_under_configuration = vnfr_id
1200 namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
1201 if vdu_id:
1202 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1203 element_type = 'VDU'
1204 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1205 osm_config["osm"]["vdu_id"] = vdu_id
1206 elif kdu_name:
1207 namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
1208 element_type = 'KDU'
1209 element_under_configuration = kdu_name
1210 osm_config["osm"]["kdu_name"] = kdu_name
1211
1212 # Get artifact path
1213 artifact_path = "{}/{}/{}/{}".format(
1214 base_folder["folder"],
1215 base_folder["pkg-dir"],
1216 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1217 vca_name
1218 )
1219
1220 self.logger.debug("Artifact path > {}".format(artifact_path))
1221
1222 # get initial_config_primitive_list that applies to this element
1223 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1224
1225 self.logger.debug("Initial config primitive list > {}".format(initial_config_primitive_list))
1226
1227 # add config if not present for NS charm
1228 ee_descriptor_id = ee_config_descriptor.get("id")
1229 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1230 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list,
1231 vca_deployed, ee_descriptor_id)
1232
1233 self.logger.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list))
1234 # n2vc_redesign STEP 3.1
1235 # find old ee_id if exists
1236 ee_id = vca_deployed.get("ee_id")
1237
1238 vim_account_id = (
1239 deep_get(db_vnfr, ("vim-account-id",)) or
1240 deep_get(deploy_params, ("OSM", "vim_account_id"))
1241 )
1242 vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
1243 vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
1244 # create or register execution environment in VCA
1245 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1246
1247 self._write_configuration_status(
1248 nsr_id=nsr_id,
1249 vca_index=vca_index,
1250 status='CREATING',
1251 element_under_configuration=element_under_configuration,
1252 element_type=element_type
1253 )
1254
1255 step = "create execution environment"
1256 self.logger.debug(logging_text + step)
1257
1258 ee_id = None
1259 credentials = None
1260 if vca_type == "k8s_proxy_charm":
1261 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1262 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1263 namespace=namespace,
1264 artifact_path=artifact_path,
1265 db_dict=db_dict,
1266 cloud_name=vca_k8s_cloud,
1267 credential_name=vca_k8s_cloud_credential,
1268 )
1269 elif vca_type == "helm" or vca_type == "helm-v3":
1270 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1271 namespace=namespace,
1272 reuse_ee_id=ee_id,
1273 db_dict=db_dict,
1274 config=osm_config,
1275 artifact_path=artifact_path,
1276 vca_type=vca_type
1277 )
1278 else:
1279 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1280 namespace=namespace,
1281 reuse_ee_id=ee_id,
1282 db_dict=db_dict,
1283 cloud_name=vca_cloud,
1284 credential_name=vca_cloud_credential,
1285 )
1286
1287 elif vca_type == "native_charm":
1288 step = "Waiting to VM being up and getting IP address"
1289 self.logger.debug(logging_text + step)
1290 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1291 user=None, pub_key=None)
1292 credentials = {"hostname": rw_mgmt_ip}
1293 # get username
1294 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1295 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1296 # merged. Meanwhile let's get username from initial-config-primitive
1297 if not username and initial_config_primitive_list:
1298 for config_primitive in initial_config_primitive_list:
1299 for param in config_primitive.get("parameter", ()):
1300 if param["name"] == "ssh-username":
1301 username = param["value"]
1302 break
1303 if not username:
1304 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1305 "'config-access.ssh-access.default-user'")
1306 credentials["username"] = username
1307 # n2vc_redesign STEP 3.2
1308
1309 self._write_configuration_status(
1310 nsr_id=nsr_id,
1311 vca_index=vca_index,
1312 status='REGISTERING',
1313 element_under_configuration=element_under_configuration,
1314 element_type=element_type
1315 )
1316
1317 step = "register execution environment {}".format(credentials)
1318 self.logger.debug(logging_text + step)
1319 ee_id = await self.vca_map[vca_type].register_execution_environment(
1320 credentials=credentials,
1321 namespace=namespace,
1322 db_dict=db_dict,
1323 cloud_name=vca_cloud,
1324 credential_name=vca_cloud_credential,
1325 )
1326
1327 # for compatibility with MON/POL modules, the need model and application name at database
1328 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1329 ee_id_parts = ee_id.split('.')
1330 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1331 if len(ee_id_parts) >= 2:
1332 model_name = ee_id_parts[0]
1333 application_name = ee_id_parts[1]
1334 db_nsr_update[db_update_entry + "model"] = model_name
1335 db_nsr_update[db_update_entry + "application"] = application_name
1336
1337 # n2vc_redesign STEP 3.3
1338 step = "Install configuration Software"
1339
1340 self._write_configuration_status(
1341 nsr_id=nsr_id,
1342 vca_index=vca_index,
1343 status='INSTALLING SW',
1344 element_under_configuration=element_under_configuration,
1345 element_type=element_type,
1346 other_update=db_nsr_update
1347 )
1348
1349 # TODO check if already done
1350 self.logger.debug(logging_text + step)
1351 config = None
1352 if vca_type == "native_charm":
1353 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1354 if config_primitive:
1355 config = self._map_primitive_params(
1356 config_primitive,
1357 {},
1358 deploy_params
1359 )
1360 num_units = 1
1361 if vca_type == "lxc_proxy_charm":
1362 if element_type == "NS":
1363 num_units = db_nsr.get("config-units") or 1
1364 elif element_type == "VNF":
1365 num_units = db_vnfr.get("config-units") or 1
1366 elif element_type == "VDU":
1367 for v in db_vnfr["vdur"]:
1368 if vdu_id == v["vdu-id-ref"]:
1369 num_units = v.get("config-units") or 1
1370 break
1371 if vca_type != "k8s_proxy_charm":
1372 await self.vca_map[vca_type].install_configuration_sw(
1373 ee_id=ee_id,
1374 artifact_path=artifact_path,
1375 db_dict=db_dict,
1376 config=config,
1377 num_units=num_units,
1378 )
1379
1380 # write in db flag of configuration_sw already installed
1381 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1382
1383 # add relations for this VCA (wait for other peers related with this VCA)
1384 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1385 vca_index=vca_index, vca_type=vca_type)
1386
1387 # if SSH access is required, then get execution environment SSH public
1388 # if native charm we have waited already to VM be UP
1389 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1390 pub_key = None
1391 user = None
1392 # self.logger.debug("get ssh key block")
1393 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1394 # self.logger.debug("ssh key needed")
1395 # Needed to inject a ssh key
1396 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1397 step = "Install configuration Software, getting public ssh key"
1398 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1399
1400 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1401 else:
1402 # self.logger.debug("no need to get ssh key")
1403 step = "Waiting to VM being up and getting IP address"
1404 self.logger.debug(logging_text + step)
1405
1406 # n2vc_redesign STEP 5.1
1407 # wait for RO (ip-address) Insert pub_key into VM
1408 if vnfr_id:
1409 if kdu_name:
1410 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1411 else:
1412 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1413 vdu_index, user=user, pub_key=pub_key)
1414 else:
1415 rw_mgmt_ip = None # This is for a NS configuration
1416
1417 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1418
1419 # store rw_mgmt_ip in deploy params for later replacement
1420 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1421
1422 # n2vc_redesign STEP 6 Execute initial config primitive
1423 step = 'execute initial config primitive'
1424
1425 # wait for dependent primitives execution (NS -> VNF -> VDU)
1426 if initial_config_primitive_list:
1427 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1428
1429 # stage, in function of element type: vdu, kdu, vnf or ns
1430 my_vca = vca_deployed_list[vca_index]
1431 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1432 # VDU or KDU
1433 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1434 elif my_vca.get("member-vnf-index"):
1435 # VNF
1436 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1437 else:
1438 # NS
1439 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1440
1441 self._write_configuration_status(
1442 nsr_id=nsr_id,
1443 vca_index=vca_index,
1444 status='EXECUTING PRIMITIVE'
1445 )
1446
1447 self._write_op_status(
1448 op_id=nslcmop_id,
1449 stage=stage
1450 )
1451
1452 check_if_terminated_needed = True
1453 for initial_config_primitive in initial_config_primitive_list:
1454 # adding information on the vca_deployed if it is a NS execution environment
1455 if not vca_deployed["member-vnf-index"]:
1456 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1457 # TODO check if already done
1458 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1459
1460 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1461 self.logger.debug(logging_text + step)
1462 await self.vca_map[vca_type].exec_primitive(
1463 ee_id=ee_id,
1464 primitive_name=initial_config_primitive["name"],
1465 params_dict=primitive_params_,
1466 db_dict=db_dict
1467 )
1468 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1469 if check_if_terminated_needed:
1470 if config_descriptor.get('terminate-config-primitive'):
1471 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1472 check_if_terminated_needed = False
1473
1474 # TODO register in database that primitive is done
1475
1476 # STEP 7 Configure metrics
1477 if vca_type == "helm" or vca_type == "helm-v3":
1478 prometheus_jobs = await self.add_prometheus_metrics(
1479 ee_id=ee_id,
1480 artifact_path=artifact_path,
1481 ee_config_descriptor=ee_config_descriptor,
1482 vnfr_id=vnfr_id,
1483 nsr_id=nsr_id,
1484 target_ip=rw_mgmt_ip,
1485 )
1486 if prometheus_jobs:
1487 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1488
1489 step = "instantiated at VCA"
1490 self.logger.debug(logging_text + step)
1491
1492 self._write_configuration_status(
1493 nsr_id=nsr_id,
1494 vca_index=vca_index,
1495 status='READY'
1496 )
1497
1498 except Exception as e: # TODO not use Exception but N2VC exception
1499 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1500 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1501 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1502 self._write_configuration_status(
1503 nsr_id=nsr_id,
1504 vca_index=vca_index,
1505 status='BROKEN'
1506 )
1507 raise LcmException("{} {}".format(step, e)) from e
1508
1509 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1510 error_description: str = None, error_detail: str = None, other_update: dict = None):
1511 """
1512 Update db_nsr fields.
1513 :param nsr_id:
1514 :param ns_state:
1515 :param current_operation:
1516 :param current_operation_id:
1517 :param error_description:
1518 :param error_detail:
1519 :param other_update: Other required changes at database if provided, will be cleared
1520 :return:
1521 """
1522 try:
1523 db_dict = other_update or {}
1524 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1525 db_dict["_admin.current-operation"] = current_operation_id
1526 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1527 db_dict["currentOperation"] = current_operation
1528 db_dict["currentOperationID"] = current_operation_id
1529 db_dict["errorDescription"] = error_description
1530 db_dict["errorDetail"] = error_detail
1531
1532 if ns_state:
1533 db_dict["nsState"] = ns_state
1534 self.update_db_2("nsrs", nsr_id, db_dict)
1535 except DbException as e:
1536 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1537
1538 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1539 operation_state: str = None, other_update: dict = None):
1540 try:
1541 db_dict = other_update or {}
1542 db_dict['queuePosition'] = queuePosition
1543 if isinstance(stage, list):
1544 db_dict['stage'] = stage[0]
1545 db_dict['detailed-status'] = " ".join(stage)
1546 elif stage is not None:
1547 db_dict['stage'] = str(stage)
1548
1549 if error_message is not None:
1550 db_dict['errorMessage'] = error_message
1551 if operation_state is not None:
1552 db_dict['operationState'] = operation_state
1553 db_dict["statusEnteredTime"] = time()
1554 self.update_db_2("nslcmops", op_id, db_dict)
1555 except DbException as e:
1556 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1557
1558 def _write_all_config_status(self, db_nsr: dict, status: str):
1559 try:
1560 nsr_id = db_nsr["_id"]
1561 # configurationStatus
1562 config_status = db_nsr.get('configurationStatus')
1563 if config_status:
1564 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1565 enumerate(config_status) if v}
1566 # update status
1567 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1568
1569 except DbException as e:
1570 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1571
1572 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1573 element_under_configuration: str = None, element_type: str = None,
1574 other_update: dict = None):
1575
1576 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1577 # .format(vca_index, status))
1578
1579 try:
1580 db_path = 'configurationStatus.{}.'.format(vca_index)
1581 db_dict = other_update or {}
1582 if status:
1583 db_dict[db_path + 'status'] = status
1584 if element_under_configuration:
1585 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1586 if element_type:
1587 db_dict[db_path + 'elementType'] = element_type
1588 self.update_db_2("nsrs", nsr_id, db_dict)
1589 except DbException as e:
1590 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1591 .format(status, nsr_id, vca_index, e))
1592
1593 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1594 """
1595 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1596 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1597 Database is used because the result can be obtained from a different LCM worker in case of HA.
1598 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1599 :param db_nslcmop: database content of nslcmop
1600 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1601 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1602 computed 'vim-account-id'
1603 """
1604 modified = False
1605 nslcmop_id = db_nslcmop['_id']
1606 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1607 if placement_engine == "PLA":
1608 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1609 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1610 db_poll_interval = 5
1611 wait = db_poll_interval * 10
1612 pla_result = None
1613 while not pla_result and wait >= 0:
1614 await asyncio.sleep(db_poll_interval)
1615 wait -= db_poll_interval
1616 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1617 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1618
1619 if not pla_result:
1620 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1621
1622 for pla_vnf in pla_result['vnf']:
1623 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1624 if not pla_vnf.get('vimAccountId') or not vnfr:
1625 continue
1626 modified = True
1627 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1628 # Modifies db_vnfrs
1629 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1630 return modified
1631
1632 def update_nsrs_with_pla_result(self, params):
1633 try:
1634 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1635 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1636 except Exception as e:
1637 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1638
1639 async def instantiate(self, nsr_id, nslcmop_id):
1640 """
1641
1642 :param nsr_id: ns instance to deploy
1643 :param nslcmop_id: operation to run
1644 :return:
1645 """
1646
1647 # Try to lock HA task here
1648 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1649 if not task_is_locked_by_me:
1650 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1651 return
1652
1653 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1654 self.logger.debug(logging_text + "Enter")
1655
1656 # get all needed from database
1657
1658 # database nsrs record
1659 db_nsr = None
1660
1661 # database nslcmops record
1662 db_nslcmop = None
1663
1664 # update operation on nsrs
1665 db_nsr_update = {}
1666 # update operation on nslcmops
1667 db_nslcmop_update = {}
1668
1669 nslcmop_operation_state = None
1670 db_vnfrs = {} # vnf's info indexed by member-index
1671 # n2vc_info = {}
1672 tasks_dict_info = {} # from task to info text
1673 exc = None
1674 error_list = []
1675 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1676 # ^ stage, step, VIM progress
1677 try:
1678 # wait for any previous tasks in process
1679 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1680
1681 stage[1] = "Sync filesystem from database."
1682 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1683
1684 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1685 stage[1] = "Reading from database."
1686 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1687 db_nsr_update["detailed-status"] = "creating"
1688 db_nsr_update["operational-status"] = "init"
1689 self._write_ns_status(
1690 nsr_id=nsr_id,
1691 ns_state="BUILDING",
1692 current_operation="INSTANTIATING",
1693 current_operation_id=nslcmop_id,
1694 other_update=db_nsr_update
1695 )
1696 self._write_op_status(
1697 op_id=nslcmop_id,
1698 stage=stage,
1699 queuePosition=0
1700 )
1701
1702 # read from db: operation
1703 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1704 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1705 ns_params = db_nslcmop.get("operationParams")
1706 if ns_params and ns_params.get("timeout_ns_deploy"):
1707 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1708 else:
1709 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1710
1711 # read from db: ns
1712 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1713 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1714 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1715 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1716 db_nsr["nsd"] = nsd
1717 # nsr_name = db_nsr["name"] # TODO short-name??
1718
1719 # read from db: vnf's of this ns
1720 stage[1] = "Getting vnfrs from db."
1721 self.logger.debug(logging_text + stage[1])
1722 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1723
1724 # read from db: vnfd's for every vnf
1725 db_vnfds = [] # every vnfd data
1726
1727 # for each vnf in ns, read vnfd
1728 for vnfr in db_vnfrs_list:
1729 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
1730 vnfd_id = vnfr["vnfd-id"]
1731 vnfd_ref = vnfr["vnfd-ref"]
1732
1733 # if we haven't this vnfd, read it from db
1734 if vnfd_id not in db_vnfds:
1735 # read from db
1736 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
1737 self.logger.debug(logging_text + stage[1])
1738 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1739
1740 # store vnfd
1741 db_vnfds.append(vnfd)
1742
1743 # Get or generates the _admin.deployed.VCA list
1744 vca_deployed_list = None
1745 if db_nsr["_admin"].get("deployed"):
1746 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1747 if vca_deployed_list is None:
1748 vca_deployed_list = []
1749 configuration_status_list = []
1750 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1751 db_nsr_update["configurationStatus"] = configuration_status_list
1752 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1753 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1754 elif isinstance(vca_deployed_list, dict):
1755 # maintain backward compatibility. Change a dict to list at database
1756 vca_deployed_list = list(vca_deployed_list.values())
1757 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1758 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1759
1760 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1761 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1762 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1763
1764 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1765 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1766 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1767 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
1768
1769 # n2vc_redesign STEP 2 Deploy Network Scenario
1770 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1771 self._write_op_status(
1772 op_id=nslcmop_id,
1773 stage=stage
1774 )
1775
1776 stage[1] = "Deploying KDUs."
1777 # self.logger.debug(logging_text + "Before deploy_kdus")
1778 # Call to deploy_kdus in case exists the "vdu:kdu" param
1779 await self.deploy_kdus(
1780 logging_text=logging_text,
1781 nsr_id=nsr_id,
1782 nslcmop_id=nslcmop_id,
1783 db_vnfrs=db_vnfrs,
1784 db_vnfds=db_vnfds,
1785 task_instantiation_info=tasks_dict_info,
1786 )
1787
1788 stage[1] = "Getting VCA public key."
1789 # n2vc_redesign STEP 1 Get VCA public ssh-key
1790 # feature 1429. Add n2vc public key to needed VMs
1791 n2vc_key = self.n2vc.get_public_key()
1792 n2vc_key_list = [n2vc_key]
1793 if self.vca_config.get("public_key"):
1794 n2vc_key_list.append(self.vca_config["public_key"])
1795
1796 stage[1] = "Deploying NS at VIM."
1797 task_ro = asyncio.ensure_future(
1798 self.instantiate_RO(
1799 logging_text=logging_text,
1800 nsr_id=nsr_id,
1801 nsd=nsd,
1802 db_nsr=db_nsr,
1803 db_nslcmop=db_nslcmop,
1804 db_vnfrs=db_vnfrs,
1805 db_vnfds=db_vnfds,
1806 n2vc_key_list=n2vc_key_list,
1807 stage=stage
1808 )
1809 )
1810 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1811 tasks_dict_info[task_ro] = "Deploying at VIM"
1812
1813 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1814 stage[1] = "Deploying Execution Environments."
1815 self.logger.debug(logging_text + stage[1])
1816
1817 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1818 for vnf_profile in get_vnf_profiles(nsd):
1819 vnfd_id = vnf_profile["vnfd-id"]
1820 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
1821 member_vnf_index = str(vnf_profile["id"])
1822 db_vnfr = db_vnfrs[member_vnf_index]
1823 base_folder = vnfd["_admin"]["storage"]
1824 vdu_id = None
1825 vdu_index = 0
1826 vdu_name = None
1827 kdu_name = None
1828
1829 # Get additional parameters
1830 deploy_params = {"OSM": get_osm_params(db_vnfr)}
1831 if db_vnfr.get("additionalParamsForVnf"):
1832 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
1833
1834 descriptor_config = get_configuration(vnfd, vnfd["id"])
1835 if descriptor_config:
1836 self._deploy_n2vc(
1837 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1838 db_nsr=db_nsr,
1839 db_vnfr=db_vnfr,
1840 nslcmop_id=nslcmop_id,
1841 nsr_id=nsr_id,
1842 nsi_id=nsi_id,
1843 vnfd_id=vnfd_id,
1844 vdu_id=vdu_id,
1845 kdu_name=kdu_name,
1846 member_vnf_index=member_vnf_index,
1847 vdu_index=vdu_index,
1848 vdu_name=vdu_name,
1849 deploy_params=deploy_params,
1850 descriptor_config=descriptor_config,
1851 base_folder=base_folder,
1852 task_instantiation_info=tasks_dict_info,
1853 stage=stage
1854 )
1855
1856 # Deploy charms for each VDU that supports one.
1857 for vdud in get_vdu_list(vnfd):
1858 vdu_id = vdud["id"]
1859 descriptor_config = get_configuration(vnfd, vdu_id)
1860 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
1861
1862 if vdur.get("additionalParams"):
1863 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
1864 else:
1865 deploy_params_vdu = deploy_params
1866 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
1867 vdud_count = get_vdu_profile(vnfd, vdu_id).get("max-number-of-instances", 1)
1868
1869 self.logger.debug("VDUD > {}".format(vdud))
1870 self.logger.debug("Descriptor config > {}".format(descriptor_config))
1871 if descriptor_config:
1872 vdu_name = None
1873 kdu_name = None
1874 for vdu_index in range(vdud_count):
1875 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1876 self._deploy_n2vc(
1877 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1878 member_vnf_index, vdu_id, vdu_index),
1879 db_nsr=db_nsr,
1880 db_vnfr=db_vnfr,
1881 nslcmop_id=nslcmop_id,
1882 nsr_id=nsr_id,
1883 nsi_id=nsi_id,
1884 vnfd_id=vnfd_id,
1885 vdu_id=vdu_id,
1886 kdu_name=kdu_name,
1887 member_vnf_index=member_vnf_index,
1888 vdu_index=vdu_index,
1889 vdu_name=vdu_name,
1890 deploy_params=deploy_params_vdu,
1891 descriptor_config=descriptor_config,
1892 base_folder=base_folder,
1893 task_instantiation_info=tasks_dict_info,
1894 stage=stage
1895 )
1896 for kdud in get_kdu_list(vnfd):
1897 kdu_name = kdud["name"]
1898 descriptor_config = get_configuration(vnfd, kdu_name)
1899 if descriptor_config:
1900 vdu_id = None
1901 vdu_index = 0
1902 vdu_name = None
1903 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
1904 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
1905 if kdur.get("additionalParams"):
1906 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
1907
1908 self._deploy_n2vc(
1909 logging_text=logging_text,
1910 db_nsr=db_nsr,
1911 db_vnfr=db_vnfr,
1912 nslcmop_id=nslcmop_id,
1913 nsr_id=nsr_id,
1914 nsi_id=nsi_id,
1915 vnfd_id=vnfd_id,
1916 vdu_id=vdu_id,
1917 kdu_name=kdu_name,
1918 member_vnf_index=member_vnf_index,
1919 vdu_index=vdu_index,
1920 vdu_name=vdu_name,
1921 deploy_params=deploy_params_kdu,
1922 descriptor_config=descriptor_config,
1923 base_folder=base_folder,
1924 task_instantiation_info=tasks_dict_info,
1925 stage=stage
1926 )
1927
1928 # Check if this NS has a charm configuration
1929 descriptor_config = nsd.get("ns-configuration")
1930 if descriptor_config and descriptor_config.get("juju"):
1931 vnfd_id = None
1932 db_vnfr = None
1933 member_vnf_index = None
1934 vdu_id = None
1935 kdu_name = None
1936 vdu_index = 0
1937 vdu_name = None
1938
1939 # Get additional parameters
1940 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
1941 if db_nsr.get("additionalParamsForNs"):
1942 deploy_params.update(parse_yaml_strings(db_nsr["additionalParamsForNs"].copy()))
1943 base_folder = nsd["_admin"]["storage"]
1944 self._deploy_n2vc(
1945 logging_text=logging_text,
1946 db_nsr=db_nsr,
1947 db_vnfr=db_vnfr,
1948 nslcmop_id=nslcmop_id,
1949 nsr_id=nsr_id,
1950 nsi_id=nsi_id,
1951 vnfd_id=vnfd_id,
1952 vdu_id=vdu_id,
1953 kdu_name=kdu_name,
1954 member_vnf_index=member_vnf_index,
1955 vdu_index=vdu_index,
1956 vdu_name=vdu_name,
1957 deploy_params=deploy_params,
1958 descriptor_config=descriptor_config,
1959 base_folder=base_folder,
1960 task_instantiation_info=tasks_dict_info,
1961 stage=stage
1962 )
1963
1964 # rest of staff will be done at finally
1965
1966 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1967 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1968 exc = e
1969 except asyncio.CancelledError:
1970 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1971 exc = "Operation was cancelled"
1972 except Exception as e:
1973 exc = traceback.format_exc()
1974 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1975 finally:
1976 if exc:
1977 error_list.append(str(exc))
1978 try:
1979 # wait for pending tasks
1980 if tasks_dict_info:
1981 stage[1] = "Waiting for instantiate pending tasks."
1982 self.logger.debug(logging_text + stage[1])
1983 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1984 stage, nslcmop_id, nsr_id=nsr_id)
1985 stage[1] = stage[2] = ""
1986 except asyncio.CancelledError:
1987 error_list.append("Cancelled")
1988 # TODO cancel all tasks
1989 except Exception as exc:
1990 error_list.append(str(exc))
1991
1992 # update operation-status
1993 db_nsr_update["operational-status"] = "running"
1994 # let's begin with VCA 'configured' status (later we can change it)
1995 db_nsr_update["config-status"] = "configured"
1996 for task, task_name in tasks_dict_info.items():
1997 if not task.done() or task.cancelled() or task.exception():
1998 if task_name.startswith(self.task_name_deploy_vca):
1999 # A N2VC task is pending
2000 db_nsr_update["config-status"] = "failed"
2001 else:
2002 # RO or KDU task is pending
2003 db_nsr_update["operational-status"] = "failed"
2004
2005 # update status at database
2006 if error_list:
2007 error_detail = ". ".join(error_list)
2008 self.logger.error(logging_text + error_detail)
2009 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
2010 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
2011
2012 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2013 db_nslcmop_update["detailed-status"] = error_detail
2014 nslcmop_operation_state = "FAILED"
2015 ns_state = "BROKEN"
2016 else:
2017 error_detail = None
2018 error_description_nsr = error_description_nslcmop = None
2019 ns_state = "READY"
2020 db_nsr_update["detailed-status"] = "Done"
2021 db_nslcmop_update["detailed-status"] = "Done"
2022 nslcmop_operation_state = "COMPLETED"
2023
2024 if db_nsr:
2025 self._write_ns_status(
2026 nsr_id=nsr_id,
2027 ns_state=ns_state,
2028 current_operation="IDLE",
2029 current_operation_id=None,
2030 error_description=error_description_nsr,
2031 error_detail=error_detail,
2032 other_update=db_nsr_update
2033 )
2034 self._write_op_status(
2035 op_id=nslcmop_id,
2036 stage="",
2037 error_message=error_description_nslcmop,
2038 operation_state=nslcmop_operation_state,
2039 other_update=db_nslcmop_update,
2040 )
2041
2042 if nslcmop_operation_state:
2043 try:
2044 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2045 "operationState": nslcmop_operation_state},
2046 loop=self.loop)
2047 except Exception as e:
2048 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2049
2050 self.logger.debug(logging_text + "Exit")
2051 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2052
2053 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2054 timeout: int = 3600, vca_type: str = None) -> bool:
2055
2056 # steps:
2057 # 1. find all relations for this VCA
2058 # 2. wait for other peers related
2059 # 3. add relations
2060
2061 try:
2062 vca_type = vca_type or "lxc_proxy_charm"
2063
2064 # STEP 1: find all relations for this VCA
2065
2066 # read nsr record
2067 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2068 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2069
2070 # this VCA data
2071 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2072
2073 # read all ns-configuration relations
2074 ns_relations = list()
2075 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2076 if db_ns_relations:
2077 for r in db_ns_relations:
2078 # check if this VCA is in the relation
2079 if my_vca.get('member-vnf-index') in\
2080 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2081 ns_relations.append(r)
2082
2083 # read all vnf-configuration relations
2084 vnf_relations = list()
2085 db_vnfd_list = db_nsr.get('vnfd-id')
2086 if db_vnfd_list:
2087 for vnfd in db_vnfd_list:
2088 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2089 db_vnf_relations = get_configuration(db_vnfd, db_vnfd["id"]).get("relation", [])
2090 if db_vnf_relations:
2091 for r in db_vnf_relations:
2092 # check if this VCA is in the relation
2093 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2094 vnf_relations.append(r)
2095
2096 # if no relations, terminate
2097 if not ns_relations and not vnf_relations:
2098 self.logger.debug(logging_text + ' No relations')
2099 return True
2100
2101 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2102
2103 # add all relations
2104 start = time()
2105 while True:
2106 # check timeout
2107 now = time()
2108 if now - start >= timeout:
2109 self.logger.error(logging_text + ' : timeout adding relations')
2110 return False
2111
2112 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2113 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2114
2115 # for each defined NS relation, find the VCA's related
2116 for r in ns_relations.copy():
2117 from_vca_ee_id = None
2118 to_vca_ee_id = None
2119 from_vca_endpoint = None
2120 to_vca_endpoint = None
2121 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2122 for vca in vca_list:
2123 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2124 and vca.get('config_sw_installed'):
2125 from_vca_ee_id = vca.get('ee_id')
2126 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2127 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2128 and vca.get('config_sw_installed'):
2129 to_vca_ee_id = vca.get('ee_id')
2130 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2131 if from_vca_ee_id and to_vca_ee_id:
2132 # add relation
2133 await self.vca_map[vca_type].add_relation(
2134 ee_id_1=from_vca_ee_id,
2135 ee_id_2=to_vca_ee_id,
2136 endpoint_1=from_vca_endpoint,
2137 endpoint_2=to_vca_endpoint)
2138 # remove entry from relations list
2139 ns_relations.remove(r)
2140 else:
2141 # check failed peers
2142 try:
2143 vca_status_list = db_nsr.get('configurationStatus')
2144 if vca_status_list:
2145 for i in range(len(vca_list)):
2146 vca = vca_list[i]
2147 vca_status = vca_status_list[i]
2148 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2149 if vca_status.get('status') == 'BROKEN':
2150 # peer broken: remove relation from list
2151 ns_relations.remove(r)
2152 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2153 if vca_status.get('status') == 'BROKEN':
2154 # peer broken: remove relation from list
2155 ns_relations.remove(r)
2156 except Exception:
2157 # ignore
2158 pass
2159
2160 # for each defined VNF relation, find the VCA's related
2161 for r in vnf_relations.copy():
2162 from_vca_ee_id = None
2163 to_vca_ee_id = None
2164 from_vca_endpoint = None
2165 to_vca_endpoint = None
2166 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2167 for vca in vca_list:
2168 key_to_check = "vdu_id"
2169 if vca.get("vdu_id") is None:
2170 key_to_check = "vnfd_id"
2171 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2172 from_vca_ee_id = vca.get('ee_id')
2173 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2174 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2175 to_vca_ee_id = vca.get('ee_id')
2176 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2177 if from_vca_ee_id and to_vca_ee_id:
2178 # add relation
2179 await self.vca_map[vca_type].add_relation(
2180 ee_id_1=from_vca_ee_id,
2181 ee_id_2=to_vca_ee_id,
2182 endpoint_1=from_vca_endpoint,
2183 endpoint_2=to_vca_endpoint)
2184 # remove entry from relations list
2185 vnf_relations.remove(r)
2186 else:
2187 # check failed peers
2188 try:
2189 vca_status_list = db_nsr.get('configurationStatus')
2190 if vca_status_list:
2191 for i in range(len(vca_list)):
2192 vca = vca_list[i]
2193 vca_status = vca_status_list[i]
2194 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2195 if vca_status.get('status') == 'BROKEN':
2196 # peer broken: remove relation from list
2197 vnf_relations.remove(r)
2198 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2199 if vca_status.get('status') == 'BROKEN':
2200 # peer broken: remove relation from list
2201 vnf_relations.remove(r)
2202 except Exception:
2203 # ignore
2204 pass
2205
2206 # wait for next try
2207 await asyncio.sleep(5.0)
2208
2209 if not ns_relations and not vnf_relations:
2210 self.logger.debug('Relations added')
2211 break
2212
2213 return True
2214
2215 except Exception as e:
2216 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2217 return False
2218
2219 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2220 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2221
2222 try:
2223 k8sclustertype = k8s_instance_info["k8scluster-type"]
2224 # Instantiate kdu
2225 db_dict_install = {"collection": "nsrs",
2226 "filter": {"_id": nsr_id},
2227 "path": nsr_db_path}
2228
2229 kdu_instance = self.k8scluster_map[k8sclustertype].generate_kdu_instance_name(
2230 db_dict=db_dict_install,
2231 kdu_model=k8s_instance_info["kdu-model"],
2232 kdu_name=k8s_instance_info["kdu-name"],
2233 )
2234 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2235 await self.k8scluster_map[k8sclustertype].install(
2236 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2237 kdu_model=k8s_instance_info["kdu-model"],
2238 atomic=True,
2239 params=k8params,
2240 db_dict=db_dict_install,
2241 timeout=timeout,
2242 kdu_name=k8s_instance_info["kdu-name"],
2243 namespace=k8s_instance_info["namespace"],
2244 kdu_instance=kdu_instance,
2245 )
2246 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2247
2248 # Obtain services to obtain management service ip
2249 services = await self.k8scluster_map[k8sclustertype].get_services(
2250 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2251 kdu_instance=kdu_instance,
2252 namespace=k8s_instance_info["namespace"])
2253
2254 # Obtain management service info (if exists)
2255 vnfr_update_dict = {}
2256 if services:
2257 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2258 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2259 for mgmt_service in mgmt_services:
2260 for service in services:
2261 if service["name"].startswith(mgmt_service["name"]):
2262 # Mgmt service found, Obtain service ip
2263 ip = service.get("external_ip", service.get("cluster_ip"))
2264 if isinstance(ip, list) and len(ip) == 1:
2265 ip = ip[0]
2266
2267 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2268
2269 # Check if must update also mgmt ip at the vnf
2270 service_external_cp = mgmt_service.get("external-connection-point-ref")
2271 if service_external_cp:
2272 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2273 vnfr_update_dict["ip-address"] = ip
2274
2275 break
2276 else:
2277 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2278
2279 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2280 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2281
2282 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
2283 if kdu_config and kdu_config.get("initial-config-primitive") and \
2284 get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None:
2285 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2286 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2287
2288 for initial_config_primitive in initial_config_primitive_list:
2289 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2290
2291 await asyncio.wait_for(
2292 self.k8scluster_map[k8sclustertype].exec_primitive(
2293 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2294 kdu_instance=kdu_instance,
2295 primitive_name=initial_config_primitive["name"],
2296 params=primitive_params_, db_dict=db_dict_install),
2297 timeout=timeout)
2298
2299 except Exception as e:
2300 # Prepare update db with error and raise exception
2301 try:
2302 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2303 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2304 except Exception:
2305 # ignore to keep original exception
2306 pass
2307 # reraise original error
2308 raise
2309
2310 return kdu_instance
2311
2312 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2313 # Launch kdus if present in the descriptor
2314
2315 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2316
2317 async def _get_cluster_id(cluster_id, cluster_type):
2318 nonlocal k8scluster_id_2_uuic
2319 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2320 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2321
2322 # check if K8scluster is creating and wait look if previous tasks in process
2323 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2324 if task_dependency:
2325 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2326 self.logger.debug(logging_text + text)
2327 await asyncio.wait(task_dependency, timeout=3600)
2328
2329 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2330 if not db_k8scluster:
2331 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2332
2333 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2334 if not k8s_id:
2335 if cluster_type == "helm-chart-v3":
2336 try:
2337 # backward compatibility for existing clusters that have not been initialized for helm v3
2338 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2339 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2340 reuse_cluster_uuid=cluster_id)
2341 db_k8scluster_update = {}
2342 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2343 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2344 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2345 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2346 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2347 except Exception as e:
2348 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2349 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2350 cluster_type))
2351 else:
2352 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2353 format(cluster_id, cluster_type))
2354 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2355 return k8s_id
2356
2357 logging_text += "Deploy kdus: "
2358 step = ""
2359 try:
2360 db_nsr_update = {"_admin.deployed.K8s": []}
2361 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2362
2363 index = 0
2364 updated_cluster_list = []
2365 updated_v3_cluster_list = []
2366
2367 for vnfr_data in db_vnfrs.values():
2368 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2369 # Step 0: Prepare and set parameters
2370 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
2371 vnfd_id = vnfr_data.get('vnfd-id')
2372 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2373 kdud = next(kdud for kdud in vnfd_with_id["kdu"] if kdud["name"] == kdur["kdu-name"])
2374 namespace = kdur.get("k8s-namespace")
2375 if kdur.get("helm-chart"):
2376 kdumodel = kdur["helm-chart"]
2377 # Default version: helm3, if helm-version is v2 assign v2
2378 k8sclustertype = "helm-chart-v3"
2379 self.logger.debug("kdur: {}".format(kdur))
2380 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2381 k8sclustertype = "helm-chart"
2382 elif kdur.get("juju-bundle"):
2383 kdumodel = kdur["juju-bundle"]
2384 k8sclustertype = "juju-bundle"
2385 else:
2386 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2387 "juju-bundle. Maybe an old NBI version is running".
2388 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2389 # check if kdumodel is a file and exists
2390 try:
2391 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2392 storage = deep_get(vnfd_with_id, ('_admin', 'storage'))
2393 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2394 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2395 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2396 kdumodel)
2397 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2398 kdumodel = self.fs.path + filename
2399 except (asyncio.TimeoutError, asyncio.CancelledError):
2400 raise
2401 except Exception: # it is not a file
2402 pass
2403
2404 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2405 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2406 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2407
2408 # Synchronize repos
2409 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2410 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2411 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2412 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2413 if del_repo_list or added_repo_dict:
2414 if k8sclustertype == "helm-chart":
2415 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2416 updated = {'_admin.helm_charts_added.' +
2417 item: name for item, name in added_repo_dict.items()}
2418 updated_cluster_list.append(cluster_uuid)
2419 elif k8sclustertype == "helm-chart-v3":
2420 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2421 updated = {'_admin.helm_charts_v3_added.' +
2422 item: name for item, name in added_repo_dict.items()}
2423 updated_v3_cluster_list.append(cluster_uuid)
2424 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2425 "'{}' to_delete: {}, to_add: {}".
2426 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2427 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2428
2429 # Instantiate kdu
2430 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2431 kdur["kdu-name"], k8s_cluster_id)
2432 k8s_instance_info = {"kdu-instance": None,
2433 "k8scluster-uuid": cluster_uuid,
2434 "k8scluster-type": k8sclustertype,
2435 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2436 "kdu-name": kdur["kdu-name"],
2437 "kdu-model": kdumodel,
2438 "namespace": namespace}
2439 db_path = "_admin.deployed.K8s.{}".format(index)
2440 db_nsr_update[db_path] = k8s_instance_info
2441 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2442 vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
2443 task = asyncio.ensure_future(
2444 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
2445 k8s_instance_info, k8params=desc_params, timeout=600))
2446 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2447 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2448
2449 index += 1
2450
2451 except (LcmException, asyncio.CancelledError):
2452 raise
2453 except Exception as e:
2454 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2455 if isinstance(e, (N2VCException, DbException)):
2456 self.logger.error(logging_text + msg)
2457 else:
2458 self.logger.critical(logging_text + msg, exc_info=True)
2459 raise LcmException(msg)
2460 finally:
2461 if db_nsr_update:
2462 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2463
2464 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2465 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2466 base_folder, task_instantiation_info, stage):
2467 # launch instantiate_N2VC in a asyncio task and register task object
2468 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2469 # if not found, create one entry and update database
2470 # fill db_nsr._admin.deployed.VCA.<index>
2471
2472 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2473 if "execution-environment-list" in descriptor_config:
2474 ee_list = descriptor_config.get("execution-environment-list", [])
2475 else: # other types as script are not supported
2476 ee_list = []
2477
2478 for ee_item in ee_list:
2479 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2480 ee_item.get("helm-chart")))
2481 ee_descriptor_id = ee_item.get("id")
2482 if ee_item.get("juju"):
2483 vca_name = ee_item['juju'].get('charm')
2484 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2485 if ee_item['juju'].get('cloud') == "k8s":
2486 vca_type = "k8s_proxy_charm"
2487 elif ee_item['juju'].get('proxy') is False:
2488 vca_type = "native_charm"
2489 elif ee_item.get("helm-chart"):
2490 vca_name = ee_item['helm-chart']
2491 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2492 vca_type = "helm"
2493 else:
2494 vca_type = "helm-v3"
2495 else:
2496 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2497 continue
2498
2499 vca_index = -1
2500 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2501 if not vca_deployed:
2502 continue
2503 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2504 vca_deployed.get("vdu_id") == vdu_id and \
2505 vca_deployed.get("kdu_name") == kdu_name and \
2506 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2507 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2508 break
2509 else:
2510 # not found, create one.
2511 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2512 if vdu_id:
2513 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2514 elif kdu_name:
2515 target += "/kdu/{}".format(kdu_name)
2516 vca_deployed = {
2517 "target_element": target,
2518 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2519 "member-vnf-index": member_vnf_index,
2520 "vdu_id": vdu_id,
2521 "kdu_name": kdu_name,
2522 "vdu_count_index": vdu_index,
2523 "operational-status": "init", # TODO revise
2524 "detailed-status": "", # TODO revise
2525 "step": "initial-deploy", # TODO revise
2526 "vnfd_id": vnfd_id,
2527 "vdu_name": vdu_name,
2528 "type": vca_type,
2529 "ee_descriptor_id": ee_descriptor_id
2530 }
2531 vca_index += 1
2532
2533 # create VCA and configurationStatus in db
2534 db_dict = {
2535 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2536 "configurationStatus.{}".format(vca_index): dict()
2537 }
2538 self.update_db_2("nsrs", nsr_id, db_dict)
2539
2540 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2541
2542 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
2543 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
2544 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
2545
2546 # Launch task
2547 task_n2vc = asyncio.ensure_future(
2548 self.instantiate_N2VC(
2549 logging_text=logging_text,
2550 vca_index=vca_index,
2551 nsi_id=nsi_id,
2552 db_nsr=db_nsr,
2553 db_vnfr=db_vnfr,
2554 vdu_id=vdu_id,
2555 kdu_name=kdu_name,
2556 vdu_index=vdu_index,
2557 deploy_params=deploy_params,
2558 config_descriptor=descriptor_config,
2559 base_folder=base_folder,
2560 nslcmop_id=nslcmop_id,
2561 stage=stage,
2562 vca_type=vca_type,
2563 vca_name=vca_name,
2564 ee_config_descriptor=ee_item
2565 )
2566 )
2567 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2568 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2569 member_vnf_index or "", vdu_id or "")
2570
2571 @staticmethod
2572 def _create_nslcmop(nsr_id, operation, params):
2573 """
2574 Creates a ns-lcm-opp content to be stored at database.
2575 :param nsr_id: internal id of the instance
2576 :param operation: instantiate, terminate, scale, action, ...
2577 :param params: user parameters for the operation
2578 :return: dictionary following SOL005 format
2579 """
2580 # Raise exception if invalid arguments
2581 if not (nsr_id and operation and params):
2582 raise LcmException(
2583 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2584 now = time()
2585 _id = str(uuid4())
2586 nslcmop = {
2587 "id": _id,
2588 "_id": _id,
2589 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2590 "operationState": "PROCESSING",
2591 "statusEnteredTime": now,
2592 "nsInstanceId": nsr_id,
2593 "lcmOperationType": operation,
2594 "startTime": now,
2595 "isAutomaticInvocation": False,
2596 "operationParams": params,
2597 "isCancelPending": False,
2598 "links": {
2599 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2600 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2601 }
2602 }
2603 return nslcmop
2604
2605 def _format_additional_params(self, params):
2606 params = params or {}
2607 for key, value in params.items():
2608 if str(value).startswith("!!yaml "):
2609 params[key] = yaml.safe_load(value[7:])
2610 return params
2611
2612 def _get_terminate_primitive_params(self, seq, vnf_index):
2613 primitive = seq.get('name')
2614 primitive_params = {}
2615 params = {
2616 "member_vnf_index": vnf_index,
2617 "primitive": primitive,
2618 "primitive_params": primitive_params,
2619 }
2620 desc_params = {}
2621 return self._map_primitive_params(seq, params, desc_params)
2622
2623 # sub-operations
2624
2625 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2626 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2627 if op.get('operationState') == 'COMPLETED':
2628 # b. Skip sub-operation
2629 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2630 return self.SUBOPERATION_STATUS_SKIP
2631 else:
2632 # c. retry executing sub-operation
2633 # The sub-operation exists, and operationState != 'COMPLETED'
2634 # Update operationState = 'PROCESSING' to indicate a retry.
2635 operationState = 'PROCESSING'
2636 detailed_status = 'In progress'
2637 self._update_suboperation_status(
2638 db_nslcmop, op_index, operationState, detailed_status)
2639 # Return the sub-operation index
2640 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2641 # with arguments extracted from the sub-operation
2642 return op_index
2643
2644 # Find a sub-operation where all keys in a matching dictionary must match
2645 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2646 def _find_suboperation(self, db_nslcmop, match):
2647 if db_nslcmop and match:
2648 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2649 for i, op in enumerate(op_list):
2650 if all(op.get(k) == match[k] for k in match):
2651 return i
2652 return self.SUBOPERATION_STATUS_NOT_FOUND
2653
2654 # Update status for a sub-operation given its index
2655 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2656 # Update DB for HA tasks
2657 q_filter = {'_id': db_nslcmop['_id']}
2658 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2659 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2660 self.db.set_one("nslcmops",
2661 q_filter=q_filter,
2662 update_dict=update_dict,
2663 fail_on_empty=False)
2664
2665 # Add sub-operation, return the index of the added sub-operation
2666 # Optionally, set operationState, detailed-status, and operationType
2667 # Status and type are currently set for 'scale' sub-operations:
2668 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2669 # 'detailed-status' : status message
2670 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2671 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2672 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2673 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2674 RO_nsr_id=None, RO_scaling_info=None):
2675 if not db_nslcmop:
2676 return self.SUBOPERATION_STATUS_NOT_FOUND
2677 # Get the "_admin.operations" list, if it exists
2678 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2679 op_list = db_nslcmop_admin.get('operations')
2680 # Create or append to the "_admin.operations" list
2681 new_op = {'member_vnf_index': vnf_index,
2682 'vdu_id': vdu_id,
2683 'vdu_count_index': vdu_count_index,
2684 'primitive': primitive,
2685 'primitive_params': mapped_primitive_params}
2686 if operationState:
2687 new_op['operationState'] = operationState
2688 if detailed_status:
2689 new_op['detailed-status'] = detailed_status
2690 if operationType:
2691 new_op['lcmOperationType'] = operationType
2692 if RO_nsr_id:
2693 new_op['RO_nsr_id'] = RO_nsr_id
2694 if RO_scaling_info:
2695 new_op['RO_scaling_info'] = RO_scaling_info
2696 if not op_list:
2697 # No existing operations, create key 'operations' with current operation as first list element
2698 db_nslcmop_admin.update({'operations': [new_op]})
2699 op_list = db_nslcmop_admin.get('operations')
2700 else:
2701 # Existing operations, append operation to list
2702 op_list.append(new_op)
2703
2704 db_nslcmop_update = {'_admin.operations': op_list}
2705 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2706 op_index = len(op_list) - 1
2707 return op_index
2708
2709 # Helper methods for scale() sub-operations
2710
2711 # pre-scale/post-scale:
2712 # Check for 3 different cases:
2713 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2714 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2715 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2716 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2717 operationType, RO_nsr_id=None, RO_scaling_info=None):
2718 # Find this sub-operation
2719 if RO_nsr_id and RO_scaling_info:
2720 operationType = 'SCALE-RO'
2721 match = {
2722 'member_vnf_index': vnf_index,
2723 'RO_nsr_id': RO_nsr_id,
2724 'RO_scaling_info': RO_scaling_info,
2725 }
2726 else:
2727 match = {
2728 'member_vnf_index': vnf_index,
2729 'primitive': vnf_config_primitive,
2730 'primitive_params': primitive_params,
2731 'lcmOperationType': operationType
2732 }
2733 op_index = self._find_suboperation(db_nslcmop, match)
2734 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2735 # a. New sub-operation
2736 # The sub-operation does not exist, add it.
2737 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2738 # The following parameters are set to None for all kind of scaling:
2739 vdu_id = None
2740 vdu_count_index = None
2741 vdu_name = None
2742 if RO_nsr_id and RO_scaling_info:
2743 vnf_config_primitive = None
2744 primitive_params = None
2745 else:
2746 RO_nsr_id = None
2747 RO_scaling_info = None
2748 # Initial status for sub-operation
2749 operationState = 'PROCESSING'
2750 detailed_status = 'In progress'
2751 # Add sub-operation for pre/post-scaling (zero or more operations)
2752 self._add_suboperation(db_nslcmop,
2753 vnf_index,
2754 vdu_id,
2755 vdu_count_index,
2756 vdu_name,
2757 vnf_config_primitive,
2758 primitive_params,
2759 operationState,
2760 detailed_status,
2761 operationType,
2762 RO_nsr_id,
2763 RO_scaling_info)
2764 return self.SUBOPERATION_STATUS_NEW
2765 else:
2766 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2767 # or op_index (operationState != 'COMPLETED')
2768 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2769
2770 # Function to return execution_environment id
2771
2772 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2773 # TODO vdu_index_count
2774 for vca in vca_deployed_list:
2775 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2776 return vca["ee_id"]
2777
2778 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
2779 vca_index, destroy_ee=True, exec_primitives=True, scaling_in=False):
2780 """
2781 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2782 :param logging_text:
2783 :param db_nslcmop:
2784 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2785 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2786 :param vca_index: index in the database _admin.deployed.VCA
2787 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2788 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2789 not executed properly
2790 :param scaling_in: True destroys the application, False destroys the model
2791 :return: None or exception
2792 """
2793
2794 self.logger.debug(
2795 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2796 vca_index, vca_deployed, config_descriptor, destroy_ee
2797 )
2798 )
2799
2800 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
2801
2802 # execute terminate_primitives
2803 if exec_primitives:
2804 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
2805 config_descriptor.get("terminate-config-primitive"), vca_deployed.get("ee_descriptor_id"))
2806 vdu_id = vca_deployed.get("vdu_id")
2807 vdu_count_index = vca_deployed.get("vdu_count_index")
2808 vdu_name = vca_deployed.get("vdu_name")
2809 vnf_index = vca_deployed.get("member-vnf-index")
2810 if terminate_primitives and vca_deployed.get("needed_terminate"):
2811 for seq in terminate_primitives:
2812 # For each sequence in list, get primitive and call _ns_execute_primitive()
2813 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2814 vnf_index, seq.get("name"))
2815 self.logger.debug(logging_text + step)
2816 # Create the primitive for each sequence, i.e. "primitive": "touch"
2817 primitive = seq.get('name')
2818 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2819
2820 # Add sub-operation
2821 self._add_suboperation(db_nslcmop,
2822 vnf_index,
2823 vdu_id,
2824 vdu_count_index,
2825 vdu_name,
2826 primitive,
2827 mapped_primitive_params)
2828 # Sub-operations: Call _ns_execute_primitive() instead of action()
2829 try:
2830 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2831 mapped_primitive_params,
2832 vca_type=vca_type)
2833 except LcmException:
2834 # this happens when VCA is not deployed. In this case it is not needed to terminate
2835 continue
2836 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2837 if result not in result_ok:
2838 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2839 "error {}".format(seq.get("name"), vnf_index, result_detail))
2840 # set that this VCA do not need terminated
2841 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2842 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2843
2844 if vca_deployed.get("prometheus_jobs") and self.prometheus:
2845 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
2846
2847 if destroy_ee:
2848 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"], scaling_in=scaling_in)
2849
2850 async def _delete_all_N2VC(self, db_nsr: dict):
2851 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2852 namespace = "." + db_nsr["_id"]
2853 try:
2854 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2855 except N2VCNotFound: # already deleted. Skip
2856 pass
2857 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2858
2859 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2860 """
2861 Terminates a deployment from RO
2862 :param logging_text:
2863 :param nsr_deployed: db_nsr._admin.deployed
2864 :param nsr_id:
2865 :param nslcmop_id:
2866 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2867 this method will update only the index 2, but it will write on database the concatenated content of the list
2868 :return:
2869 """
2870 db_nsr_update = {}
2871 failed_detail = []
2872 ro_nsr_id = ro_delete_action = None
2873 if nsr_deployed and nsr_deployed.get("RO"):
2874 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2875 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2876 try:
2877 if ro_nsr_id:
2878 stage[2] = "Deleting ns from VIM."
2879 db_nsr_update["detailed-status"] = " ".join(stage)
2880 self._write_op_status(nslcmop_id, stage)
2881 self.logger.debug(logging_text + stage[2])
2882 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2883 self._write_op_status(nslcmop_id, stage)
2884 desc = await self.RO.delete("ns", ro_nsr_id)
2885 ro_delete_action = desc["action_id"]
2886 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2887 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2888 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2889 if ro_delete_action:
2890 # wait until NS is deleted from VIM
2891 stage[2] = "Waiting ns deleted from VIM."
2892 detailed_status_old = None
2893 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2894 ro_delete_action))
2895 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2896 self._write_op_status(nslcmop_id, stage)
2897
2898 delete_timeout = 20 * 60 # 20 minutes
2899 while delete_timeout > 0:
2900 desc = await self.RO.show(
2901 "ns",
2902 item_id_name=ro_nsr_id,
2903 extra_item="action",
2904 extra_item_id=ro_delete_action)
2905
2906 # deploymentStatus
2907 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2908
2909 ns_status, ns_status_info = self.RO.check_action_status(desc)
2910 if ns_status == "ERROR":
2911 raise ROclient.ROClientException(ns_status_info)
2912 elif ns_status == "BUILD":
2913 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2914 elif ns_status == "ACTIVE":
2915 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2916 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2917 break
2918 else:
2919 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2920 if stage[2] != detailed_status_old:
2921 detailed_status_old = stage[2]
2922 db_nsr_update["detailed-status"] = " ".join(stage)
2923 self._write_op_status(nslcmop_id, stage)
2924 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2925 await asyncio.sleep(5, loop=self.loop)
2926 delete_timeout -= 5
2927 else: # delete_timeout <= 0:
2928 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2929
2930 except Exception as e:
2931 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2932 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2933 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2934 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2935 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2936 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2937 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2938 failed_detail.append("delete conflict: {}".format(e))
2939 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2940 else:
2941 failed_detail.append("delete error: {}".format(e))
2942 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2943
2944 # Delete nsd
2945 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2946 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2947 try:
2948 stage[2] = "Deleting nsd from RO."
2949 db_nsr_update["detailed-status"] = " ".join(stage)
2950 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2951 self._write_op_status(nslcmop_id, stage)
2952 await self.RO.delete("nsd", ro_nsd_id)
2953 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2954 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2955 except Exception as e:
2956 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2957 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2958 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2959 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2960 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2961 self.logger.debug(logging_text + failed_detail[-1])
2962 else:
2963 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2964 self.logger.error(logging_text + failed_detail[-1])
2965
2966 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2967 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2968 if not vnf_deployed or not vnf_deployed["id"]:
2969 continue
2970 try:
2971 ro_vnfd_id = vnf_deployed["id"]
2972 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2973 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2974 db_nsr_update["detailed-status"] = " ".join(stage)
2975 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2976 self._write_op_status(nslcmop_id, stage)
2977 await self.RO.delete("vnfd", ro_vnfd_id)
2978 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2979 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2980 except Exception as e:
2981 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2982 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2983 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2984 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2985 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2986 self.logger.debug(logging_text + failed_detail[-1])
2987 else:
2988 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2989 self.logger.error(logging_text + failed_detail[-1])
2990
2991 if failed_detail:
2992 stage[2] = "Error deleting from VIM"
2993 else:
2994 stage[2] = "Deleted from VIM"
2995 db_nsr_update["detailed-status"] = " ".join(stage)
2996 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2997 self._write_op_status(nslcmop_id, stage)
2998
2999 if failed_detail:
3000 raise LcmException("; ".join(failed_detail))
3001
3002 async def terminate(self, nsr_id, nslcmop_id):
3003 # Try to lock HA task here
3004 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3005 if not task_is_locked_by_me:
3006 return
3007
3008 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3009 self.logger.debug(logging_text + "Enter")
3010 timeout_ns_terminate = self.timeout_ns_terminate
3011 db_nsr = None
3012 db_nslcmop = None
3013 operation_params = None
3014 exc = None
3015 error_list = [] # annotates all failed error messages
3016 db_nslcmop_update = {}
3017 autoremove = False # autoremove after terminated
3018 tasks_dict_info = {}
3019 db_nsr_update = {}
3020 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3021 # ^ contains [stage, step, VIM-status]
3022 try:
3023 # wait for any previous tasks in process
3024 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3025
3026 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3027 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3028 operation_params = db_nslcmop.get("operationParams") or {}
3029 if operation_params.get("timeout_ns_terminate"):
3030 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3031 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3032 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3033
3034 db_nsr_update["operational-status"] = "terminating"
3035 db_nsr_update["config-status"] = "terminating"
3036 self._write_ns_status(
3037 nsr_id=nsr_id,
3038 ns_state="TERMINATING",
3039 current_operation="TERMINATING",
3040 current_operation_id=nslcmop_id,
3041 other_update=db_nsr_update
3042 )
3043 self._write_op_status(
3044 op_id=nslcmop_id,
3045 queuePosition=0,
3046 stage=stage
3047 )
3048 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3049 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3050 return
3051
3052 stage[1] = "Getting vnf descriptors from db."
3053 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3054 db_vnfds_from_id = {}
3055 db_vnfds_from_member_index = {}
3056 # Loop over VNFRs
3057 for vnfr in db_vnfrs_list:
3058 vnfd_id = vnfr["vnfd-id"]
3059 if vnfd_id not in db_vnfds_from_id:
3060 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3061 db_vnfds_from_id[vnfd_id] = vnfd
3062 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3063
3064 # Destroy individual execution environments when there are terminating primitives.
3065 # Rest of EE will be deleted at once
3066 # TODO - check before calling _destroy_N2VC
3067 # if not operation_params.get("skip_terminate_primitives"):#
3068 # or not vca.get("needed_terminate"):
3069 stage[0] = "Stage 2/3 execute terminating primitives."
3070 self.logger.debug(logging_text + stage[0])
3071 stage[1] = "Looking execution environment that needs terminate."
3072 self.logger.debug(logging_text + stage[1])
3073
3074 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3075 config_descriptor = None
3076 if not vca or not vca.get("ee_id"):
3077 continue
3078 if not vca.get("member-vnf-index"):
3079 # ns
3080 config_descriptor = db_nsr.get("ns-configuration")
3081 elif vca.get("vdu_id"):
3082 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3083 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3084 elif vca.get("kdu_name"):
3085 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3086 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3087 else:
3088 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3089 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3090 vca_type = vca.get("type")
3091 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3092 vca.get("needed_terminate"))
3093 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3094 # pending native charms
3095 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3096 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3097 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3098 task = asyncio.ensure_future(
3099 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3100 destroy_ee, exec_terminate_primitives))
3101 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3102
3103 # wait for pending tasks of terminate primitives
3104 if tasks_dict_info:
3105 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3106 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3107 min(self.timeout_charm_delete, timeout_ns_terminate),
3108 stage, nslcmop_id)
3109 tasks_dict_info.clear()
3110 if error_list:
3111 return # raise LcmException("; ".join(error_list))
3112
3113 # remove All execution environments at once
3114 stage[0] = "Stage 3/3 delete all."
3115
3116 if nsr_deployed.get("VCA"):
3117 stage[1] = "Deleting all execution environments."
3118 self.logger.debug(logging_text + stage[1])
3119 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3120 timeout=self.timeout_charm_delete))
3121 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3122 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3123
3124 # Delete from k8scluster
3125 stage[1] = "Deleting KDUs."
3126 self.logger.debug(logging_text + stage[1])
3127 # print(nsr_deployed)
3128 for kdu in get_iterable(nsr_deployed, "K8s"):
3129 if not kdu or not kdu.get("kdu-instance"):
3130 continue
3131 kdu_instance = kdu.get("kdu-instance")
3132 if kdu.get("k8scluster-type") in self.k8scluster_map:
3133 task_delete_kdu_instance = asyncio.ensure_future(
3134 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3135 cluster_uuid=kdu.get("k8scluster-uuid"),
3136 kdu_instance=kdu_instance))
3137 else:
3138 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3139 format(kdu.get("k8scluster-type")))
3140 continue
3141 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3142
3143 # remove from RO
3144 stage[1] = "Deleting ns from VIM."
3145 if self.ng_ro:
3146 task_delete_ro = asyncio.ensure_future(
3147 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3148 else:
3149 task_delete_ro = asyncio.ensure_future(
3150 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3151 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3152
3153 # rest of staff will be done at finally
3154
3155 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3156 self.logger.error(logging_text + "Exit Exception {}".format(e))
3157 exc = e
3158 except asyncio.CancelledError:
3159 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3160 exc = "Operation was cancelled"
3161 except Exception as e:
3162 exc = traceback.format_exc()
3163 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3164 finally:
3165 if exc:
3166 error_list.append(str(exc))
3167 try:
3168 # wait for pending tasks
3169 if tasks_dict_info:
3170 stage[1] = "Waiting for terminate pending tasks."
3171 self.logger.debug(logging_text + stage[1])
3172 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3173 stage, nslcmop_id)
3174 stage[1] = stage[2] = ""
3175 except asyncio.CancelledError:
3176 error_list.append("Cancelled")
3177 # TODO cancell all tasks
3178 except Exception as exc:
3179 error_list.append(str(exc))
3180 # update status at database
3181 if error_list:
3182 error_detail = "; ".join(error_list)
3183 # self.logger.error(logging_text + error_detail)
3184 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3185 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3186
3187 db_nsr_update["operational-status"] = "failed"
3188 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3189 db_nslcmop_update["detailed-status"] = error_detail
3190 nslcmop_operation_state = "FAILED"
3191 ns_state = "BROKEN"
3192 else:
3193 error_detail = None
3194 error_description_nsr = error_description_nslcmop = None
3195 ns_state = "NOT_INSTANTIATED"
3196 db_nsr_update["operational-status"] = "terminated"
3197 db_nsr_update["detailed-status"] = "Done"
3198 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3199 db_nslcmop_update["detailed-status"] = "Done"
3200 nslcmop_operation_state = "COMPLETED"
3201
3202 if db_nsr:
3203 self._write_ns_status(
3204 nsr_id=nsr_id,
3205 ns_state=ns_state,
3206 current_operation="IDLE",
3207 current_operation_id=None,
3208 error_description=error_description_nsr,
3209 error_detail=error_detail,
3210 other_update=db_nsr_update
3211 )
3212 self._write_op_status(
3213 op_id=nslcmop_id,
3214 stage="",
3215 error_message=error_description_nslcmop,
3216 operation_state=nslcmop_operation_state,
3217 other_update=db_nslcmop_update,
3218 )
3219 if ns_state == "NOT_INSTANTIATED":
3220 try:
3221 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3222 except DbException as e:
3223 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3224 format(nsr_id, e))
3225 if operation_params:
3226 autoremove = operation_params.get("autoremove", False)
3227 if nslcmop_operation_state:
3228 try:
3229 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3230 "operationState": nslcmop_operation_state,
3231 "autoremove": autoremove},
3232 loop=self.loop)
3233 except Exception as e:
3234 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3235
3236 self.logger.debug(logging_text + "Exit")
3237 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3238
3239 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3240 time_start = time()
3241 error_detail_list = []
3242 error_list = []
3243 pending_tasks = list(created_tasks_info.keys())
3244 num_tasks = len(pending_tasks)
3245 num_done = 0
3246 stage[1] = "{}/{}.".format(num_done, num_tasks)
3247 self._write_op_status(nslcmop_id, stage)
3248 while pending_tasks:
3249 new_error = None
3250 _timeout = timeout + time_start - time()
3251 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3252 return_when=asyncio.FIRST_COMPLETED)
3253 num_done += len(done)
3254 if not done: # Timeout
3255 for task in pending_tasks:
3256 new_error = created_tasks_info[task] + ": Timeout"
3257 error_detail_list.append(new_error)
3258 error_list.append(new_error)
3259 break
3260 for task in done:
3261 if task.cancelled():
3262 exc = "Cancelled"
3263 else:
3264 exc = task.exception()
3265 if exc:
3266 if isinstance(exc, asyncio.TimeoutError):
3267 exc = "Timeout"
3268 new_error = created_tasks_info[task] + ": {}".format(exc)
3269 error_list.append(created_tasks_info[task])
3270 error_detail_list.append(new_error)
3271 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3272 K8sException, NgRoException)):
3273 self.logger.error(logging_text + new_error)
3274 else:
3275 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3276 self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
3277 else:
3278 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3279 stage[1] = "{}/{}.".format(num_done, num_tasks)
3280 if new_error:
3281 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3282 if nsr_id: # update also nsr
3283 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3284 "errorDetail": ". ".join(error_detail_list)})
3285 self._write_op_status(nslcmop_id, stage)
3286 return error_detail_list
3287
3288 @staticmethod
3289 def _map_primitive_params(primitive_desc, params, instantiation_params):
3290 """
3291 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3292 The default-value is used. If it is between < > it look for a value at instantiation_params
3293 :param primitive_desc: portion of VNFD/NSD that describes primitive
3294 :param params: Params provided by user
3295 :param instantiation_params: Instantiation params provided by user
3296 :return: a dictionary with the calculated params
3297 """
3298 calculated_params = {}
3299 for parameter in primitive_desc.get("parameter", ()):
3300 param_name = parameter["name"]
3301 if param_name in params:
3302 calculated_params[param_name] = params[param_name]
3303 elif "default-value" in parameter or "value" in parameter:
3304 if "value" in parameter:
3305 calculated_params[param_name] = parameter["value"]
3306 else:
3307 calculated_params[param_name] = parameter["default-value"]
3308 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3309 and calculated_params[param_name].endswith(">"):
3310 if calculated_params[param_name][1:-1] in instantiation_params:
3311 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3312 else:
3313 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3314 format(calculated_params[param_name], primitive_desc["name"]))
3315 else:
3316 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3317 format(param_name, primitive_desc["name"]))
3318
3319 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3320 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name],
3321 default_flow_style=True, width=256)
3322 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3323 calculated_params[param_name] = calculated_params[param_name][7:]
3324 if parameter.get("data-type") == "INTEGER":
3325 try:
3326 calculated_params[param_name] = int(calculated_params[param_name])
3327 except ValueError: # error converting string to int
3328 raise LcmException(
3329 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3330 elif parameter.get("data-type") == "BOOLEAN":
3331 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3332
3333 # add always ns_config_info if primitive name is config
3334 if primitive_desc["name"] == "config":
3335 if "ns_config_info" in instantiation_params:
3336 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3337 return calculated_params
3338
3339 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3340 ee_descriptor_id=None):
3341 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3342 for vca in deployed_vca:
3343 if not vca:
3344 continue
3345 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3346 continue
3347 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3348 continue
3349 if kdu_name and kdu_name != vca["kdu_name"]:
3350 continue
3351 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3352 continue
3353 break
3354 else:
3355 # vca_deployed not found
3356 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3357 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3358 ee_descriptor_id))
3359 # get ee_id
3360 ee_id = vca.get("ee_id")
3361 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3362 if not ee_id:
3363 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3364 "execution environment"
3365 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3366 return ee_id, vca_type
3367
3368 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30,
3369 timeout=None, vca_type=None, db_dict=None) -> (str, str):
3370 try:
3371 if primitive == "config":
3372 primitive_params = {"params": primitive_params}
3373
3374 vca_type = vca_type or "lxc_proxy_charm"
3375
3376 while retries >= 0:
3377 try:
3378 output = await asyncio.wait_for(
3379 self.vca_map[vca_type].exec_primitive(
3380 ee_id=ee_id,
3381 primitive_name=primitive,
3382 params_dict=primitive_params,
3383 progress_timeout=self.timeout_progress_primitive,
3384 total_timeout=self.timeout_primitive,
3385 db_dict=db_dict),
3386 timeout=timeout or self.timeout_primitive)
3387 # execution was OK
3388 break
3389 except asyncio.CancelledError:
3390 raise
3391 except Exception as e: # asyncio.TimeoutError
3392 if isinstance(e, asyncio.TimeoutError):
3393 e = "Timeout"
3394 retries -= 1
3395 if retries >= 0:
3396 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3397 # wait and retry
3398 await asyncio.sleep(retries_interval, loop=self.loop)
3399 else:
3400 return 'FAILED', str(e)
3401
3402 return 'COMPLETED', output
3403
3404 except (LcmException, asyncio.CancelledError):
3405 raise
3406 except Exception as e:
3407 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3408
3409 async def vca_status_refresh(self, nsr_id, nslcmop_id):
3410 """
3411 Updating the vca_status with latest juju information in nsrs record
3412 :param: nsr_id: Id of the nsr
3413 :param: nslcmop_id: Id of the nslcmop
3414 :return: None
3415 """
3416
3417 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
3418 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3419 if db_nsr['_admin']['deployed']['K8s']:
3420 for k8s_index, k8s in enumerate(db_nsr['_admin']['deployed']['K8s']):
3421 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
3422 await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id})
3423 else:
3424 for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']):
3425 table, filter = "nsrs", {"_id": nsr_id}
3426 path = "_admin.deployed.VCA.{}.".format(vca_index)
3427 await self._on_update_n2vc_db(table, filter, path, {})
3428
3429 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
3430 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
3431
3432 async def action(self, nsr_id, nslcmop_id):
3433 # Try to lock HA task here
3434 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3435 if not task_is_locked_by_me:
3436 return
3437
3438 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3439 self.logger.debug(logging_text + "Enter")
3440 # get all needed from database
3441 db_nsr = None
3442 db_nslcmop = None
3443 db_nsr_update = {}
3444 db_nslcmop_update = {}
3445 nslcmop_operation_state = None
3446 error_description_nslcmop = None
3447 exc = None
3448 try:
3449 # wait for any previous tasks in process
3450 step = "Waiting for previous operations to terminate"
3451 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3452
3453 self._write_ns_status(
3454 nsr_id=nsr_id,
3455 ns_state=None,
3456 current_operation="RUNNING ACTION",
3457 current_operation_id=nslcmop_id
3458 )
3459
3460 step = "Getting information from database"
3461 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3462 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3463
3464 nsr_deployed = db_nsr["_admin"].get("deployed")
3465 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3466 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3467 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3468 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3469 primitive = db_nslcmop["operationParams"]["primitive"]
3470 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3471 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3472
3473 if vnf_index:
3474 step = "Getting vnfr from database"
3475 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3476 step = "Getting vnfd from database"
3477 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3478 else:
3479 step = "Getting nsd from database"
3480 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3481
3482 # for backward compatibility
3483 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3484 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3485 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3486 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3487
3488 # look for primitive
3489 config_primitive_desc = descriptor_configuration = None
3490 if vdu_id:
3491 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
3492 elif kdu_name:
3493 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
3494 elif vnf_index:
3495 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
3496 else:
3497 descriptor_configuration = db_nsd.get("ns-configuration")
3498
3499 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3500 for config_primitive in descriptor_configuration["config-primitive"]:
3501 if config_primitive["name"] == primitive:
3502 config_primitive_desc = config_primitive
3503 break
3504
3505 if not config_primitive_desc:
3506 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3507 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3508 format(primitive))
3509 primitive_name = primitive
3510 ee_descriptor_id = None
3511 else:
3512 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3513 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3514
3515 if vnf_index:
3516 if vdu_id:
3517 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3518 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
3519 elif kdu_name:
3520 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3521 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3522 else:
3523 desc_params = parse_yaml_strings(db_vnfr.get("additionalParamsForVnf"))
3524 else:
3525 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
3526 if kdu_name and get_configuration(db_vnfd, kdu_name):
3527 kdu_configuration = get_configuration(db_vnfd, kdu_name)
3528 actions = set()
3529 for primitive in kdu_configuration.get("initial-config-primitive", []):
3530 actions.add(primitive["name"])
3531 for primitive in kdu_configuration.get("config-primitive", []):
3532 actions.add(primitive["name"])
3533 kdu_action = True if primitive_name in actions else False
3534
3535 # TODO check if ns is in a proper status
3536 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3537 # kdur and desc_params already set from before
3538 if primitive_params:
3539 desc_params.update(primitive_params)
3540 # TODO Check if we will need something at vnf level
3541 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3542 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3543 break
3544 else:
3545 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3546
3547 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3548 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3549 raise LcmException(msg)
3550
3551 db_dict = {"collection": "nsrs",
3552 "filter": {"_id": nsr_id},
3553 "path": "_admin.deployed.K8s.{}".format(index)}
3554 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3555 step = "Executing kdu {}".format(primitive_name)
3556 if primitive_name == "upgrade":
3557 if desc_params.get("kdu_model"):
3558 kdu_model = desc_params.get("kdu_model")
3559 del desc_params["kdu_model"]
3560 else:
3561 kdu_model = kdu.get("kdu-model")
3562 parts = kdu_model.split(sep=":")
3563 if len(parts) == 2:
3564 kdu_model = parts[0]
3565
3566 detailed_status = await asyncio.wait_for(
3567 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3568 cluster_uuid=kdu.get("k8scluster-uuid"),
3569 kdu_instance=kdu.get("kdu-instance"),
3570 atomic=True, kdu_model=kdu_model,
3571 params=desc_params, db_dict=db_dict,
3572 timeout=timeout_ns_action),
3573 timeout=timeout_ns_action + 10)
3574 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3575 elif primitive_name == "rollback":
3576 detailed_status = await asyncio.wait_for(
3577 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3578 cluster_uuid=kdu.get("k8scluster-uuid"),
3579 kdu_instance=kdu.get("kdu-instance"),
3580 db_dict=db_dict),
3581 timeout=timeout_ns_action)
3582 elif primitive_name == "status":
3583 detailed_status = await asyncio.wait_for(
3584 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3585 cluster_uuid=kdu.get("k8scluster-uuid"),
3586 kdu_instance=kdu.get("kdu-instance")),
3587 timeout=timeout_ns_action)
3588 else:
3589 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3590 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3591
3592 detailed_status = await asyncio.wait_for(
3593 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3594 cluster_uuid=kdu.get("k8scluster-uuid"),
3595 kdu_instance=kdu_instance,
3596 primitive_name=primitive_name,
3597 params=params, db_dict=db_dict,
3598 timeout=timeout_ns_action),
3599 timeout=timeout_ns_action)
3600
3601 if detailed_status:
3602 nslcmop_operation_state = 'COMPLETED'
3603 else:
3604 detailed_status = ''
3605 nslcmop_operation_state = 'FAILED'
3606 else:
3607 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index,
3608 vdu_id=vdu_id, vdu_count_index=vdu_count_index,
3609 ee_descriptor_id=ee_descriptor_id)
3610 for vca_index, vca_deployed in enumerate(db_nsr['_admin']['deployed']['VCA']):
3611 if vca_deployed.get("member-vnf-index") == vnf_index:
3612 db_dict = {"collection": "nsrs",
3613 "filter": {"_id": nsr_id},
3614 "path": "_admin.deployed.VCA.{}.".format(vca_index)}
3615 break
3616 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3617 ee_id,
3618 primitive=primitive_name,
3619 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3620 timeout=timeout_ns_action,
3621 vca_type=vca_type,
3622 db_dict=db_dict)
3623
3624 db_nslcmop_update["detailed-status"] = detailed_status
3625 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3626 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3627 detailed_status))
3628 return # database update is called inside finally
3629
3630 except (DbException, LcmException, N2VCException, K8sException) as e:
3631 self.logger.error(logging_text + "Exit Exception {}".format(e))
3632 exc = e
3633 except asyncio.CancelledError:
3634 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3635 exc = "Operation was cancelled"
3636 except asyncio.TimeoutError:
3637 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3638 exc = "Timeout"
3639 except Exception as e:
3640 exc = traceback.format_exc()
3641 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3642 finally:
3643 if exc:
3644 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3645 "FAILED {}: {}".format(step, exc)
3646 nslcmop_operation_state = "FAILED"
3647 if db_nsr:
3648 self._write_ns_status(
3649 nsr_id=nsr_id,
3650 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3651 current_operation="IDLE",
3652 current_operation_id=None,
3653 # error_description=error_description_nsr,
3654 # error_detail=error_detail,
3655 other_update=db_nsr_update
3656 )
3657
3658 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
3659 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
3660
3661 if nslcmop_operation_state:
3662 try:
3663 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3664 "operationState": nslcmop_operation_state},
3665 loop=self.loop)
3666 except Exception as e:
3667 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3668 self.logger.debug(logging_text + "Exit")
3669 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3670 return nslcmop_operation_state, detailed_status
3671
3672 async def scale(self, nsr_id, nslcmop_id):
3673 # Try to lock HA task here
3674 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3675 if not task_is_locked_by_me:
3676 return
3677
3678 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3679 stage = ['', '', '']
3680 tasks_dict_info = {}
3681 # ^ stage, step, VIM progress
3682 self.logger.debug(logging_text + "Enter")
3683 # get all needed from database
3684 db_nsr = None
3685 db_nslcmop_update = {}
3686 db_nsr_update = {}
3687 exc = None
3688 # in case of error, indicates what part of scale was failed to put nsr at error status
3689 scale_process = None
3690 old_operational_status = ""
3691 old_config_status = ""
3692 nsi_id = None
3693 try:
3694 # wait for any previous tasks in process
3695 step = "Waiting for previous operations to terminate"
3696 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3697 self._write_ns_status(nsr_id=nsr_id, ns_state=None,
3698 current_operation="SCALING", current_operation_id=nslcmop_id)
3699
3700 step = "Getting nslcmop from database"
3701 self.logger.debug(step + " after having waited for previous tasks to be completed")
3702 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3703
3704 step = "Getting nsr from database"
3705 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3706 old_operational_status = db_nsr["operational-status"]
3707 old_config_status = db_nsr["config-status"]
3708
3709 step = "Parsing scaling parameters"
3710 db_nsr_update["operational-status"] = "scaling"
3711 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3712 nsr_deployed = db_nsr["_admin"].get("deployed")
3713
3714 #######
3715 nsr_deployed = db_nsr["_admin"].get("deployed")
3716 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3717 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3718 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3719 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3720 #######
3721
3722 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3723 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3724 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3725 # for backward compatibility
3726 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3727 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3728 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3729 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3730
3731 step = "Getting vnfr from database"
3732 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3733
3734 step = "Getting vnfd from database"
3735 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3736
3737 base_folder = db_vnfd["_admin"]["storage"]
3738
3739 step = "Getting scaling-group-descriptor"
3740 scaling_descriptor = find_in_list(
3741 get_scaling_aspect(
3742 db_vnfd
3743 ),
3744 lambda scale_desc: scale_desc["name"] == scaling_group
3745 )
3746 if not scaling_descriptor:
3747 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3748 "at vnfd:scaling-group-descriptor".format(scaling_group))
3749
3750 step = "Sending scale order to VIM"
3751 # TODO check if ns is in a proper status
3752 nb_scale_op = 0
3753 if not db_nsr["_admin"].get("scaling-group"):
3754 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3755 admin_scale_index = 0
3756 else:
3757 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3758 if admin_scale_info["name"] == scaling_group:
3759 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3760 break
3761 else: # not found, set index one plus last element and add new entry with the name
3762 admin_scale_index += 1
3763 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3764 RO_scaling_info = []
3765 VCA_scaling_info = []
3766 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3767 if scaling_type == "SCALE_OUT":
3768 if "aspect-delta-details" not in scaling_descriptor:
3769 raise LcmException(
3770 "Aspect delta details not fount in scaling descriptor {}".format(
3771 scaling_descriptor["name"]
3772 )
3773 )
3774 # count if max-instance-count is reached
3775 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3776
3777 vdu_scaling_info["scaling_direction"] = "OUT"
3778 vdu_scaling_info["vdu-create"] = {}
3779 for delta in deltas:
3780 for vdu_delta in delta["vdu-delta"]:
3781 vdud = get_vdu(db_vnfd, vdu_delta["id"])
3782 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3783 cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
3784 if cloud_init_text:
3785 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
3786 cloud_init_list = []
3787
3788 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3789 max_instance_count = 10
3790 if vdu_profile and "max-number-of-instances" in vdu_profile:
3791 max_instance_count = vdu_profile.get("max-number-of-instances", 10)
3792
3793 default_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
3794
3795 nb_scale_op += vdu_delta.get("number-of-instances", 1)
3796
3797 if nb_scale_op + default_instance_num > max_instance_count:
3798 raise LcmException(
3799 "reached the limit of {} (max-instance-count) "
3800 "scaling-out operations for the "
3801 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3802 )
3803 for x in range(vdu_delta.get("number-of-instances", 1)):
3804 if cloud_init_text:
3805 # TODO Information of its own ip is not available because db_vnfr is not updated.
3806 additional_params["OSM"] = get_osm_params(
3807 db_vnfr,
3808 vdu_delta["id"],
3809 vdu_index + x
3810 )
3811 cloud_init_list.append(
3812 self._parse_cloud_init(
3813 cloud_init_text,
3814 additional_params,
3815 db_vnfd["id"],
3816 vdud["id"]
3817 )
3818 )
3819 VCA_scaling_info.append(
3820 {
3821 "osm_vdu_id": vdu_delta["id"],
3822 "member-vnf-index": vnf_index,
3823 "type": "create",
3824 "vdu_index": vdu_index + x
3825 }
3826 )
3827 RO_scaling_info.append(
3828 {
3829 "osm_vdu_id": vdu_delta["id"],
3830 "member-vnf-index": vnf_index,
3831 "type": "create",
3832 "count": vdu_delta.get("number-of-instances", 1)
3833 }
3834 )
3835 if cloud_init_list:
3836 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
3837 vdu_scaling_info["vdu-create"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3838
3839 elif scaling_type == "SCALE_IN":
3840 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3841 min_instance_count = int(scaling_descriptor["min-instance-count"])
3842
3843 vdu_scaling_info["scaling_direction"] = "IN"
3844 vdu_scaling_info["vdu-delete"] = {}
3845 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3846 for delta in deltas:
3847 for vdu_delta in delta["vdu-delta"]:
3848 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3849 min_instance_count = 0
3850 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3851 if vdu_profile and "min-number-of-instances" in vdu_profile:
3852 min_instance_count = vdu_profile["min-number-of-instances"]
3853
3854 default_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
3855
3856 nb_scale_op -= vdu_delta.get("number-of-instances", 1)
3857 if nb_scale_op + default_instance_num < min_instance_count:
3858 raise LcmException(
3859 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3860 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3861 )
3862 RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index,
3863 "type": "delete", "count": vdu_delta.get("number-of-instances", 1),
3864 "vdu_index": vdu_index - 1})
3865 for x in range(vdu_delta.get("number-of-instances", 1)):
3866 VCA_scaling_info.append(
3867 {
3868 "osm_vdu_id": vdu_delta["id"],
3869 "member-vnf-index": vnf_index,
3870 "type": "delete",
3871 "vdu_index": vdu_index - 1 - x
3872 }
3873 )
3874 vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3875
3876 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3877 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3878 if vdu_scaling_info["scaling_direction"] == "IN":
3879 for vdur in reversed(db_vnfr["vdur"]):
3880 if vdu_delete.get(vdur["vdu-id-ref"]):
3881 vdu_delete[vdur["vdu-id-ref"]] -= 1
3882 vdu_scaling_info["vdu"].append({
3883 "name": vdur.get("name") or vdur.get("vdu-name"),
3884 "vdu_id": vdur["vdu-id-ref"],
3885 "interface": []
3886 })
3887 for interface in vdur["interfaces"]:
3888 vdu_scaling_info["vdu"][-1]["interface"].append({
3889 "name": interface["name"],
3890 "ip_address": interface["ip-address"],
3891 "mac_address": interface.get("mac-address"),
3892 })
3893 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3894
3895 # PRE-SCALE BEGIN
3896 step = "Executing pre-scale vnf-config-primitive"
3897 if scaling_descriptor.get("scaling-config-action"):
3898 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3899 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3900 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3901 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3902 step = db_nslcmop_update["detailed-status"] = \
3903 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3904
3905 # look for primitive
3906 for config_primitive in (get_configuration(
3907 db_vnfd, db_vnfd["id"]
3908 ) or {}).get("config-primitive", ()):
3909 if config_primitive["name"] == vnf_config_primitive:
3910 break
3911 else:
3912 raise LcmException(
3913 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3914 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3915 "primitive".format(scaling_group, vnf_config_primitive))
3916
3917 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3918 if db_vnfr.get("additionalParamsForVnf"):
3919 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3920
3921 scale_process = "VCA"
3922 db_nsr_update["config-status"] = "configuring pre-scaling"
3923 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3924
3925 # Pre-scale retry check: Check if this sub-operation has been executed before
3926 op_index = self._check_or_add_scale_suboperation(
3927 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3928 if op_index == self.SUBOPERATION_STATUS_SKIP:
3929 # Skip sub-operation
3930 result = 'COMPLETED'
3931 result_detail = 'Done'
3932 self.logger.debug(logging_text +
3933 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3934 vnf_config_primitive, result, result_detail))
3935 else:
3936 if op_index == self.SUBOPERATION_STATUS_NEW:
3937 # New sub-operation: Get index of this sub-operation
3938 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3939 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3940 format(vnf_config_primitive))
3941 else:
3942 # retry: Get registered params for this existing sub-operation
3943 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3944 vnf_index = op.get('member_vnf_index')
3945 vnf_config_primitive = op.get('primitive')
3946 primitive_params = op.get('primitive_params')
3947 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
3948 format(vnf_config_primitive))
3949 # Execute the primitive, either with new (first-time) or registered (reintent) args
3950 ee_descriptor_id = config_primitive.get("execution-environment-ref")
3951 primitive_name = config_primitive.get("execution-environment-primitive",
3952 vnf_config_primitive)
3953 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3954 member_vnf_index=vnf_index,
3955 vdu_id=None,
3956 vdu_count_index=None,
3957 ee_descriptor_id=ee_descriptor_id)
3958 result, result_detail = await self._ns_execute_primitive(
3959 ee_id, primitive_name, primitive_params, vca_type=vca_type)
3960 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3961 vnf_config_primitive, result, result_detail))
3962 # Update operationState = COMPLETED | FAILED
3963 self._update_suboperation_status(
3964 db_nslcmop, op_index, result, result_detail)
3965
3966 if result == "FAILED":
3967 raise LcmException(result_detail)
3968 db_nsr_update["config-status"] = old_config_status
3969 scale_process = None
3970 # PRE-SCALE END
3971
3972 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3973 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3974
3975 # SCALE-IN VCA - BEGIN
3976 if VCA_scaling_info:
3977 step = db_nslcmop_update["detailed-status"] = \
3978 "Deleting the execution environments"
3979 scale_process = "VCA"
3980 for vdu_info in VCA_scaling_info:
3981 if vdu_info["type"] == "delete":
3982 member_vnf_index = str(vdu_info["member-vnf-index"])
3983 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
3984 vdu_id = vdu_info["osm_vdu_id"]
3985 vdu_index = int(vdu_info["vdu_index"])
3986 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
3987 member_vnf_index, vdu_id, vdu_index)
3988 stage[2] = step = "Scaling in VCA"
3989 self._write_op_status(
3990 op_id=nslcmop_id,
3991 stage=stage
3992 )
3993 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
3994 config_update = db_nsr["configurationStatus"]
3995 for vca_index, vca in enumerate(vca_update):
3996 if (vca or vca.get("ee_id")) and vca["member-vnf-index"] == member_vnf_index and \
3997 vca["vdu_count_index"] == vdu_index:
3998 if vca.get("vdu_id"):
3999 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
4000 elif vca.get("kdu_name"):
4001 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
4002 else:
4003 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
4004 operation_params = db_nslcmop.get("operationParams") or {}
4005 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
4006 vca.get("needed_terminate"))
4007 task = asyncio.ensure_future(asyncio.wait_for(
4008 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
4009 vca_index, destroy_ee=True,
4010 exec_primitives=exec_terminate_primitives,
4011 scaling_in=True), timeout=self.timeout_charm_delete))
4012 # wait before next removal
4013 await asyncio.sleep(30)
4014 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
4015 del vca_update[vca_index]
4016 del config_update[vca_index]
4017 # wait for pending tasks of terminate primitives
4018 if tasks_dict_info:
4019 self.logger.debug(logging_text +
4020 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
4021 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
4022 min(self.timeout_charm_delete,
4023 self.timeout_ns_terminate),
4024 stage, nslcmop_id)
4025 tasks_dict_info.clear()
4026 if error_list:
4027 raise LcmException("; ".join(error_list))
4028
4029 db_vca_and_config_update = {
4030 "_admin.deployed.VCA": vca_update,
4031 "configurationStatus": config_update
4032 }
4033 self.update_db_2("nsrs", db_nsr["_id"], db_vca_and_config_update)
4034 scale_process = None
4035 # SCALE-IN VCA - END
4036
4037 # SCALE RO - BEGIN
4038 if RO_scaling_info:
4039 scale_process = "RO"
4040 if self.ro_config.get("ng"):
4041 await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
4042 vdu_scaling_info.pop("vdu-create", None)
4043 vdu_scaling_info.pop("vdu-delete", None)
4044
4045 scale_process = None
4046 if db_nsr_update:
4047 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4048 # SCALE RO - END
4049
4050 # SCALE-UP VCA - BEGIN
4051 if VCA_scaling_info:
4052 step = db_nslcmop_update["detailed-status"] = \
4053 "Creating new execution environments"
4054 scale_process = "VCA"
4055 for vdu_info in VCA_scaling_info:
4056 if vdu_info["type"] == "create":
4057 member_vnf_index = str(vdu_info["member-vnf-index"])
4058 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
4059 vnfd_id = db_vnfr["vnfd-ref"]
4060 vdu_index = int(vdu_info["vdu_index"])
4061 deploy_params = {"OSM": get_osm_params(db_vnfr)}
4062 if db_vnfr.get("additionalParamsForVnf"):
4063 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
4064 descriptor_config = get_configuration(db_vnfd, db_vnfd["id"])
4065 if descriptor_config:
4066 vdu_id = None
4067 vdu_name = None
4068 kdu_name = None
4069 self._deploy_n2vc(
4070 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
4071 db_nsr=db_nsr,
4072 db_vnfr=db_vnfr,
4073 nslcmop_id=nslcmop_id,
4074 nsr_id=nsr_id,
4075 nsi_id=nsi_id,
4076 vnfd_id=vnfd_id,
4077 vdu_id=vdu_id,
4078 kdu_name=kdu_name,
4079 member_vnf_index=member_vnf_index,
4080 vdu_index=vdu_index,
4081 vdu_name=vdu_name,
4082 deploy_params=deploy_params,
4083 descriptor_config=descriptor_config,
4084 base_folder=base_folder,
4085 task_instantiation_info=tasks_dict_info,
4086 stage=stage
4087 )
4088 vdu_id = vdu_info["osm_vdu_id"]
4089 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
4090 descriptor_config = get_configuration(db_vnfd, vdu_id)
4091 if vdur.get("additionalParams"):
4092 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
4093 else:
4094 deploy_params_vdu = deploy_params
4095 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=vdu_index)
4096 if descriptor_config:
4097 vdu_name = None
4098 kdu_name = None
4099 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4100 member_vnf_index, vdu_id, vdu_index)
4101 stage[2] = step = "Scaling out VCA"
4102 self._write_op_status(
4103 op_id=nslcmop_id,
4104 stage=stage
4105 )
4106 self._deploy_n2vc(
4107 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4108 member_vnf_index, vdu_id, vdu_index),
4109 db_nsr=db_nsr,
4110 db_vnfr=db_vnfr,
4111 nslcmop_id=nslcmop_id,
4112 nsr_id=nsr_id,
4113 nsi_id=nsi_id,
4114 vnfd_id=vnfd_id,
4115 vdu_id=vdu_id,
4116 kdu_name=kdu_name,
4117 member_vnf_index=member_vnf_index,
4118 vdu_index=vdu_index,
4119 vdu_name=vdu_name,
4120 deploy_params=deploy_params_vdu,
4121 descriptor_config=descriptor_config,
4122 base_folder=base_folder,
4123 task_instantiation_info=tasks_dict_info,
4124 stage=stage
4125 )
4126 # TODO: scaling for kdu is not implemented yet.
4127 kdu_name = vdu_info["osm_vdu_id"]
4128 descriptor_config = get_configuration(db_vnfd, kdu_name)
4129 if descriptor_config:
4130 vdu_id = None
4131 vdu_index = vdu_index
4132 vdu_name = None
4133 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
4134 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
4135 if kdur.get("additionalParams"):
4136 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
4137
4138 self._deploy_n2vc(
4139 logging_text=logging_text,
4140 db_nsr=db_nsr,
4141 db_vnfr=db_vnfr,
4142 nslcmop_id=nslcmop_id,
4143 nsr_id=nsr_id,
4144 nsi_id=nsi_id,
4145 vnfd_id=vnfd_id,
4146 vdu_id=vdu_id,
4147 kdu_name=kdu_name,
4148 member_vnf_index=member_vnf_index,
4149 vdu_index=vdu_index,
4150 vdu_name=vdu_name,
4151 deploy_params=deploy_params_kdu,
4152 descriptor_config=descriptor_config,
4153 base_folder=base_folder,
4154 task_instantiation_info=tasks_dict_info,
4155 stage=stage
4156 )
4157 # SCALE-UP VCA - END
4158 scale_process = None
4159
4160 # POST-SCALE BEGIN
4161 # execute primitive service POST-SCALING
4162 step = "Executing post-scale vnf-config-primitive"
4163 if scaling_descriptor.get("scaling-config-action"):
4164 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4165 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4166 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4167 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4168 step = db_nslcmop_update["detailed-status"] = \
4169 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4170
4171 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4172 if db_vnfr.get("additionalParamsForVnf"):
4173 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4174
4175 # look for primitive
4176 for config_primitive in (
4177 get_configuration(db_vnfd, db_vnfd["id"]) or {}
4178 ).get("config-primitive", ()):
4179 if config_primitive["name"] == vnf_config_primitive:
4180 break
4181 else:
4182 raise LcmException(
4183 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4184 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4185 "config-primitive".format(scaling_group, vnf_config_primitive))
4186 scale_process = "VCA"
4187 db_nsr_update["config-status"] = "configuring post-scaling"
4188 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4189
4190 # Post-scale retry check: Check if this sub-operation has been executed before
4191 op_index = self._check_or_add_scale_suboperation(
4192 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4193 if op_index == self.SUBOPERATION_STATUS_SKIP:
4194 # Skip sub-operation
4195 result = 'COMPLETED'
4196 result_detail = 'Done'
4197 self.logger.debug(logging_text +
4198 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4199 format(vnf_config_primitive, result, result_detail))
4200 else:
4201 if op_index == self.SUBOPERATION_STATUS_NEW:
4202 # New sub-operation: Get index of this sub-operation
4203 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4204 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4205 format(vnf_config_primitive))
4206 else:
4207 # retry: Get registered params for this existing sub-operation
4208 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4209 vnf_index = op.get('member_vnf_index')
4210 vnf_config_primitive = op.get('primitive')
4211 primitive_params = op.get('primitive_params')
4212 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4213 format(vnf_config_primitive))
4214 # Execute the primitive, either with new (first-time) or registered (reintent) args
4215 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4216 primitive_name = config_primitive.get("execution-environment-primitive",
4217 vnf_config_primitive)
4218 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4219 member_vnf_index=vnf_index,
4220 vdu_id=None,
4221 vdu_count_index=None,
4222 ee_descriptor_id=ee_descriptor_id)
4223 result, result_detail = await self._ns_execute_primitive(
4224 ee_id, primitive_name, primitive_params, vca_type=vca_type)
4225 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4226 vnf_config_primitive, result, result_detail))
4227 # Update operationState = COMPLETED | FAILED
4228 self._update_suboperation_status(
4229 db_nslcmop, op_index, result, result_detail)
4230
4231 if result == "FAILED":
4232 raise LcmException(result_detail)
4233 db_nsr_update["config-status"] = old_config_status
4234 scale_process = None
4235 # POST-SCALE END
4236
4237 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4238 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4239 else old_operational_status
4240 db_nsr_update["config-status"] = old_config_status
4241 return
4242 except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
4243 self.logger.error(logging_text + "Exit Exception {}".format(e))
4244 exc = e
4245 except asyncio.CancelledError:
4246 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4247 exc = "Operation was cancelled"
4248 except Exception as e:
4249 exc = traceback.format_exc()
4250 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4251 finally:
4252 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None)
4253 if tasks_dict_info:
4254 stage[1] = "Waiting for instantiate pending tasks."
4255 self.logger.debug(logging_text + stage[1])
4256 exc = await self._wait_for_tasks(logging_text, tasks_dict_info, self.timeout_ns_deploy,
4257 stage, nslcmop_id, nsr_id=nsr_id)
4258 if exc:
4259 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4260 nslcmop_operation_state = "FAILED"
4261 if db_nsr:
4262 db_nsr_update["operational-status"] = old_operational_status
4263 db_nsr_update["config-status"] = old_config_status
4264 db_nsr_update["detailed-status"] = ""
4265 if scale_process:
4266 if "VCA" in scale_process:
4267 db_nsr_update["config-status"] = "failed"
4268 if "RO" in scale_process:
4269 db_nsr_update["operational-status"] = "failed"
4270 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4271 exc)
4272 else:
4273 error_description_nslcmop = None
4274 nslcmop_operation_state = "COMPLETED"
4275 db_nslcmop_update["detailed-status"] = "Done"
4276
4277 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
4278 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
4279 if db_nsr:
4280 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE",
4281 current_operation_id=None, other_update=db_nsr_update)
4282
4283 if nslcmop_operation_state:
4284 try:
4285 msg = {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state}
4286 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
4287 except Exception as e:
4288 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4289 self.logger.debug(logging_text + "Exit")
4290 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4291
4292 async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
4293 nsr_id = db_nslcmop["nsInstanceId"]
4294 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4295 db_vnfrs = {}
4296
4297 # read from db: vnfd's for every vnf
4298 db_vnfds = []
4299
4300 # for each vnf in ns, read vnfd
4301 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
4302 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
4303 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
4304 # if we haven't this vnfd, read it from db
4305 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
4306 # read from db
4307 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4308 db_vnfds.append(vnfd)
4309 n2vc_key = self.n2vc.get_public_key()
4310 n2vc_key_list = [n2vc_key]
4311 self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
4312 mark_delete=True)
4313 # db_vnfr has been updated, update db_vnfrs to use it
4314 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
4315 await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
4316 db_vnfds, n2vc_key_list, stage=stage, start_deploy=time(),
4317 timeout_ns_deploy=self.timeout_ns_deploy)
4318 if vdu_scaling_info.get("vdu-delete"):
4319 self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
4320
4321 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4322 if not self.prometheus:
4323 return
4324 # look if exist a file called 'prometheus*.j2' and
4325 artifact_content = self.fs.dir_ls(artifact_path)
4326 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4327 if not job_file:
4328 return
4329 with self.fs.file_open((artifact_path, job_file), "r") as f:
4330 job_data = f.read()
4331
4332 # TODO get_service
4333 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4334 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4335 host_port = "80"
4336 vnfr_id = vnfr_id.replace("-", "")
4337 variables = {
4338 "JOB_NAME": vnfr_id,
4339 "TARGET_IP": target_ip,
4340 "EXPORTER_POD_IP": host_name,
4341 "EXPORTER_POD_PORT": host_port,
4342 }
4343 job_list = self.prometheus.parse_job(job_data, variables)
4344 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4345 for job in job_list:
4346 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4347 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4348 job["nsr_id"] = nsr_id
4349 job_dict = {jl["job_name"]: jl for jl in job_list}
4350 if await self.prometheus.update(job_dict):
4351 return list(job_dict.keys())
4352
4353 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4354 """
4355 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4356
4357 :param: vim_account_id: VIM Account ID
4358
4359 :return: (cloud_name, cloud_credential)
4360 """
4361 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4362 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4363
4364 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4365 """
4366 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4367
4368 :param: vim_account_id: VIM Account ID
4369
4370 :return: (cloud_name, cloud_credential)
4371 """
4372 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4373 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")