Feature-9904: Enhancing NG-UI to enable Juju operational view dashboard
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from n2vc.k8s_helm_conn import K8sHelmConnector
31 from n2vc.k8s_juju_conn import K8sJujuConnector
32
33 from osm_common.dbbase import DbException
34 from osm_common.fsbase import FsException
35
36 from n2vc.n2vc_juju_conn import N2VCJujuConnector
37 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
38
39 from osm_lcm.lcm_helm_conn import LCMHelmConn
40
41 from copy import copy, deepcopy
42 from http import HTTPStatus
43 from time import time
44 from uuid import uuid4
45
46 from random import randint
47
48 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
49
50
51 class N2VCJujuConnectorLCM(N2VCJujuConnector):
52
53 async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
54 progress_timeout: float = None, total_timeout: float = None,
55 config: dict = None, artifact_path: str = None,
56 vca_type: str = None) -> (str, dict):
57 # admit two new parameters, artifact_path and vca_type
58 if vca_type == "k8s_proxy_charm":
59 ee_id = await self.install_k8s_proxy_charm(
60 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
61 namespace=namespace,
62 artifact_path=artifact_path,
63 db_dict=db_dict)
64 return ee_id, None
65 else:
66 return await super().create_execution_environment(
67 namespace=namespace, db_dict=db_dict, reuse_ee_id=reuse_ee_id,
68 progress_timeout=progress_timeout, total_timeout=total_timeout)
69
70 async def install_configuration_sw(self, ee_id: str, artifact_path: str, db_dict: dict,
71 progress_timeout: float = None, total_timeout: float = None,
72 config: dict = None, num_units: int = 1, vca_type: str = "lxc_proxy_charm"):
73 if vca_type == "k8s_proxy_charm":
74 return
75 return await super().install_configuration_sw(
76 ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict, progress_timeout=progress_timeout,
77 total_timeout=total_timeout, config=config, num_units=num_units)
78
79
80 class NsLcm(LcmBase):
81 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
82 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
83 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
84 timeout_charm_delete = 10 * 60
85 timeout_primitive = 30 * 60 # timeout for primitive execution
86 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
87
88 SUBOPERATION_STATUS_NOT_FOUND = -1
89 SUBOPERATION_STATUS_NEW = -2
90 SUBOPERATION_STATUS_SKIP = -3
91 task_name_deploy_vca = "Deploying VCA"
92
93 def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
94 """
95 Init, Connect to database, filesystem storage, and messaging
96 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
97 :return: None
98 """
99 super().__init__(
100 db=db,
101 msg=msg,
102 fs=fs,
103 logger=logging.getLogger('lcm.ns')
104 )
105
106 self.loop = loop
107 self.lcm_tasks = lcm_tasks
108 self.timeout = config["timeout"]
109 self.ro_config = config["ro_config"]
110 self.ng_ro = config["ro_config"].get("ng")
111 self.vca_config = config["VCA"].copy()
112
113 # create N2VC connector
114 self.n2vc = N2VCJujuConnectorLCM(
115 db=self.db,
116 fs=self.fs,
117 log=self.logger,
118 loop=self.loop,
119 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
120 username=self.vca_config.get('user', None),
121 vca_config=self.vca_config,
122 on_update_db=self._on_update_n2vc_db
123 )
124
125 self.conn_helm_ee = LCMHelmConn(
126 db=self.db,
127 fs=self.fs,
128 log=self.logger,
129 loop=self.loop,
130 url=None,
131 username=None,
132 vca_config=self.vca_config,
133 on_update_db=self._on_update_n2vc_db
134 )
135
136 self.k8sclusterhelm = K8sHelmConnector(
137 kubectl_command=self.vca_config.get("kubectlpath"),
138 helm_command=self.vca_config.get("helmpath"),
139 fs=self.fs,
140 log=self.logger,
141 db=self.db,
142 on_update_db=None,
143 )
144
145 self.k8sclusterjuju = K8sJujuConnector(
146 kubectl_command=self.vca_config.get("kubectlpath"),
147 juju_command=self.vca_config.get("jujupath"),
148 fs=self.fs,
149 log=self.logger,
150 db=self.db,
151 on_update_db=self._on_update_k8s_db,
152 )
153
154 self.k8scluster_map = {
155 "helm-chart": self.k8sclusterhelm,
156 "chart": self.k8sclusterhelm,
157 "juju-bundle": self.k8sclusterjuju,
158 "juju": self.k8sclusterjuju,
159 }
160
161 self.vca_map = {
162 "lxc_proxy_charm": self.n2vc,
163 "native_charm": self.n2vc,
164 "k8s_proxy_charm": self.n2vc,
165 "helm": self.conn_helm_ee
166 }
167
168 self.prometheus = prometheus
169
170 # create RO client
171 if self.ng_ro:
172 self.RO = NgRoClient(self.loop, **self.ro_config)
173 else:
174 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
175
176 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
177
178 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
179
180 try:
181 # TODO filter RO descriptor fields...
182
183 # write to database
184 db_dict = dict()
185 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
186 db_dict['deploymentStatus'] = ro_descriptor
187 self.update_db_2("nsrs", nsrs_id, db_dict)
188
189 except Exception as e:
190 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
191
192 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
193
194 # remove last dot from path (if exists)
195 if path.endswith('.'):
196 path = path[:-1]
197
198 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
199 # .format(table, filter, path, updated_data))
200 try:
201
202 nsr_id = filter.get('_id')
203
204 # read ns record from database
205 nsr = self.db.get_one(table='nsrs', q_filter=filter)
206 current_ns_status = nsr.get('nsState')
207
208 # get vca status for NS
209 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
210
211 # vcaStatus
212 db_dict = dict()
213 db_dict['vcaStatus'] = status_dict
214 await self.n2vc.update_vca_status(db_dict['vcaStatus'])
215
216 # update configurationStatus for this VCA
217 try:
218 vca_index = int(path[path.rfind(".")+1:])
219
220 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
221 vca_status = vca_list[vca_index].get('status')
222
223 configuration_status_list = nsr.get('configurationStatus')
224 config_status = configuration_status_list[vca_index].get('status')
225
226 if config_status == 'BROKEN' and vca_status != 'failed':
227 db_dict['configurationStatus'][vca_index] = 'READY'
228 elif config_status != 'BROKEN' and vca_status == 'failed':
229 db_dict['configurationStatus'][vca_index] = 'BROKEN'
230 except Exception as e:
231 # not update configurationStatus
232 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
233
234 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
235 # if nsState = 'DEGRADED' check if all is OK
236 is_degraded = False
237 if current_ns_status in ('READY', 'DEGRADED'):
238 error_description = ''
239 # check machines
240 if status_dict.get('machines'):
241 for machine_id in status_dict.get('machines'):
242 machine = status_dict.get('machines').get(machine_id)
243 # check machine agent-status
244 if machine.get('agent-status'):
245 s = machine.get('agent-status').get('status')
246 if s != 'started':
247 is_degraded = True
248 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
249 # check machine instance status
250 if machine.get('instance-status'):
251 s = machine.get('instance-status').get('status')
252 if s != 'running':
253 is_degraded = True
254 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
255 # check applications
256 if status_dict.get('applications'):
257 for app_id in status_dict.get('applications'):
258 app = status_dict.get('applications').get(app_id)
259 # check application status
260 if app.get('status'):
261 s = app.get('status').get('status')
262 if s != 'active':
263 is_degraded = True
264 error_description += 'application {} status={} ; '.format(app_id, s)
265
266 if error_description:
267 db_dict['errorDescription'] = error_description
268 if current_ns_status == 'READY' and is_degraded:
269 db_dict['nsState'] = 'DEGRADED'
270 if current_ns_status == 'DEGRADED' and not is_degraded:
271 db_dict['nsState'] = 'READY'
272
273 # write to database
274 self.update_db_2("nsrs", nsr_id, db_dict)
275
276 except (asyncio.CancelledError, asyncio.TimeoutError):
277 raise
278 except Exception as e:
279 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
280
281 async def _on_update_k8s_db(self, cluster_uuid, kdu_instance, filter=None):
282 """
283 Updating vca status in NSR record
284 :param cluster_uuid: UUID of a k8s cluster
285 :param kdu_instance: The unique name of the KDU instance
286 :param filter: To get nsr_id
287 :return: none
288 """
289
290 # self.logger.debug("_on_update_k8s_db(cluster_uuid={}, kdu_instance={}, filter={}"
291 # .format(cluster_uuid, kdu_instance, filter))
292
293 try:
294 nsr_id = filter.get('_id')
295
296 # get vca status for NS
297 vca_status = await self.k8sclusterjuju.status_kdu(cluster_uuid,
298 kdu_instance,
299 complete_status=True,
300 yaml_format=False)
301 # vcaStatus
302 db_dict = dict()
303 db_dict['vcaStatus'] = {nsr_id: vca_status}
304
305 await self.k8sclusterjuju.update_vcaStatus(db_dict['vcaStatus'], cluster_uuid,
306 kdu_instance)
307
308 # write to database
309 self.update_db_2("nsrs", nsr_id, db_dict)
310
311 except (asyncio.CancelledError, asyncio.TimeoutError):
312 raise
313 except Exception as e:
314 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
315
316 @staticmethod
317 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
318 try:
319 env = Environment(undefined=StrictUndefined)
320 template = env.from_string(cloud_init_text)
321 return template.render(additional_params or {})
322 except UndefinedError as e:
323 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
324 "file, must be provided in the instantiation parameters inside the "
325 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
326 except (TemplateError, TemplateNotFound) as e:
327 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
328 format(vnfd_id, vdu_id, e))
329
330 def _get_cloud_init(self, vdu, vnfd):
331 try:
332 cloud_init_content = cloud_init_file = None
333 if vdu.get("cloud-init-file"):
334 base_folder = vnfd["_admin"]["storage"]
335 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
336 vdu["cloud-init-file"])
337 with self.fs.file_open(cloud_init_file, "r") as ci_file:
338 cloud_init_content = ci_file.read()
339 elif vdu.get("cloud-init"):
340 cloud_init_content = vdu["cloud-init"]
341
342 return cloud_init_content
343 except FsException as e:
344 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
345 format(vnfd["id"], vdu["id"], cloud_init_file, e))
346
347 def _get_osm_params(self, db_vnfr, vdu_id=None, vdu_count_index=0):
348 osm_params = {x.replace("-", "_"): db_vnfr[x] for x in ("ip-address", "vim-account-id", "vnfd-id", "vnfd-ref")
349 if db_vnfr.get(x) is not None}
350 osm_params["ns_id"] = db_vnfr["nsr-id-ref"]
351 osm_params["vnf_id"] = db_vnfr["_id"]
352 osm_params["member_vnf_index"] = db_vnfr["member-vnf-index-ref"]
353 if db_vnfr.get("vdur"):
354 osm_params["vdu"] = {}
355 for vdur in db_vnfr["vdur"]:
356 vdu = {
357 "count_index": vdur["count-index"],
358 "vdu_id": vdur["vdu-id-ref"],
359 "interfaces": {}
360 }
361 if vdur.get("ip-address"):
362 vdu["ip_address"] = vdur["ip-address"]
363 for iface in vdur["interfaces"]:
364 vdu["interfaces"][iface["name"]] = \
365 {x.replace("-", "_"): iface[x] for x in ("mac-address", "ip-address", "vnf-vld-id", "name")
366 if iface.get(x) is not None}
367 vdu_id_index = "{}-{}".format(vdur["vdu-id-ref"], vdur["count-index"])
368 osm_params["vdu"][vdu_id_index] = vdu
369 if vdu_id:
370 osm_params["vdu_id"] = vdu_id
371 osm_params["count_index"] = vdu_count_index
372 return osm_params
373
374 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
375 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
376 additional_params = vdur.get("additionalParams")
377 return self._format_additional_params(additional_params)
378
379 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
380 """
381 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
382 :param vnfd: input vnfd
383 :param new_id: overrides vnf id if provided
384 :param additionalParams: Instantiation params for VNFs provided
385 :param nsrId: Id of the NSR
386 :return: copy of vnfd
387 """
388 vnfd_RO = deepcopy(vnfd)
389 # remove unused by RO configuration, monitoring, scaling and internal keys
390 vnfd_RO.pop("_id", None)
391 vnfd_RO.pop("_admin", None)
392 vnfd_RO.pop("vnf-configuration", None)
393 vnfd_RO.pop("monitoring-param", None)
394 vnfd_RO.pop("scaling-group-descriptor", None)
395 vnfd_RO.pop("kdu", None)
396 vnfd_RO.pop("k8s-cluster", None)
397 if new_id:
398 vnfd_RO["id"] = new_id
399
400 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
401 for vdu in get_iterable(vnfd_RO, "vdu"):
402 vdu.pop("cloud-init-file", None)
403 vdu.pop("cloud-init", None)
404 return vnfd_RO
405
406 def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list):
407 """
408 Creates a RO ns descriptor from OSM ns_instantiate params
409 :param ns_params: OSM instantiate params
410 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
411 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
412 :return: The RO ns descriptor
413 """
414 vim_2_RO = {}
415 wim_2_RO = {}
416 # TODO feature 1417: Check that no instantiation is set over PDU
417 # check if PDU forces a concrete vim-network-id and add it
418 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
419
420 def vim_account_2_RO(vim_account):
421 if vim_account in vim_2_RO:
422 return vim_2_RO[vim_account]
423
424 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
425 if db_vim["_admin"]["operationalState"] != "ENABLED":
426 raise LcmException("VIM={} is not available. operationalState={}".format(
427 vim_account, db_vim["_admin"]["operationalState"]))
428 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
429 vim_2_RO[vim_account] = RO_vim_id
430 return RO_vim_id
431
432 def wim_account_2_RO(wim_account):
433 if isinstance(wim_account, str):
434 if wim_account in wim_2_RO:
435 return wim_2_RO[wim_account]
436
437 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
438 if db_wim["_admin"]["operationalState"] != "ENABLED":
439 raise LcmException("WIM={} is not available. operationalState={}".format(
440 wim_account, db_wim["_admin"]["operationalState"]))
441 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
442 wim_2_RO[wim_account] = RO_wim_id
443 return RO_wim_id
444 else:
445 return wim_account
446
447 def ip_profile_2_RO(ip_profile):
448 RO_ip_profile = deepcopy((ip_profile))
449 if "dns-server" in RO_ip_profile:
450 if isinstance(RO_ip_profile["dns-server"], list):
451 RO_ip_profile["dns-address"] = []
452 for ds in RO_ip_profile.pop("dns-server"):
453 RO_ip_profile["dns-address"].append(ds['address'])
454 else:
455 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
456 if RO_ip_profile.get("ip-version") == "ipv4":
457 RO_ip_profile["ip-version"] = "IPv4"
458 if RO_ip_profile.get("ip-version") == "ipv6":
459 RO_ip_profile["ip-version"] = "IPv6"
460 if "dhcp-params" in RO_ip_profile:
461 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
462 return RO_ip_profile
463
464 if not ns_params:
465 return None
466 RO_ns_params = {
467 # "name": ns_params["nsName"],
468 # "description": ns_params.get("nsDescription"),
469 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
470 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
471 # "scenario": ns_params["nsdId"],
472 }
473 # set vim_account of each vnf if different from general vim_account.
474 # Get this information from <vnfr> database content, key vim-account-id
475 # Vim account can be set by placement_engine and it may be different from
476 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
477 for vnf_index, vnfr in db_vnfrs.items():
478 if vnfr.get("vim-account-id") and vnfr["vim-account-id"] != ns_params["vimAccountId"]:
479 populate_dict(RO_ns_params, ("vnfs", vnf_index, "datacenter"), vim_account_2_RO(vnfr["vim-account-id"]))
480
481 n2vc_key_list = n2vc_key_list or []
482 for vnfd_ref, vnfd in vnfd_dict.items():
483 vdu_needed_access = []
484 mgmt_cp = None
485 if vnfd.get("vnf-configuration"):
486 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
487 if ssh_required and vnfd.get("mgmt-interface"):
488 if vnfd["mgmt-interface"].get("vdu-id"):
489 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
490 elif vnfd["mgmt-interface"].get("cp"):
491 mgmt_cp = vnfd["mgmt-interface"]["cp"]
492
493 for vdu in vnfd.get("vdu", ()):
494 if vdu.get("vdu-configuration"):
495 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
496 if ssh_required:
497 vdu_needed_access.append(vdu["id"])
498 elif mgmt_cp:
499 for vdu_interface in vdu.get("interface"):
500 if vdu_interface.get("external-connection-point-ref") and \
501 vdu_interface["external-connection-point-ref"] == mgmt_cp:
502 vdu_needed_access.append(vdu["id"])
503 mgmt_cp = None
504 break
505
506 if vdu_needed_access:
507 for vnf_member in nsd.get("constituent-vnfd"):
508 if vnf_member["vnfd-id-ref"] != vnfd_ref:
509 continue
510 for vdu in vdu_needed_access:
511 populate_dict(RO_ns_params,
512 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
513 n2vc_key_list)
514 # cloud init
515 for vdu in get_iterable(vnfd, "vdu"):
516 cloud_init_text = self._get_cloud_init(vdu, vnfd)
517 if not cloud_init_text:
518 continue
519 for vnf_member in nsd.get("constituent-vnfd"):
520 if vnf_member["vnfd-id-ref"] != vnfd_ref:
521 continue
522 db_vnfr = db_vnfrs[vnf_member["member-vnf-index"]]
523 additional_params = self._get_vdu_additional_params(db_vnfr, vdu["id"]) or {}
524
525 cloud_init_list = []
526 for vdu_index in range(0, int(vdu.get("count", 1))):
527 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu["id"], vdu_index)
528 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params, vnfd["id"],
529 vdu["id"]))
530 populate_dict(RO_ns_params,
531 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu["id"], "cloud_init"),
532 cloud_init_list)
533
534 if ns_params.get("vduImage"):
535 RO_ns_params["vduImage"] = ns_params["vduImage"]
536
537 if ns_params.get("ssh_keys"):
538 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
539 for vnf_params in get_iterable(ns_params, "vnf"):
540 for constituent_vnfd in nsd["constituent-vnfd"]:
541 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
542 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
543 break
544 else:
545 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
546 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
547
548 for vdu_params in get_iterable(vnf_params, "vdu"):
549 # TODO feature 1417: check that this VDU exist and it is not a PDU
550 if vdu_params.get("volume"):
551 for volume_params in vdu_params["volume"]:
552 if volume_params.get("vim-volume-id"):
553 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
554 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
555 volume_params["vim-volume-id"])
556 if vdu_params.get("interface"):
557 for interface_params in vdu_params["interface"]:
558 if interface_params.get("ip-address"):
559 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
560 vdu_params["id"], "interfaces", interface_params["name"],
561 "ip_address"),
562 interface_params["ip-address"])
563 if interface_params.get("mac-address"):
564 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
565 vdu_params["id"], "interfaces", interface_params["name"],
566 "mac_address"),
567 interface_params["mac-address"])
568 if interface_params.get("floating-ip-required"):
569 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
570 vdu_params["id"], "interfaces", interface_params["name"],
571 "floating-ip"),
572 interface_params["floating-ip-required"])
573
574 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
575 if internal_vld_params.get("vim-network-name"):
576 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
577 internal_vld_params["name"], "vim-network-name"),
578 internal_vld_params["vim-network-name"])
579 if internal_vld_params.get("vim-network-id"):
580 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
581 internal_vld_params["name"], "vim-network-id"),
582 internal_vld_params["vim-network-id"])
583 if internal_vld_params.get("ip-profile"):
584 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
585 internal_vld_params["name"], "ip-profile"),
586 ip_profile_2_RO(internal_vld_params["ip-profile"]))
587 if internal_vld_params.get("provider-network"):
588
589 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
590 internal_vld_params["name"], "provider-network"),
591 internal_vld_params["provider-network"].copy())
592
593 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
594 # look for interface
595 iface_found = False
596 for vdu_descriptor in vnf_descriptor["vdu"]:
597 for vdu_interface in vdu_descriptor["interface"]:
598 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
599 if icp_params.get("ip-address"):
600 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
601 vdu_descriptor["id"], "interfaces",
602 vdu_interface["name"], "ip_address"),
603 icp_params["ip-address"])
604
605 if icp_params.get("mac-address"):
606 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
607 vdu_descriptor["id"], "interfaces",
608 vdu_interface["name"], "mac_address"),
609 icp_params["mac-address"])
610 iface_found = True
611 break
612 if iface_found:
613 break
614 else:
615 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
616 "internal-vld:id-ref={} is not present at vnfd:internal-"
617 "connection-point".format(vnf_params["member-vnf-index"],
618 icp_params["id-ref"]))
619
620 for vld_params in get_iterable(ns_params, "vld"):
621 if "ip-profile" in vld_params:
622 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
623 ip_profile_2_RO(vld_params["ip-profile"]))
624
625 if vld_params.get("provider-network"):
626
627 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
628 vld_params["provider-network"].copy())
629
630 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
631 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
632 wim_account_2_RO(vld_params["wimAccountId"])),
633 if vld_params.get("vim-network-name"):
634 RO_vld_sites = []
635 if isinstance(vld_params["vim-network-name"], dict):
636 for vim_account, vim_net in vld_params["vim-network-name"].items():
637 RO_vld_sites.append({
638 "netmap-use": vim_net,
639 "datacenter": vim_account_2_RO(vim_account)
640 })
641 else: # isinstance str
642 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
643 if RO_vld_sites:
644 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
645
646 if vld_params.get("vim-network-id"):
647 RO_vld_sites = []
648 if isinstance(vld_params["vim-network-id"], dict):
649 for vim_account, vim_net in vld_params["vim-network-id"].items():
650 RO_vld_sites.append({
651 "netmap-use": vim_net,
652 "datacenter": vim_account_2_RO(vim_account)
653 })
654 else: # isinstance str
655 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
656 if RO_vld_sites:
657 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
658 if vld_params.get("ns-net"):
659 if isinstance(vld_params["ns-net"], dict):
660 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
661 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
662 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
663 if "vnfd-connection-point-ref" in vld_params:
664 for cp_params in vld_params["vnfd-connection-point-ref"]:
665 # look for interface
666 for constituent_vnfd in nsd["constituent-vnfd"]:
667 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
668 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
669 break
670 else:
671 raise LcmException(
672 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
673 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
674 match_cp = False
675 for vdu_descriptor in vnf_descriptor["vdu"]:
676 for interface_descriptor in vdu_descriptor["interface"]:
677 if interface_descriptor.get("external-connection-point-ref") == \
678 cp_params["vnfd-connection-point-ref"]:
679 match_cp = True
680 break
681 if match_cp:
682 break
683 else:
684 raise LcmException(
685 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
686 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
687 cp_params["member-vnf-index-ref"],
688 cp_params["vnfd-connection-point-ref"],
689 vnf_descriptor["id"]))
690 if cp_params.get("ip-address"):
691 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
692 vdu_descriptor["id"], "interfaces",
693 interface_descriptor["name"], "ip_address"),
694 cp_params["ip-address"])
695 if cp_params.get("mac-address"):
696 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
697 vdu_descriptor["id"], "interfaces",
698 interface_descriptor["name"], "mac_address"),
699 cp_params["mac-address"])
700 return RO_ns_params
701
702 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
703 # make a copy to do not change
704 vdu_create = copy(vdu_create)
705 vdu_delete = copy(vdu_delete)
706
707 vdurs = db_vnfr.get("vdur")
708 if vdurs is None:
709 vdurs = []
710 vdu_index = len(vdurs)
711 while vdu_index:
712 vdu_index -= 1
713 vdur = vdurs[vdu_index]
714 if vdur.get("pdu-type"):
715 continue
716 vdu_id_ref = vdur["vdu-id-ref"]
717 if vdu_create and vdu_create.get(vdu_id_ref):
718 vdur_copy = deepcopy(vdur)
719 vdur_copy["status"] = "BUILD"
720 vdur_copy["status-detailed"] = None
721 vdur_copy["ip_address"]: None
722 for iface in vdur_copy["interfaces"]:
723 iface["ip-address"] = None
724 iface["mac-address"] = None
725 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf # TODO ALF
726 for index in range(0, vdu_create[vdu_id_ref]):
727 vdur_copy["_id"] = str(uuid4())
728 vdur_copy["count-index"] += 1
729 vdurs.insert(vdu_index+1+index, vdur_copy)
730 self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
731 vdur_copy = deepcopy(vdur_copy)
732
733 del vdu_create[vdu_id_ref]
734 if vdu_delete and vdu_delete.get(vdu_id_ref):
735 del vdurs[vdu_index]
736 vdu_delete[vdu_id_ref] -= 1
737 if not vdu_delete[vdu_id_ref]:
738 del vdu_delete[vdu_id_ref]
739 # check all operations are done
740 if vdu_create or vdu_delete:
741 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
742 vdu_create))
743 if vdu_delete:
744 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
745 vdu_delete))
746
747 vnfr_update = {"vdur": vdurs}
748 db_vnfr["vdur"] = vdurs
749 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
750
751 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
752 """
753 Updates database nsr with the RO info for the created vld
754 :param ns_update_nsr: dictionary to be filled with the updated info
755 :param db_nsr: content of db_nsr. This is also modified
756 :param nsr_desc_RO: nsr descriptor from RO
757 :return: Nothing, LcmException is raised on errors
758 """
759
760 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
761 for net_RO in get_iterable(nsr_desc_RO, "nets"):
762 if vld["id"] != net_RO.get("ns_net_osm_id"):
763 continue
764 vld["vim-id"] = net_RO.get("vim_net_id")
765 vld["name"] = net_RO.get("vim_name")
766 vld["status"] = net_RO.get("status")
767 vld["status-detailed"] = net_RO.get("error_msg")
768 ns_update_nsr["vld.{}".format(vld_index)] = vld
769 break
770 else:
771 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
772
773 def set_vnfr_at_error(self, db_vnfrs, error_text):
774 try:
775 for db_vnfr in db_vnfrs.values():
776 vnfr_update = {"status": "ERROR"}
777 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
778 if "status" not in vdur:
779 vdur["status"] = "ERROR"
780 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
781 if error_text:
782 vdur["status-detailed"] = str(error_text)
783 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
784 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
785 except DbException as e:
786 self.logger.error("Cannot update vnf. {}".format(e))
787
788 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
789 """
790 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
791 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
792 :param nsr_desc_RO: nsr descriptor from RO
793 :return: Nothing, LcmException is raised on errors
794 """
795 for vnf_index, db_vnfr in db_vnfrs.items():
796 for vnf_RO in nsr_desc_RO["vnfs"]:
797 if vnf_RO["member_vnf_index"] != vnf_index:
798 continue
799 vnfr_update = {}
800 if vnf_RO.get("ip_address"):
801 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
802 elif not db_vnfr.get("ip-address"):
803 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
804 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
805
806 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
807 vdur_RO_count_index = 0
808 if vdur.get("pdu-type"):
809 continue
810 for vdur_RO in get_iterable(vnf_RO, "vms"):
811 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
812 continue
813 if vdur["count-index"] != vdur_RO_count_index:
814 vdur_RO_count_index += 1
815 continue
816 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
817 if vdur_RO.get("ip_address"):
818 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
819 else:
820 vdur["ip-address"] = None
821 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
822 vdur["name"] = vdur_RO.get("vim_name")
823 vdur["status"] = vdur_RO.get("status")
824 vdur["status-detailed"] = vdur_RO.get("error_msg")
825 for ifacer in get_iterable(vdur, "interfaces"):
826 for interface_RO in get_iterable(vdur_RO, "interfaces"):
827 if ifacer["name"] == interface_RO.get("internal_name"):
828 ifacer["ip-address"] = interface_RO.get("ip_address")
829 ifacer["mac-address"] = interface_RO.get("mac_address")
830 break
831 else:
832 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
833 "from VIM info"
834 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
835 vnfr_update["vdur.{}".format(vdu_index)] = vdur
836 break
837 else:
838 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
839 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
840
841 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
842 for net_RO in get_iterable(nsr_desc_RO, "nets"):
843 if vld["id"] != net_RO.get("vnf_net_osm_id"):
844 continue
845 vld["vim-id"] = net_RO.get("vim_net_id")
846 vld["name"] = net_RO.get("vim_name")
847 vld["status"] = net_RO.get("status")
848 vld["status-detailed"] = net_RO.get("error_msg")
849 vnfr_update["vld.{}".format(vld_index)] = vld
850 break
851 else:
852 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
853 vnf_index, vld["id"]))
854
855 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
856 break
857
858 else:
859 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
860
861 def _get_ns_config_info(self, nsr_id):
862 """
863 Generates a mapping between vnf,vdu elements and the N2VC id
864 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
865 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
866 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
867 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
868 """
869 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
870 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
871 mapping = {}
872 ns_config_info = {"osm-config-mapping": mapping}
873 for vca in vca_deployed_list:
874 if not vca["member-vnf-index"]:
875 continue
876 if not vca["vdu_id"]:
877 mapping[vca["member-vnf-index"]] = vca["application"]
878 else:
879 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
880 vca["application"]
881 return ns_config_info
882
883 @staticmethod
884 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed, ee_descriptor_id):
885 """
886 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
887 primitives as verify-ssh-credentials, or config when needed
888 :param desc_primitive_list: information of the descriptor
889 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
890 this element contains a ssh public key
891 :param ee_descriptor_id: execution environment descriptor id. It is the value of
892 XXX_configuration.execution-environment-list.INDEX.id; it can be None
893 :return: The modified list. Can ba an empty list, but always a list
894 """
895
896 primitive_list = desc_primitive_list or []
897
898 # filter primitives by ee_id
899 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
900
901 # sort by 'seq'
902 if primitive_list:
903 primitive_list.sort(key=lambda val: int(val['seq']))
904
905 # look for primitive config, and get the position. None if not present
906 config_position = None
907 for index, primitive in enumerate(primitive_list):
908 if primitive["name"] == "config":
909 config_position = index
910 break
911
912 # for NS, add always a config primitive if not present (bug 874)
913 if not vca_deployed["member-vnf-index"] and config_position is None:
914 primitive_list.insert(0, {"name": "config", "parameter": []})
915 config_position = 0
916 # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
917 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
918 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
919 return primitive_list
920
921 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
922 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
923 nslcmop_id = db_nslcmop["_id"]
924 target = {
925 "name": db_nsr["name"],
926 "ns": {"vld": []},
927 "vnf": [],
928 "image": deepcopy(db_nsr["image"]),
929 "flavor": deepcopy(db_nsr["flavor"]),
930 "action_id": nslcmop_id,
931 }
932 for image in target["image"]:
933 image["vim_info"] = []
934 for flavor in target["flavor"]:
935 flavor["vim_info"] = []
936
937 ns_params = db_nslcmop.get("operationParams")
938 ssh_keys = []
939 if ns_params.get("ssh_keys"):
940 ssh_keys += ns_params.get("ssh_keys")
941 if n2vc_key_list:
942 ssh_keys += n2vc_key_list
943
944 cp2target = {}
945 for vld_index, vld in enumerate(nsd.get("vld")):
946 target_vld = {"id": vld["id"],
947 "name": vld["name"],
948 "mgmt-network": vld.get("mgmt-network", False),
949 "type": vld.get("type"),
950 "vim_info": [{"vim-network-name": vld.get("vim-network-name"),
951 "vim_account_id": ns_params["vimAccountId"]}],
952 }
953 for cp in vld["vnfd-connection-point-ref"]:
954 cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \
955 "nsrs:{}:vld.{}".format(nsr_id, vld_index)
956 target["ns"]["vld"].append(target_vld)
957 for vnfr in db_vnfrs.values():
958 vnfd = db_vnfds_ref[vnfr["vnfd-ref"]]
959 target_vnf = deepcopy(vnfr)
960 for vld in target_vnf.get("vld", ()):
961 # check if connected to a ns.vld
962 vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if
963 cp.get("internal-vld-ref") == vld["id"]), None)
964 if vnf_cp:
965 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
966 if cp2target.get(ns_cp):
967 vld["target"] = cp2target[ns_cp]
968 vld["vim_info"] = [{"vim-network-name": vld.get("vim-network-name"),
969 "vim_account_id": vnfr["vim-account-id"]}]
970
971 for vdur in target_vnf.get("vdur", ()):
972 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
973 vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"])
974 # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
975
976 if ssh_keys:
977 if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")):
978 vdur["ssh-keys"] = ssh_keys
979 vdur["ssh-access-required"] = True
980 elif deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
981 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
982 vdur["ssh-keys"] = ssh_keys
983 vdur["ssh-access-required"] = True
984
985 # cloud-init
986 if vdud.get("cloud-init-file"):
987 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
988 elif vdud.get("cloud-init"):
989 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index)
990
991 # flavor
992 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
993 if not next((vi for vi in ns_flavor["vim_info"] if
994 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
995 ns_flavor["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
996 # image
997 ns_image = target["image"][int(vdur["ns-image-id"])]
998 if not next((vi for vi in ns_image["vim_info"] if
999 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
1000 ns_image["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
1001
1002 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
1003 target["vnf"].append(target_vnf)
1004
1005 desc = await self.RO.deploy(nsr_id, target)
1006 action_id = desc["action_id"]
1007 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
1008
1009 # Updating NSR
1010 db_nsr_update = {
1011 "_admin.deployed.RO.operational-status": "running",
1012 "detailed-status": " ".join(stage)
1013 }
1014 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1015 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1016 self._write_op_status(nslcmop_id, stage)
1017 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
1018 return
1019
1020 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_time, timeout, stage):
1021 detailed_status_old = None
1022 db_nsr_update = {}
1023 while time() <= start_time + timeout:
1024 desc_status = await self.RO.status(nsr_id, action_id)
1025 if desc_status["status"] == "FAILED":
1026 raise NgRoException(desc_status["details"])
1027 elif desc_status["status"] == "BUILD":
1028 stage[2] = "VIM: ({})".format(desc_status["details"])
1029 elif desc_status["status"] == "DONE":
1030 stage[2] = "Deployed at VIM"
1031 break
1032 else:
1033 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
1034 if stage[2] != detailed_status_old:
1035 detailed_status_old = stage[2]
1036 db_nsr_update["detailed-status"] = " ".join(stage)
1037 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1038 self._write_op_status(nslcmop_id, stage)
1039 await asyncio.sleep(5, loop=self.loop)
1040 else: # timeout_ns_deploy
1041 raise NgRoException("Timeout waiting ns to deploy")
1042
1043 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
1044 db_nsr_update = {}
1045 failed_detail = []
1046 action_id = None
1047 start_deploy = time()
1048 try:
1049 target = {
1050 "ns": {"vld": []},
1051 "vnf": [],
1052 "image": [],
1053 "flavor": [],
1054 }
1055 desc = await self.RO.deploy(nsr_id, target)
1056 action_id = desc["action_id"]
1057 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1058 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1059 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
1060
1061 # wait until done
1062 delete_timeout = 20 * 60 # 20 minutes
1063 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
1064
1065 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1066 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1067 # delete all nsr
1068 await self.RO.delete(nsr_id)
1069 except Exception as e:
1070 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1071 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1072 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1073 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1074 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
1075 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1076 failed_detail.append("delete conflict: {}".format(e))
1077 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
1078 else:
1079 failed_detail.append("delete error: {}".format(e))
1080 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
1081
1082 if failed_detail:
1083 stage[2] = "Error deleting from VIM"
1084 else:
1085 stage[2] = "Deleted from VIM"
1086 db_nsr_update["detailed-status"] = " ".join(stage)
1087 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1088 self._write_op_status(nslcmop_id, stage)
1089
1090 if failed_detail:
1091 raise LcmException("; ".join(failed_detail))
1092 return
1093
1094 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
1095 n2vc_key_list, stage):
1096 """
1097 Instantiate at RO
1098 :param logging_text: preffix text to use at logging
1099 :param nsr_id: nsr identity
1100 :param nsd: database content of ns descriptor
1101 :param db_nsr: database content of ns record
1102 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1103 :param db_vnfrs:
1104 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1105 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1106 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1107 :return: None or exception
1108 """
1109 try:
1110 db_nsr_update = {}
1111 RO_descriptor_number = 0 # number of descriptors created at RO
1112 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
1113 nslcmop_id = db_nslcmop["_id"]
1114 start_deploy = time()
1115 ns_params = db_nslcmop.get("operationParams")
1116 if ns_params and ns_params.get("timeout_ns_deploy"):
1117 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1118 else:
1119 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1120
1121 # Check for and optionally request placement optimization. Database will be updated if placement activated
1122 stage[2] = "Waiting for Placement."
1123 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1124 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1125 for vnfr in db_vnfrs.values():
1126 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1127 break
1128 else:
1129 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1130
1131 if self.ng_ro:
1132 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
1133 db_vnfds_ref, n2vc_key_list, stage, start_deploy,
1134 timeout_ns_deploy)
1135 # deploy RO
1136 # get vnfds, instantiate at RO
1137 for c_vnf in nsd.get("constituent-vnfd", ()):
1138 member_vnf_index = c_vnf["member-vnf-index"]
1139 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
1140 vnfd_ref = vnfd["id"]
1141
1142 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
1143 db_nsr_update["detailed-status"] = " ".join(stage)
1144 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1145 self._write_op_status(nslcmop_id, stage)
1146
1147 # self.logger.debug(logging_text + stage[2])
1148 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
1149 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
1150 RO_descriptor_number += 1
1151
1152 # look position at deployed.RO.vnfd if not present it will be appended at the end
1153 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
1154 if vnf_deployed["member-vnf-index"] == member_vnf_index:
1155 break
1156 else:
1157 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
1158 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1159
1160 # look if present
1161 RO_update = {"member-vnf-index": member_vnf_index}
1162 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
1163 if vnfd_list:
1164 RO_update["id"] = vnfd_list[0]["uuid"]
1165 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1166 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
1167 else:
1168 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
1169 get("additionalParamsForVnf"), nsr_id)
1170 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
1171 RO_update["id"] = desc["uuid"]
1172 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1173 vnfd_ref, member_vnf_index, desc["uuid"]))
1174 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
1175 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
1176
1177 # create nsd at RO
1178 nsd_ref = nsd["id"]
1179
1180 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1181 db_nsr_update["detailed-status"] = " ".join(stage)
1182 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1183 self._write_op_status(nslcmop_id, stage)
1184
1185 # self.logger.debug(logging_text + stage[2])
1186 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
1187 RO_descriptor_number += 1
1188 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
1189 if nsd_list:
1190 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
1191 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
1192 nsd_ref, RO_nsd_uuid))
1193 else:
1194 nsd_RO = deepcopy(nsd)
1195 nsd_RO["id"] = RO_osm_nsd_id
1196 nsd_RO.pop("_id", None)
1197 nsd_RO.pop("_admin", None)
1198 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
1199 member_vnf_index = c_vnf["member-vnf-index"]
1200 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1201 for c_vld in nsd_RO.get("vld", ()):
1202 for cp in c_vld.get("vnfd-connection-point-ref", ()):
1203 member_vnf_index = cp["member-vnf-index-ref"]
1204 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1205
1206 desc = await self.RO.create("nsd", descriptor=nsd_RO)
1207 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1208 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
1209 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
1210 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1211
1212 # Crate ns at RO
1213 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1214 db_nsr_update["detailed-status"] = " ".join(stage)
1215 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1216 self._write_op_status(nslcmop_id, stage)
1217
1218 # if present use it unless in error status
1219 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
1220 if RO_nsr_id:
1221 try:
1222 stage[2] = "Looking for existing ns at RO"
1223 db_nsr_update["detailed-status"] = " ".join(stage)
1224 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1225 self._write_op_status(nslcmop_id, stage)
1226 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1227 desc = await self.RO.show("ns", RO_nsr_id)
1228
1229 except ROclient.ROClientException as e:
1230 if e.http_code != HTTPStatus.NOT_FOUND:
1231 raise
1232 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1233 if RO_nsr_id:
1234 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1235 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1236 if ns_status == "ERROR":
1237 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
1238 self.logger.debug(logging_text + stage[2])
1239 await self.RO.delete("ns", RO_nsr_id)
1240 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1241 if not RO_nsr_id:
1242 stage[2] = "Checking dependencies"
1243 db_nsr_update["detailed-status"] = " ".join(stage)
1244 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1245 self._write_op_status(nslcmop_id, stage)
1246 # self.logger.debug(logging_text + stage[2])
1247
1248 # check if VIM is creating and wait look if previous tasks in process
1249 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
1250 if task_dependency:
1251 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
1252 self.logger.debug(logging_text + stage[2])
1253 await asyncio.wait(task_dependency, timeout=3600)
1254 if ns_params.get("vnf"):
1255 for vnf in ns_params["vnf"]:
1256 if "vimAccountId" in vnf:
1257 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
1258 vnf["vimAccountId"])
1259 if task_dependency:
1260 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
1261 self.logger.debug(logging_text + stage[2])
1262 await asyncio.wait(task_dependency, timeout=3600)
1263
1264 stage[2] = "Checking instantiation parameters."
1265 RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, db_vnfrs, n2vc_key_list)
1266 stage[2] = "Deploying ns at VIM."
1267 db_nsr_update["detailed-status"] = " ".join(stage)
1268 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1269 self._write_op_status(nslcmop_id, stage)
1270
1271 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
1272 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
1273 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1274 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
1275 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
1276
1277 # wait until NS is ready
1278 stage[2] = "Waiting VIM to deploy ns."
1279 db_nsr_update["detailed-status"] = " ".join(stage)
1280 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1281 self._write_op_status(nslcmop_id, stage)
1282 detailed_status_old = None
1283 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1284
1285 old_desc = None
1286 while time() <= start_deploy + timeout_ns_deploy:
1287 desc = await self.RO.show("ns", RO_nsr_id)
1288
1289 # deploymentStatus
1290 if desc != old_desc:
1291 # desc has changed => update db
1292 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
1293 old_desc = desc
1294
1295 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1296 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1297 if ns_status == "ERROR":
1298 raise ROclient.ROClientException(ns_status_info)
1299 elif ns_status == "BUILD":
1300 stage[2] = "VIM: ({})".format(ns_status_info)
1301 elif ns_status == "ACTIVE":
1302 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1303 try:
1304 self.ns_update_vnfr(db_vnfrs, desc)
1305 break
1306 except LcmExceptionNoMgmtIP:
1307 pass
1308 else:
1309 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1310 if stage[2] != detailed_status_old:
1311 detailed_status_old = stage[2]
1312 db_nsr_update["detailed-status"] = " ".join(stage)
1313 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1314 self._write_op_status(nslcmop_id, stage)
1315 await asyncio.sleep(5, loop=self.loop)
1316 else: # timeout_ns_deploy
1317 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1318
1319 # Updating NSR
1320 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
1321
1322 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
1323 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1324 stage[2] = "Deployed at VIM"
1325 db_nsr_update["detailed-status"] = " ".join(stage)
1326 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1327 self._write_op_status(nslcmop_id, stage)
1328 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1329 # self.logger.debug(logging_text + "Deployed at VIM")
1330 except (ROclient.ROClientException, LcmException, DbException, NgRoException) as e:
1331 stage[2] = "ERROR deploying at VIM"
1332 self.set_vnfr_at_error(db_vnfrs, str(e))
1333 raise
1334
1335 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1336 """
1337 Wait for kdu to be up, get ip address
1338 :param logging_text: prefix use for logging
1339 :param nsr_id:
1340 :param vnfr_id:
1341 :param kdu_name:
1342 :return: IP address
1343 """
1344
1345 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1346 nb_tries = 0
1347
1348 while nb_tries < 360:
1349 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1350 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
1351 if not kdur:
1352 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
1353 if kdur.get("status"):
1354 if kdur["status"] in ("READY", "ENABLED"):
1355 return kdur.get("ip-address")
1356 else:
1357 raise LcmException("target KDU={} is in error state".format(kdu_name))
1358
1359 await asyncio.sleep(10, loop=self.loop)
1360 nb_tries += 1
1361 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1362
1363 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1364 """
1365 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1366 :param logging_text: prefix use for logging
1367 :param nsr_id:
1368 :param vnfr_id:
1369 :param vdu_id:
1370 :param vdu_index:
1371 :param pub_key: public ssh key to inject, None to skip
1372 :param user: user to apply the public ssh key
1373 :return: IP address
1374 """
1375
1376 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1377 ro_nsr_id = None
1378 ip_address = None
1379 nb_tries = 0
1380 target_vdu_id = None
1381 ro_retries = 0
1382
1383 while True:
1384
1385 ro_retries += 1
1386 if ro_retries >= 360: # 1 hour
1387 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1388
1389 await asyncio.sleep(10, loop=self.loop)
1390
1391 # get ip address
1392 if not target_vdu_id:
1393 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1394
1395 if not vdu_id: # for the VNF case
1396 if db_vnfr.get("status") == "ERROR":
1397 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1398 ip_address = db_vnfr.get("ip-address")
1399 if not ip_address:
1400 continue
1401 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1402 else: # VDU case
1403 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1404 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1405
1406 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1407 vdur = db_vnfr["vdur"][0]
1408 if not vdur:
1409 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1410 vdu_index))
1411
1412 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1413 ip_address = vdur.get("ip-address")
1414 if not ip_address:
1415 continue
1416 target_vdu_id = vdur["vdu-id-ref"]
1417 elif vdur.get("status") == "ERROR":
1418 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1419
1420 if not target_vdu_id:
1421 continue
1422
1423 # inject public key into machine
1424 if pub_key and user:
1425 # wait until NS is deployed at RO
1426 if not ro_nsr_id:
1427 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1428 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1429 if not ro_nsr_id:
1430 continue
1431
1432 # self.logger.debug(logging_text + "Inserting RO key")
1433 if vdur.get("pdu-type"):
1434 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1435 return ip_address
1436 try:
1437 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1438 if self.ng_ro:
1439 target = {"action": "inject_ssh_key", "key": pub_key, "user": user,
1440 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdu_id}]}],
1441 }
1442 await self.RO.deploy(nsr_id, target)
1443 else:
1444 result_dict = await self.RO.create_action(
1445 item="ns",
1446 item_id_name=ro_nsr_id,
1447 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1448 )
1449 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1450 if not result_dict or not isinstance(result_dict, dict):
1451 raise LcmException("Unknown response from RO when injecting key")
1452 for result in result_dict.values():
1453 if result.get("vim_result") == 200:
1454 break
1455 else:
1456 raise ROclient.ROClientException("error injecting key: {}".format(
1457 result.get("description")))
1458 break
1459 except NgRoException as e:
1460 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1461 except ROclient.ROClientException as e:
1462 if not nb_tries:
1463 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1464 format(e, 20*10))
1465 nb_tries += 1
1466 if nb_tries >= 20:
1467 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1468 else:
1469 break
1470
1471 return ip_address
1472
1473 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1474 """
1475 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1476 """
1477 my_vca = vca_deployed_list[vca_index]
1478 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1479 # vdu or kdu: no dependencies
1480 return
1481 timeout = 300
1482 while timeout >= 0:
1483 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1484 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1485 configuration_status_list = db_nsr["configurationStatus"]
1486 for index, vca_deployed in enumerate(configuration_status_list):
1487 if index == vca_index:
1488 # myself
1489 continue
1490 if not my_vca.get("member-vnf-index") or \
1491 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1492 internal_status = configuration_status_list[index].get("status")
1493 if internal_status == 'READY':
1494 continue
1495 elif internal_status == 'BROKEN':
1496 raise LcmException("Configuration aborted because dependent charm/s has failed")
1497 else:
1498 break
1499 else:
1500 # no dependencies, return
1501 return
1502 await asyncio.sleep(10)
1503 timeout -= 1
1504
1505 raise LcmException("Configuration aborted because dependent charm/s timeout")
1506
1507 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1508 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1509 ee_config_descriptor):
1510 nsr_id = db_nsr["_id"]
1511 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1512 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1513 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1514 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1515 db_dict = {
1516 'collection': 'nsrs',
1517 'filter': {'_id': nsr_id},
1518 'path': db_update_entry
1519 }
1520 step = ""
1521 try:
1522
1523 element_type = 'NS'
1524 element_under_configuration = nsr_id
1525
1526 vnfr_id = None
1527 if db_vnfr:
1528 vnfr_id = db_vnfr["_id"]
1529 osm_config["osm"]["vnf_id"] = vnfr_id
1530
1531 namespace = "{nsi}.{ns}".format(
1532 nsi=nsi_id if nsi_id else "",
1533 ns=nsr_id)
1534
1535 if vnfr_id:
1536 element_type = 'VNF'
1537 element_under_configuration = vnfr_id
1538 namespace += ".{}".format(vnfr_id)
1539 if vdu_id:
1540 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1541 element_type = 'VDU'
1542 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1543 osm_config["osm"]["vdu_id"] = vdu_id
1544 elif kdu_name:
1545 namespace += ".{}".format(kdu_name)
1546 element_type = 'KDU'
1547 element_under_configuration = kdu_name
1548 osm_config["osm"]["kdu_name"] = kdu_name
1549
1550 # Get artifact path
1551 artifact_path = "{}/{}/{}/{}".format(
1552 base_folder["folder"],
1553 base_folder["pkg-dir"],
1554 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1555 vca_name
1556 )
1557 # get initial_config_primitive_list that applies to this element
1558 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1559
1560 # add config if not present for NS charm
1561 ee_descriptor_id = ee_config_descriptor.get("id")
1562 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1563 vca_deployed, ee_descriptor_id)
1564
1565 # n2vc_redesign STEP 3.1
1566 # find old ee_id if exists
1567 ee_id = vca_deployed.get("ee_id")
1568
1569 # create or register execution environment in VCA
1570 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"):
1571
1572 self._write_configuration_status(
1573 nsr_id=nsr_id,
1574 vca_index=vca_index,
1575 status='CREATING',
1576 element_under_configuration=element_under_configuration,
1577 element_type=element_type
1578 )
1579
1580 step = "create execution environment"
1581 self.logger.debug(logging_text + step)
1582 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1583 namespace=namespace,
1584 reuse_ee_id=ee_id,
1585 db_dict=db_dict,
1586 config=osm_config,
1587 artifact_path=artifact_path,
1588 vca_type=vca_type)
1589
1590 elif vca_type == "native_charm":
1591 step = "Waiting to VM being up and getting IP address"
1592 self.logger.debug(logging_text + step)
1593 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1594 user=None, pub_key=None)
1595 credentials = {"hostname": rw_mgmt_ip}
1596 # get username
1597 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1598 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1599 # merged. Meanwhile let's get username from initial-config-primitive
1600 if not username and initial_config_primitive_list:
1601 for config_primitive in initial_config_primitive_list:
1602 for param in config_primitive.get("parameter", ()):
1603 if param["name"] == "ssh-username":
1604 username = param["value"]
1605 break
1606 if not username:
1607 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1608 "'config-access.ssh-access.default-user'")
1609 credentials["username"] = username
1610 # n2vc_redesign STEP 3.2
1611
1612 self._write_configuration_status(
1613 nsr_id=nsr_id,
1614 vca_index=vca_index,
1615 status='REGISTERING',
1616 element_under_configuration=element_under_configuration,
1617 element_type=element_type
1618 )
1619
1620 step = "register execution environment {}".format(credentials)
1621 self.logger.debug(logging_text + step)
1622 ee_id = await self.vca_map[vca_type].register_execution_environment(
1623 credentials=credentials, namespace=namespace, db_dict=db_dict)
1624
1625 # for compatibility with MON/POL modules, the need model and application name at database
1626 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1627 ee_id_parts = ee_id.split('.')
1628 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1629 if len(ee_id_parts) >= 2:
1630 model_name = ee_id_parts[0]
1631 application_name = ee_id_parts[1]
1632 db_nsr_update[db_update_entry + "model"] = model_name
1633 db_nsr_update[db_update_entry + "application"] = application_name
1634
1635 # n2vc_redesign STEP 3.3
1636 step = "Install configuration Software"
1637
1638 self._write_configuration_status(
1639 nsr_id=nsr_id,
1640 vca_index=vca_index,
1641 status='INSTALLING SW',
1642 element_under_configuration=element_under_configuration,
1643 element_type=element_type,
1644 other_update=db_nsr_update
1645 )
1646
1647 # TODO check if already done
1648 self.logger.debug(logging_text + step)
1649 config = None
1650 if vca_type == "native_charm":
1651 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1652 if config_primitive:
1653 config = self._map_primitive_params(
1654 config_primitive,
1655 {},
1656 deploy_params
1657 )
1658 num_units = 1
1659 if vca_type == "lxc_proxy_charm":
1660 if element_type == "NS":
1661 num_units = db_nsr.get("config-units") or 1
1662 elif element_type == "VNF":
1663 num_units = db_vnfr.get("config-units") or 1
1664 elif element_type == "VDU":
1665 for v in db_vnfr["vdur"]:
1666 if vdu_id == v["vdu-id-ref"]:
1667 num_units = v.get("config-units") or 1
1668 break
1669
1670 await self.vca_map[vca_type].install_configuration_sw(
1671 ee_id=ee_id,
1672 artifact_path=artifact_path,
1673 db_dict=db_dict,
1674 config=config,
1675 num_units=num_units,
1676 vca_type=vca_type
1677 )
1678
1679 # write in db flag of configuration_sw already installed
1680 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1681
1682 # add relations for this VCA (wait for other peers related with this VCA)
1683 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1684 vca_index=vca_index, vca_type=vca_type)
1685
1686 # if SSH access is required, then get execution environment SSH public
1687 # if native charm we have waited already to VM be UP
1688 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm"):
1689 pub_key = None
1690 user = None
1691 # self.logger.debug("get ssh key block")
1692 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1693 # self.logger.debug("ssh key needed")
1694 # Needed to inject a ssh key
1695 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1696 step = "Install configuration Software, getting public ssh key"
1697 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1698
1699 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1700 else:
1701 # self.logger.debug("no need to get ssh key")
1702 step = "Waiting to VM being up and getting IP address"
1703 self.logger.debug(logging_text + step)
1704
1705 # n2vc_redesign STEP 5.1
1706 # wait for RO (ip-address) Insert pub_key into VM
1707 if vnfr_id:
1708 if kdu_name:
1709 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1710 else:
1711 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1712 vdu_index, user=user, pub_key=pub_key)
1713 else:
1714 rw_mgmt_ip = None # This is for a NS configuration
1715
1716 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1717
1718 # store rw_mgmt_ip in deploy params for later replacement
1719 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1720
1721 # n2vc_redesign STEP 6 Execute initial config primitive
1722 step = 'execute initial config primitive'
1723
1724 # wait for dependent primitives execution (NS -> VNF -> VDU)
1725 if initial_config_primitive_list:
1726 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1727
1728 # stage, in function of element type: vdu, kdu, vnf or ns
1729 my_vca = vca_deployed_list[vca_index]
1730 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1731 # VDU or KDU
1732 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1733 elif my_vca.get("member-vnf-index"):
1734 # VNF
1735 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1736 else:
1737 # NS
1738 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1739
1740 self._write_configuration_status(
1741 nsr_id=nsr_id,
1742 vca_index=vca_index,
1743 status='EXECUTING PRIMITIVE'
1744 )
1745
1746 self._write_op_status(
1747 op_id=nslcmop_id,
1748 stage=stage
1749 )
1750
1751 check_if_terminated_needed = True
1752 for initial_config_primitive in initial_config_primitive_list:
1753 # adding information on the vca_deployed if it is a NS execution environment
1754 if not vca_deployed["member-vnf-index"]:
1755 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1756 # TODO check if already done
1757 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1758
1759 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1760 self.logger.debug(logging_text + step)
1761 await self.vca_map[vca_type].exec_primitive(
1762 ee_id=ee_id,
1763 primitive_name=initial_config_primitive["name"],
1764 params_dict=primitive_params_,
1765 db_dict=db_dict
1766 )
1767 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1768 if check_if_terminated_needed:
1769 if config_descriptor.get('terminate-config-primitive'):
1770 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1771 check_if_terminated_needed = False
1772
1773 # TODO register in database that primitive is done
1774
1775 # STEP 7 Configure metrics
1776 if vca_type == "helm":
1777 prometheus_jobs = await self.add_prometheus_metrics(
1778 ee_id=ee_id,
1779 artifact_path=artifact_path,
1780 ee_config_descriptor=ee_config_descriptor,
1781 vnfr_id=vnfr_id,
1782 nsr_id=nsr_id,
1783 target_ip=rw_mgmt_ip,
1784 )
1785 if prometheus_jobs:
1786 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1787
1788 step = "instantiated at VCA"
1789 self.logger.debug(logging_text + step)
1790
1791 self._write_configuration_status(
1792 nsr_id=nsr_id,
1793 vca_index=vca_index,
1794 status='READY'
1795 )
1796
1797 except Exception as e: # TODO not use Exception but N2VC exception
1798 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1799 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1800 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1801 self._write_configuration_status(
1802 nsr_id=nsr_id,
1803 vca_index=vca_index,
1804 status='BROKEN'
1805 )
1806 raise LcmException("{} {}".format(step, e)) from e
1807
1808 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1809 error_description: str = None, error_detail: str = None, other_update: dict = None):
1810 """
1811 Update db_nsr fields.
1812 :param nsr_id:
1813 :param ns_state:
1814 :param current_operation:
1815 :param current_operation_id:
1816 :param error_description:
1817 :param error_detail:
1818 :param other_update: Other required changes at database if provided, will be cleared
1819 :return:
1820 """
1821 try:
1822 db_dict = other_update or {}
1823 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1824 db_dict["_admin.current-operation"] = current_operation_id
1825 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1826 db_dict["currentOperation"] = current_operation
1827 db_dict["currentOperationID"] = current_operation_id
1828 db_dict["errorDescription"] = error_description
1829 db_dict["errorDetail"] = error_detail
1830
1831 if ns_state:
1832 db_dict["nsState"] = ns_state
1833 self.update_db_2("nsrs", nsr_id, db_dict)
1834 except DbException as e:
1835 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1836
1837 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1838 operation_state: str = None, other_update: dict = None):
1839 try:
1840 db_dict = other_update or {}
1841 db_dict['queuePosition'] = queuePosition
1842 if isinstance(stage, list):
1843 db_dict['stage'] = stage[0]
1844 db_dict['detailed-status'] = " ".join(stage)
1845 elif stage is not None:
1846 db_dict['stage'] = str(stage)
1847
1848 if error_message is not None:
1849 db_dict['errorMessage'] = error_message
1850 if operation_state is not None:
1851 db_dict['operationState'] = operation_state
1852 db_dict["statusEnteredTime"] = time()
1853 self.update_db_2("nslcmops", op_id, db_dict)
1854 except DbException as e:
1855 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1856
1857 def _write_all_config_status(self, db_nsr: dict, status: str):
1858 try:
1859 nsr_id = db_nsr["_id"]
1860 # configurationStatus
1861 config_status = db_nsr.get('configurationStatus')
1862 if config_status:
1863 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1864 enumerate(config_status) if v}
1865 # update status
1866 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1867
1868 except DbException as e:
1869 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1870
1871 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1872 element_under_configuration: str = None, element_type: str = None,
1873 other_update: dict = None):
1874
1875 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1876 # .format(vca_index, status))
1877
1878 try:
1879 db_path = 'configurationStatus.{}.'.format(vca_index)
1880 db_dict = other_update or {}
1881 if status:
1882 db_dict[db_path + 'status'] = status
1883 if element_under_configuration:
1884 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1885 if element_type:
1886 db_dict[db_path + 'elementType'] = element_type
1887 self.update_db_2("nsrs", nsr_id, db_dict)
1888 except DbException as e:
1889 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1890 .format(status, nsr_id, vca_index, e))
1891
1892 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1893 """
1894 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1895 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1896 Database is used because the result can be obtained from a different LCM worker in case of HA.
1897 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1898 :param db_nslcmop: database content of nslcmop
1899 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1900 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1901 computed 'vim-account-id'
1902 """
1903 modified = False
1904 nslcmop_id = db_nslcmop['_id']
1905 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1906 if placement_engine == "PLA":
1907 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1908 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1909 db_poll_interval = 5
1910 wait = db_poll_interval * 10
1911 pla_result = None
1912 while not pla_result and wait >= 0:
1913 await asyncio.sleep(db_poll_interval)
1914 wait -= db_poll_interval
1915 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1916 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1917
1918 if not pla_result:
1919 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1920
1921 for pla_vnf in pla_result['vnf']:
1922 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1923 if not pla_vnf.get('vimAccountId') or not vnfr:
1924 continue
1925 modified = True
1926 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1927 # Modifies db_vnfrs
1928 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1929 return modified
1930
1931 def update_nsrs_with_pla_result(self, params):
1932 try:
1933 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1934 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1935 except Exception as e:
1936 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1937
1938 async def instantiate(self, nsr_id, nslcmop_id):
1939 """
1940
1941 :param nsr_id: ns instance to deploy
1942 :param nslcmop_id: operation to run
1943 :return:
1944 """
1945
1946 # Try to lock HA task here
1947 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1948 if not task_is_locked_by_me:
1949 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1950 return
1951
1952 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1953 self.logger.debug(logging_text + "Enter")
1954
1955 # get all needed from database
1956
1957 # database nsrs record
1958 db_nsr = None
1959
1960 # database nslcmops record
1961 db_nslcmop = None
1962
1963 # update operation on nsrs
1964 db_nsr_update = {}
1965 # update operation on nslcmops
1966 db_nslcmop_update = {}
1967
1968 nslcmop_operation_state = None
1969 db_vnfrs = {} # vnf's info indexed by member-index
1970 # n2vc_info = {}
1971 tasks_dict_info = {} # from task to info text
1972 exc = None
1973 error_list = []
1974 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1975 # ^ stage, step, VIM progress
1976 try:
1977 # wait for any previous tasks in process
1978 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1979
1980 stage[1] = "Sync filesystem from database."
1981 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1982
1983 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1984 stage[1] = "Reading from database."
1985 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1986 db_nsr_update["detailed-status"] = "creating"
1987 db_nsr_update["operational-status"] = "init"
1988 self._write_ns_status(
1989 nsr_id=nsr_id,
1990 ns_state="BUILDING",
1991 current_operation="INSTANTIATING",
1992 current_operation_id=nslcmop_id,
1993 other_update=db_nsr_update
1994 )
1995 self._write_op_status(
1996 op_id=nslcmop_id,
1997 stage=stage,
1998 queuePosition=0
1999 )
2000
2001 # read from db: operation
2002 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2003 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2004 ns_params = db_nslcmop.get("operationParams")
2005 if ns_params and ns_params.get("timeout_ns_deploy"):
2006 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2007 else:
2008 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
2009
2010 # read from db: ns
2011 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2012 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2013 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2014 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2015 db_nsr["nsd"] = nsd
2016 # nsr_name = db_nsr["name"] # TODO short-name??
2017
2018 # read from db: vnf's of this ns
2019 stage[1] = "Getting vnfrs from db."
2020 self.logger.debug(logging_text + stage[1])
2021 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2022
2023 # read from db: vnfd's for every vnf
2024 db_vnfds_ref = {} # every vnfd data indexed by vnf name
2025 db_vnfds = {} # every vnfd data indexed by vnf id
2026 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
2027
2028 # for each vnf in ns, read vnfd
2029 for vnfr in db_vnfrs_list:
2030 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
2031 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
2032 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
2033
2034 # if we haven't this vnfd, read it from db
2035 if vnfd_id not in db_vnfds:
2036 # read from db
2037 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
2038 self.logger.debug(logging_text + stage[1])
2039 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2040
2041 # store vnfd
2042 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
2043 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
2044 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
2045
2046 # Get or generates the _admin.deployed.VCA list
2047 vca_deployed_list = None
2048 if db_nsr["_admin"].get("deployed"):
2049 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2050 if vca_deployed_list is None:
2051 vca_deployed_list = []
2052 configuration_status_list = []
2053 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2054 db_nsr_update["configurationStatus"] = configuration_status_list
2055 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2056 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2057 elif isinstance(vca_deployed_list, dict):
2058 # maintain backward compatibility. Change a dict to list at database
2059 vca_deployed_list = list(vca_deployed_list.values())
2060 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2061 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2062
2063 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
2064 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2065 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2066
2067 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2068 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2069 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2070 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
2071
2072 # n2vc_redesign STEP 2 Deploy Network Scenario
2073 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
2074 self._write_op_status(
2075 op_id=nslcmop_id,
2076 stage=stage
2077 )
2078
2079 stage[1] = "Deploying KDUs."
2080 # self.logger.debug(logging_text + "Before deploy_kdus")
2081 # Call to deploy_kdus in case exists the "vdu:kdu" param
2082 await self.deploy_kdus(
2083 logging_text=logging_text,
2084 nsr_id=nsr_id,
2085 nslcmop_id=nslcmop_id,
2086 db_vnfrs=db_vnfrs,
2087 db_vnfds=db_vnfds,
2088 task_instantiation_info=tasks_dict_info,
2089 )
2090
2091 stage[1] = "Getting VCA public key."
2092 # n2vc_redesign STEP 1 Get VCA public ssh-key
2093 # feature 1429. Add n2vc public key to needed VMs
2094 n2vc_key = self.n2vc.get_public_key()
2095 n2vc_key_list = [n2vc_key]
2096 if self.vca_config.get("public_key"):
2097 n2vc_key_list.append(self.vca_config["public_key"])
2098
2099 stage[1] = "Deploying NS at VIM."
2100 task_ro = asyncio.ensure_future(
2101 self.instantiate_RO(
2102 logging_text=logging_text,
2103 nsr_id=nsr_id,
2104 nsd=nsd,
2105 db_nsr=db_nsr,
2106 db_nslcmop=db_nslcmop,
2107 db_vnfrs=db_vnfrs,
2108 db_vnfds_ref=db_vnfds_ref,
2109 n2vc_key_list=n2vc_key_list,
2110 stage=stage
2111 )
2112 )
2113 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2114 tasks_dict_info[task_ro] = "Deploying at VIM"
2115
2116 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2117 stage[1] = "Deploying Execution Environments."
2118 self.logger.debug(logging_text + stage[1])
2119
2120 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2121 # get_iterable() returns a value from a dict or empty tuple if key does not exist
2122 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
2123 vnfd_id = c_vnf["vnfd-id-ref"]
2124 vnfd = db_vnfds_ref[vnfd_id]
2125 member_vnf_index = str(c_vnf["member-vnf-index"])
2126 db_vnfr = db_vnfrs[member_vnf_index]
2127 base_folder = vnfd["_admin"]["storage"]
2128 vdu_id = None
2129 vdu_index = 0
2130 vdu_name = None
2131 kdu_name = None
2132
2133 # Get additional parameters
2134 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2135 if db_vnfr.get("additionalParamsForVnf"):
2136 deploy_params.update(self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy()))
2137
2138 descriptor_config = vnfd.get("vnf-configuration")
2139 if descriptor_config:
2140 self._deploy_n2vc(
2141 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
2142 db_nsr=db_nsr,
2143 db_vnfr=db_vnfr,
2144 nslcmop_id=nslcmop_id,
2145 nsr_id=nsr_id,
2146 nsi_id=nsi_id,
2147 vnfd_id=vnfd_id,
2148 vdu_id=vdu_id,
2149 kdu_name=kdu_name,
2150 member_vnf_index=member_vnf_index,
2151 vdu_index=vdu_index,
2152 vdu_name=vdu_name,
2153 deploy_params=deploy_params,
2154 descriptor_config=descriptor_config,
2155 base_folder=base_folder,
2156 task_instantiation_info=tasks_dict_info,
2157 stage=stage
2158 )
2159
2160 # Deploy charms for each VDU that supports one.
2161 for vdud in get_iterable(vnfd, 'vdu'):
2162 vdu_id = vdud["id"]
2163 descriptor_config = vdud.get('vdu-configuration')
2164 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2165 if vdur.get("additionalParams"):
2166 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
2167 else:
2168 deploy_params_vdu = deploy_params
2169 deploy_params_vdu["OSM"] = self._get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
2170 if descriptor_config:
2171 vdu_name = None
2172 kdu_name = None
2173 for vdu_index in range(int(vdud.get("count", 1))):
2174 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2175 self._deploy_n2vc(
2176 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2177 member_vnf_index, vdu_id, vdu_index),
2178 db_nsr=db_nsr,
2179 db_vnfr=db_vnfr,
2180 nslcmop_id=nslcmop_id,
2181 nsr_id=nsr_id,
2182 nsi_id=nsi_id,
2183 vnfd_id=vnfd_id,
2184 vdu_id=vdu_id,
2185 kdu_name=kdu_name,
2186 member_vnf_index=member_vnf_index,
2187 vdu_index=vdu_index,
2188 vdu_name=vdu_name,
2189 deploy_params=deploy_params_vdu,
2190 descriptor_config=descriptor_config,
2191 base_folder=base_folder,
2192 task_instantiation_info=tasks_dict_info,
2193 stage=stage
2194 )
2195 for kdud in get_iterable(vnfd, 'kdu'):
2196 kdu_name = kdud["name"]
2197 descriptor_config = kdud.get('kdu-configuration')
2198 if descriptor_config:
2199 vdu_id = None
2200 vdu_index = 0
2201 vdu_name = None
2202 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
2203 deploy_params_kdu = {"OSM": self._get_osm_params(db_vnfr)}
2204 if kdur.get("additionalParams"):
2205 deploy_params_kdu = self._format_additional_params(kdur["additionalParams"])
2206
2207 self._deploy_n2vc(
2208 logging_text=logging_text,
2209 db_nsr=db_nsr,
2210 db_vnfr=db_vnfr,
2211 nslcmop_id=nslcmop_id,
2212 nsr_id=nsr_id,
2213 nsi_id=nsi_id,
2214 vnfd_id=vnfd_id,
2215 vdu_id=vdu_id,
2216 kdu_name=kdu_name,
2217 member_vnf_index=member_vnf_index,
2218 vdu_index=vdu_index,
2219 vdu_name=vdu_name,
2220 deploy_params=deploy_params_kdu,
2221 descriptor_config=descriptor_config,
2222 base_folder=base_folder,
2223 task_instantiation_info=tasks_dict_info,
2224 stage=stage
2225 )
2226
2227 # Check if this NS has a charm configuration
2228 descriptor_config = nsd.get("ns-configuration")
2229 if descriptor_config and descriptor_config.get("juju"):
2230 vnfd_id = None
2231 db_vnfr = None
2232 member_vnf_index = None
2233 vdu_id = None
2234 kdu_name = None
2235 vdu_index = 0
2236 vdu_name = None
2237
2238 # Get additional parameters
2239 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2240 if db_nsr.get("additionalParamsForNs"):
2241 deploy_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"].copy()))
2242 base_folder = nsd["_admin"]["storage"]
2243 self._deploy_n2vc(
2244 logging_text=logging_text,
2245 db_nsr=db_nsr,
2246 db_vnfr=db_vnfr,
2247 nslcmop_id=nslcmop_id,
2248 nsr_id=nsr_id,
2249 nsi_id=nsi_id,
2250 vnfd_id=vnfd_id,
2251 vdu_id=vdu_id,
2252 kdu_name=kdu_name,
2253 member_vnf_index=member_vnf_index,
2254 vdu_index=vdu_index,
2255 vdu_name=vdu_name,
2256 deploy_params=deploy_params,
2257 descriptor_config=descriptor_config,
2258 base_folder=base_folder,
2259 task_instantiation_info=tasks_dict_info,
2260 stage=stage
2261 )
2262
2263 # rest of staff will be done at finally
2264
2265 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2266 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
2267 exc = e
2268 except asyncio.CancelledError:
2269 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2270 exc = "Operation was cancelled"
2271 except Exception as e:
2272 exc = traceback.format_exc()
2273 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2274 finally:
2275 if exc:
2276 error_list.append(str(exc))
2277 try:
2278 # wait for pending tasks
2279 if tasks_dict_info:
2280 stage[1] = "Waiting for instantiate pending tasks."
2281 self.logger.debug(logging_text + stage[1])
2282 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
2283 stage, nslcmop_id, nsr_id=nsr_id)
2284 stage[1] = stage[2] = ""
2285 except asyncio.CancelledError:
2286 error_list.append("Cancelled")
2287 # TODO cancel all tasks
2288 except Exception as exc:
2289 error_list.append(str(exc))
2290
2291 # update operation-status
2292 db_nsr_update["operational-status"] = "running"
2293 # let's begin with VCA 'configured' status (later we can change it)
2294 db_nsr_update["config-status"] = "configured"
2295 for task, task_name in tasks_dict_info.items():
2296 if not task.done() or task.cancelled() or task.exception():
2297 if task_name.startswith(self.task_name_deploy_vca):
2298 # A N2VC task is pending
2299 db_nsr_update["config-status"] = "failed"
2300 else:
2301 # RO or KDU task is pending
2302 db_nsr_update["operational-status"] = "failed"
2303
2304 # update status at database
2305 if error_list:
2306 error_detail = ". ".join(error_list)
2307 self.logger.error(logging_text + error_detail)
2308 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
2309 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
2310
2311 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2312 db_nslcmop_update["detailed-status"] = error_detail
2313 nslcmop_operation_state = "FAILED"
2314 ns_state = "BROKEN"
2315 else:
2316 error_detail = None
2317 error_description_nsr = error_description_nslcmop = None
2318 ns_state = "READY"
2319 db_nsr_update["detailed-status"] = "Done"
2320 db_nslcmop_update["detailed-status"] = "Done"
2321 nslcmop_operation_state = "COMPLETED"
2322
2323 if db_nsr:
2324 self._write_ns_status(
2325 nsr_id=nsr_id,
2326 ns_state=ns_state,
2327 current_operation="IDLE",
2328 current_operation_id=None,
2329 error_description=error_description_nsr,
2330 error_detail=error_detail,
2331 other_update=db_nsr_update
2332 )
2333 self._write_op_status(
2334 op_id=nslcmop_id,
2335 stage="",
2336 error_message=error_description_nslcmop,
2337 operation_state=nslcmop_operation_state,
2338 other_update=db_nslcmop_update,
2339 )
2340
2341 if nslcmop_operation_state:
2342 try:
2343 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2344 "operationState": nslcmop_operation_state},
2345 loop=self.loop)
2346 except Exception as e:
2347 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2348
2349 self.logger.debug(logging_text + "Exit")
2350 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2351
2352 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2353 timeout: int = 3600, vca_type: str = None) -> bool:
2354
2355 # steps:
2356 # 1. find all relations for this VCA
2357 # 2. wait for other peers related
2358 # 3. add relations
2359
2360 try:
2361 vca_type = vca_type or "lxc_proxy_charm"
2362
2363 # STEP 1: find all relations for this VCA
2364
2365 # read nsr record
2366 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2367 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2368
2369 # this VCA data
2370 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2371
2372 # read all ns-configuration relations
2373 ns_relations = list()
2374 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2375 if db_ns_relations:
2376 for r in db_ns_relations:
2377 # check if this VCA is in the relation
2378 if my_vca.get('member-vnf-index') in\
2379 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2380 ns_relations.append(r)
2381
2382 # read all vnf-configuration relations
2383 vnf_relations = list()
2384 db_vnfd_list = db_nsr.get('vnfd-id')
2385 if db_vnfd_list:
2386 for vnfd in db_vnfd_list:
2387 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2388 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
2389 if db_vnf_relations:
2390 for r in db_vnf_relations:
2391 # check if this VCA is in the relation
2392 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2393 vnf_relations.append(r)
2394
2395 # if no relations, terminate
2396 if not ns_relations and not vnf_relations:
2397 self.logger.debug(logging_text + ' No relations')
2398 return True
2399
2400 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2401
2402 # add all relations
2403 start = time()
2404 while True:
2405 # check timeout
2406 now = time()
2407 if now - start >= timeout:
2408 self.logger.error(logging_text + ' : timeout adding relations')
2409 return False
2410
2411 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2412 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2413
2414 # for each defined NS relation, find the VCA's related
2415 for r in ns_relations.copy():
2416 from_vca_ee_id = None
2417 to_vca_ee_id = None
2418 from_vca_endpoint = None
2419 to_vca_endpoint = None
2420 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2421 for vca in vca_list:
2422 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2423 and vca.get('config_sw_installed'):
2424 from_vca_ee_id = vca.get('ee_id')
2425 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2426 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2427 and vca.get('config_sw_installed'):
2428 to_vca_ee_id = vca.get('ee_id')
2429 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2430 if from_vca_ee_id and to_vca_ee_id:
2431 # add relation
2432 await self.vca_map[vca_type].add_relation(
2433 ee_id_1=from_vca_ee_id,
2434 ee_id_2=to_vca_ee_id,
2435 endpoint_1=from_vca_endpoint,
2436 endpoint_2=to_vca_endpoint)
2437 # remove entry from relations list
2438 ns_relations.remove(r)
2439 else:
2440 # check failed peers
2441 try:
2442 vca_status_list = db_nsr.get('configurationStatus')
2443 if vca_status_list:
2444 for i in range(len(vca_list)):
2445 vca = vca_list[i]
2446 vca_status = vca_status_list[i]
2447 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2448 if vca_status.get('status') == 'BROKEN':
2449 # peer broken: remove relation from list
2450 ns_relations.remove(r)
2451 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2452 if vca_status.get('status') == 'BROKEN':
2453 # peer broken: remove relation from list
2454 ns_relations.remove(r)
2455 except Exception:
2456 # ignore
2457 pass
2458
2459 # for each defined VNF relation, find the VCA's related
2460 for r in vnf_relations.copy():
2461 from_vca_ee_id = None
2462 to_vca_ee_id = None
2463 from_vca_endpoint = None
2464 to_vca_endpoint = None
2465 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2466 for vca in vca_list:
2467 key_to_check = "vdu_id"
2468 if vca.get("vdu_id") is None:
2469 key_to_check = "vnfd_id"
2470 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2471 from_vca_ee_id = vca.get('ee_id')
2472 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2473 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2474 to_vca_ee_id = vca.get('ee_id')
2475 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2476 if from_vca_ee_id and to_vca_ee_id:
2477 # add relation
2478 await self.vca_map[vca_type].add_relation(
2479 ee_id_1=from_vca_ee_id,
2480 ee_id_2=to_vca_ee_id,
2481 endpoint_1=from_vca_endpoint,
2482 endpoint_2=to_vca_endpoint)
2483 # remove entry from relations list
2484 vnf_relations.remove(r)
2485 else:
2486 # check failed peers
2487 try:
2488 vca_status_list = db_nsr.get('configurationStatus')
2489 if vca_status_list:
2490 for i in range(len(vca_list)):
2491 vca = vca_list[i]
2492 vca_status = vca_status_list[i]
2493 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2494 if vca_status.get('status') == 'BROKEN':
2495 # peer broken: remove relation from list
2496 vnf_relations.remove(r)
2497 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2498 if vca_status.get('status') == 'BROKEN':
2499 # peer broken: remove relation from list
2500 vnf_relations.remove(r)
2501 except Exception:
2502 # ignore
2503 pass
2504
2505 # wait for next try
2506 await asyncio.sleep(5.0)
2507
2508 if not ns_relations and not vnf_relations:
2509 self.logger.debug('Relations added')
2510 break
2511
2512 return True
2513
2514 except Exception as e:
2515 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2516 return False
2517
2518 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2519 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2520
2521 try:
2522 k8sclustertype = k8s_instance_info["k8scluster-type"]
2523 # Instantiate kdu
2524 db_dict_install = {"collection": "nsrs",
2525 "filter": {"_id": nsr_id},
2526 "path": nsr_db_path}
2527
2528 kdu_instance = await self.k8scluster_map[k8sclustertype].install(
2529 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2530 kdu_model=k8s_instance_info["kdu-model"],
2531 atomic=True,
2532 params=k8params,
2533 db_dict=db_dict_install,
2534 timeout=timeout,
2535 kdu_name=k8s_instance_info["kdu-name"],
2536 namespace=k8s_instance_info["namespace"])
2537 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2538
2539 # Obtain services to obtain management service ip
2540 services = await self.k8scluster_map[k8sclustertype].get_services(
2541 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2542 kdu_instance=kdu_instance,
2543 namespace=k8s_instance_info["namespace"])
2544
2545 # Obtain management service info (if exists)
2546 vnfr_update_dict = {}
2547 if services:
2548 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2549 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2550 for mgmt_service in mgmt_services:
2551 for service in services:
2552 if service["name"].startswith(mgmt_service["name"]):
2553 # Mgmt service found, Obtain service ip
2554 ip = service.get("external_ip", service.get("cluster_ip"))
2555 if isinstance(ip, list) and len(ip) == 1:
2556 ip = ip[0]
2557
2558 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2559
2560 # Check if must update also mgmt ip at the vnf
2561 service_external_cp = mgmt_service.get("external-connection-point-ref")
2562 if service_external_cp:
2563 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2564 vnfr_update_dict["ip-address"] = ip
2565
2566 break
2567 else:
2568 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2569
2570 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2571 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2572
2573 kdu_config = kdud.get("kdu-configuration")
2574 if kdu_config and kdu_config.get("initial-config-primitive") and kdu_config.get("juju") is None:
2575 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2576 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2577
2578 for initial_config_primitive in initial_config_primitive_list:
2579 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2580
2581 await asyncio.wait_for(
2582 self.k8scluster_map[k8sclustertype].exec_primitive(
2583 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2584 kdu_instance=kdu_instance,
2585 primitive_name=initial_config_primitive["name"],
2586 params=primitive_params_, db_dict=db_dict_install),
2587 timeout=timeout)
2588
2589 except Exception as e:
2590 # Prepare update db with error and raise exception
2591 try:
2592 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2593 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2594 except Exception:
2595 # ignore to keep original exception
2596 pass
2597 # reraise original error
2598 raise
2599
2600 return kdu_instance
2601
2602 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2603 # Launch kdus if present in the descriptor
2604
2605 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2606
2607 async def _get_cluster_id(cluster_id, cluster_type):
2608 nonlocal k8scluster_id_2_uuic
2609 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2610 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2611
2612 # check if K8scluster is creating and wait look if previous tasks in process
2613 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2614 if task_dependency:
2615 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2616 self.logger.debug(logging_text + text)
2617 await asyncio.wait(task_dependency, timeout=3600)
2618
2619 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2620 if not db_k8scluster:
2621 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2622
2623 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2624 if not k8s_id:
2625 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2626 cluster_type))
2627 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2628 return k8s_id
2629
2630 logging_text += "Deploy kdus: "
2631 step = ""
2632 try:
2633 db_nsr_update = {"_admin.deployed.K8s": []}
2634 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2635
2636 index = 0
2637 updated_cluster_list = []
2638
2639 for vnfr_data in db_vnfrs.values():
2640 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2641 # Step 0: Prepare and set parameters
2642 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2643 vnfd_id = vnfr_data.get('vnfd-id')
2644 kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
2645 namespace = kdur.get("k8s-namespace")
2646 if kdur.get("helm-chart"):
2647 kdumodel = kdur["helm-chart"]
2648 k8sclustertype = "helm-chart"
2649 elif kdur.get("juju-bundle"):
2650 kdumodel = kdur["juju-bundle"]
2651 k8sclustertype = "juju-bundle"
2652 else:
2653 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2654 "juju-bundle. Maybe an old NBI version is running".
2655 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2656 # check if kdumodel is a file and exists
2657 try:
2658 storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
2659 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2660 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2661 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2662 kdumodel)
2663 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2664 kdumodel = self.fs.path + filename
2665 except (asyncio.TimeoutError, asyncio.CancelledError):
2666 raise
2667 except Exception: # it is not a file
2668 pass
2669
2670 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2671 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2672 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2673
2674 # Synchronize repos
2675 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2676 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2677 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2678 if del_repo_list or added_repo_dict:
2679 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2680 updated = {'_admin.helm_charts_added.' +
2681 item: name for item, name in added_repo_dict.items()}
2682 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2683 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2684 added_repo_dict))
2685 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2686 updated_cluster_list.append(cluster_uuid)
2687
2688 # Instantiate kdu
2689 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2690 kdur["kdu-name"], k8s_cluster_id)
2691 k8s_instance_info = {"kdu-instance": None,
2692 "k8scluster-uuid": cluster_uuid,
2693 "k8scluster-type": k8sclustertype,
2694 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2695 "kdu-name": kdur["kdu-name"],
2696 "kdu-model": kdumodel,
2697 "namespace": namespace}
2698 db_path = "_admin.deployed.K8s.{}".format(index)
2699 db_nsr_update[db_path] = k8s_instance_info
2700 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2701
2702 task = asyncio.ensure_future(
2703 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, db_vnfds[vnfd_id],
2704 k8s_instance_info, k8params=desc_params, timeout=600))
2705 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2706 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2707
2708 index += 1
2709
2710 except (LcmException, asyncio.CancelledError):
2711 raise
2712 except Exception as e:
2713 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2714 if isinstance(e, (N2VCException, DbException)):
2715 self.logger.error(logging_text + msg)
2716 else:
2717 self.logger.critical(logging_text + msg, exc_info=True)
2718 raise LcmException(msg)
2719 finally:
2720 if db_nsr_update:
2721 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2722
2723 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2724 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2725 base_folder, task_instantiation_info, stage):
2726 # launch instantiate_N2VC in a asyncio task and register task object
2727 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2728 # if not found, create one entry and update database
2729 # fill db_nsr._admin.deployed.VCA.<index>
2730
2731 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2732 if descriptor_config.get("juju"): # There is one execution envioronment of type juju
2733 ee_list = [descriptor_config]
2734 elif descriptor_config.get("execution-environment-list"):
2735 ee_list = descriptor_config.get("execution-environment-list")
2736 else: # other types as script are not supported
2737 ee_list = []
2738
2739 for ee_item in ee_list:
2740 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2741 ee_item.get("helm-chart")))
2742 ee_descriptor_id = ee_item.get("id")
2743 if ee_item.get("juju"):
2744 vca_name = ee_item['juju'].get('charm')
2745 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2746 if ee_item['juju'].get('cloud') == "k8s":
2747 vca_type = "k8s_proxy_charm"
2748 elif ee_item['juju'].get('proxy') is False:
2749 vca_type = "native_charm"
2750 elif ee_item.get("helm-chart"):
2751 vca_name = ee_item['helm-chart']
2752 vca_type = "helm"
2753 else:
2754 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2755 continue
2756
2757 vca_index = -1
2758 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2759 if not vca_deployed:
2760 continue
2761 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2762 vca_deployed.get("vdu_id") == vdu_id and \
2763 vca_deployed.get("kdu_name") == kdu_name and \
2764 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2765 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2766 break
2767 else:
2768 # not found, create one.
2769 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2770 if vdu_id:
2771 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2772 elif kdu_name:
2773 target += "/kdu/{}".format(kdu_name)
2774 vca_deployed = {
2775 "target_element": target,
2776 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2777 "member-vnf-index": member_vnf_index,
2778 "vdu_id": vdu_id,
2779 "kdu_name": kdu_name,
2780 "vdu_count_index": vdu_index,
2781 "operational-status": "init", # TODO revise
2782 "detailed-status": "", # TODO revise
2783 "step": "initial-deploy", # TODO revise
2784 "vnfd_id": vnfd_id,
2785 "vdu_name": vdu_name,
2786 "type": vca_type,
2787 "ee_descriptor_id": ee_descriptor_id
2788 }
2789 vca_index += 1
2790
2791 # create VCA and configurationStatus in db
2792 db_dict = {
2793 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2794 "configurationStatus.{}".format(vca_index): dict()
2795 }
2796 self.update_db_2("nsrs", nsr_id, db_dict)
2797
2798 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2799
2800 # Launch task
2801 task_n2vc = asyncio.ensure_future(
2802 self.instantiate_N2VC(
2803 logging_text=logging_text,
2804 vca_index=vca_index,
2805 nsi_id=nsi_id,
2806 db_nsr=db_nsr,
2807 db_vnfr=db_vnfr,
2808 vdu_id=vdu_id,
2809 kdu_name=kdu_name,
2810 vdu_index=vdu_index,
2811 deploy_params=deploy_params,
2812 config_descriptor=descriptor_config,
2813 base_folder=base_folder,
2814 nslcmop_id=nslcmop_id,
2815 stage=stage,
2816 vca_type=vca_type,
2817 vca_name=vca_name,
2818 ee_config_descriptor=ee_item
2819 )
2820 )
2821 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2822 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2823 member_vnf_index or "", vdu_id or "")
2824
2825 @staticmethod
2826 def _get_terminate_config_primitive(primitive_list, vca_deployed):
2827 """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
2828 it get only those primitives for this execution envirom"""
2829
2830 primitive_list = primitive_list or []
2831 # filter primitives by ee_descriptor_id
2832 ee_descriptor_id = vca_deployed.get("ee_descriptor_id")
2833 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
2834
2835 if primitive_list:
2836 primitive_list.sort(key=lambda val: int(val['seq']))
2837
2838 return primitive_list
2839
2840 @staticmethod
2841 def _create_nslcmop(nsr_id, operation, params):
2842 """
2843 Creates a ns-lcm-opp content to be stored at database.
2844 :param nsr_id: internal id of the instance
2845 :param operation: instantiate, terminate, scale, action, ...
2846 :param params: user parameters for the operation
2847 :return: dictionary following SOL005 format
2848 """
2849 # Raise exception if invalid arguments
2850 if not (nsr_id and operation and params):
2851 raise LcmException(
2852 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2853 now = time()
2854 _id = str(uuid4())
2855 nslcmop = {
2856 "id": _id,
2857 "_id": _id,
2858 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2859 "operationState": "PROCESSING",
2860 "statusEnteredTime": now,
2861 "nsInstanceId": nsr_id,
2862 "lcmOperationType": operation,
2863 "startTime": now,
2864 "isAutomaticInvocation": False,
2865 "operationParams": params,
2866 "isCancelPending": False,
2867 "links": {
2868 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2869 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2870 }
2871 }
2872 return nslcmop
2873
2874 def _format_additional_params(self, params):
2875 params = params or {}
2876 for key, value in params.items():
2877 if str(value).startswith("!!yaml "):
2878 params[key] = yaml.safe_load(value[7:])
2879 return params
2880
2881 def _get_terminate_primitive_params(self, seq, vnf_index):
2882 primitive = seq.get('name')
2883 primitive_params = {}
2884 params = {
2885 "member_vnf_index": vnf_index,
2886 "primitive": primitive,
2887 "primitive_params": primitive_params,
2888 }
2889 desc_params = {}
2890 return self._map_primitive_params(seq, params, desc_params)
2891
2892 # sub-operations
2893
2894 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2895 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2896 if op.get('operationState') == 'COMPLETED':
2897 # b. Skip sub-operation
2898 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2899 return self.SUBOPERATION_STATUS_SKIP
2900 else:
2901 # c. retry executing sub-operation
2902 # The sub-operation exists, and operationState != 'COMPLETED'
2903 # Update operationState = 'PROCESSING' to indicate a retry.
2904 operationState = 'PROCESSING'
2905 detailed_status = 'In progress'
2906 self._update_suboperation_status(
2907 db_nslcmop, op_index, operationState, detailed_status)
2908 # Return the sub-operation index
2909 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2910 # with arguments extracted from the sub-operation
2911 return op_index
2912
2913 # Find a sub-operation where all keys in a matching dictionary must match
2914 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2915 def _find_suboperation(self, db_nslcmop, match):
2916 if db_nslcmop and match:
2917 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2918 for i, op in enumerate(op_list):
2919 if all(op.get(k) == match[k] for k in match):
2920 return i
2921 return self.SUBOPERATION_STATUS_NOT_FOUND
2922
2923 # Update status for a sub-operation given its index
2924 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2925 # Update DB for HA tasks
2926 q_filter = {'_id': db_nslcmop['_id']}
2927 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2928 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2929 self.db.set_one("nslcmops",
2930 q_filter=q_filter,
2931 update_dict=update_dict,
2932 fail_on_empty=False)
2933
2934 # Add sub-operation, return the index of the added sub-operation
2935 # Optionally, set operationState, detailed-status, and operationType
2936 # Status and type are currently set for 'scale' sub-operations:
2937 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2938 # 'detailed-status' : status message
2939 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2940 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2941 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2942 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2943 RO_nsr_id=None, RO_scaling_info=None):
2944 if not db_nslcmop:
2945 return self.SUBOPERATION_STATUS_NOT_FOUND
2946 # Get the "_admin.operations" list, if it exists
2947 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2948 op_list = db_nslcmop_admin.get('operations')
2949 # Create or append to the "_admin.operations" list
2950 new_op = {'member_vnf_index': vnf_index,
2951 'vdu_id': vdu_id,
2952 'vdu_count_index': vdu_count_index,
2953 'primitive': primitive,
2954 'primitive_params': mapped_primitive_params}
2955 if operationState:
2956 new_op['operationState'] = operationState
2957 if detailed_status:
2958 new_op['detailed-status'] = detailed_status
2959 if operationType:
2960 new_op['lcmOperationType'] = operationType
2961 if RO_nsr_id:
2962 new_op['RO_nsr_id'] = RO_nsr_id
2963 if RO_scaling_info:
2964 new_op['RO_scaling_info'] = RO_scaling_info
2965 if not op_list:
2966 # No existing operations, create key 'operations' with current operation as first list element
2967 db_nslcmop_admin.update({'operations': [new_op]})
2968 op_list = db_nslcmop_admin.get('operations')
2969 else:
2970 # Existing operations, append operation to list
2971 op_list.append(new_op)
2972
2973 db_nslcmop_update = {'_admin.operations': op_list}
2974 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2975 op_index = len(op_list) - 1
2976 return op_index
2977
2978 # Helper methods for scale() sub-operations
2979
2980 # pre-scale/post-scale:
2981 # Check for 3 different cases:
2982 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2983 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2984 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2985 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2986 operationType, RO_nsr_id=None, RO_scaling_info=None):
2987 # Find this sub-operation
2988 if RO_nsr_id and RO_scaling_info:
2989 operationType = 'SCALE-RO'
2990 match = {
2991 'member_vnf_index': vnf_index,
2992 'RO_nsr_id': RO_nsr_id,
2993 'RO_scaling_info': RO_scaling_info,
2994 }
2995 else:
2996 match = {
2997 'member_vnf_index': vnf_index,
2998 'primitive': vnf_config_primitive,
2999 'primitive_params': primitive_params,
3000 'lcmOperationType': operationType
3001 }
3002 op_index = self._find_suboperation(db_nslcmop, match)
3003 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3004 # a. New sub-operation
3005 # The sub-operation does not exist, add it.
3006 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3007 # The following parameters are set to None for all kind of scaling:
3008 vdu_id = None
3009 vdu_count_index = None
3010 vdu_name = None
3011 if RO_nsr_id and RO_scaling_info:
3012 vnf_config_primitive = None
3013 primitive_params = None
3014 else:
3015 RO_nsr_id = None
3016 RO_scaling_info = None
3017 # Initial status for sub-operation
3018 operationState = 'PROCESSING'
3019 detailed_status = 'In progress'
3020 # Add sub-operation for pre/post-scaling (zero or more operations)
3021 self._add_suboperation(db_nslcmop,
3022 vnf_index,
3023 vdu_id,
3024 vdu_count_index,
3025 vdu_name,
3026 vnf_config_primitive,
3027 primitive_params,
3028 operationState,
3029 detailed_status,
3030 operationType,
3031 RO_nsr_id,
3032 RO_scaling_info)
3033 return self.SUBOPERATION_STATUS_NEW
3034 else:
3035 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3036 # or op_index (operationState != 'COMPLETED')
3037 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3038
3039 # Function to return execution_environment id
3040
3041 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3042 # TODO vdu_index_count
3043 for vca in vca_deployed_list:
3044 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3045 return vca["ee_id"]
3046
3047 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
3048 vca_index, destroy_ee=True, exec_primitives=True):
3049 """
3050 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3051 :param logging_text:
3052 :param db_nslcmop:
3053 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3054 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3055 :param vca_index: index in the database _admin.deployed.VCA
3056 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3057 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3058 not executed properly
3059 :return: None or exception
3060 """
3061
3062 self.logger.debug(
3063 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3064 vca_index, vca_deployed, config_descriptor, destroy_ee
3065 )
3066 )
3067
3068 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3069
3070 # execute terminate_primitives
3071 if exec_primitives:
3072 terminate_primitives = self._get_terminate_config_primitive(
3073 config_descriptor.get("terminate-config-primitive"), vca_deployed)
3074 vdu_id = vca_deployed.get("vdu_id")
3075 vdu_count_index = vca_deployed.get("vdu_count_index")
3076 vdu_name = vca_deployed.get("vdu_name")
3077 vnf_index = vca_deployed.get("member-vnf-index")
3078 if terminate_primitives and vca_deployed.get("needed_terminate"):
3079 for seq in terminate_primitives:
3080 # For each sequence in list, get primitive and call _ns_execute_primitive()
3081 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3082 vnf_index, seq.get("name"))
3083 self.logger.debug(logging_text + step)
3084 # Create the primitive for each sequence, i.e. "primitive": "touch"
3085 primitive = seq.get('name')
3086 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
3087
3088 # Add sub-operation
3089 self._add_suboperation(db_nslcmop,
3090 vnf_index,
3091 vdu_id,
3092 vdu_count_index,
3093 vdu_name,
3094 primitive,
3095 mapped_primitive_params)
3096 # Sub-operations: Call _ns_execute_primitive() instead of action()
3097 try:
3098 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
3099 mapped_primitive_params,
3100 vca_type=vca_type)
3101 except LcmException:
3102 # this happens when VCA is not deployed. In this case it is not needed to terminate
3103 continue
3104 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
3105 if result not in result_ok:
3106 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
3107 "error {}".format(seq.get("name"), vnf_index, result_detail))
3108 # set that this VCA do not need terminated
3109 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
3110 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
3111
3112 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3113 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3114
3115 if destroy_ee:
3116 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
3117
3118 async def _delete_all_N2VC(self, db_nsr: dict):
3119 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
3120 namespace = "." + db_nsr["_id"]
3121 try:
3122 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
3123 except N2VCNotFound: # already deleted. Skip
3124 pass
3125 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
3126
3127 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
3128 """
3129 Terminates a deployment from RO
3130 :param logging_text:
3131 :param nsr_deployed: db_nsr._admin.deployed
3132 :param nsr_id:
3133 :param nslcmop_id:
3134 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3135 this method will update only the index 2, but it will write on database the concatenated content of the list
3136 :return:
3137 """
3138 db_nsr_update = {}
3139 failed_detail = []
3140 ro_nsr_id = ro_delete_action = None
3141 if nsr_deployed and nsr_deployed.get("RO"):
3142 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3143 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3144 try:
3145 if ro_nsr_id:
3146 stage[2] = "Deleting ns from VIM."
3147 db_nsr_update["detailed-status"] = " ".join(stage)
3148 self._write_op_status(nslcmop_id, stage)
3149 self.logger.debug(logging_text + stage[2])
3150 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3151 self._write_op_status(nslcmop_id, stage)
3152 desc = await self.RO.delete("ns", ro_nsr_id)
3153 ro_delete_action = desc["action_id"]
3154 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
3155 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3156 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3157 if ro_delete_action:
3158 # wait until NS is deleted from VIM
3159 stage[2] = "Waiting ns deleted from VIM."
3160 detailed_status_old = None
3161 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
3162 ro_delete_action))
3163 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3164 self._write_op_status(nslcmop_id, stage)
3165
3166 delete_timeout = 20 * 60 # 20 minutes
3167 while delete_timeout > 0:
3168 desc = await self.RO.show(
3169 "ns",
3170 item_id_name=ro_nsr_id,
3171 extra_item="action",
3172 extra_item_id=ro_delete_action)
3173
3174 # deploymentStatus
3175 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3176
3177 ns_status, ns_status_info = self.RO.check_action_status(desc)
3178 if ns_status == "ERROR":
3179 raise ROclient.ROClientException(ns_status_info)
3180 elif ns_status == "BUILD":
3181 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3182 elif ns_status == "ACTIVE":
3183 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3184 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3185 break
3186 else:
3187 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3188 if stage[2] != detailed_status_old:
3189 detailed_status_old = stage[2]
3190 db_nsr_update["detailed-status"] = " ".join(stage)
3191 self._write_op_status(nslcmop_id, stage)
3192 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3193 await asyncio.sleep(5, loop=self.loop)
3194 delete_timeout -= 5
3195 else: # delete_timeout <= 0:
3196 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
3197
3198 except Exception as e:
3199 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3200 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3201 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3202 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3203 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3204 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
3205 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3206 failed_detail.append("delete conflict: {}".format(e))
3207 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
3208 else:
3209 failed_detail.append("delete error: {}".format(e))
3210 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
3211
3212 # Delete nsd
3213 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3214 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3215 try:
3216 stage[2] = "Deleting nsd from RO."
3217 db_nsr_update["detailed-status"] = " ".join(stage)
3218 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3219 self._write_op_status(nslcmop_id, stage)
3220 await self.RO.delete("nsd", ro_nsd_id)
3221 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
3222 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3223 except Exception as e:
3224 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3225 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3226 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
3227 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3228 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
3229 self.logger.debug(logging_text + failed_detail[-1])
3230 else:
3231 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
3232 self.logger.error(logging_text + failed_detail[-1])
3233
3234 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3235 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3236 if not vnf_deployed or not vnf_deployed["id"]:
3237 continue
3238 try:
3239 ro_vnfd_id = vnf_deployed["id"]
3240 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3241 vnf_deployed["member-vnf-index"], ro_vnfd_id)
3242 db_nsr_update["detailed-status"] = " ".join(stage)
3243 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3244 self._write_op_status(nslcmop_id, stage)
3245 await self.RO.delete("vnfd", ro_vnfd_id)
3246 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
3247 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3248 except Exception as e:
3249 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3250 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3251 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
3252 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3253 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
3254 self.logger.debug(logging_text + failed_detail[-1])
3255 else:
3256 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
3257 self.logger.error(logging_text + failed_detail[-1])
3258
3259 if failed_detail:
3260 stage[2] = "Error deleting from VIM"
3261 else:
3262 stage[2] = "Deleted from VIM"
3263 db_nsr_update["detailed-status"] = " ".join(stage)
3264 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3265 self._write_op_status(nslcmop_id, stage)
3266
3267 if failed_detail:
3268 raise LcmException("; ".join(failed_detail))
3269
3270 async def terminate(self, nsr_id, nslcmop_id):
3271 # Try to lock HA task here
3272 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3273 if not task_is_locked_by_me:
3274 return
3275
3276 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3277 self.logger.debug(logging_text + "Enter")
3278 timeout_ns_terminate = self.timeout_ns_terminate
3279 db_nsr = None
3280 db_nslcmop = None
3281 operation_params = None
3282 exc = None
3283 error_list = [] # annotates all failed error messages
3284 db_nslcmop_update = {}
3285 autoremove = False # autoremove after terminated
3286 tasks_dict_info = {}
3287 db_nsr_update = {}
3288 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3289 # ^ contains [stage, step, VIM-status]
3290 try:
3291 # wait for any previous tasks in process
3292 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3293
3294 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3295 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3296 operation_params = db_nslcmop.get("operationParams") or {}
3297 if operation_params.get("timeout_ns_terminate"):
3298 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3299 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3300 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3301
3302 db_nsr_update["operational-status"] = "terminating"
3303 db_nsr_update["config-status"] = "terminating"
3304 self._write_ns_status(
3305 nsr_id=nsr_id,
3306 ns_state="TERMINATING",
3307 current_operation="TERMINATING",
3308 current_operation_id=nslcmop_id,
3309 other_update=db_nsr_update
3310 )
3311 self._write_op_status(
3312 op_id=nslcmop_id,
3313 queuePosition=0,
3314 stage=stage
3315 )
3316 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3317 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3318 return
3319
3320 stage[1] = "Getting vnf descriptors from db."
3321 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3322 db_vnfds_from_id = {}
3323 db_vnfds_from_member_index = {}
3324 # Loop over VNFRs
3325 for vnfr in db_vnfrs_list:
3326 vnfd_id = vnfr["vnfd-id"]
3327 if vnfd_id not in db_vnfds_from_id:
3328 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3329 db_vnfds_from_id[vnfd_id] = vnfd
3330 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3331
3332 # Destroy individual execution environments when there are terminating primitives.
3333 # Rest of EE will be deleted at once
3334 # TODO - check before calling _destroy_N2VC
3335 # if not operation_params.get("skip_terminate_primitives"):#
3336 # or not vca.get("needed_terminate"):
3337 stage[0] = "Stage 2/3 execute terminating primitives."
3338 self.logger.debug(logging_text + stage[0])
3339 stage[1] = "Looking execution environment that needs terminate."
3340 self.logger.debug(logging_text + stage[1])
3341 # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3342 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3343 config_descriptor = None
3344 if not vca or not vca.get("ee_id"):
3345 continue
3346 if not vca.get("member-vnf-index"):
3347 # ns
3348 config_descriptor = db_nsr.get("ns-configuration")
3349 elif vca.get("vdu_id"):
3350 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3351 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
3352 if vdud:
3353 config_descriptor = vdud.get("vdu-configuration")
3354 elif vca.get("kdu_name"):
3355 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3356 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
3357 if kdud:
3358 config_descriptor = kdud.get("kdu-configuration")
3359 else:
3360 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
3361 vca_type = vca.get("type")
3362 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3363 vca.get("needed_terminate"))
3364 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3365 # pending native charms
3366 destroy_ee = True if vca_type in ("helm", "native_charm") else False
3367 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3368 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3369 task = asyncio.ensure_future(
3370 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3371 destroy_ee, exec_terminate_primitives))
3372 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3373
3374 # wait for pending tasks of terminate primitives
3375 if tasks_dict_info:
3376 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3377 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3378 min(self.timeout_charm_delete, timeout_ns_terminate),
3379 stage, nslcmop_id)
3380 tasks_dict_info.clear()
3381 if error_list:
3382 return # raise LcmException("; ".join(error_list))
3383
3384 # remove All execution environments at once
3385 stage[0] = "Stage 3/3 delete all."
3386
3387 if nsr_deployed.get("VCA"):
3388 stage[1] = "Deleting all execution environments."
3389 self.logger.debug(logging_text + stage[1])
3390 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3391 timeout=self.timeout_charm_delete))
3392 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3393 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3394
3395 # Delete from k8scluster
3396 stage[1] = "Deleting KDUs."
3397 self.logger.debug(logging_text + stage[1])
3398 # print(nsr_deployed)
3399 for kdu in get_iterable(nsr_deployed, "K8s"):
3400 if not kdu or not kdu.get("kdu-instance"):
3401 continue
3402 kdu_instance = kdu.get("kdu-instance")
3403 if kdu.get("k8scluster-type") in self.k8scluster_map:
3404 task_delete_kdu_instance = asyncio.ensure_future(
3405 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3406 cluster_uuid=kdu.get("k8scluster-uuid"),
3407 kdu_instance=kdu_instance))
3408 else:
3409 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3410 format(kdu.get("k8scluster-type")))
3411 continue
3412 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3413
3414 # remove from RO
3415 stage[1] = "Deleting ns from VIM."
3416 if self.ng_ro:
3417 task_delete_ro = asyncio.ensure_future(
3418 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3419 else:
3420 task_delete_ro = asyncio.ensure_future(
3421 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3422 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3423
3424 # rest of staff will be done at finally
3425
3426 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3427 self.logger.error(logging_text + "Exit Exception {}".format(e))
3428 exc = e
3429 except asyncio.CancelledError:
3430 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3431 exc = "Operation was cancelled"
3432 except Exception as e:
3433 exc = traceback.format_exc()
3434 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3435 finally:
3436 if exc:
3437 error_list.append(str(exc))
3438 try:
3439 # wait for pending tasks
3440 if tasks_dict_info:
3441 stage[1] = "Waiting for terminate pending tasks."
3442 self.logger.debug(logging_text + stage[1])
3443 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3444 stage, nslcmop_id)
3445 stage[1] = stage[2] = ""
3446 except asyncio.CancelledError:
3447 error_list.append("Cancelled")
3448 # TODO cancell all tasks
3449 except Exception as exc:
3450 error_list.append(str(exc))
3451 # update status at database
3452 if error_list:
3453 error_detail = "; ".join(error_list)
3454 # self.logger.error(logging_text + error_detail)
3455 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3456 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3457
3458 db_nsr_update["operational-status"] = "failed"
3459 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3460 db_nslcmop_update["detailed-status"] = error_detail
3461 nslcmop_operation_state = "FAILED"
3462 ns_state = "BROKEN"
3463 else:
3464 error_detail = None
3465 error_description_nsr = error_description_nslcmop = None
3466 ns_state = "NOT_INSTANTIATED"
3467 db_nsr_update["operational-status"] = "terminated"
3468 db_nsr_update["detailed-status"] = "Done"
3469 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3470 db_nslcmop_update["detailed-status"] = "Done"
3471 nslcmop_operation_state = "COMPLETED"
3472
3473 if db_nsr:
3474 self._write_ns_status(
3475 nsr_id=nsr_id,
3476 ns_state=ns_state,
3477 current_operation="IDLE",
3478 current_operation_id=None,
3479 error_description=error_description_nsr,
3480 error_detail=error_detail,
3481 other_update=db_nsr_update
3482 )
3483 self._write_op_status(
3484 op_id=nslcmop_id,
3485 stage="",
3486 error_message=error_description_nslcmop,
3487 operation_state=nslcmop_operation_state,
3488 other_update=db_nslcmop_update,
3489 )
3490 if ns_state == "NOT_INSTANTIATED":
3491 try:
3492 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3493 except DbException as e:
3494 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3495 format(nsr_id, e))
3496 if operation_params:
3497 autoremove = operation_params.get("autoremove", False)
3498 if nslcmop_operation_state:
3499 try:
3500 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3501 "operationState": nslcmop_operation_state,
3502 "autoremove": autoremove},
3503 loop=self.loop)
3504 except Exception as e:
3505 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3506
3507 self.logger.debug(logging_text + "Exit")
3508 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3509
3510 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3511 time_start = time()
3512 error_detail_list = []
3513 error_list = []
3514 pending_tasks = list(created_tasks_info.keys())
3515 num_tasks = len(pending_tasks)
3516 num_done = 0
3517 stage[1] = "{}/{}.".format(num_done, num_tasks)
3518 self._write_op_status(nslcmop_id, stage)
3519 while pending_tasks:
3520 new_error = None
3521 _timeout = timeout + time_start - time()
3522 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3523 return_when=asyncio.FIRST_COMPLETED)
3524 num_done += len(done)
3525 if not done: # Timeout
3526 for task in pending_tasks:
3527 new_error = created_tasks_info[task] + ": Timeout"
3528 error_detail_list.append(new_error)
3529 error_list.append(new_error)
3530 break
3531 for task in done:
3532 if task.cancelled():
3533 exc = "Cancelled"
3534 else:
3535 exc = task.exception()
3536 if exc:
3537 if isinstance(exc, asyncio.TimeoutError):
3538 exc = "Timeout"
3539 new_error = created_tasks_info[task] + ": {}".format(exc)
3540 error_list.append(created_tasks_info[task])
3541 error_detail_list.append(new_error)
3542 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3543 K8sException)):
3544 self.logger.error(logging_text + new_error)
3545 else:
3546 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3547 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
3548 else:
3549 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3550 stage[1] = "{}/{}.".format(num_done, num_tasks)
3551 if new_error:
3552 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3553 if nsr_id: # update also nsr
3554 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3555 "errorDetail": ". ".join(error_detail_list)})
3556 self._write_op_status(nslcmop_id, stage)
3557 return error_detail_list
3558
3559 @staticmethod
3560 def _map_primitive_params(primitive_desc, params, instantiation_params):
3561 """
3562 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3563 The default-value is used. If it is between < > it look for a value at instantiation_params
3564 :param primitive_desc: portion of VNFD/NSD that describes primitive
3565 :param params: Params provided by user
3566 :param instantiation_params: Instantiation params provided by user
3567 :return: a dictionary with the calculated params
3568 """
3569 calculated_params = {}
3570 for parameter in primitive_desc.get("parameter", ()):
3571 param_name = parameter["name"]
3572 if param_name in params:
3573 calculated_params[param_name] = params[param_name]
3574 elif "default-value" in parameter or "value" in parameter:
3575 if "value" in parameter:
3576 calculated_params[param_name] = parameter["value"]
3577 else:
3578 calculated_params[param_name] = parameter["default-value"]
3579 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3580 and calculated_params[param_name].endswith(">"):
3581 if calculated_params[param_name][1:-1] in instantiation_params:
3582 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3583 else:
3584 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3585 format(calculated_params[param_name], primitive_desc["name"]))
3586 else:
3587 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3588 format(param_name, primitive_desc["name"]))
3589
3590 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3591 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
3592 width=256)
3593 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3594 calculated_params[param_name] = calculated_params[param_name][7:]
3595 if parameter.get("data-type") == "INTEGER":
3596 try:
3597 calculated_params[param_name] = int(calculated_params[param_name])
3598 except ValueError: # error converting string to int
3599 raise LcmException(
3600 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3601 elif parameter.get("data-type") == "BOOLEAN":
3602 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3603
3604 # add always ns_config_info if primitive name is config
3605 if primitive_desc["name"] == "config":
3606 if "ns_config_info" in instantiation_params:
3607 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3608 return calculated_params
3609
3610 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3611 ee_descriptor_id=None):
3612 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3613 for vca in deployed_vca:
3614 if not vca:
3615 continue
3616 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3617 continue
3618 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3619 continue
3620 if kdu_name and kdu_name != vca["kdu_name"]:
3621 continue
3622 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3623 continue
3624 break
3625 else:
3626 # vca_deployed not found
3627 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3628 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3629 ee_descriptor_id))
3630
3631 # get ee_id
3632 ee_id = vca.get("ee_id")
3633 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3634 if not ee_id:
3635 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3636 "execution environment"
3637 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3638 return ee_id, vca_type
3639
3640 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3641 retries_interval=30, timeout=None,
3642 vca_type=None, db_dict=None) -> (str, str):
3643 try:
3644 if primitive == "config":
3645 primitive_params = {"params": primitive_params}
3646
3647 vca_type = vca_type or "lxc_proxy_charm"
3648
3649 while retries >= 0:
3650 try:
3651 output = await asyncio.wait_for(
3652 self.vca_map[vca_type].exec_primitive(
3653 ee_id=ee_id,
3654 primitive_name=primitive,
3655 params_dict=primitive_params,
3656 progress_timeout=self.timeout_progress_primitive,
3657 total_timeout=self.timeout_primitive,
3658 db_dict=db_dict),
3659 timeout=timeout or self.timeout_primitive)
3660 # execution was OK
3661 break
3662 except asyncio.CancelledError:
3663 raise
3664 except Exception as e: # asyncio.TimeoutError
3665 if isinstance(e, asyncio.TimeoutError):
3666 e = "Timeout"
3667 retries -= 1
3668 if retries >= 0:
3669 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3670 # wait and retry
3671 await asyncio.sleep(retries_interval, loop=self.loop)
3672 else:
3673 return 'FAILED', str(e)
3674
3675 return 'COMPLETED', output
3676
3677 except (LcmException, asyncio.CancelledError):
3678 raise
3679 except Exception as e:
3680 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3681
3682 async def vca_status_refresh(self, nsr_id, nslcmop_id):
3683 """
3684 Updating the vca_status with latest juju information in nsrs record
3685 :param: nsr_id: Id of the nsr
3686 :param: nslcmop_id: Id of the nslcmop
3687 :return: None
3688 """
3689
3690 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
3691 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3692 if db_nsr['_admin']['deployed']['K8s']:
3693 for k8s_index, k8s in enumerate(db_nsr['_admin']['deployed']['K8s']):
3694 cluster_uuid, kdu_instance = k8s["k8scluster-uuid"], k8s["kdu-instance"]
3695 await self._on_update_k8s_db(cluster_uuid, kdu_instance, filter={'_id': nsr_id})
3696 else:
3697 for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']):
3698 table, filter = "nsrs", {"_id": nsr_id}
3699 path = "_admin.deployed.VCA.{}.".format(vca_index)
3700 await self._on_update_n2vc_db(table, filter, path, {})
3701
3702 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
3703 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
3704
3705 async def action(self, nsr_id, nslcmop_id):
3706
3707 # Try to lock HA task here
3708 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3709 if not task_is_locked_by_me:
3710 return
3711
3712 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3713 self.logger.debug(logging_text + "Enter")
3714 # get all needed from database
3715 db_nsr = None
3716 db_nslcmop = None
3717 db_nsr_update = {}
3718 db_nslcmop_update = {}
3719 nslcmop_operation_state = None
3720 error_description_nslcmop = None
3721 exc = None
3722 try:
3723 # wait for any previous tasks in process
3724 step = "Waiting for previous operations to terminate"
3725 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3726
3727 self._write_ns_status(
3728 nsr_id=nsr_id,
3729 ns_state=None,
3730 current_operation="RUNNING ACTION",
3731 current_operation_id=nslcmop_id
3732 )
3733
3734 step = "Getting information from database"
3735 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3736 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3737
3738 nsr_deployed = db_nsr["_admin"].get("deployed")
3739 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3740 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3741 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3742 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3743 primitive = db_nslcmop["operationParams"]["primitive"]
3744 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3745 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3746
3747 if vnf_index:
3748 step = "Getting vnfr from database"
3749 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3750 step = "Getting vnfd from database"
3751 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3752 else:
3753 step = "Getting nsd from database"
3754 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3755
3756 # for backward compatibility
3757 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3758 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3759 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3760 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3761
3762 # look for primitive
3763 config_primitive_desc = descriptor_configuration = None
3764 if vdu_id:
3765 for vdu in get_iterable(db_vnfd, "vdu"):
3766 if vdu_id == vdu["id"]:
3767 descriptor_configuration = vdu.get("vdu-configuration")
3768 break
3769 elif kdu_name:
3770 for kdu in get_iterable(db_vnfd, "kdu"):
3771 if kdu_name == kdu["name"]:
3772 descriptor_configuration = kdu.get("kdu-configuration")
3773 break
3774 elif vnf_index:
3775 descriptor_configuration = db_vnfd.get("vnf-configuration")
3776 else:
3777 descriptor_configuration = db_nsd.get("ns-configuration")
3778
3779 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3780 for config_primitive in descriptor_configuration["config-primitive"]:
3781 if config_primitive["name"] == primitive:
3782 config_primitive_desc = config_primitive
3783 break
3784
3785 if not config_primitive_desc:
3786 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3787 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3788 format(primitive))
3789 primitive_name = primitive
3790 ee_descriptor_id = None
3791 else:
3792 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3793 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3794
3795 if vnf_index:
3796 if vdu_id:
3797 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3798 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3799 elif kdu_name:
3800 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3801 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3802 else:
3803 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3804 else:
3805 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3806
3807 if kdu_name:
3808 kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
3809
3810 # TODO check if ns is in a proper status
3811 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3812 # kdur and desc_params already set from before
3813 if primitive_params:
3814 desc_params.update(primitive_params)
3815 # TODO Check if we will need something at vnf level
3816 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3817 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3818 break
3819 else:
3820 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3821
3822 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3823 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3824 raise LcmException(msg)
3825
3826 db_dict = {"collection": "nsrs",
3827 "filter": {"_id": nsr_id},
3828 "path": "_admin.deployed.K8s.{}".format(index)}
3829 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3830 step = "Executing kdu {}".format(primitive_name)
3831 if primitive_name == "upgrade":
3832 if desc_params.get("kdu_model"):
3833 kdu_model = desc_params.get("kdu_model")
3834 del desc_params["kdu_model"]
3835 else:
3836 kdu_model = kdu.get("kdu-model")
3837 parts = kdu_model.split(sep=":")
3838 if len(parts) == 2:
3839 kdu_model = parts[0]
3840
3841 detailed_status = await asyncio.wait_for(
3842 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3843 cluster_uuid=kdu.get("k8scluster-uuid"),
3844 kdu_instance=kdu.get("kdu-instance"),
3845 atomic=True, kdu_model=kdu_model,
3846 params=desc_params, db_dict=db_dict,
3847 timeout=timeout_ns_action),
3848 timeout=timeout_ns_action + 10)
3849 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3850 elif primitive_name == "rollback":
3851 detailed_status = await asyncio.wait_for(
3852 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3853 cluster_uuid=kdu.get("k8scluster-uuid"),
3854 kdu_instance=kdu.get("kdu-instance"),
3855 db_dict=db_dict),
3856 timeout=timeout_ns_action)
3857 elif primitive_name == "status":
3858 detailed_status = await asyncio.wait_for(
3859 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3860 cluster_uuid=kdu.get("k8scluster-uuid"),
3861 kdu_instance=kdu.get("kdu-instance")),
3862 timeout=timeout_ns_action)
3863 else:
3864 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3865 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3866
3867 detailed_status = await asyncio.wait_for(
3868 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3869 cluster_uuid=kdu.get("k8scluster-uuid"),
3870 kdu_instance=kdu_instance,
3871 primitive_name=primitive_name,
3872 params=params, db_dict=db_dict,
3873 timeout=timeout_ns_action),
3874 timeout=timeout_ns_action)
3875
3876 if detailed_status:
3877 nslcmop_operation_state = 'COMPLETED'
3878 else:
3879 detailed_status = ''
3880 nslcmop_operation_state = 'FAILED'
3881 else:
3882 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3883 member_vnf_index=vnf_index,
3884 vdu_id=vdu_id,
3885 vdu_count_index=vdu_count_index,
3886 ee_descriptor_id=ee_descriptor_id)
3887 db_nslcmop_notif = {"collection": "nslcmops",
3888 "filter": {"_id": nslcmop_id},
3889 "path": "admin.VCA"}
3890 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3891 ee_id,
3892 primitive=primitive_name,
3893 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3894 timeout=timeout_ns_action,
3895 vca_type=vca_type,
3896 db_dict=db_nslcmop_notif)
3897
3898 db_nslcmop_update["detailed-status"] = detailed_status
3899 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3900 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3901 detailed_status))
3902 return # database update is called inside finally
3903
3904 except (DbException, LcmException, N2VCException, K8sException) as e:
3905 self.logger.error(logging_text + "Exit Exception {}".format(e))
3906 exc = e
3907 except asyncio.CancelledError:
3908 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3909 exc = "Operation was cancelled"
3910 except asyncio.TimeoutError:
3911 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3912 exc = "Timeout"
3913 except Exception as e:
3914 exc = traceback.format_exc()
3915 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3916 finally:
3917 if exc:
3918 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3919 "FAILED {}: {}".format(step, exc)
3920 nslcmop_operation_state = "FAILED"
3921 if db_nsr:
3922 self._write_ns_status(
3923 nsr_id=nsr_id,
3924 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3925 current_operation="IDLE",
3926 current_operation_id=None,
3927 # error_description=error_description_nsr,
3928 # error_detail=error_detail,
3929 other_update=db_nsr_update
3930 )
3931
3932 self._write_op_status(
3933 op_id=nslcmop_id,
3934 stage="",
3935 error_message=error_description_nslcmop,
3936 operation_state=nslcmop_operation_state,
3937 other_update=db_nslcmop_update,
3938 )
3939
3940 if nslcmop_operation_state:
3941 try:
3942 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3943 "operationState": nslcmop_operation_state},
3944 loop=self.loop)
3945 except Exception as e:
3946 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3947 self.logger.debug(logging_text + "Exit")
3948 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3949 return nslcmop_operation_state, detailed_status
3950
3951 async def scale(self, nsr_id, nslcmop_id):
3952
3953 # Try to lock HA task here
3954 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3955 if not task_is_locked_by_me:
3956 return
3957
3958 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3959 self.logger.debug(logging_text + "Enter")
3960 # get all needed from database
3961 db_nsr = None
3962 db_nslcmop = None
3963 db_nslcmop_update = {}
3964 nslcmop_operation_state = None
3965 db_nsr_update = {}
3966 exc = None
3967 # in case of error, indicates what part of scale was failed to put nsr at error status
3968 scale_process = None
3969 old_operational_status = ""
3970 old_config_status = ""
3971 try:
3972 # wait for any previous tasks in process
3973 step = "Waiting for previous operations to terminate"
3974 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3975
3976 self._write_ns_status(
3977 nsr_id=nsr_id,
3978 ns_state=None,
3979 current_operation="SCALING",
3980 current_operation_id=nslcmop_id
3981 )
3982
3983 step = "Getting nslcmop from database"
3984 self.logger.debug(step + " after having waited for previous tasks to be completed")
3985 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3986 step = "Getting nsr from database"
3987 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3988
3989 old_operational_status = db_nsr["operational-status"]
3990 old_config_status = db_nsr["config-status"]
3991 step = "Parsing scaling parameters"
3992 # self.logger.debug(step)
3993 db_nsr_update["operational-status"] = "scaling"
3994 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3995 nsr_deployed = db_nsr["_admin"].get("deployed")
3996
3997 #######
3998 nsr_deployed = db_nsr["_admin"].get("deployed")
3999 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4000 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4001 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4002 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
4003 #######
4004
4005 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
4006 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
4007 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
4008 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
4009 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
4010
4011 # for backward compatibility
4012 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4013 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4014 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4015 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4016
4017 step = "Getting vnfr from database"
4018 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
4019 step = "Getting vnfd from database"
4020 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4021
4022 step = "Getting scaling-group-descriptor"
4023 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
4024 if scaling_descriptor["name"] == scaling_group:
4025 break
4026 else:
4027 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
4028 "at vnfd:scaling-group-descriptor".format(scaling_group))
4029
4030 # cooldown_time = 0
4031 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
4032 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
4033 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
4034 # break
4035
4036 # TODO check if ns is in a proper status
4037 step = "Sending scale order to VIM"
4038 nb_scale_op = 0
4039 if not db_nsr["_admin"].get("scaling-group"):
4040 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
4041 admin_scale_index = 0
4042 else:
4043 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
4044 if admin_scale_info["name"] == scaling_group:
4045 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
4046 break
4047 else: # not found, set index one plus last element and add new entry with the name
4048 admin_scale_index += 1
4049 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
4050 RO_scaling_info = []
4051 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
4052 if scaling_type == "SCALE_OUT":
4053 # count if max-instance-count is reached
4054 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
4055 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
4056 if nb_scale_op >= max_instance_count:
4057 raise LcmException("reached the limit of {} (max-instance-count) "
4058 "scaling-out operations for the "
4059 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4060
4061 nb_scale_op += 1
4062 vdu_scaling_info["scaling_direction"] = "OUT"
4063 vdu_scaling_info["vdu-create"] = {}
4064 for vdu_scale_info in scaling_descriptor["vdu"]:
4065 vdud = next(vdu for vdu in db_vnfd.get("vdu") if vdu["id"] == vdu_scale_info["vdu-id-ref"])
4066 vdu_index = len([x for x in db_vnfr.get("vdur", ())
4067 if x.get("vdu-id-ref") == vdu_scale_info["vdu-id-ref"] and
4068 x.get("member-vnf-index-ref") == vnf_index])
4069 cloud_init_text = self._get_cloud_init(vdud, db_vnfd)
4070 if cloud_init_text:
4071 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
4072 cloud_init_list = []
4073 for x in range(vdu_scale_info.get("count", 1)):
4074 if cloud_init_text:
4075 # TODO Information of its own ip is not available because db_vnfr is not updated.
4076 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu_scale_info["vdu-id-ref"],
4077 vdu_index + x)
4078 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params,
4079 db_vnfd["id"], vdud["id"]))
4080 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4081 "type": "create", "count": vdu_scale_info.get("count", 1)})
4082 if cloud_init_list:
4083 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
4084 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4085
4086 elif scaling_type == "SCALE_IN":
4087 # count if min-instance-count is reached
4088 min_instance_count = 0
4089 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
4090 min_instance_count = int(scaling_descriptor["min-instance-count"])
4091 if nb_scale_op <= min_instance_count:
4092 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
4093 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4094 nb_scale_op -= 1
4095 vdu_scaling_info["scaling_direction"] = "IN"
4096 vdu_scaling_info["vdu-delete"] = {}
4097 for vdu_scale_info in scaling_descriptor["vdu"]:
4098 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4099 "type": "delete", "count": vdu_scale_info.get("count", 1)})
4100 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4101
4102 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
4103 vdu_create = vdu_scaling_info.get("vdu-create")
4104 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
4105 if vdu_scaling_info["scaling_direction"] == "IN":
4106 for vdur in reversed(db_vnfr["vdur"]):
4107 if vdu_delete.get(vdur["vdu-id-ref"]):
4108 vdu_delete[vdur["vdu-id-ref"]] -= 1
4109 vdu_scaling_info["vdu"].append({
4110 "name": vdur["name"],
4111 "vdu_id": vdur["vdu-id-ref"],
4112 "interface": []
4113 })
4114 for interface in vdur["interfaces"]:
4115 vdu_scaling_info["vdu"][-1]["interface"].append({
4116 "name": interface["name"],
4117 "ip_address": interface["ip-address"],
4118 "mac_address": interface.get("mac-address"),
4119 })
4120 vdu_delete = vdu_scaling_info.pop("vdu-delete")
4121
4122 # PRE-SCALE BEGIN
4123 step = "Executing pre-scale vnf-config-primitive"
4124 if scaling_descriptor.get("scaling-config-action"):
4125 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4126 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
4127 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
4128 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4129 step = db_nslcmop_update["detailed-status"] = \
4130 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
4131
4132 # look for primitive
4133 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4134 if config_primitive["name"] == vnf_config_primitive:
4135 break
4136 else:
4137 raise LcmException(
4138 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4139 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4140 "primitive".format(scaling_group, vnf_config_primitive))
4141
4142 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4143 if db_vnfr.get("additionalParamsForVnf"):
4144 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4145
4146 scale_process = "VCA"
4147 db_nsr_update["config-status"] = "configuring pre-scaling"
4148 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4149
4150 # Pre-scale retry check: Check if this sub-operation has been executed before
4151 op_index = self._check_or_add_scale_suboperation(
4152 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
4153 if op_index == self.SUBOPERATION_STATUS_SKIP:
4154 # Skip sub-operation
4155 result = 'COMPLETED'
4156 result_detail = 'Done'
4157 self.logger.debug(logging_text +
4158 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4159 vnf_config_primitive, result, result_detail))
4160 else:
4161 if op_index == self.SUBOPERATION_STATUS_NEW:
4162 # New sub-operation: Get index of this sub-operation
4163 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4164 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4165 format(vnf_config_primitive))
4166 else:
4167 # retry: Get registered params for this existing sub-operation
4168 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4169 vnf_index = op.get('member_vnf_index')
4170 vnf_config_primitive = op.get('primitive')
4171 primitive_params = op.get('primitive_params')
4172 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4173 format(vnf_config_primitive))
4174 # Execute the primitive, either with new (first-time) or registered (reintent) args
4175 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4176 primitive_name = config_primitive.get("execution-environment-primitive",
4177 vnf_config_primitive)
4178 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4179 member_vnf_index=vnf_index,
4180 vdu_id=None,
4181 vdu_count_index=None,
4182 ee_descriptor_id=ee_descriptor_id)
4183 result, result_detail = await self._ns_execute_primitive(
4184 ee_id, primitive_name, primitive_params, vca_type)
4185 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4186 vnf_config_primitive, result, result_detail))
4187 # Update operationState = COMPLETED | FAILED
4188 self._update_suboperation_status(
4189 db_nslcmop, op_index, result, result_detail)
4190
4191 if result == "FAILED":
4192 raise LcmException(result_detail)
4193 db_nsr_update["config-status"] = old_config_status
4194 scale_process = None
4195 # PRE-SCALE END
4196
4197 # SCALE RO - BEGIN
4198 # Should this block be skipped if 'RO_nsr_id' == None ?
4199 # if (RO_nsr_id and RO_scaling_info):
4200 if RO_scaling_info:
4201 scale_process = "RO"
4202 # Scale RO retry check: Check if this sub-operation has been executed before
4203 op_index = self._check_or_add_scale_suboperation(
4204 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
4205 if op_index == self.SUBOPERATION_STATUS_SKIP:
4206 # Skip sub-operation
4207 result = 'COMPLETED'
4208 result_detail = 'Done'
4209 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
4210 result, result_detail))
4211 else:
4212 if op_index == self.SUBOPERATION_STATUS_NEW:
4213 # New sub-operation: Get index of this sub-operation
4214 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4215 self.logger.debug(logging_text + "New sub-operation RO")
4216 else:
4217 # retry: Get registered params for this existing sub-operation
4218 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4219 RO_nsr_id = op.get('RO_nsr_id')
4220 RO_scaling_info = op.get('RO_scaling_info')
4221 self.logger.debug(logging_text + "Sub-operation RO retry for primitive {}".format(
4222 vnf_config_primitive))
4223
4224 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
4225 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
4226 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
4227 # wait until ready
4228 RO_nslcmop_id = RO_desc["instance_action_id"]
4229 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
4230
4231 RO_task_done = False
4232 step = detailed_status = "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id)
4233 detailed_status_old = None
4234 self.logger.debug(logging_text + step)
4235
4236 deployment_timeout = 1 * 3600 # One hour
4237 while deployment_timeout > 0:
4238 if not RO_task_done:
4239 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
4240 extra_item_id=RO_nslcmop_id)
4241
4242 # deploymentStatus
4243 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4244
4245 ns_status, ns_status_info = self.RO.check_action_status(desc)
4246 if ns_status == "ERROR":
4247 raise ROclient.ROClientException(ns_status_info)
4248 elif ns_status == "BUILD":
4249 detailed_status = step + "; {}".format(ns_status_info)
4250 elif ns_status == "ACTIVE":
4251 RO_task_done = True
4252 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
4253 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
4254 self.logger.debug(logging_text + step)
4255 else:
4256 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
4257 else:
4258 desc = await self.RO.show("ns", RO_nsr_id)
4259 ns_status, ns_status_info = self.RO.check_ns_status(desc)
4260 # deploymentStatus
4261 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4262
4263 if ns_status == "ERROR":
4264 raise ROclient.ROClientException(ns_status_info)
4265 elif ns_status == "BUILD":
4266 detailed_status = step + "; {}".format(ns_status_info)
4267 elif ns_status == "ACTIVE":
4268 step = detailed_status = \
4269 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4270 try:
4271 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4272 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
4273 break
4274 except LcmExceptionNoMgmtIP:
4275 pass
4276 else:
4277 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
4278 if detailed_status != detailed_status_old:
4279 self._update_suboperation_status(
4280 db_nslcmop, op_index, 'COMPLETED', detailed_status)
4281 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
4282 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
4283
4284 await asyncio.sleep(5, loop=self.loop)
4285 deployment_timeout -= 5
4286 if deployment_timeout <= 0:
4287 self._update_suboperation_status(
4288 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
4289 raise ROclient.ROClientException("Timeout waiting ns to be ready")
4290
4291 # update VDU_SCALING_INFO with the obtained ip_addresses
4292 if vdu_scaling_info["scaling_direction"] == "OUT":
4293 for vdur in reversed(db_vnfr["vdur"]):
4294 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
4295 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
4296 vdu_scaling_info["vdu"].append({
4297 "name": vdur["name"],
4298 "vdu_id": vdur["vdu-id-ref"],
4299 "interface": []
4300 })
4301 for interface in vdur["interfaces"]:
4302 vdu_scaling_info["vdu"][-1]["interface"].append({
4303 "name": interface["name"],
4304 "ip_address": interface["ip-address"],
4305 "mac_address": interface.get("mac-address"),
4306 })
4307 del vdu_scaling_info["vdu-create"]
4308
4309 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
4310 # SCALE RO - END
4311
4312 scale_process = None
4313 if db_nsr_update:
4314 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4315
4316 # POST-SCALE BEGIN
4317 # execute primitive service POST-SCALING
4318 step = "Executing post-scale vnf-config-primitive"
4319 if scaling_descriptor.get("scaling-config-action"):
4320 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4321 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4322 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4323 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4324 step = db_nslcmop_update["detailed-status"] = \
4325 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4326
4327 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4328 if db_vnfr.get("additionalParamsForVnf"):
4329 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4330
4331 # look for primitive
4332 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4333 if config_primitive["name"] == vnf_config_primitive:
4334 break
4335 else:
4336 raise LcmException(
4337 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4338 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4339 "config-primitive".format(scaling_group, vnf_config_primitive))
4340 scale_process = "VCA"
4341 db_nsr_update["config-status"] = "configuring post-scaling"
4342 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4343
4344 # Post-scale retry check: Check if this sub-operation has been executed before
4345 op_index = self._check_or_add_scale_suboperation(
4346 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4347 if op_index == self.SUBOPERATION_STATUS_SKIP:
4348 # Skip sub-operation
4349 result = 'COMPLETED'
4350 result_detail = 'Done'
4351 self.logger.debug(logging_text +
4352 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4353 format(vnf_config_primitive, result, result_detail))
4354 else:
4355 if op_index == self.SUBOPERATION_STATUS_NEW:
4356 # New sub-operation: Get index of this sub-operation
4357 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4358 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4359 format(vnf_config_primitive))
4360 else:
4361 # retry: Get registered params for this existing sub-operation
4362 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4363 vnf_index = op.get('member_vnf_index')
4364 vnf_config_primitive = op.get('primitive')
4365 primitive_params = op.get('primitive_params')
4366 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4367 format(vnf_config_primitive))
4368 # Execute the primitive, either with new (first-time) or registered (reintent) args
4369 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4370 primitive_name = config_primitive.get("execution-environment-primitive",
4371 vnf_config_primitive)
4372 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4373 member_vnf_index=vnf_index,
4374 vdu_id=None,
4375 vdu_count_index=None,
4376 ee_descriptor_id=ee_descriptor_id)
4377 result, result_detail = await self._ns_execute_primitive(
4378 ee_id, primitive_name, primitive_params, vca_type)
4379 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4380 vnf_config_primitive, result, result_detail))
4381 # Update operationState = COMPLETED | FAILED
4382 self._update_suboperation_status(
4383 db_nslcmop, op_index, result, result_detail)
4384
4385 if result == "FAILED":
4386 raise LcmException(result_detail)
4387 db_nsr_update["config-status"] = old_config_status
4388 scale_process = None
4389 # POST-SCALE END
4390
4391 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4392 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4393 else old_operational_status
4394 db_nsr_update["config-status"] = old_config_status
4395 return
4396 except (ROclient.ROClientException, DbException, LcmException) as e:
4397 self.logger.error(logging_text + "Exit Exception {}".format(e))
4398 exc = e
4399 except asyncio.CancelledError:
4400 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4401 exc = "Operation was cancelled"
4402 except Exception as e:
4403 exc = traceback.format_exc()
4404 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4405 finally:
4406 self._write_ns_status(
4407 nsr_id=nsr_id,
4408 ns_state=None,
4409 current_operation="IDLE",
4410 current_operation_id=None
4411 )
4412 if exc:
4413 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4414 nslcmop_operation_state = "FAILED"
4415 if db_nsr:
4416 db_nsr_update["operational-status"] = old_operational_status
4417 db_nsr_update["config-status"] = old_config_status
4418 db_nsr_update["detailed-status"] = ""
4419 if scale_process:
4420 if "VCA" in scale_process:
4421 db_nsr_update["config-status"] = "failed"
4422 if "RO" in scale_process:
4423 db_nsr_update["operational-status"] = "failed"
4424 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4425 exc)
4426 else:
4427 error_description_nslcmop = None
4428 nslcmop_operation_state = "COMPLETED"
4429 db_nslcmop_update["detailed-status"] = "Done"
4430
4431 self._write_op_status(
4432 op_id=nslcmop_id,
4433 stage="",
4434 error_message=error_description_nslcmop,
4435 operation_state=nslcmop_operation_state,
4436 other_update=db_nslcmop_update,
4437 )
4438 if db_nsr:
4439 self._write_ns_status(
4440 nsr_id=nsr_id,
4441 ns_state=None,
4442 current_operation="IDLE",
4443 current_operation_id=None,
4444 other_update=db_nsr_update
4445 )
4446
4447 if nslcmop_operation_state:
4448 try:
4449 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
4450 "operationState": nslcmop_operation_state},
4451 loop=self.loop)
4452 # if cooldown_time:
4453 # await asyncio.sleep(cooldown_time, loop=self.loop)
4454 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4455 except Exception as e:
4456 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4457 self.logger.debug(logging_text + "Exit")
4458 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4459
4460 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4461 if not self.prometheus:
4462 return
4463 # look if exist a file called 'prometheus*.j2' and
4464 artifact_content = self.fs.dir_ls(artifact_path)
4465 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4466 if not job_file:
4467 return
4468 with self.fs.file_open((artifact_path, job_file), "r") as f:
4469 job_data = f.read()
4470
4471 # TODO get_service
4472 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4473 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4474 host_port = "80"
4475 vnfr_id = vnfr_id.replace("-", "")
4476 variables = {
4477 "JOB_NAME": vnfr_id,
4478 "TARGET_IP": target_ip,
4479 "EXPORTER_POD_IP": host_name,
4480 "EXPORTER_POD_PORT": host_port,
4481 }
4482 job_list = self.prometheus.parse_job(job_data, variables)
4483 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4484 for job in job_list:
4485 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4486 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4487 job["nsr_id"] = nsr_id
4488 job_dict = {jl["job_name"]: jl for jl in job_list}
4489 if await self.prometheus.update(job_dict):
4490 return list(job_dict.keys())