cbfb868db84dec91414ca5bcda5b2e8f36b8f0d7
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from n2vc.k8s_helm_conn import K8sHelmConnector
31 from n2vc.k8s_juju_conn import K8sJujuConnector
32
33 from osm_common.dbbase import DbException
34 from osm_common.fsbase import FsException
35
36 from n2vc.n2vc_juju_conn import N2VCJujuConnector
37 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
38
39 from osm_lcm.lcm_helm_conn import LCMHelmConn
40
41 from copy import copy, deepcopy
42 from http import HTTPStatus
43 from time import time
44 from uuid import uuid4
45
46 from random import randint
47
48 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
49
50
51 class N2VCJujuConnectorLCM(N2VCJujuConnector):
52
53 async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
54 progress_timeout: float = None, total_timeout: float = None,
55 config: dict = None, artifact_path: str = None,
56 vca_type: str = None) -> (str, dict):
57 # admit two new parameters, artifact_path and vca_type
58 if vca_type == "k8s_proxy_charm":
59 ee_id = await self.install_k8s_proxy_charm(
60 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
61 namespace=namespace,
62 artifact_path=artifact_path,
63 db_dict=db_dict)
64 return ee_id, None
65 else:
66 return await super().create_execution_environment(
67 namespace=namespace, db_dict=db_dict, reuse_ee_id=reuse_ee_id,
68 progress_timeout=progress_timeout, total_timeout=total_timeout)
69
70 async def install_configuration_sw(self, ee_id: str, artifact_path: str, db_dict: dict,
71 progress_timeout: float = None, total_timeout: float = None,
72 config: dict = None, num_units: int = 1, vca_type: str = "lxc_proxy_charm"):
73 if vca_type == "k8s_proxy_charm":
74 return
75 return await super().install_configuration_sw(
76 ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict, progress_timeout=progress_timeout,
77 total_timeout=total_timeout, config=config, num_units=num_units)
78
79
80 class NsLcm(LcmBase):
81 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
82 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
83 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
84 timeout_charm_delete = 10 * 60
85 timeout_primitive = 30 * 60 # timeout for primitive execution
86 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
87
88 SUBOPERATION_STATUS_NOT_FOUND = -1
89 SUBOPERATION_STATUS_NEW = -2
90 SUBOPERATION_STATUS_SKIP = -3
91 task_name_deploy_vca = "Deploying VCA"
92
93 def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
94 """
95 Init, Connect to database, filesystem storage, and messaging
96 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
97 :return: None
98 """
99 super().__init__(
100 db=db,
101 msg=msg,
102 fs=fs,
103 logger=logging.getLogger('lcm.ns')
104 )
105
106 self.loop = loop
107 self.lcm_tasks = lcm_tasks
108 self.timeout = config["timeout"]
109 self.ro_config = config["ro_config"]
110 self.ng_ro = config["ro_config"].get("ng")
111 self.vca_config = config["VCA"].copy()
112
113 # create N2VC connector
114 self.n2vc = N2VCJujuConnectorLCM(
115 db=self.db,
116 fs=self.fs,
117 log=self.logger,
118 loop=self.loop,
119 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
120 username=self.vca_config.get('user', None),
121 vca_config=self.vca_config,
122 on_update_db=self._on_update_n2vc_db
123 )
124
125 self.conn_helm_ee = LCMHelmConn(
126 db=self.db,
127 fs=self.fs,
128 log=self.logger,
129 loop=self.loop,
130 url=None,
131 username=None,
132 vca_config=self.vca_config,
133 on_update_db=self._on_update_n2vc_db
134 )
135
136 self.k8sclusterhelm = K8sHelmConnector(
137 kubectl_command=self.vca_config.get("kubectlpath"),
138 helm_command=self.vca_config.get("helmpath"),
139 fs=self.fs,
140 log=self.logger,
141 db=self.db,
142 on_update_db=None,
143 )
144
145 self.k8sclusterjuju = K8sJujuConnector(
146 kubectl_command=self.vca_config.get("kubectlpath"),
147 juju_command=self.vca_config.get("jujupath"),
148 fs=self.fs,
149 log=self.logger,
150 db=self.db,
151 on_update_db=None,
152 )
153
154 self.k8scluster_map = {
155 "helm-chart": self.k8sclusterhelm,
156 "chart": self.k8sclusterhelm,
157 "juju-bundle": self.k8sclusterjuju,
158 "juju": self.k8sclusterjuju,
159 }
160
161 self.vca_map = {
162 "lxc_proxy_charm": self.n2vc,
163 "native_charm": self.n2vc,
164 "k8s_proxy_charm": self.n2vc,
165 "helm": self.conn_helm_ee
166 }
167
168 self.prometheus = prometheus
169
170 # create RO client
171 if self.ng_ro:
172 self.RO = NgRoClient(self.loop, **self.ro_config)
173 else:
174 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
175
176 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
177
178 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
179
180 try:
181 # TODO filter RO descriptor fields...
182
183 # write to database
184 db_dict = dict()
185 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
186 db_dict['deploymentStatus'] = ro_descriptor
187 self.update_db_2("nsrs", nsrs_id, db_dict)
188
189 except Exception as e:
190 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
191
192 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
193
194 # remove last dot from path (if exists)
195 if path.endswith('.'):
196 path = path[:-1]
197
198 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
199 # .format(table, filter, path, updated_data))
200 try:
201
202 nsr_id = filter.get('_id')
203
204 # read ns record from database
205 nsr = self.db.get_one(table='nsrs', q_filter=filter)
206 current_ns_status = nsr.get('nsState')
207
208 # get vca status for NS
209 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
210
211 # vcaStatus
212 db_dict = dict()
213 db_dict['vcaStatus'] = status_dict
214
215 # update configurationStatus for this VCA
216 try:
217 vca_index = int(path[path.rfind(".")+1:])
218
219 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
220 vca_status = vca_list[vca_index].get('status')
221
222 configuration_status_list = nsr.get('configurationStatus')
223 config_status = configuration_status_list[vca_index].get('status')
224
225 if config_status == 'BROKEN' and vca_status != 'failed':
226 db_dict['configurationStatus'][vca_index] = 'READY'
227 elif config_status != 'BROKEN' and vca_status == 'failed':
228 db_dict['configurationStatus'][vca_index] = 'BROKEN'
229 except Exception as e:
230 # not update configurationStatus
231 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
232
233 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
234 # if nsState = 'DEGRADED' check if all is OK
235 is_degraded = False
236 if current_ns_status in ('READY', 'DEGRADED'):
237 error_description = ''
238 # check machines
239 if status_dict.get('machines'):
240 for machine_id in status_dict.get('machines'):
241 machine = status_dict.get('machines').get(machine_id)
242 # check machine agent-status
243 if machine.get('agent-status'):
244 s = machine.get('agent-status').get('status')
245 if s != 'started':
246 is_degraded = True
247 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
248 # check machine instance status
249 if machine.get('instance-status'):
250 s = machine.get('instance-status').get('status')
251 if s != 'running':
252 is_degraded = True
253 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
254 # check applications
255 if status_dict.get('applications'):
256 for app_id in status_dict.get('applications'):
257 app = status_dict.get('applications').get(app_id)
258 # check application status
259 if app.get('status'):
260 s = app.get('status').get('status')
261 if s != 'active':
262 is_degraded = True
263 error_description += 'application {} status={} ; '.format(app_id, s)
264
265 if error_description:
266 db_dict['errorDescription'] = error_description
267 if current_ns_status == 'READY' and is_degraded:
268 db_dict['nsState'] = 'DEGRADED'
269 if current_ns_status == 'DEGRADED' and not is_degraded:
270 db_dict['nsState'] = 'READY'
271
272 # write to database
273 self.update_db_2("nsrs", nsr_id, db_dict)
274
275 except (asyncio.CancelledError, asyncio.TimeoutError):
276 raise
277 except Exception as e:
278 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
279
280 @staticmethod
281 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
282 try:
283 env = Environment(undefined=StrictUndefined)
284 template = env.from_string(cloud_init_text)
285 return template.render(additional_params or {})
286 except UndefinedError as e:
287 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
288 "file, must be provided in the instantiation parameters inside the "
289 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
290 except (TemplateError, TemplateNotFound) as e:
291 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
292 format(vnfd_id, vdu_id, e))
293
294 def _get_cloud_init(self, vdu, vnfd):
295 try:
296 cloud_init_content = cloud_init_file = None
297 if vdu.get("cloud-init-file"):
298 base_folder = vnfd["_admin"]["storage"]
299 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
300 vdu["cloud-init-file"])
301 with self.fs.file_open(cloud_init_file, "r") as ci_file:
302 cloud_init_content = ci_file.read()
303 elif vdu.get("cloud-init"):
304 cloud_init_content = vdu["cloud-init"]
305
306 return cloud_init_content
307 except FsException as e:
308 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
309 format(vnfd["id"], vdu["id"], cloud_init_file, e))
310
311 def _get_osm_params(self, db_vnfr, vdu_id=None, vdu_count_index=0):
312 osm_params = {x.replace("-", "_"): db_vnfr[x] for x in ("ip-address", "vim-account-id", "vnfd-id", "vnfd-ref")
313 if db_vnfr.get(x) is not None}
314 osm_params["ns_id"] = db_vnfr["nsr-id-ref"]
315 osm_params["vnf_id"] = db_vnfr["_id"]
316 osm_params["member_vnf_index"] = db_vnfr["member-vnf-index-ref"]
317 if db_vnfr.get("vdur"):
318 osm_params["vdu"] = {}
319 for vdur in db_vnfr["vdur"]:
320 vdu = {
321 "count_index": vdur["count-index"],
322 "vdu_id": vdur["vdu-id-ref"],
323 "interfaces": {}
324 }
325 if vdur.get("ip-address"):
326 vdu["ip_address"] = vdur["ip-address"]
327 for iface in vdur["interfaces"]:
328 vdu["interfaces"][iface["name"]] = \
329 {x.replace("-", "_"): iface[x] for x in ("mac-address", "ip-address", "vnf-vld-id", "name")
330 if iface.get(x) is not None}
331 vdu_id_index = "{}-{}".format(vdur["vdu-id-ref"], vdur["count-index"])
332 osm_params["vdu"][vdu_id_index] = vdu
333 if vdu_id:
334 osm_params["vdu_id"] = vdu_id
335 osm_params["count_index"] = vdu_count_index
336 return osm_params
337
338 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
339 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
340 additional_params = vdur.get("additionalParams")
341 return self._format_additional_params(additional_params)
342
343 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
344 """
345 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
346 :param vnfd: input vnfd
347 :param new_id: overrides vnf id if provided
348 :param additionalParams: Instantiation params for VNFs provided
349 :param nsrId: Id of the NSR
350 :return: copy of vnfd
351 """
352 vnfd_RO = deepcopy(vnfd)
353 # remove unused by RO configuration, monitoring, scaling and internal keys
354 vnfd_RO.pop("_id", None)
355 vnfd_RO.pop("_admin", None)
356 vnfd_RO.pop("vnf-configuration", None)
357 vnfd_RO.pop("monitoring-param", None)
358 vnfd_RO.pop("scaling-group-descriptor", None)
359 vnfd_RO.pop("kdu", None)
360 vnfd_RO.pop("k8s-cluster", None)
361 if new_id:
362 vnfd_RO["id"] = new_id
363
364 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
365 for vdu in get_iterable(vnfd_RO, "vdu"):
366 vdu.pop("cloud-init-file", None)
367 vdu.pop("cloud-init", None)
368 return vnfd_RO
369
370 def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list):
371 """
372 Creates a RO ns descriptor from OSM ns_instantiate params
373 :param ns_params: OSM instantiate params
374 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
375 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
376 :return: The RO ns descriptor
377 """
378 vim_2_RO = {}
379 wim_2_RO = {}
380 # TODO feature 1417: Check that no instantiation is set over PDU
381 # check if PDU forces a concrete vim-network-id and add it
382 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
383
384 def vim_account_2_RO(vim_account):
385 if vim_account in vim_2_RO:
386 return vim_2_RO[vim_account]
387
388 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
389 if db_vim["_admin"]["operationalState"] != "ENABLED":
390 raise LcmException("VIM={} is not available. operationalState={}".format(
391 vim_account, db_vim["_admin"]["operationalState"]))
392 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
393 vim_2_RO[vim_account] = RO_vim_id
394 return RO_vim_id
395
396 def wim_account_2_RO(wim_account):
397 if isinstance(wim_account, str):
398 if wim_account in wim_2_RO:
399 return wim_2_RO[wim_account]
400
401 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
402 if db_wim["_admin"]["operationalState"] != "ENABLED":
403 raise LcmException("WIM={} is not available. operationalState={}".format(
404 wim_account, db_wim["_admin"]["operationalState"]))
405 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
406 wim_2_RO[wim_account] = RO_wim_id
407 return RO_wim_id
408 else:
409 return wim_account
410
411 def ip_profile_2_RO(ip_profile):
412 RO_ip_profile = deepcopy((ip_profile))
413 if "dns-server" in RO_ip_profile:
414 if isinstance(RO_ip_profile["dns-server"], list):
415 RO_ip_profile["dns-address"] = []
416 for ds in RO_ip_profile.pop("dns-server"):
417 RO_ip_profile["dns-address"].append(ds['address'])
418 else:
419 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
420 if RO_ip_profile.get("ip-version") == "ipv4":
421 RO_ip_profile["ip-version"] = "IPv4"
422 if RO_ip_profile.get("ip-version") == "ipv6":
423 RO_ip_profile["ip-version"] = "IPv6"
424 if "dhcp-params" in RO_ip_profile:
425 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
426 return RO_ip_profile
427
428 if not ns_params:
429 return None
430 RO_ns_params = {
431 # "name": ns_params["nsName"],
432 # "description": ns_params.get("nsDescription"),
433 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
434 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
435 # "scenario": ns_params["nsdId"],
436 }
437 # set vim_account of each vnf if different from general vim_account.
438 # Get this information from <vnfr> database content, key vim-account-id
439 # Vim account can be set by placement_engine and it may be different from
440 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
441 for vnf_index, vnfr in db_vnfrs.items():
442 if vnfr.get("vim-account-id") and vnfr["vim-account-id"] != ns_params["vimAccountId"]:
443 populate_dict(RO_ns_params, ("vnfs", vnf_index, "datacenter"), vim_account_2_RO(vnfr["vim-account-id"]))
444
445 n2vc_key_list = n2vc_key_list or []
446 for vnfd_ref, vnfd in vnfd_dict.items():
447 vdu_needed_access = []
448 mgmt_cp = None
449 if vnfd.get("vnf-configuration"):
450 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
451 if ssh_required and vnfd.get("mgmt-interface"):
452 if vnfd["mgmt-interface"].get("vdu-id"):
453 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
454 elif vnfd["mgmt-interface"].get("cp"):
455 mgmt_cp = vnfd["mgmt-interface"]["cp"]
456
457 for vdu in vnfd.get("vdu", ()):
458 if vdu.get("vdu-configuration"):
459 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
460 if ssh_required:
461 vdu_needed_access.append(vdu["id"])
462 elif mgmt_cp:
463 for vdu_interface in vdu.get("interface"):
464 if vdu_interface.get("external-connection-point-ref") and \
465 vdu_interface["external-connection-point-ref"] == mgmt_cp:
466 vdu_needed_access.append(vdu["id"])
467 mgmt_cp = None
468 break
469
470 if vdu_needed_access:
471 for vnf_member in nsd.get("constituent-vnfd"):
472 if vnf_member["vnfd-id-ref"] != vnfd_ref:
473 continue
474 for vdu in vdu_needed_access:
475 populate_dict(RO_ns_params,
476 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
477 n2vc_key_list)
478 # cloud init
479 for vdu in get_iterable(vnfd, "vdu"):
480 cloud_init_text = self._get_cloud_init(vdu, vnfd)
481 if not cloud_init_text:
482 continue
483 for vnf_member in nsd.get("constituent-vnfd"):
484 if vnf_member["vnfd-id-ref"] != vnfd_ref:
485 continue
486 db_vnfr = db_vnfrs[vnf_member["member-vnf-index"]]
487 additional_params = self._get_vdu_additional_params(db_vnfr, vdu["id"]) or {}
488
489 cloud_init_list = []
490 for vdu_index in range(0, int(vdu.get("count", 1))):
491 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu["id"], vdu_index)
492 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params, vnfd["id"],
493 vdu["id"]))
494 populate_dict(RO_ns_params,
495 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu["id"], "cloud_init"),
496 cloud_init_list)
497
498 if ns_params.get("vduImage"):
499 RO_ns_params["vduImage"] = ns_params["vduImage"]
500
501 if ns_params.get("ssh_keys"):
502 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
503 for vnf_params in get_iterable(ns_params, "vnf"):
504 for constituent_vnfd in nsd["constituent-vnfd"]:
505 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
506 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
507 break
508 else:
509 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
510 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
511
512 for vdu_params in get_iterable(vnf_params, "vdu"):
513 # TODO feature 1417: check that this VDU exist and it is not a PDU
514 if vdu_params.get("volume"):
515 for volume_params in vdu_params["volume"]:
516 if volume_params.get("vim-volume-id"):
517 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
518 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
519 volume_params["vim-volume-id"])
520 if vdu_params.get("interface"):
521 for interface_params in vdu_params["interface"]:
522 if interface_params.get("ip-address"):
523 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
524 vdu_params["id"], "interfaces", interface_params["name"],
525 "ip_address"),
526 interface_params["ip-address"])
527 if interface_params.get("mac-address"):
528 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
529 vdu_params["id"], "interfaces", interface_params["name"],
530 "mac_address"),
531 interface_params["mac-address"])
532 if interface_params.get("floating-ip-required"):
533 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
534 vdu_params["id"], "interfaces", interface_params["name"],
535 "floating-ip"),
536 interface_params["floating-ip-required"])
537
538 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
539 if internal_vld_params.get("vim-network-name"):
540 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
541 internal_vld_params["name"], "vim-network-name"),
542 internal_vld_params["vim-network-name"])
543 if internal_vld_params.get("vim-network-id"):
544 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
545 internal_vld_params["name"], "vim-network-id"),
546 internal_vld_params["vim-network-id"])
547 if internal_vld_params.get("ip-profile"):
548 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
549 internal_vld_params["name"], "ip-profile"),
550 ip_profile_2_RO(internal_vld_params["ip-profile"]))
551 if internal_vld_params.get("provider-network"):
552
553 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
554 internal_vld_params["name"], "provider-network"),
555 internal_vld_params["provider-network"].copy())
556
557 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
558 # look for interface
559 iface_found = False
560 for vdu_descriptor in vnf_descriptor["vdu"]:
561 for vdu_interface in vdu_descriptor["interface"]:
562 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
563 if icp_params.get("ip-address"):
564 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
565 vdu_descriptor["id"], "interfaces",
566 vdu_interface["name"], "ip_address"),
567 icp_params["ip-address"])
568
569 if icp_params.get("mac-address"):
570 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
571 vdu_descriptor["id"], "interfaces",
572 vdu_interface["name"], "mac_address"),
573 icp_params["mac-address"])
574 iface_found = True
575 break
576 if iface_found:
577 break
578 else:
579 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
580 "internal-vld:id-ref={} is not present at vnfd:internal-"
581 "connection-point".format(vnf_params["member-vnf-index"],
582 icp_params["id-ref"]))
583
584 for vld_params in get_iterable(ns_params, "vld"):
585 if "ip-profile" in vld_params:
586 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
587 ip_profile_2_RO(vld_params["ip-profile"]))
588
589 if vld_params.get("provider-network"):
590
591 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
592 vld_params["provider-network"].copy())
593
594 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
595 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
596 wim_account_2_RO(vld_params["wimAccountId"])),
597 if vld_params.get("vim-network-name"):
598 RO_vld_sites = []
599 if isinstance(vld_params["vim-network-name"], dict):
600 for vim_account, vim_net in vld_params["vim-network-name"].items():
601 RO_vld_sites.append({
602 "netmap-use": vim_net,
603 "datacenter": vim_account_2_RO(vim_account)
604 })
605 else: # isinstance str
606 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
607 if RO_vld_sites:
608 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
609
610 if vld_params.get("vim-network-id"):
611 RO_vld_sites = []
612 if isinstance(vld_params["vim-network-id"], dict):
613 for vim_account, vim_net in vld_params["vim-network-id"].items():
614 RO_vld_sites.append({
615 "netmap-use": vim_net,
616 "datacenter": vim_account_2_RO(vim_account)
617 })
618 else: # isinstance str
619 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
620 if RO_vld_sites:
621 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
622 if vld_params.get("ns-net"):
623 if isinstance(vld_params["ns-net"], dict):
624 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
625 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
626 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
627 if "vnfd-connection-point-ref" in vld_params:
628 for cp_params in vld_params["vnfd-connection-point-ref"]:
629 # look for interface
630 for constituent_vnfd in nsd["constituent-vnfd"]:
631 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
632 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
633 break
634 else:
635 raise LcmException(
636 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
637 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
638 match_cp = False
639 for vdu_descriptor in vnf_descriptor["vdu"]:
640 for interface_descriptor in vdu_descriptor["interface"]:
641 if interface_descriptor.get("external-connection-point-ref") == \
642 cp_params["vnfd-connection-point-ref"]:
643 match_cp = True
644 break
645 if match_cp:
646 break
647 else:
648 raise LcmException(
649 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
650 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
651 cp_params["member-vnf-index-ref"],
652 cp_params["vnfd-connection-point-ref"],
653 vnf_descriptor["id"]))
654 if cp_params.get("ip-address"):
655 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
656 vdu_descriptor["id"], "interfaces",
657 interface_descriptor["name"], "ip_address"),
658 cp_params["ip-address"])
659 if cp_params.get("mac-address"):
660 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
661 vdu_descriptor["id"], "interfaces",
662 interface_descriptor["name"], "mac_address"),
663 cp_params["mac-address"])
664 return RO_ns_params
665
666 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
667 # make a copy to do not change
668 vdu_create = copy(vdu_create)
669 vdu_delete = copy(vdu_delete)
670
671 vdurs = db_vnfr.get("vdur")
672 if vdurs is None:
673 vdurs = []
674 vdu_index = len(vdurs)
675 while vdu_index:
676 vdu_index -= 1
677 vdur = vdurs[vdu_index]
678 if vdur.get("pdu-type"):
679 continue
680 vdu_id_ref = vdur["vdu-id-ref"]
681 if vdu_create and vdu_create.get(vdu_id_ref):
682 vdur_copy = deepcopy(vdur)
683 vdur_copy["status"] = "BUILD"
684 vdur_copy["status-detailed"] = None
685 vdur_copy["ip_address"]: None
686 for iface in vdur_copy["interfaces"]:
687 iface["ip-address"] = None
688 iface["mac-address"] = None
689 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf # TODO ALF
690 for index in range(0, vdu_create[vdu_id_ref]):
691 vdur_copy["_id"] = str(uuid4())
692 vdur_copy["count-index"] += 1
693 vdurs.insert(vdu_index+1+index, vdur_copy)
694 self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
695 vdur_copy = deepcopy(vdur_copy)
696
697 del vdu_create[vdu_id_ref]
698 if vdu_delete and vdu_delete.get(vdu_id_ref):
699 del vdurs[vdu_index]
700 vdu_delete[vdu_id_ref] -= 1
701 if not vdu_delete[vdu_id_ref]:
702 del vdu_delete[vdu_id_ref]
703 # check all operations are done
704 if vdu_create or vdu_delete:
705 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
706 vdu_create))
707 if vdu_delete:
708 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
709 vdu_delete))
710
711 vnfr_update = {"vdur": vdurs}
712 db_vnfr["vdur"] = vdurs
713 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
714
715 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
716 """
717 Updates database nsr with the RO info for the created vld
718 :param ns_update_nsr: dictionary to be filled with the updated info
719 :param db_nsr: content of db_nsr. This is also modified
720 :param nsr_desc_RO: nsr descriptor from RO
721 :return: Nothing, LcmException is raised on errors
722 """
723
724 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
725 for net_RO in get_iterable(nsr_desc_RO, "nets"):
726 if vld["id"] != net_RO.get("ns_net_osm_id"):
727 continue
728 vld["vim-id"] = net_RO.get("vim_net_id")
729 vld["name"] = net_RO.get("vim_name")
730 vld["status"] = net_RO.get("status")
731 vld["status-detailed"] = net_RO.get("error_msg")
732 ns_update_nsr["vld.{}".format(vld_index)] = vld
733 break
734 else:
735 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
736
737 def set_vnfr_at_error(self, db_vnfrs, error_text):
738 try:
739 for db_vnfr in db_vnfrs.values():
740 vnfr_update = {"status": "ERROR"}
741 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
742 if "status" not in vdur:
743 vdur["status"] = "ERROR"
744 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
745 if error_text:
746 vdur["status-detailed"] = str(error_text)
747 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
748 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
749 except DbException as e:
750 self.logger.error("Cannot update vnf. {}".format(e))
751
752 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
753 """
754 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
755 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
756 :param nsr_desc_RO: nsr descriptor from RO
757 :return: Nothing, LcmException is raised on errors
758 """
759 for vnf_index, db_vnfr in db_vnfrs.items():
760 for vnf_RO in nsr_desc_RO["vnfs"]:
761 if vnf_RO["member_vnf_index"] != vnf_index:
762 continue
763 vnfr_update = {}
764 if vnf_RO.get("ip_address"):
765 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
766 elif not db_vnfr.get("ip-address"):
767 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
768 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
769
770 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
771 vdur_RO_count_index = 0
772 if vdur.get("pdu-type"):
773 continue
774 for vdur_RO in get_iterable(vnf_RO, "vms"):
775 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
776 continue
777 if vdur["count-index"] != vdur_RO_count_index:
778 vdur_RO_count_index += 1
779 continue
780 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
781 if vdur_RO.get("ip_address"):
782 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
783 else:
784 vdur["ip-address"] = None
785 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
786 vdur["name"] = vdur_RO.get("vim_name")
787 vdur["status"] = vdur_RO.get("status")
788 vdur["status-detailed"] = vdur_RO.get("error_msg")
789 for ifacer in get_iterable(vdur, "interfaces"):
790 for interface_RO in get_iterable(vdur_RO, "interfaces"):
791 if ifacer["name"] == interface_RO.get("internal_name"):
792 ifacer["ip-address"] = interface_RO.get("ip_address")
793 ifacer["mac-address"] = interface_RO.get("mac_address")
794 break
795 else:
796 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
797 "from VIM info"
798 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
799 vnfr_update["vdur.{}".format(vdu_index)] = vdur
800 break
801 else:
802 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
803 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
804
805 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
806 for net_RO in get_iterable(nsr_desc_RO, "nets"):
807 if vld["id"] != net_RO.get("vnf_net_osm_id"):
808 continue
809 vld["vim-id"] = net_RO.get("vim_net_id")
810 vld["name"] = net_RO.get("vim_name")
811 vld["status"] = net_RO.get("status")
812 vld["status-detailed"] = net_RO.get("error_msg")
813 vnfr_update["vld.{}".format(vld_index)] = vld
814 break
815 else:
816 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
817 vnf_index, vld["id"]))
818
819 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
820 break
821
822 else:
823 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
824
825 def _get_ns_config_info(self, nsr_id):
826 """
827 Generates a mapping between vnf,vdu elements and the N2VC id
828 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
829 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
830 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
831 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
832 """
833 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
834 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
835 mapping = {}
836 ns_config_info = {"osm-config-mapping": mapping}
837 for vca in vca_deployed_list:
838 if not vca["member-vnf-index"]:
839 continue
840 if not vca["vdu_id"]:
841 mapping[vca["member-vnf-index"]] = vca["application"]
842 else:
843 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
844 vca["application"]
845 return ns_config_info
846
847 @staticmethod
848 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed, ee_descriptor_id):
849 """
850 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
851 primitives as verify-ssh-credentials, or config when needed
852 :param desc_primitive_list: information of the descriptor
853 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
854 this element contains a ssh public key
855 :param ee_descriptor_id: execution environment descriptor id. It is the value of
856 XXX_configuration.execution-environment-list.INDEX.id; it can be None
857 :return: The modified list. Can ba an empty list, but always a list
858 """
859
860 primitive_list = desc_primitive_list or []
861
862 # filter primitives by ee_id
863 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
864
865 # sort by 'seq'
866 if primitive_list:
867 primitive_list.sort(key=lambda val: int(val['seq']))
868
869 # look for primitive config, and get the position. None if not present
870 config_position = None
871 for index, primitive in enumerate(primitive_list):
872 if primitive["name"] == "config":
873 config_position = index
874 break
875
876 # for NS, add always a config primitive if not present (bug 874)
877 if not vca_deployed["member-vnf-index"] and config_position is None:
878 primitive_list.insert(0, {"name": "config", "parameter": []})
879 config_position = 0
880 # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
881 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
882 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
883 return primitive_list
884
885 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
886 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
887 nslcmop_id = db_nslcmop["_id"]
888 target = {
889 "name": db_nsr["name"],
890 "ns": {"vld": []},
891 "vnf": [],
892 "image": deepcopy(db_nsr["image"]),
893 "flavor": deepcopy(db_nsr["flavor"]),
894 "action_id": nslcmop_id,
895 }
896 for image in target["image"]:
897 image["vim_info"] = []
898 for flavor in target["flavor"]:
899 flavor["vim_info"] = []
900
901 ns_params = db_nslcmop.get("operationParams")
902 ssh_keys = []
903 if ns_params.get("ssh_keys"):
904 ssh_keys += ns_params.get("ssh_keys")
905 if n2vc_key_list:
906 ssh_keys += n2vc_key_list
907
908 cp2target = {}
909 for vld_index, vld in enumerate(nsd.get("vld")):
910 target_vld = {"id": vld["id"],
911 "name": vld["name"],
912 "mgmt-network": vld.get("mgmt-network", False),
913 "type": vld.get("type"),
914 "vim_info": [{"vim-network-name": vld.get("vim-network-name"),
915 "vim_account_id": ns_params["vimAccountId"]}],
916 }
917 for cp in vld["vnfd-connection-point-ref"]:
918 cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \
919 "nsrs:{}:vld.{}".format(nsr_id, vld_index)
920 target["ns"]["vld"].append(target_vld)
921 for vnfr in db_vnfrs.values():
922 vnfd = db_vnfds_ref[vnfr["vnfd-ref"]]
923 target_vnf = deepcopy(vnfr)
924 for vld in target_vnf.get("vld", ()):
925 # check if connected to a ns.vld
926 vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if
927 cp.get("internal-vld-ref") == vld["id"]), None)
928 if vnf_cp:
929 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
930 if cp2target.get(ns_cp):
931 vld["target"] = cp2target[ns_cp]
932 vld["vim_info"] = [{"vim-network-name": vld.get("vim-network-name"),
933 "vim_account_id": vnfr["vim-account-id"]}]
934
935 for vdur in target_vnf.get("vdur", ()):
936 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
937 vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"])
938 # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
939
940 if ssh_keys:
941 if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")):
942 vdur["ssh-keys"] = ssh_keys
943 vdur["ssh-access-required"] = True
944 elif deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
945 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
946 vdur["ssh-keys"] = ssh_keys
947 vdur["ssh-access-required"] = True
948
949 # cloud-init
950 if vdud.get("cloud-init-file"):
951 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
952 elif vdud.get("cloud-init"):
953 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index)
954
955 # flavor
956 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
957 if not next((vi for vi in ns_flavor["vim_info"] if
958 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
959 ns_flavor["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
960 # image
961 ns_image = target["image"][int(vdur["ns-image-id"])]
962 if not next((vi for vi in ns_image["vim_info"] if
963 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
964 ns_image["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
965
966 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
967 target["vnf"].append(target_vnf)
968
969 desc = await self.RO.deploy(nsr_id, target)
970 action_id = desc["action_id"]
971 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
972
973 # Updating NSR
974 db_nsr_update = {
975 "_admin.deployed.RO.operational-status": "running",
976 "detailed-status": " ".join(stage)
977 }
978 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
979 self.update_db_2("nsrs", nsr_id, db_nsr_update)
980 self._write_op_status(nslcmop_id, stage)
981 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
982 return
983
984 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_time, timeout, stage):
985 detailed_status_old = None
986 db_nsr_update = {}
987 while time() <= start_time + timeout:
988 desc_status = await self.RO.status(nsr_id, action_id)
989 if desc_status["status"] == "FAILED":
990 raise NgRoException(desc_status["details"])
991 elif desc_status["status"] == "BUILD":
992 stage[2] = "VIM: ({})".format(desc_status["details"])
993 elif desc_status["status"] == "DONE":
994 stage[2] = "Deployed at VIM"
995 break
996 else:
997 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
998 if stage[2] != detailed_status_old:
999 detailed_status_old = stage[2]
1000 db_nsr_update["detailed-status"] = " ".join(stage)
1001 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1002 self._write_op_status(nslcmop_id, stage)
1003 await asyncio.sleep(5, loop=self.loop)
1004 else: # timeout_ns_deploy
1005 raise NgRoException("Timeout waiting ns to deploy")
1006
1007 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
1008 db_nsr_update = {}
1009 failed_detail = []
1010 action_id = None
1011 start_deploy = time()
1012 try:
1013 target = {
1014 "ns": {"vld": []},
1015 "vnf": [],
1016 "image": [],
1017 "flavor": [],
1018 }
1019 desc = await self.RO.deploy(nsr_id, target)
1020 action_id = desc["action_id"]
1021 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1022 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1023 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
1024
1025 # wait until done
1026 delete_timeout = 20 * 60 # 20 minutes
1027 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
1028
1029 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1030 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1031 # delete all nsr
1032 await self.RO.delete(nsr_id)
1033 except Exception as e:
1034 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1035 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1036 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1037 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1038 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
1039 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1040 failed_detail.append("delete conflict: {}".format(e))
1041 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
1042 else:
1043 failed_detail.append("delete error: {}".format(e))
1044 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
1045
1046 if failed_detail:
1047 stage[2] = "Error deleting from VIM"
1048 else:
1049 stage[2] = "Deleted from VIM"
1050 db_nsr_update["detailed-status"] = " ".join(stage)
1051 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1052 self._write_op_status(nslcmop_id, stage)
1053
1054 if failed_detail:
1055 raise LcmException("; ".join(failed_detail))
1056 return
1057
1058 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
1059 n2vc_key_list, stage):
1060 """
1061 Instantiate at RO
1062 :param logging_text: preffix text to use at logging
1063 :param nsr_id: nsr identity
1064 :param nsd: database content of ns descriptor
1065 :param db_nsr: database content of ns record
1066 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1067 :param db_vnfrs:
1068 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1069 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1070 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1071 :return: None or exception
1072 """
1073 try:
1074 db_nsr_update = {}
1075 RO_descriptor_number = 0 # number of descriptors created at RO
1076 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
1077 nslcmop_id = db_nslcmop["_id"]
1078 start_deploy = time()
1079 ns_params = db_nslcmop.get("operationParams")
1080 if ns_params and ns_params.get("timeout_ns_deploy"):
1081 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1082 else:
1083 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1084
1085 # Check for and optionally request placement optimization. Database will be updated if placement activated
1086 stage[2] = "Waiting for Placement."
1087 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1088 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1089 for vnfr in db_vnfrs.values():
1090 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1091 break
1092 else:
1093 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1094
1095 if self.ng_ro:
1096 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
1097 db_vnfds_ref, n2vc_key_list, stage, start_deploy,
1098 timeout_ns_deploy)
1099 # deploy RO
1100 # get vnfds, instantiate at RO
1101 for c_vnf in nsd.get("constituent-vnfd", ()):
1102 member_vnf_index = c_vnf["member-vnf-index"]
1103 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
1104 vnfd_ref = vnfd["id"]
1105
1106 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
1107 db_nsr_update["detailed-status"] = " ".join(stage)
1108 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1109 self._write_op_status(nslcmop_id, stage)
1110
1111 # self.logger.debug(logging_text + stage[2])
1112 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
1113 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
1114 RO_descriptor_number += 1
1115
1116 # look position at deployed.RO.vnfd if not present it will be appended at the end
1117 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
1118 if vnf_deployed["member-vnf-index"] == member_vnf_index:
1119 break
1120 else:
1121 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
1122 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1123
1124 # look if present
1125 RO_update = {"member-vnf-index": member_vnf_index}
1126 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
1127 if vnfd_list:
1128 RO_update["id"] = vnfd_list[0]["uuid"]
1129 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1130 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
1131 else:
1132 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
1133 get("additionalParamsForVnf"), nsr_id)
1134 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
1135 RO_update["id"] = desc["uuid"]
1136 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1137 vnfd_ref, member_vnf_index, desc["uuid"]))
1138 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
1139 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
1140
1141 # create nsd at RO
1142 nsd_ref = nsd["id"]
1143
1144 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1145 db_nsr_update["detailed-status"] = " ".join(stage)
1146 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1147 self._write_op_status(nslcmop_id, stage)
1148
1149 # self.logger.debug(logging_text + stage[2])
1150 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
1151 RO_descriptor_number += 1
1152 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
1153 if nsd_list:
1154 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
1155 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
1156 nsd_ref, RO_nsd_uuid))
1157 else:
1158 nsd_RO = deepcopy(nsd)
1159 nsd_RO["id"] = RO_osm_nsd_id
1160 nsd_RO.pop("_id", None)
1161 nsd_RO.pop("_admin", None)
1162 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
1163 member_vnf_index = c_vnf["member-vnf-index"]
1164 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1165 for c_vld in nsd_RO.get("vld", ()):
1166 for cp in c_vld.get("vnfd-connection-point-ref", ()):
1167 member_vnf_index = cp["member-vnf-index-ref"]
1168 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1169
1170 desc = await self.RO.create("nsd", descriptor=nsd_RO)
1171 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1172 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
1173 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
1174 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1175
1176 # Crate ns at RO
1177 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1178 db_nsr_update["detailed-status"] = " ".join(stage)
1179 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1180 self._write_op_status(nslcmop_id, stage)
1181
1182 # if present use it unless in error status
1183 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
1184 if RO_nsr_id:
1185 try:
1186 stage[2] = "Looking for existing ns at RO"
1187 db_nsr_update["detailed-status"] = " ".join(stage)
1188 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1189 self._write_op_status(nslcmop_id, stage)
1190 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1191 desc = await self.RO.show("ns", RO_nsr_id)
1192
1193 except ROclient.ROClientException as e:
1194 if e.http_code != HTTPStatus.NOT_FOUND:
1195 raise
1196 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1197 if RO_nsr_id:
1198 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1199 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1200 if ns_status == "ERROR":
1201 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
1202 self.logger.debug(logging_text + stage[2])
1203 await self.RO.delete("ns", RO_nsr_id)
1204 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1205 if not RO_nsr_id:
1206 stage[2] = "Checking dependencies"
1207 db_nsr_update["detailed-status"] = " ".join(stage)
1208 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1209 self._write_op_status(nslcmop_id, stage)
1210 # self.logger.debug(logging_text + stage[2])
1211
1212 # check if VIM is creating and wait look if previous tasks in process
1213 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
1214 if task_dependency:
1215 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
1216 self.logger.debug(logging_text + stage[2])
1217 await asyncio.wait(task_dependency, timeout=3600)
1218 if ns_params.get("vnf"):
1219 for vnf in ns_params["vnf"]:
1220 if "vimAccountId" in vnf:
1221 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
1222 vnf["vimAccountId"])
1223 if task_dependency:
1224 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
1225 self.logger.debug(logging_text + stage[2])
1226 await asyncio.wait(task_dependency, timeout=3600)
1227
1228 stage[2] = "Checking instantiation parameters."
1229 RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, db_vnfrs, n2vc_key_list)
1230 stage[2] = "Deploying ns at VIM."
1231 db_nsr_update["detailed-status"] = " ".join(stage)
1232 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1233 self._write_op_status(nslcmop_id, stage)
1234
1235 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
1236 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
1237 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1238 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
1239 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
1240
1241 # wait until NS is ready
1242 stage[2] = "Waiting VIM to deploy ns."
1243 db_nsr_update["detailed-status"] = " ".join(stage)
1244 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1245 self._write_op_status(nslcmop_id, stage)
1246 detailed_status_old = None
1247 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1248
1249 old_desc = None
1250 while time() <= start_deploy + timeout_ns_deploy:
1251 desc = await self.RO.show("ns", RO_nsr_id)
1252
1253 # deploymentStatus
1254 if desc != old_desc:
1255 # desc has changed => update db
1256 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
1257 old_desc = desc
1258
1259 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1260 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1261 if ns_status == "ERROR":
1262 raise ROclient.ROClientException(ns_status_info)
1263 elif ns_status == "BUILD":
1264 stage[2] = "VIM: ({})".format(ns_status_info)
1265 elif ns_status == "ACTIVE":
1266 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1267 try:
1268 self.ns_update_vnfr(db_vnfrs, desc)
1269 break
1270 except LcmExceptionNoMgmtIP:
1271 pass
1272 else:
1273 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1274 if stage[2] != detailed_status_old:
1275 detailed_status_old = stage[2]
1276 db_nsr_update["detailed-status"] = " ".join(stage)
1277 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1278 self._write_op_status(nslcmop_id, stage)
1279 await asyncio.sleep(5, loop=self.loop)
1280 else: # timeout_ns_deploy
1281 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1282
1283 # Updating NSR
1284 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
1285
1286 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
1287 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1288 stage[2] = "Deployed at VIM"
1289 db_nsr_update["detailed-status"] = " ".join(stage)
1290 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1291 self._write_op_status(nslcmop_id, stage)
1292 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1293 # self.logger.debug(logging_text + "Deployed at VIM")
1294 except (ROclient.ROClientException, LcmException, DbException, NgRoException) as e:
1295 stage[2] = "ERROR deploying at VIM"
1296 self.set_vnfr_at_error(db_vnfrs, str(e))
1297 raise
1298
1299 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1300 """
1301 Wait for kdu to be up, get ip address
1302 :param logging_text: prefix use for logging
1303 :param nsr_id:
1304 :param vnfr_id:
1305 :param kdu_name:
1306 :return: IP address
1307 """
1308
1309 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1310 nb_tries = 0
1311
1312 while nb_tries < 360:
1313 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1314 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
1315 if not kdur:
1316 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
1317 if kdur.get("status"):
1318 if kdur["status"] in ("READY", "ENABLED"):
1319 return kdur.get("ip-address")
1320 else:
1321 raise LcmException("target KDU={} is in error state".format(kdu_name))
1322
1323 await asyncio.sleep(10, loop=self.loop)
1324 nb_tries += 1
1325 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1326
1327 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1328 """
1329 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1330 :param logging_text: prefix use for logging
1331 :param nsr_id:
1332 :param vnfr_id:
1333 :param vdu_id:
1334 :param vdu_index:
1335 :param pub_key: public ssh key to inject, None to skip
1336 :param user: user to apply the public ssh key
1337 :return: IP address
1338 """
1339
1340 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1341 ro_nsr_id = None
1342 ip_address = None
1343 nb_tries = 0
1344 target_vdu_id = None
1345 ro_retries = 0
1346
1347 while True:
1348
1349 ro_retries += 1
1350 if ro_retries >= 360: # 1 hour
1351 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1352
1353 await asyncio.sleep(10, loop=self.loop)
1354
1355 # get ip address
1356 if not target_vdu_id:
1357 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1358
1359 if not vdu_id: # for the VNF case
1360 if db_vnfr.get("status") == "ERROR":
1361 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1362 ip_address = db_vnfr.get("ip-address")
1363 if not ip_address:
1364 continue
1365 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1366 else: # VDU case
1367 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1368 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1369
1370 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1371 vdur = db_vnfr["vdur"][0]
1372 if not vdur:
1373 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1374 vdu_index))
1375
1376 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1377 ip_address = vdur.get("ip-address")
1378 if not ip_address:
1379 continue
1380 target_vdu_id = vdur["vdu-id-ref"]
1381 elif vdur.get("status") == "ERROR":
1382 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1383
1384 if not target_vdu_id:
1385 continue
1386
1387 # inject public key into machine
1388 if pub_key and user:
1389 # wait until NS is deployed at RO
1390 if not ro_nsr_id:
1391 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1392 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1393 if not ro_nsr_id:
1394 continue
1395
1396 # self.logger.debug(logging_text + "Inserting RO key")
1397 if vdur.get("pdu-type"):
1398 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1399 return ip_address
1400 try:
1401 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1402 if self.ng_ro:
1403 target = {"action": "inject_ssh_key", "key": pub_key, "user": user,
1404 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdu_id}]}],
1405 }
1406 await self.RO.deploy(nsr_id, target)
1407 else:
1408 result_dict = await self.RO.create_action(
1409 item="ns",
1410 item_id_name=ro_nsr_id,
1411 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1412 )
1413 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1414 if not result_dict or not isinstance(result_dict, dict):
1415 raise LcmException("Unknown response from RO when injecting key")
1416 for result in result_dict.values():
1417 if result.get("vim_result") == 200:
1418 break
1419 else:
1420 raise ROclient.ROClientException("error injecting key: {}".format(
1421 result.get("description")))
1422 break
1423 except NgRoException as e:
1424 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1425 except ROclient.ROClientException as e:
1426 if not nb_tries:
1427 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1428 format(e, 20*10))
1429 nb_tries += 1
1430 if nb_tries >= 20:
1431 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1432 else:
1433 break
1434
1435 return ip_address
1436
1437 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1438 """
1439 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1440 """
1441 my_vca = vca_deployed_list[vca_index]
1442 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1443 # vdu or kdu: no dependencies
1444 return
1445 timeout = 300
1446 while timeout >= 0:
1447 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1448 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1449 configuration_status_list = db_nsr["configurationStatus"]
1450 for index, vca_deployed in enumerate(configuration_status_list):
1451 if index == vca_index:
1452 # myself
1453 continue
1454 if not my_vca.get("member-vnf-index") or \
1455 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1456 internal_status = configuration_status_list[index].get("status")
1457 if internal_status == 'READY':
1458 continue
1459 elif internal_status == 'BROKEN':
1460 raise LcmException("Configuration aborted because dependent charm/s has failed")
1461 else:
1462 break
1463 else:
1464 # no dependencies, return
1465 return
1466 await asyncio.sleep(10)
1467 timeout -= 1
1468
1469 raise LcmException("Configuration aborted because dependent charm/s timeout")
1470
1471 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1472 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1473 ee_config_descriptor):
1474 nsr_id = db_nsr["_id"]
1475 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1476 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1477 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1478 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1479 db_dict = {
1480 'collection': 'nsrs',
1481 'filter': {'_id': nsr_id},
1482 'path': db_update_entry
1483 }
1484 step = ""
1485 try:
1486
1487 element_type = 'NS'
1488 element_under_configuration = nsr_id
1489
1490 vnfr_id = None
1491 if db_vnfr:
1492 vnfr_id = db_vnfr["_id"]
1493 osm_config["osm"]["vnf_id"] = vnfr_id
1494
1495 namespace = "{nsi}.{ns}".format(
1496 nsi=nsi_id if nsi_id else "",
1497 ns=nsr_id)
1498
1499 if vnfr_id:
1500 element_type = 'VNF'
1501 element_under_configuration = vnfr_id
1502 namespace += ".{}".format(vnfr_id)
1503 if vdu_id:
1504 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1505 element_type = 'VDU'
1506 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1507 osm_config["osm"]["vdu_id"] = vdu_id
1508 elif kdu_name:
1509 namespace += ".{}".format(kdu_name)
1510 element_type = 'KDU'
1511 element_under_configuration = kdu_name
1512 osm_config["osm"]["kdu_name"] = kdu_name
1513
1514 # Get artifact path
1515 artifact_path = "{}/{}/{}/{}".format(
1516 base_folder["folder"],
1517 base_folder["pkg-dir"],
1518 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1519 vca_name
1520 )
1521 # get initial_config_primitive_list that applies to this element
1522 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1523
1524 # add config if not present for NS charm
1525 ee_descriptor_id = ee_config_descriptor.get("id")
1526 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1527 vca_deployed, ee_descriptor_id)
1528
1529 # n2vc_redesign STEP 3.1
1530 # find old ee_id if exists
1531 ee_id = vca_deployed.get("ee_id")
1532
1533 # create or register execution environment in VCA
1534 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"):
1535
1536 self._write_configuration_status(
1537 nsr_id=nsr_id,
1538 vca_index=vca_index,
1539 status='CREATING',
1540 element_under_configuration=element_under_configuration,
1541 element_type=element_type
1542 )
1543
1544 step = "create execution environment"
1545 self.logger.debug(logging_text + step)
1546 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1547 namespace=namespace,
1548 reuse_ee_id=ee_id,
1549 db_dict=db_dict,
1550 config=osm_config,
1551 artifact_path=artifact_path,
1552 vca_type=vca_type)
1553
1554 elif vca_type == "native_charm":
1555 step = "Waiting to VM being up and getting IP address"
1556 self.logger.debug(logging_text + step)
1557 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1558 user=None, pub_key=None)
1559 credentials = {"hostname": rw_mgmt_ip}
1560 # get username
1561 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1562 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1563 # merged. Meanwhile let's get username from initial-config-primitive
1564 if not username and initial_config_primitive_list:
1565 for config_primitive in initial_config_primitive_list:
1566 for param in config_primitive.get("parameter", ()):
1567 if param["name"] == "ssh-username":
1568 username = param["value"]
1569 break
1570 if not username:
1571 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1572 "'config-access.ssh-access.default-user'")
1573 credentials["username"] = username
1574 # n2vc_redesign STEP 3.2
1575
1576 self._write_configuration_status(
1577 nsr_id=nsr_id,
1578 vca_index=vca_index,
1579 status='REGISTERING',
1580 element_under_configuration=element_under_configuration,
1581 element_type=element_type
1582 )
1583
1584 step = "register execution environment {}".format(credentials)
1585 self.logger.debug(logging_text + step)
1586 ee_id = await self.vca_map[vca_type].register_execution_environment(
1587 credentials=credentials, namespace=namespace, db_dict=db_dict)
1588
1589 # for compatibility with MON/POL modules, the need model and application name at database
1590 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1591 ee_id_parts = ee_id.split('.')
1592 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1593 if len(ee_id_parts) >= 2:
1594 model_name = ee_id_parts[0]
1595 application_name = ee_id_parts[1]
1596 db_nsr_update[db_update_entry + "model"] = model_name
1597 db_nsr_update[db_update_entry + "application"] = application_name
1598
1599 # n2vc_redesign STEP 3.3
1600 step = "Install configuration Software"
1601
1602 self._write_configuration_status(
1603 nsr_id=nsr_id,
1604 vca_index=vca_index,
1605 status='INSTALLING SW',
1606 element_under_configuration=element_under_configuration,
1607 element_type=element_type,
1608 other_update=db_nsr_update
1609 )
1610
1611 # TODO check if already done
1612 self.logger.debug(logging_text + step)
1613 config = None
1614 if vca_type == "native_charm":
1615 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1616 if config_primitive:
1617 config = self._map_primitive_params(
1618 config_primitive,
1619 {},
1620 deploy_params
1621 )
1622 num_units = 1
1623 if vca_type == "lxc_proxy_charm":
1624 if element_type == "NS":
1625 num_units = db_nsr.get("config-units") or 1
1626 elif element_type == "VNF":
1627 num_units = db_vnfr.get("config-units") or 1
1628 elif element_type == "VDU":
1629 for v in db_vnfr["vdur"]:
1630 if vdu_id == v["vdu-id-ref"]:
1631 num_units = v.get("config-units") or 1
1632 break
1633
1634 await self.vca_map[vca_type].install_configuration_sw(
1635 ee_id=ee_id,
1636 artifact_path=artifact_path,
1637 db_dict=db_dict,
1638 config=config,
1639 num_units=num_units,
1640 vca_type=vca_type
1641 )
1642
1643 # write in db flag of configuration_sw already installed
1644 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1645
1646 # add relations for this VCA (wait for other peers related with this VCA)
1647 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1648 vca_index=vca_index, vca_type=vca_type)
1649
1650 # if SSH access is required, then get execution environment SSH public
1651 # if native charm we have waited already to VM be UP
1652 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm"):
1653 pub_key = None
1654 user = None
1655 # self.logger.debug("get ssh key block")
1656 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1657 # self.logger.debug("ssh key needed")
1658 # Needed to inject a ssh key
1659 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1660 step = "Install configuration Software, getting public ssh key"
1661 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1662
1663 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1664 else:
1665 # self.logger.debug("no need to get ssh key")
1666 step = "Waiting to VM being up and getting IP address"
1667 self.logger.debug(logging_text + step)
1668
1669 # n2vc_redesign STEP 5.1
1670 # wait for RO (ip-address) Insert pub_key into VM
1671 if vnfr_id:
1672 if kdu_name:
1673 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1674 else:
1675 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1676 vdu_index, user=user, pub_key=pub_key)
1677 else:
1678 rw_mgmt_ip = None # This is for a NS configuration
1679
1680 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1681
1682 # store rw_mgmt_ip in deploy params for later replacement
1683 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1684
1685 # n2vc_redesign STEP 6 Execute initial config primitive
1686 step = 'execute initial config primitive'
1687
1688 # wait for dependent primitives execution (NS -> VNF -> VDU)
1689 if initial_config_primitive_list:
1690 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1691
1692 # stage, in function of element type: vdu, kdu, vnf or ns
1693 my_vca = vca_deployed_list[vca_index]
1694 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1695 # VDU or KDU
1696 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1697 elif my_vca.get("member-vnf-index"):
1698 # VNF
1699 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1700 else:
1701 # NS
1702 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1703
1704 self._write_configuration_status(
1705 nsr_id=nsr_id,
1706 vca_index=vca_index,
1707 status='EXECUTING PRIMITIVE'
1708 )
1709
1710 self._write_op_status(
1711 op_id=nslcmop_id,
1712 stage=stage
1713 )
1714
1715 check_if_terminated_needed = True
1716 for initial_config_primitive in initial_config_primitive_list:
1717 # adding information on the vca_deployed if it is a NS execution environment
1718 if not vca_deployed["member-vnf-index"]:
1719 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1720 # TODO check if already done
1721 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1722
1723 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1724 self.logger.debug(logging_text + step)
1725 await self.vca_map[vca_type].exec_primitive(
1726 ee_id=ee_id,
1727 primitive_name=initial_config_primitive["name"],
1728 params_dict=primitive_params_,
1729 db_dict=db_dict
1730 )
1731 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1732 if check_if_terminated_needed:
1733 if config_descriptor.get('terminate-config-primitive'):
1734 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1735 check_if_terminated_needed = False
1736
1737 # TODO register in database that primitive is done
1738
1739 # STEP 7 Configure metrics
1740 if vca_type == "helm":
1741 prometheus_jobs = await self.add_prometheus_metrics(
1742 ee_id=ee_id,
1743 artifact_path=artifact_path,
1744 ee_config_descriptor=ee_config_descriptor,
1745 vnfr_id=vnfr_id,
1746 nsr_id=nsr_id,
1747 target_ip=rw_mgmt_ip,
1748 )
1749 if prometheus_jobs:
1750 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1751
1752 step = "instantiated at VCA"
1753 self.logger.debug(logging_text + step)
1754
1755 self._write_configuration_status(
1756 nsr_id=nsr_id,
1757 vca_index=vca_index,
1758 status='READY'
1759 )
1760
1761 except Exception as e: # TODO not use Exception but N2VC exception
1762 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1763 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1764 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1765 self._write_configuration_status(
1766 nsr_id=nsr_id,
1767 vca_index=vca_index,
1768 status='BROKEN'
1769 )
1770 raise LcmException("{} {}".format(step, e)) from e
1771
1772 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1773 error_description: str = None, error_detail: str = None, other_update: dict = None):
1774 """
1775 Update db_nsr fields.
1776 :param nsr_id:
1777 :param ns_state:
1778 :param current_operation:
1779 :param current_operation_id:
1780 :param error_description:
1781 :param error_detail:
1782 :param other_update: Other required changes at database if provided, will be cleared
1783 :return:
1784 """
1785 try:
1786 db_dict = other_update or {}
1787 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1788 db_dict["_admin.current-operation"] = current_operation_id
1789 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1790 db_dict["currentOperation"] = current_operation
1791 db_dict["currentOperationID"] = current_operation_id
1792 db_dict["errorDescription"] = error_description
1793 db_dict["errorDetail"] = error_detail
1794
1795 if ns_state:
1796 db_dict["nsState"] = ns_state
1797 self.update_db_2("nsrs", nsr_id, db_dict)
1798 except DbException as e:
1799 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1800
1801 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1802 operation_state: str = None, other_update: dict = None):
1803 try:
1804 db_dict = other_update or {}
1805 db_dict['queuePosition'] = queuePosition
1806 if isinstance(stage, list):
1807 db_dict['stage'] = stage[0]
1808 db_dict['detailed-status'] = " ".join(stage)
1809 elif stage is not None:
1810 db_dict['stage'] = str(stage)
1811
1812 if error_message is not None:
1813 db_dict['errorMessage'] = error_message
1814 if operation_state is not None:
1815 db_dict['operationState'] = operation_state
1816 db_dict["statusEnteredTime"] = time()
1817 self.update_db_2("nslcmops", op_id, db_dict)
1818 except DbException as e:
1819 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1820
1821 def _write_all_config_status(self, db_nsr: dict, status: str):
1822 try:
1823 nsr_id = db_nsr["_id"]
1824 # configurationStatus
1825 config_status = db_nsr.get('configurationStatus')
1826 if config_status:
1827 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1828 enumerate(config_status) if v}
1829 # update status
1830 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1831
1832 except DbException as e:
1833 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1834
1835 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1836 element_under_configuration: str = None, element_type: str = None,
1837 other_update: dict = None):
1838
1839 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1840 # .format(vca_index, status))
1841
1842 try:
1843 db_path = 'configurationStatus.{}.'.format(vca_index)
1844 db_dict = other_update or {}
1845 if status:
1846 db_dict[db_path + 'status'] = status
1847 if element_under_configuration:
1848 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1849 if element_type:
1850 db_dict[db_path + 'elementType'] = element_type
1851 self.update_db_2("nsrs", nsr_id, db_dict)
1852 except DbException as e:
1853 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1854 .format(status, nsr_id, vca_index, e))
1855
1856 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1857 """
1858 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1859 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1860 Database is used because the result can be obtained from a different LCM worker in case of HA.
1861 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1862 :param db_nslcmop: database content of nslcmop
1863 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1864 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1865 computed 'vim-account-id'
1866 """
1867 modified = False
1868 nslcmop_id = db_nslcmop['_id']
1869 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1870 if placement_engine == "PLA":
1871 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1872 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1873 db_poll_interval = 5
1874 wait = db_poll_interval * 10
1875 pla_result = None
1876 while not pla_result and wait >= 0:
1877 await asyncio.sleep(db_poll_interval)
1878 wait -= db_poll_interval
1879 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1880 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1881
1882 if not pla_result:
1883 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1884
1885 for pla_vnf in pla_result['vnf']:
1886 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1887 if not pla_vnf.get('vimAccountId') or not vnfr:
1888 continue
1889 modified = True
1890 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1891 # Modifies db_vnfrs
1892 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1893 return modified
1894
1895 def update_nsrs_with_pla_result(self, params):
1896 try:
1897 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1898 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1899 except Exception as e:
1900 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1901
1902 async def instantiate(self, nsr_id, nslcmop_id):
1903 """
1904
1905 :param nsr_id: ns instance to deploy
1906 :param nslcmop_id: operation to run
1907 :return:
1908 """
1909
1910 # Try to lock HA task here
1911 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1912 if not task_is_locked_by_me:
1913 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1914 return
1915
1916 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1917 self.logger.debug(logging_text + "Enter")
1918
1919 # get all needed from database
1920
1921 # database nsrs record
1922 db_nsr = None
1923
1924 # database nslcmops record
1925 db_nslcmop = None
1926
1927 # update operation on nsrs
1928 db_nsr_update = {}
1929 # update operation on nslcmops
1930 db_nslcmop_update = {}
1931
1932 nslcmop_operation_state = None
1933 db_vnfrs = {} # vnf's info indexed by member-index
1934 # n2vc_info = {}
1935 tasks_dict_info = {} # from task to info text
1936 exc = None
1937 error_list = []
1938 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1939 # ^ stage, step, VIM progress
1940 try:
1941 # wait for any previous tasks in process
1942 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1943
1944 stage[1] = "Sync filesystem from database."
1945 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1946
1947 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1948 stage[1] = "Reading from database."
1949 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1950 db_nsr_update["detailed-status"] = "creating"
1951 db_nsr_update["operational-status"] = "init"
1952 self._write_ns_status(
1953 nsr_id=nsr_id,
1954 ns_state="BUILDING",
1955 current_operation="INSTANTIATING",
1956 current_operation_id=nslcmop_id,
1957 other_update=db_nsr_update
1958 )
1959 self._write_op_status(
1960 op_id=nslcmop_id,
1961 stage=stage,
1962 queuePosition=0
1963 )
1964
1965 # read from db: operation
1966 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1967 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1968 ns_params = db_nslcmop.get("operationParams")
1969 if ns_params and ns_params.get("timeout_ns_deploy"):
1970 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1971 else:
1972 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1973
1974 # read from db: ns
1975 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1976 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1977 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1978 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1979 db_nsr["nsd"] = nsd
1980 # nsr_name = db_nsr["name"] # TODO short-name??
1981
1982 # read from db: vnf's of this ns
1983 stage[1] = "Getting vnfrs from db."
1984 self.logger.debug(logging_text + stage[1])
1985 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1986
1987 # read from db: vnfd's for every vnf
1988 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1989 db_vnfds = {} # every vnfd data indexed by vnf id
1990 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1991
1992 # for each vnf in ns, read vnfd
1993 for vnfr in db_vnfrs_list:
1994 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1995 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1996 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1997
1998 # if we haven't this vnfd, read it from db
1999 if vnfd_id not in db_vnfds:
2000 # read from db
2001 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
2002 self.logger.debug(logging_text + stage[1])
2003 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2004
2005 # store vnfd
2006 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
2007 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
2008 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
2009
2010 # Get or generates the _admin.deployed.VCA list
2011 vca_deployed_list = None
2012 if db_nsr["_admin"].get("deployed"):
2013 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2014 if vca_deployed_list is None:
2015 vca_deployed_list = []
2016 configuration_status_list = []
2017 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2018 db_nsr_update["configurationStatus"] = configuration_status_list
2019 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2020 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2021 elif isinstance(vca_deployed_list, dict):
2022 # maintain backward compatibility. Change a dict to list at database
2023 vca_deployed_list = list(vca_deployed_list.values())
2024 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2025 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2026
2027 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
2028 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2029 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2030
2031 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2032 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2033 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2034 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
2035
2036 # n2vc_redesign STEP 2 Deploy Network Scenario
2037 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
2038 self._write_op_status(
2039 op_id=nslcmop_id,
2040 stage=stage
2041 )
2042
2043 stage[1] = "Deploying KDUs."
2044 # self.logger.debug(logging_text + "Before deploy_kdus")
2045 # Call to deploy_kdus in case exists the "vdu:kdu" param
2046 await self.deploy_kdus(
2047 logging_text=logging_text,
2048 nsr_id=nsr_id,
2049 nslcmop_id=nslcmop_id,
2050 db_vnfrs=db_vnfrs,
2051 db_vnfds=db_vnfds,
2052 task_instantiation_info=tasks_dict_info,
2053 )
2054
2055 stage[1] = "Getting VCA public key."
2056 # n2vc_redesign STEP 1 Get VCA public ssh-key
2057 # feature 1429. Add n2vc public key to needed VMs
2058 n2vc_key = self.n2vc.get_public_key()
2059 n2vc_key_list = [n2vc_key]
2060 if self.vca_config.get("public_key"):
2061 n2vc_key_list.append(self.vca_config["public_key"])
2062
2063 stage[1] = "Deploying NS at VIM."
2064 task_ro = asyncio.ensure_future(
2065 self.instantiate_RO(
2066 logging_text=logging_text,
2067 nsr_id=nsr_id,
2068 nsd=nsd,
2069 db_nsr=db_nsr,
2070 db_nslcmop=db_nslcmop,
2071 db_vnfrs=db_vnfrs,
2072 db_vnfds_ref=db_vnfds_ref,
2073 n2vc_key_list=n2vc_key_list,
2074 stage=stage
2075 )
2076 )
2077 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2078 tasks_dict_info[task_ro] = "Deploying at VIM"
2079
2080 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2081 stage[1] = "Deploying Execution Environments."
2082 self.logger.debug(logging_text + stage[1])
2083
2084 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2085 # get_iterable() returns a value from a dict or empty tuple if key does not exist
2086 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
2087 vnfd_id = c_vnf["vnfd-id-ref"]
2088 vnfd = db_vnfds_ref[vnfd_id]
2089 member_vnf_index = str(c_vnf["member-vnf-index"])
2090 db_vnfr = db_vnfrs[member_vnf_index]
2091 base_folder = vnfd["_admin"]["storage"]
2092 vdu_id = None
2093 vdu_index = 0
2094 vdu_name = None
2095 kdu_name = None
2096
2097 # Get additional parameters
2098 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2099 if db_vnfr.get("additionalParamsForVnf"):
2100 deploy_params.update(self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy()))
2101
2102 descriptor_config = vnfd.get("vnf-configuration")
2103 if descriptor_config:
2104 self._deploy_n2vc(
2105 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
2106 db_nsr=db_nsr,
2107 db_vnfr=db_vnfr,
2108 nslcmop_id=nslcmop_id,
2109 nsr_id=nsr_id,
2110 nsi_id=nsi_id,
2111 vnfd_id=vnfd_id,
2112 vdu_id=vdu_id,
2113 kdu_name=kdu_name,
2114 member_vnf_index=member_vnf_index,
2115 vdu_index=vdu_index,
2116 vdu_name=vdu_name,
2117 deploy_params=deploy_params,
2118 descriptor_config=descriptor_config,
2119 base_folder=base_folder,
2120 task_instantiation_info=tasks_dict_info,
2121 stage=stage
2122 )
2123
2124 # Deploy charms for each VDU that supports one.
2125 for vdud in get_iterable(vnfd, 'vdu'):
2126 vdu_id = vdud["id"]
2127 descriptor_config = vdud.get('vdu-configuration')
2128 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2129 if vdur.get("additionalParams"):
2130 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
2131 else:
2132 deploy_params_vdu = deploy_params
2133 deploy_params_vdu["OSM"] = self._get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
2134 if descriptor_config:
2135 vdu_name = None
2136 kdu_name = None
2137 for vdu_index in range(int(vdud.get("count", 1))):
2138 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2139 self._deploy_n2vc(
2140 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2141 member_vnf_index, vdu_id, vdu_index),
2142 db_nsr=db_nsr,
2143 db_vnfr=db_vnfr,
2144 nslcmop_id=nslcmop_id,
2145 nsr_id=nsr_id,
2146 nsi_id=nsi_id,
2147 vnfd_id=vnfd_id,
2148 vdu_id=vdu_id,
2149 kdu_name=kdu_name,
2150 member_vnf_index=member_vnf_index,
2151 vdu_index=vdu_index,
2152 vdu_name=vdu_name,
2153 deploy_params=deploy_params_vdu,
2154 descriptor_config=descriptor_config,
2155 base_folder=base_folder,
2156 task_instantiation_info=tasks_dict_info,
2157 stage=stage
2158 )
2159 for kdud in get_iterable(vnfd, 'kdu'):
2160 kdu_name = kdud["name"]
2161 descriptor_config = kdud.get('kdu-configuration')
2162 if descriptor_config:
2163 vdu_id = None
2164 vdu_index = 0
2165 vdu_name = None
2166 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
2167 deploy_params_kdu = {"OSM": self._get_osm_params(db_vnfr)}
2168 if kdur.get("additionalParams"):
2169 deploy_params_kdu = self._format_additional_params(kdur["additionalParams"])
2170
2171 self._deploy_n2vc(
2172 logging_text=logging_text,
2173 db_nsr=db_nsr,
2174 db_vnfr=db_vnfr,
2175 nslcmop_id=nslcmop_id,
2176 nsr_id=nsr_id,
2177 nsi_id=nsi_id,
2178 vnfd_id=vnfd_id,
2179 vdu_id=vdu_id,
2180 kdu_name=kdu_name,
2181 member_vnf_index=member_vnf_index,
2182 vdu_index=vdu_index,
2183 vdu_name=vdu_name,
2184 deploy_params=deploy_params_kdu,
2185 descriptor_config=descriptor_config,
2186 base_folder=base_folder,
2187 task_instantiation_info=tasks_dict_info,
2188 stage=stage
2189 )
2190
2191 # Check if this NS has a charm configuration
2192 descriptor_config = nsd.get("ns-configuration")
2193 if descriptor_config and descriptor_config.get("juju"):
2194 vnfd_id = None
2195 db_vnfr = None
2196 member_vnf_index = None
2197 vdu_id = None
2198 kdu_name = None
2199 vdu_index = 0
2200 vdu_name = None
2201
2202 # Get additional parameters
2203 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2204 if db_nsr.get("additionalParamsForNs"):
2205 deploy_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"].copy()))
2206 base_folder = nsd["_admin"]["storage"]
2207 self._deploy_n2vc(
2208 logging_text=logging_text,
2209 db_nsr=db_nsr,
2210 db_vnfr=db_vnfr,
2211 nslcmop_id=nslcmop_id,
2212 nsr_id=nsr_id,
2213 nsi_id=nsi_id,
2214 vnfd_id=vnfd_id,
2215 vdu_id=vdu_id,
2216 kdu_name=kdu_name,
2217 member_vnf_index=member_vnf_index,
2218 vdu_index=vdu_index,
2219 vdu_name=vdu_name,
2220 deploy_params=deploy_params,
2221 descriptor_config=descriptor_config,
2222 base_folder=base_folder,
2223 task_instantiation_info=tasks_dict_info,
2224 stage=stage
2225 )
2226
2227 # rest of staff will be done at finally
2228
2229 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2230 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
2231 exc = e
2232 except asyncio.CancelledError:
2233 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2234 exc = "Operation was cancelled"
2235 except Exception as e:
2236 exc = traceback.format_exc()
2237 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2238 finally:
2239 if exc:
2240 error_list.append(str(exc))
2241 try:
2242 # wait for pending tasks
2243 if tasks_dict_info:
2244 stage[1] = "Waiting for instantiate pending tasks."
2245 self.logger.debug(logging_text + stage[1])
2246 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
2247 stage, nslcmop_id, nsr_id=nsr_id)
2248 stage[1] = stage[2] = ""
2249 except asyncio.CancelledError:
2250 error_list.append("Cancelled")
2251 # TODO cancel all tasks
2252 except Exception as exc:
2253 error_list.append(str(exc))
2254
2255 # update operation-status
2256 db_nsr_update["operational-status"] = "running"
2257 # let's begin with VCA 'configured' status (later we can change it)
2258 db_nsr_update["config-status"] = "configured"
2259 for task, task_name in tasks_dict_info.items():
2260 if not task.done() or task.cancelled() or task.exception():
2261 if task_name.startswith(self.task_name_deploy_vca):
2262 # A N2VC task is pending
2263 db_nsr_update["config-status"] = "failed"
2264 else:
2265 # RO or KDU task is pending
2266 db_nsr_update["operational-status"] = "failed"
2267
2268 # update status at database
2269 if error_list:
2270 error_detail = ". ".join(error_list)
2271 self.logger.error(logging_text + error_detail)
2272 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
2273 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
2274
2275 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2276 db_nslcmop_update["detailed-status"] = error_detail
2277 nslcmop_operation_state = "FAILED"
2278 ns_state = "BROKEN"
2279 else:
2280 error_detail = None
2281 error_description_nsr = error_description_nslcmop = None
2282 ns_state = "READY"
2283 db_nsr_update["detailed-status"] = "Done"
2284 db_nslcmop_update["detailed-status"] = "Done"
2285 nslcmop_operation_state = "COMPLETED"
2286
2287 if db_nsr:
2288 self._write_ns_status(
2289 nsr_id=nsr_id,
2290 ns_state=ns_state,
2291 current_operation="IDLE",
2292 current_operation_id=None,
2293 error_description=error_description_nsr,
2294 error_detail=error_detail,
2295 other_update=db_nsr_update
2296 )
2297 self._write_op_status(
2298 op_id=nslcmop_id,
2299 stage="",
2300 error_message=error_description_nslcmop,
2301 operation_state=nslcmop_operation_state,
2302 other_update=db_nslcmop_update,
2303 )
2304
2305 if nslcmop_operation_state:
2306 try:
2307 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2308 "operationState": nslcmop_operation_state},
2309 loop=self.loop)
2310 except Exception as e:
2311 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2312
2313 self.logger.debug(logging_text + "Exit")
2314 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2315
2316 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2317 timeout: int = 3600, vca_type: str = None) -> bool:
2318
2319 # steps:
2320 # 1. find all relations for this VCA
2321 # 2. wait for other peers related
2322 # 3. add relations
2323
2324 try:
2325 vca_type = vca_type or "lxc_proxy_charm"
2326
2327 # STEP 1: find all relations for this VCA
2328
2329 # read nsr record
2330 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2331 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2332
2333 # this VCA data
2334 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2335
2336 # read all ns-configuration relations
2337 ns_relations = list()
2338 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2339 if db_ns_relations:
2340 for r in db_ns_relations:
2341 # check if this VCA is in the relation
2342 if my_vca.get('member-vnf-index') in\
2343 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2344 ns_relations.append(r)
2345
2346 # read all vnf-configuration relations
2347 vnf_relations = list()
2348 db_vnfd_list = db_nsr.get('vnfd-id')
2349 if db_vnfd_list:
2350 for vnfd in db_vnfd_list:
2351 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2352 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
2353 if db_vnf_relations:
2354 for r in db_vnf_relations:
2355 # check if this VCA is in the relation
2356 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2357 vnf_relations.append(r)
2358
2359 # if no relations, terminate
2360 if not ns_relations and not vnf_relations:
2361 self.logger.debug(logging_text + ' No relations')
2362 return True
2363
2364 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2365
2366 # add all relations
2367 start = time()
2368 while True:
2369 # check timeout
2370 now = time()
2371 if now - start >= timeout:
2372 self.logger.error(logging_text + ' : timeout adding relations')
2373 return False
2374
2375 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2376 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2377
2378 # for each defined NS relation, find the VCA's related
2379 for r in ns_relations.copy():
2380 from_vca_ee_id = None
2381 to_vca_ee_id = None
2382 from_vca_endpoint = None
2383 to_vca_endpoint = None
2384 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2385 for vca in vca_list:
2386 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2387 and vca.get('config_sw_installed'):
2388 from_vca_ee_id = vca.get('ee_id')
2389 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2390 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2391 and vca.get('config_sw_installed'):
2392 to_vca_ee_id = vca.get('ee_id')
2393 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2394 if from_vca_ee_id and to_vca_ee_id:
2395 # add relation
2396 await self.vca_map[vca_type].add_relation(
2397 ee_id_1=from_vca_ee_id,
2398 ee_id_2=to_vca_ee_id,
2399 endpoint_1=from_vca_endpoint,
2400 endpoint_2=to_vca_endpoint)
2401 # remove entry from relations list
2402 ns_relations.remove(r)
2403 else:
2404 # check failed peers
2405 try:
2406 vca_status_list = db_nsr.get('configurationStatus')
2407 if vca_status_list:
2408 for i in range(len(vca_list)):
2409 vca = vca_list[i]
2410 vca_status = vca_status_list[i]
2411 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2412 if vca_status.get('status') == 'BROKEN':
2413 # peer broken: remove relation from list
2414 ns_relations.remove(r)
2415 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2416 if vca_status.get('status') == 'BROKEN':
2417 # peer broken: remove relation from list
2418 ns_relations.remove(r)
2419 except Exception:
2420 # ignore
2421 pass
2422
2423 # for each defined VNF relation, find the VCA's related
2424 for r in vnf_relations.copy():
2425 from_vca_ee_id = None
2426 to_vca_ee_id = None
2427 from_vca_endpoint = None
2428 to_vca_endpoint = None
2429 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2430 for vca in vca_list:
2431 key_to_check = "vdu_id"
2432 if vca.get("vdu_id") is None:
2433 key_to_check = "vnfd_id"
2434 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2435 from_vca_ee_id = vca.get('ee_id')
2436 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2437 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2438 to_vca_ee_id = vca.get('ee_id')
2439 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2440 if from_vca_ee_id and to_vca_ee_id:
2441 # add relation
2442 await self.vca_map[vca_type].add_relation(
2443 ee_id_1=from_vca_ee_id,
2444 ee_id_2=to_vca_ee_id,
2445 endpoint_1=from_vca_endpoint,
2446 endpoint_2=to_vca_endpoint)
2447 # remove entry from relations list
2448 vnf_relations.remove(r)
2449 else:
2450 # check failed peers
2451 try:
2452 vca_status_list = db_nsr.get('configurationStatus')
2453 if vca_status_list:
2454 for i in range(len(vca_list)):
2455 vca = vca_list[i]
2456 vca_status = vca_status_list[i]
2457 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2458 if vca_status.get('status') == 'BROKEN':
2459 # peer broken: remove relation from list
2460 vnf_relations.remove(r)
2461 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2462 if vca_status.get('status') == 'BROKEN':
2463 # peer broken: remove relation from list
2464 vnf_relations.remove(r)
2465 except Exception:
2466 # ignore
2467 pass
2468
2469 # wait for next try
2470 await asyncio.sleep(5.0)
2471
2472 if not ns_relations and not vnf_relations:
2473 self.logger.debug('Relations added')
2474 break
2475
2476 return True
2477
2478 except Exception as e:
2479 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2480 return False
2481
2482 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2483 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2484
2485 try:
2486 k8sclustertype = k8s_instance_info["k8scluster-type"]
2487 # Instantiate kdu
2488 db_dict_install = {"collection": "nsrs",
2489 "filter": {"_id": nsr_id},
2490 "path": nsr_db_path}
2491
2492 kdu_instance = await self.k8scluster_map[k8sclustertype].install(
2493 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2494 kdu_model=k8s_instance_info["kdu-model"],
2495 atomic=True,
2496 params=k8params,
2497 db_dict=db_dict_install,
2498 timeout=timeout,
2499 kdu_name=k8s_instance_info["kdu-name"],
2500 namespace=k8s_instance_info["namespace"])
2501 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2502
2503 # Obtain services to obtain management service ip
2504 services = await self.k8scluster_map[k8sclustertype].get_services(
2505 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2506 kdu_instance=kdu_instance,
2507 namespace=k8s_instance_info["namespace"])
2508
2509 # Obtain management service info (if exists)
2510 vnfr_update_dict = {}
2511 if services:
2512 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2513 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2514 for mgmt_service in mgmt_services:
2515 for service in services:
2516 if service["name"].startswith(mgmt_service["name"]):
2517 # Mgmt service found, Obtain service ip
2518 ip = service.get("external_ip", service.get("cluster_ip"))
2519 if isinstance(ip, list) and len(ip) == 1:
2520 ip = ip[0]
2521
2522 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2523
2524 # Check if must update also mgmt ip at the vnf
2525 service_external_cp = mgmt_service.get("external-connection-point-ref")
2526 if service_external_cp:
2527 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2528 vnfr_update_dict["ip-address"] = ip
2529
2530 break
2531 else:
2532 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2533
2534 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2535 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2536
2537 kdu_config = kdud.get("kdu-configuration")
2538 if kdu_config and kdu_config.get("initial-config-primitive") and kdu_config.get("juju") is None:
2539 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2540 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2541
2542 for initial_config_primitive in initial_config_primitive_list:
2543 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2544
2545 await asyncio.wait_for(
2546 self.k8scluster_map[k8sclustertype].exec_primitive(
2547 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2548 kdu_instance=kdu_instance,
2549 primitive_name=initial_config_primitive["name"],
2550 params=primitive_params_, db_dict={}),
2551 timeout=timeout)
2552
2553 except Exception as e:
2554 # Prepare update db with error and raise exception
2555 try:
2556 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2557 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2558 except Exception:
2559 # ignore to keep original exception
2560 pass
2561 # reraise original error
2562 raise
2563
2564 return kdu_instance
2565
2566 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2567 # Launch kdus if present in the descriptor
2568
2569 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2570
2571 async def _get_cluster_id(cluster_id, cluster_type):
2572 nonlocal k8scluster_id_2_uuic
2573 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2574 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2575
2576 # check if K8scluster is creating and wait look if previous tasks in process
2577 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2578 if task_dependency:
2579 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2580 self.logger.debug(logging_text + text)
2581 await asyncio.wait(task_dependency, timeout=3600)
2582
2583 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2584 if not db_k8scluster:
2585 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2586
2587 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2588 if not k8s_id:
2589 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2590 cluster_type))
2591 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2592 return k8s_id
2593
2594 logging_text += "Deploy kdus: "
2595 step = ""
2596 try:
2597 db_nsr_update = {"_admin.deployed.K8s": []}
2598 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2599
2600 index = 0
2601 updated_cluster_list = []
2602
2603 for vnfr_data in db_vnfrs.values():
2604 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2605 # Step 0: Prepare and set parameters
2606 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2607 vnfd_id = vnfr_data.get('vnfd-id')
2608 kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
2609 namespace = kdur.get("k8s-namespace")
2610 if kdur.get("helm-chart"):
2611 kdumodel = kdur["helm-chart"]
2612 k8sclustertype = "helm-chart"
2613 elif kdur.get("juju-bundle"):
2614 kdumodel = kdur["juju-bundle"]
2615 k8sclustertype = "juju-bundle"
2616 else:
2617 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2618 "juju-bundle. Maybe an old NBI version is running".
2619 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2620 # check if kdumodel is a file and exists
2621 try:
2622 storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
2623 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2624 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2625 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2626 kdumodel)
2627 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2628 kdumodel = self.fs.path + filename
2629 except (asyncio.TimeoutError, asyncio.CancelledError):
2630 raise
2631 except Exception: # it is not a file
2632 pass
2633
2634 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2635 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2636 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2637
2638 # Synchronize repos
2639 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2640 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2641 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2642 if del_repo_list or added_repo_dict:
2643 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2644 updated = {'_admin.helm_charts_added.' +
2645 item: name for item, name in added_repo_dict.items()}
2646 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2647 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2648 added_repo_dict))
2649 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2650 updated_cluster_list.append(cluster_uuid)
2651
2652 # Instantiate kdu
2653 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2654 kdur["kdu-name"], k8s_cluster_id)
2655 k8s_instance_info = {"kdu-instance": None,
2656 "k8scluster-uuid": cluster_uuid,
2657 "k8scluster-type": k8sclustertype,
2658 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2659 "kdu-name": kdur["kdu-name"],
2660 "kdu-model": kdumodel,
2661 "namespace": namespace}
2662 db_path = "_admin.deployed.K8s.{}".format(index)
2663 db_nsr_update[db_path] = k8s_instance_info
2664 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2665
2666 task = asyncio.ensure_future(
2667 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, db_vnfds[vnfd_id],
2668 k8s_instance_info, k8params=desc_params, timeout=600))
2669 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2670 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2671
2672 index += 1
2673
2674 except (LcmException, asyncio.CancelledError):
2675 raise
2676 except Exception as e:
2677 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2678 if isinstance(e, (N2VCException, DbException)):
2679 self.logger.error(logging_text + msg)
2680 else:
2681 self.logger.critical(logging_text + msg, exc_info=True)
2682 raise LcmException(msg)
2683 finally:
2684 if db_nsr_update:
2685 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2686
2687 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2688 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2689 base_folder, task_instantiation_info, stage):
2690 # launch instantiate_N2VC in a asyncio task and register task object
2691 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2692 # if not found, create one entry and update database
2693 # fill db_nsr._admin.deployed.VCA.<index>
2694
2695 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2696 if descriptor_config.get("juju"): # There is one execution envioronment of type juju
2697 ee_list = [descriptor_config]
2698 elif descriptor_config.get("execution-environment-list"):
2699 ee_list = descriptor_config.get("execution-environment-list")
2700 else: # other types as script are not supported
2701 ee_list = []
2702
2703 for ee_item in ee_list:
2704 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2705 ee_item.get("helm-chart")))
2706 ee_descriptor_id = ee_item.get("id")
2707 if ee_item.get("juju"):
2708 vca_name = ee_item['juju'].get('charm')
2709 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2710 if ee_item['juju'].get('cloud') == "k8s":
2711 vca_type = "k8s_proxy_charm"
2712 elif ee_item['juju'].get('proxy') is False:
2713 vca_type = "native_charm"
2714 elif ee_item.get("helm-chart"):
2715 vca_name = ee_item['helm-chart']
2716 vca_type = "helm"
2717 else:
2718 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2719 continue
2720
2721 vca_index = -1
2722 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2723 if not vca_deployed:
2724 continue
2725 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2726 vca_deployed.get("vdu_id") == vdu_id and \
2727 vca_deployed.get("kdu_name") == kdu_name and \
2728 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2729 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2730 break
2731 else:
2732 # not found, create one.
2733 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2734 if vdu_id:
2735 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2736 elif kdu_name:
2737 target += "/kdu/{}".format(kdu_name)
2738 vca_deployed = {
2739 "target_element": target,
2740 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2741 "member-vnf-index": member_vnf_index,
2742 "vdu_id": vdu_id,
2743 "kdu_name": kdu_name,
2744 "vdu_count_index": vdu_index,
2745 "operational-status": "init", # TODO revise
2746 "detailed-status": "", # TODO revise
2747 "step": "initial-deploy", # TODO revise
2748 "vnfd_id": vnfd_id,
2749 "vdu_name": vdu_name,
2750 "type": vca_type,
2751 "ee_descriptor_id": ee_descriptor_id
2752 }
2753 vca_index += 1
2754
2755 # create VCA and configurationStatus in db
2756 db_dict = {
2757 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2758 "configurationStatus.{}".format(vca_index): dict()
2759 }
2760 self.update_db_2("nsrs", nsr_id, db_dict)
2761
2762 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2763
2764 # Launch task
2765 task_n2vc = asyncio.ensure_future(
2766 self.instantiate_N2VC(
2767 logging_text=logging_text,
2768 vca_index=vca_index,
2769 nsi_id=nsi_id,
2770 db_nsr=db_nsr,
2771 db_vnfr=db_vnfr,
2772 vdu_id=vdu_id,
2773 kdu_name=kdu_name,
2774 vdu_index=vdu_index,
2775 deploy_params=deploy_params,
2776 config_descriptor=descriptor_config,
2777 base_folder=base_folder,
2778 nslcmop_id=nslcmop_id,
2779 stage=stage,
2780 vca_type=vca_type,
2781 vca_name=vca_name,
2782 ee_config_descriptor=ee_item
2783 )
2784 )
2785 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2786 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2787 member_vnf_index or "", vdu_id or "")
2788
2789 @staticmethod
2790 def _get_terminate_config_primitive(primitive_list, vca_deployed):
2791 """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
2792 it get only those primitives for this execution envirom"""
2793
2794 primitive_list = primitive_list or []
2795 # filter primitives by ee_descriptor_id
2796 ee_descriptor_id = vca_deployed.get("ee_descriptor_id")
2797 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
2798
2799 if primitive_list:
2800 primitive_list.sort(key=lambda val: int(val['seq']))
2801
2802 return primitive_list
2803
2804 @staticmethod
2805 def _create_nslcmop(nsr_id, operation, params):
2806 """
2807 Creates a ns-lcm-opp content to be stored at database.
2808 :param nsr_id: internal id of the instance
2809 :param operation: instantiate, terminate, scale, action, ...
2810 :param params: user parameters for the operation
2811 :return: dictionary following SOL005 format
2812 """
2813 # Raise exception if invalid arguments
2814 if not (nsr_id and operation and params):
2815 raise LcmException(
2816 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2817 now = time()
2818 _id = str(uuid4())
2819 nslcmop = {
2820 "id": _id,
2821 "_id": _id,
2822 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2823 "operationState": "PROCESSING",
2824 "statusEnteredTime": now,
2825 "nsInstanceId": nsr_id,
2826 "lcmOperationType": operation,
2827 "startTime": now,
2828 "isAutomaticInvocation": False,
2829 "operationParams": params,
2830 "isCancelPending": False,
2831 "links": {
2832 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2833 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2834 }
2835 }
2836 return nslcmop
2837
2838 def _format_additional_params(self, params):
2839 params = params or {}
2840 for key, value in params.items():
2841 if str(value).startswith("!!yaml "):
2842 params[key] = yaml.safe_load(value[7:])
2843 return params
2844
2845 def _get_terminate_primitive_params(self, seq, vnf_index):
2846 primitive = seq.get('name')
2847 primitive_params = {}
2848 params = {
2849 "member_vnf_index": vnf_index,
2850 "primitive": primitive,
2851 "primitive_params": primitive_params,
2852 }
2853 desc_params = {}
2854 return self._map_primitive_params(seq, params, desc_params)
2855
2856 # sub-operations
2857
2858 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2859 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2860 if op.get('operationState') == 'COMPLETED':
2861 # b. Skip sub-operation
2862 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2863 return self.SUBOPERATION_STATUS_SKIP
2864 else:
2865 # c. retry executing sub-operation
2866 # The sub-operation exists, and operationState != 'COMPLETED'
2867 # Update operationState = 'PROCESSING' to indicate a retry.
2868 operationState = 'PROCESSING'
2869 detailed_status = 'In progress'
2870 self._update_suboperation_status(
2871 db_nslcmop, op_index, operationState, detailed_status)
2872 # Return the sub-operation index
2873 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2874 # with arguments extracted from the sub-operation
2875 return op_index
2876
2877 # Find a sub-operation where all keys in a matching dictionary must match
2878 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2879 def _find_suboperation(self, db_nslcmop, match):
2880 if db_nslcmop and match:
2881 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2882 for i, op in enumerate(op_list):
2883 if all(op.get(k) == match[k] for k in match):
2884 return i
2885 return self.SUBOPERATION_STATUS_NOT_FOUND
2886
2887 # Update status for a sub-operation given its index
2888 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2889 # Update DB for HA tasks
2890 q_filter = {'_id': db_nslcmop['_id']}
2891 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2892 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2893 self.db.set_one("nslcmops",
2894 q_filter=q_filter,
2895 update_dict=update_dict,
2896 fail_on_empty=False)
2897
2898 # Add sub-operation, return the index of the added sub-operation
2899 # Optionally, set operationState, detailed-status, and operationType
2900 # Status and type are currently set for 'scale' sub-operations:
2901 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2902 # 'detailed-status' : status message
2903 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2904 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2905 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2906 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2907 RO_nsr_id=None, RO_scaling_info=None):
2908 if not db_nslcmop:
2909 return self.SUBOPERATION_STATUS_NOT_FOUND
2910 # Get the "_admin.operations" list, if it exists
2911 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2912 op_list = db_nslcmop_admin.get('operations')
2913 # Create or append to the "_admin.operations" list
2914 new_op = {'member_vnf_index': vnf_index,
2915 'vdu_id': vdu_id,
2916 'vdu_count_index': vdu_count_index,
2917 'primitive': primitive,
2918 'primitive_params': mapped_primitive_params}
2919 if operationState:
2920 new_op['operationState'] = operationState
2921 if detailed_status:
2922 new_op['detailed-status'] = detailed_status
2923 if operationType:
2924 new_op['lcmOperationType'] = operationType
2925 if RO_nsr_id:
2926 new_op['RO_nsr_id'] = RO_nsr_id
2927 if RO_scaling_info:
2928 new_op['RO_scaling_info'] = RO_scaling_info
2929 if not op_list:
2930 # No existing operations, create key 'operations' with current operation as first list element
2931 db_nslcmop_admin.update({'operations': [new_op]})
2932 op_list = db_nslcmop_admin.get('operations')
2933 else:
2934 # Existing operations, append operation to list
2935 op_list.append(new_op)
2936
2937 db_nslcmop_update = {'_admin.operations': op_list}
2938 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2939 op_index = len(op_list) - 1
2940 return op_index
2941
2942 # Helper methods for scale() sub-operations
2943
2944 # pre-scale/post-scale:
2945 # Check for 3 different cases:
2946 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2947 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2948 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2949 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2950 operationType, RO_nsr_id=None, RO_scaling_info=None):
2951 # Find this sub-operation
2952 if RO_nsr_id and RO_scaling_info:
2953 operationType = 'SCALE-RO'
2954 match = {
2955 'member_vnf_index': vnf_index,
2956 'RO_nsr_id': RO_nsr_id,
2957 'RO_scaling_info': RO_scaling_info,
2958 }
2959 else:
2960 match = {
2961 'member_vnf_index': vnf_index,
2962 'primitive': vnf_config_primitive,
2963 'primitive_params': primitive_params,
2964 'lcmOperationType': operationType
2965 }
2966 op_index = self._find_suboperation(db_nslcmop, match)
2967 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2968 # a. New sub-operation
2969 # The sub-operation does not exist, add it.
2970 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2971 # The following parameters are set to None for all kind of scaling:
2972 vdu_id = None
2973 vdu_count_index = None
2974 vdu_name = None
2975 if RO_nsr_id and RO_scaling_info:
2976 vnf_config_primitive = None
2977 primitive_params = None
2978 else:
2979 RO_nsr_id = None
2980 RO_scaling_info = None
2981 # Initial status for sub-operation
2982 operationState = 'PROCESSING'
2983 detailed_status = 'In progress'
2984 # Add sub-operation for pre/post-scaling (zero or more operations)
2985 self._add_suboperation(db_nslcmop,
2986 vnf_index,
2987 vdu_id,
2988 vdu_count_index,
2989 vdu_name,
2990 vnf_config_primitive,
2991 primitive_params,
2992 operationState,
2993 detailed_status,
2994 operationType,
2995 RO_nsr_id,
2996 RO_scaling_info)
2997 return self.SUBOPERATION_STATUS_NEW
2998 else:
2999 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3000 # or op_index (operationState != 'COMPLETED')
3001 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3002
3003 # Function to return execution_environment id
3004
3005 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3006 # TODO vdu_index_count
3007 for vca in vca_deployed_list:
3008 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3009 return vca["ee_id"]
3010
3011 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
3012 vca_index, destroy_ee=True, exec_primitives=True):
3013 """
3014 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3015 :param logging_text:
3016 :param db_nslcmop:
3017 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3018 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3019 :param vca_index: index in the database _admin.deployed.VCA
3020 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3021 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3022 not executed properly
3023 :return: None or exception
3024 """
3025
3026 self.logger.debug(
3027 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3028 vca_index, vca_deployed, config_descriptor, destroy_ee
3029 )
3030 )
3031
3032 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3033
3034 # execute terminate_primitives
3035 if exec_primitives:
3036 terminate_primitives = self._get_terminate_config_primitive(
3037 config_descriptor.get("terminate-config-primitive"), vca_deployed)
3038 vdu_id = vca_deployed.get("vdu_id")
3039 vdu_count_index = vca_deployed.get("vdu_count_index")
3040 vdu_name = vca_deployed.get("vdu_name")
3041 vnf_index = vca_deployed.get("member-vnf-index")
3042 if terminate_primitives and vca_deployed.get("needed_terminate"):
3043 for seq in terminate_primitives:
3044 # For each sequence in list, get primitive and call _ns_execute_primitive()
3045 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3046 vnf_index, seq.get("name"))
3047 self.logger.debug(logging_text + step)
3048 # Create the primitive for each sequence, i.e. "primitive": "touch"
3049 primitive = seq.get('name')
3050 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
3051
3052 # Add sub-operation
3053 self._add_suboperation(db_nslcmop,
3054 vnf_index,
3055 vdu_id,
3056 vdu_count_index,
3057 vdu_name,
3058 primitive,
3059 mapped_primitive_params)
3060 # Sub-operations: Call _ns_execute_primitive() instead of action()
3061 try:
3062 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
3063 mapped_primitive_params,
3064 vca_type=vca_type)
3065 except LcmException:
3066 # this happens when VCA is not deployed. In this case it is not needed to terminate
3067 continue
3068 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
3069 if result not in result_ok:
3070 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
3071 "error {}".format(seq.get("name"), vnf_index, result_detail))
3072 # set that this VCA do not need terminated
3073 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
3074 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
3075
3076 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3077 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3078
3079 if destroy_ee:
3080 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
3081
3082 async def _delete_all_N2VC(self, db_nsr: dict):
3083 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
3084 namespace = "." + db_nsr["_id"]
3085 try:
3086 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
3087 except N2VCNotFound: # already deleted. Skip
3088 pass
3089 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
3090
3091 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
3092 """
3093 Terminates a deployment from RO
3094 :param logging_text:
3095 :param nsr_deployed: db_nsr._admin.deployed
3096 :param nsr_id:
3097 :param nslcmop_id:
3098 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3099 this method will update only the index 2, but it will write on database the concatenated content of the list
3100 :return:
3101 """
3102 db_nsr_update = {}
3103 failed_detail = []
3104 ro_nsr_id = ro_delete_action = None
3105 if nsr_deployed and nsr_deployed.get("RO"):
3106 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3107 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3108 try:
3109 if ro_nsr_id:
3110 stage[2] = "Deleting ns from VIM."
3111 db_nsr_update["detailed-status"] = " ".join(stage)
3112 self._write_op_status(nslcmop_id, stage)
3113 self.logger.debug(logging_text + stage[2])
3114 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3115 self._write_op_status(nslcmop_id, stage)
3116 desc = await self.RO.delete("ns", ro_nsr_id)
3117 ro_delete_action = desc["action_id"]
3118 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
3119 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3120 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3121 if ro_delete_action:
3122 # wait until NS is deleted from VIM
3123 stage[2] = "Waiting ns deleted from VIM."
3124 detailed_status_old = None
3125 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
3126 ro_delete_action))
3127 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3128 self._write_op_status(nslcmop_id, stage)
3129
3130 delete_timeout = 20 * 60 # 20 minutes
3131 while delete_timeout > 0:
3132 desc = await self.RO.show(
3133 "ns",
3134 item_id_name=ro_nsr_id,
3135 extra_item="action",
3136 extra_item_id=ro_delete_action)
3137
3138 # deploymentStatus
3139 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3140
3141 ns_status, ns_status_info = self.RO.check_action_status(desc)
3142 if ns_status == "ERROR":
3143 raise ROclient.ROClientException(ns_status_info)
3144 elif ns_status == "BUILD":
3145 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3146 elif ns_status == "ACTIVE":
3147 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3148 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3149 break
3150 else:
3151 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3152 if stage[2] != detailed_status_old:
3153 detailed_status_old = stage[2]
3154 db_nsr_update["detailed-status"] = " ".join(stage)
3155 self._write_op_status(nslcmop_id, stage)
3156 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3157 await asyncio.sleep(5, loop=self.loop)
3158 delete_timeout -= 5
3159 else: # delete_timeout <= 0:
3160 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
3161
3162 except Exception as e:
3163 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3164 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3165 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3166 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3167 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3168 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
3169 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3170 failed_detail.append("delete conflict: {}".format(e))
3171 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
3172 else:
3173 failed_detail.append("delete error: {}".format(e))
3174 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
3175
3176 # Delete nsd
3177 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3178 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3179 try:
3180 stage[2] = "Deleting nsd from RO."
3181 db_nsr_update["detailed-status"] = " ".join(stage)
3182 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3183 self._write_op_status(nslcmop_id, stage)
3184 await self.RO.delete("nsd", ro_nsd_id)
3185 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
3186 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3187 except Exception as e:
3188 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3189 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3190 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
3191 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3192 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
3193 self.logger.debug(logging_text + failed_detail[-1])
3194 else:
3195 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
3196 self.logger.error(logging_text + failed_detail[-1])
3197
3198 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3199 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3200 if not vnf_deployed or not vnf_deployed["id"]:
3201 continue
3202 try:
3203 ro_vnfd_id = vnf_deployed["id"]
3204 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3205 vnf_deployed["member-vnf-index"], ro_vnfd_id)
3206 db_nsr_update["detailed-status"] = " ".join(stage)
3207 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3208 self._write_op_status(nslcmop_id, stage)
3209 await self.RO.delete("vnfd", ro_vnfd_id)
3210 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
3211 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3212 except Exception as e:
3213 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3214 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3215 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
3216 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3217 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
3218 self.logger.debug(logging_text + failed_detail[-1])
3219 else:
3220 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
3221 self.logger.error(logging_text + failed_detail[-1])
3222
3223 if failed_detail:
3224 stage[2] = "Error deleting from VIM"
3225 else:
3226 stage[2] = "Deleted from VIM"
3227 db_nsr_update["detailed-status"] = " ".join(stage)
3228 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3229 self._write_op_status(nslcmop_id, stage)
3230
3231 if failed_detail:
3232 raise LcmException("; ".join(failed_detail))
3233
3234 async def terminate(self, nsr_id, nslcmop_id):
3235 # Try to lock HA task here
3236 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3237 if not task_is_locked_by_me:
3238 return
3239
3240 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3241 self.logger.debug(logging_text + "Enter")
3242 timeout_ns_terminate = self.timeout_ns_terminate
3243 db_nsr = None
3244 db_nslcmop = None
3245 operation_params = None
3246 exc = None
3247 error_list = [] # annotates all failed error messages
3248 db_nslcmop_update = {}
3249 autoremove = False # autoremove after terminated
3250 tasks_dict_info = {}
3251 db_nsr_update = {}
3252 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3253 # ^ contains [stage, step, VIM-status]
3254 try:
3255 # wait for any previous tasks in process
3256 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3257
3258 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3259 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3260 operation_params = db_nslcmop.get("operationParams") or {}
3261 if operation_params.get("timeout_ns_terminate"):
3262 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3263 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3264 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3265
3266 db_nsr_update["operational-status"] = "terminating"
3267 db_nsr_update["config-status"] = "terminating"
3268 self._write_ns_status(
3269 nsr_id=nsr_id,
3270 ns_state="TERMINATING",
3271 current_operation="TERMINATING",
3272 current_operation_id=nslcmop_id,
3273 other_update=db_nsr_update
3274 )
3275 self._write_op_status(
3276 op_id=nslcmop_id,
3277 queuePosition=0,
3278 stage=stage
3279 )
3280 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3281 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3282 return
3283
3284 stage[1] = "Getting vnf descriptors from db."
3285 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3286 db_vnfds_from_id = {}
3287 db_vnfds_from_member_index = {}
3288 # Loop over VNFRs
3289 for vnfr in db_vnfrs_list:
3290 vnfd_id = vnfr["vnfd-id"]
3291 if vnfd_id not in db_vnfds_from_id:
3292 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3293 db_vnfds_from_id[vnfd_id] = vnfd
3294 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3295
3296 # Destroy individual execution environments when there are terminating primitives.
3297 # Rest of EE will be deleted at once
3298 # TODO - check before calling _destroy_N2VC
3299 # if not operation_params.get("skip_terminate_primitives"):#
3300 # or not vca.get("needed_terminate"):
3301 stage[0] = "Stage 2/3 execute terminating primitives."
3302 self.logger.debug(logging_text + stage[0])
3303 stage[1] = "Looking execution environment that needs terminate."
3304 self.logger.debug(logging_text + stage[1])
3305 # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3306 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3307 config_descriptor = None
3308 if not vca or not vca.get("ee_id"):
3309 continue
3310 if not vca.get("member-vnf-index"):
3311 # ns
3312 config_descriptor = db_nsr.get("ns-configuration")
3313 elif vca.get("vdu_id"):
3314 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3315 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
3316 if vdud:
3317 config_descriptor = vdud.get("vdu-configuration")
3318 elif vca.get("kdu_name"):
3319 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3320 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
3321 if kdud:
3322 config_descriptor = kdud.get("kdu-configuration")
3323 else:
3324 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
3325 vca_type = vca.get("type")
3326 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3327 vca.get("needed_terminate"))
3328 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3329 # pending native charms
3330 destroy_ee = True if vca_type in ("helm", "native_charm") else False
3331 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3332 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3333 task = asyncio.ensure_future(
3334 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3335 destroy_ee, exec_terminate_primitives))
3336 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3337
3338 # wait for pending tasks of terminate primitives
3339 if tasks_dict_info:
3340 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3341 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3342 min(self.timeout_charm_delete, timeout_ns_terminate),
3343 stage, nslcmop_id)
3344 tasks_dict_info.clear()
3345 if error_list:
3346 return # raise LcmException("; ".join(error_list))
3347
3348 # remove All execution environments at once
3349 stage[0] = "Stage 3/3 delete all."
3350
3351 if nsr_deployed.get("VCA"):
3352 stage[1] = "Deleting all execution environments."
3353 self.logger.debug(logging_text + stage[1])
3354 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3355 timeout=self.timeout_charm_delete))
3356 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3357 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3358
3359 # Delete from k8scluster
3360 stage[1] = "Deleting KDUs."
3361 self.logger.debug(logging_text + stage[1])
3362 # print(nsr_deployed)
3363 for kdu in get_iterable(nsr_deployed, "K8s"):
3364 if not kdu or not kdu.get("kdu-instance"):
3365 continue
3366 kdu_instance = kdu.get("kdu-instance")
3367 if kdu.get("k8scluster-type") in self.k8scluster_map:
3368 task_delete_kdu_instance = asyncio.ensure_future(
3369 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3370 cluster_uuid=kdu.get("k8scluster-uuid"),
3371 kdu_instance=kdu_instance))
3372 else:
3373 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3374 format(kdu.get("k8scluster-type")))
3375 continue
3376 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3377
3378 # remove from RO
3379 stage[1] = "Deleting ns from VIM."
3380 if self.ng_ro:
3381 task_delete_ro = asyncio.ensure_future(
3382 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3383 else:
3384 task_delete_ro = asyncio.ensure_future(
3385 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3386 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3387
3388 # rest of staff will be done at finally
3389
3390 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3391 self.logger.error(logging_text + "Exit Exception {}".format(e))
3392 exc = e
3393 except asyncio.CancelledError:
3394 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3395 exc = "Operation was cancelled"
3396 except Exception as e:
3397 exc = traceback.format_exc()
3398 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3399 finally:
3400 if exc:
3401 error_list.append(str(exc))
3402 try:
3403 # wait for pending tasks
3404 if tasks_dict_info:
3405 stage[1] = "Waiting for terminate pending tasks."
3406 self.logger.debug(logging_text + stage[1])
3407 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3408 stage, nslcmop_id)
3409 stage[1] = stage[2] = ""
3410 except asyncio.CancelledError:
3411 error_list.append("Cancelled")
3412 # TODO cancell all tasks
3413 except Exception as exc:
3414 error_list.append(str(exc))
3415 # update status at database
3416 if error_list:
3417 error_detail = "; ".join(error_list)
3418 # self.logger.error(logging_text + error_detail)
3419 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3420 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3421
3422 db_nsr_update["operational-status"] = "failed"
3423 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3424 db_nslcmop_update["detailed-status"] = error_detail
3425 nslcmop_operation_state = "FAILED"
3426 ns_state = "BROKEN"
3427 else:
3428 error_detail = None
3429 error_description_nsr = error_description_nslcmop = None
3430 ns_state = "NOT_INSTANTIATED"
3431 db_nsr_update["operational-status"] = "terminated"
3432 db_nsr_update["detailed-status"] = "Done"
3433 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3434 db_nslcmop_update["detailed-status"] = "Done"
3435 nslcmop_operation_state = "COMPLETED"
3436
3437 if db_nsr:
3438 self._write_ns_status(
3439 nsr_id=nsr_id,
3440 ns_state=ns_state,
3441 current_operation="IDLE",
3442 current_operation_id=None,
3443 error_description=error_description_nsr,
3444 error_detail=error_detail,
3445 other_update=db_nsr_update
3446 )
3447 self._write_op_status(
3448 op_id=nslcmop_id,
3449 stage="",
3450 error_message=error_description_nslcmop,
3451 operation_state=nslcmop_operation_state,
3452 other_update=db_nslcmop_update,
3453 )
3454 if ns_state == "NOT_INSTANTIATED":
3455 try:
3456 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3457 except DbException as e:
3458 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3459 format(nsr_id, e))
3460 if operation_params:
3461 autoremove = operation_params.get("autoremove", False)
3462 if nslcmop_operation_state:
3463 try:
3464 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3465 "operationState": nslcmop_operation_state,
3466 "autoremove": autoremove},
3467 loop=self.loop)
3468 except Exception as e:
3469 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3470
3471 self.logger.debug(logging_text + "Exit")
3472 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3473
3474 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3475 time_start = time()
3476 error_detail_list = []
3477 error_list = []
3478 pending_tasks = list(created_tasks_info.keys())
3479 num_tasks = len(pending_tasks)
3480 num_done = 0
3481 stage[1] = "{}/{}.".format(num_done, num_tasks)
3482 self._write_op_status(nslcmop_id, stage)
3483 while pending_tasks:
3484 new_error = None
3485 _timeout = timeout + time_start - time()
3486 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3487 return_when=asyncio.FIRST_COMPLETED)
3488 num_done += len(done)
3489 if not done: # Timeout
3490 for task in pending_tasks:
3491 new_error = created_tasks_info[task] + ": Timeout"
3492 error_detail_list.append(new_error)
3493 error_list.append(new_error)
3494 break
3495 for task in done:
3496 if task.cancelled():
3497 exc = "Cancelled"
3498 else:
3499 exc = task.exception()
3500 if exc:
3501 if isinstance(exc, asyncio.TimeoutError):
3502 exc = "Timeout"
3503 new_error = created_tasks_info[task] + ": {}".format(exc)
3504 error_list.append(created_tasks_info[task])
3505 error_detail_list.append(new_error)
3506 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3507 K8sException)):
3508 self.logger.error(logging_text + new_error)
3509 else:
3510 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3511 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
3512 else:
3513 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3514 stage[1] = "{}/{}.".format(num_done, num_tasks)
3515 if new_error:
3516 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3517 if nsr_id: # update also nsr
3518 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3519 "errorDetail": ". ".join(error_detail_list)})
3520 self._write_op_status(nslcmop_id, stage)
3521 return error_detail_list
3522
3523 @staticmethod
3524 def _map_primitive_params(primitive_desc, params, instantiation_params):
3525 """
3526 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3527 The default-value is used. If it is between < > it look for a value at instantiation_params
3528 :param primitive_desc: portion of VNFD/NSD that describes primitive
3529 :param params: Params provided by user
3530 :param instantiation_params: Instantiation params provided by user
3531 :return: a dictionary with the calculated params
3532 """
3533 calculated_params = {}
3534 for parameter in primitive_desc.get("parameter", ()):
3535 param_name = parameter["name"]
3536 if param_name in params:
3537 calculated_params[param_name] = params[param_name]
3538 elif "default-value" in parameter or "value" in parameter:
3539 if "value" in parameter:
3540 calculated_params[param_name] = parameter["value"]
3541 else:
3542 calculated_params[param_name] = parameter["default-value"]
3543 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3544 and calculated_params[param_name].endswith(">"):
3545 if calculated_params[param_name][1:-1] in instantiation_params:
3546 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3547 else:
3548 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3549 format(calculated_params[param_name], primitive_desc["name"]))
3550 else:
3551 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3552 format(param_name, primitive_desc["name"]))
3553
3554 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3555 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
3556 width=256)
3557 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3558 calculated_params[param_name] = calculated_params[param_name][7:]
3559 if parameter.get("data-type") == "INTEGER":
3560 try:
3561 calculated_params[param_name] = int(calculated_params[param_name])
3562 except ValueError: # error converting string to int
3563 raise LcmException(
3564 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3565 elif parameter.get("data-type") == "BOOLEAN":
3566 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3567
3568 # add always ns_config_info if primitive name is config
3569 if primitive_desc["name"] == "config":
3570 if "ns_config_info" in instantiation_params:
3571 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3572 return calculated_params
3573
3574 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3575 ee_descriptor_id=None):
3576 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3577 for vca in deployed_vca:
3578 if not vca:
3579 continue
3580 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3581 continue
3582 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3583 continue
3584 if kdu_name and kdu_name != vca["kdu_name"]:
3585 continue
3586 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3587 continue
3588 break
3589 else:
3590 # vca_deployed not found
3591 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3592 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3593 ee_descriptor_id))
3594
3595 # get ee_id
3596 ee_id = vca.get("ee_id")
3597 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3598 if not ee_id:
3599 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3600 "execution environment"
3601 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3602 return ee_id, vca_type
3603
3604 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3605 retries_interval=30, timeout=None,
3606 vca_type=None, db_dict=None) -> (str, str):
3607 try:
3608 if primitive == "config":
3609 primitive_params = {"params": primitive_params}
3610
3611 vca_type = vca_type or "lxc_proxy_charm"
3612
3613 while retries >= 0:
3614 try:
3615 output = await asyncio.wait_for(
3616 self.vca_map[vca_type].exec_primitive(
3617 ee_id=ee_id,
3618 primitive_name=primitive,
3619 params_dict=primitive_params,
3620 progress_timeout=self.timeout_progress_primitive,
3621 total_timeout=self.timeout_primitive,
3622 db_dict=db_dict),
3623 timeout=timeout or self.timeout_primitive)
3624 # execution was OK
3625 break
3626 except asyncio.CancelledError:
3627 raise
3628 except Exception as e: # asyncio.TimeoutError
3629 if isinstance(e, asyncio.TimeoutError):
3630 e = "Timeout"
3631 retries -= 1
3632 if retries >= 0:
3633 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3634 # wait and retry
3635 await asyncio.sleep(retries_interval, loop=self.loop)
3636 else:
3637 return 'FAILED', str(e)
3638
3639 return 'COMPLETED', output
3640
3641 except (LcmException, asyncio.CancelledError):
3642 raise
3643 except Exception as e:
3644 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3645
3646 async def vca_status_refresh(self, nsr_id, nslcmop_id):
3647 """
3648 Updating the vca_status with latest juju information in nsrs record
3649 :param: nsr_id: Id of the nsr
3650 :param: nslcmop_id: Id of the nslcmop
3651 :return: None
3652 """
3653
3654 self.logger.debug("Task ns={} action={} Enter".format(nsr_id, nslcmop_id))
3655 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3656
3657 for vca_index, _ in enumerate(db_nsr['_admin']['deployed']['VCA']):
3658 table, filter, path = "nsrs", {"_id": nsr_id}, "_admin.deployed.VCA.{}.".format(vca_index)
3659 await self._on_update_n2vc_db(table, filter, path, {})
3660
3661 self.logger.debug("Task ns={} action={} Exit".format(nsr_id, nslcmop_id))
3662 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_vca_status_refresh")
3663
3664 async def action(self, nsr_id, nslcmop_id):
3665
3666 # Try to lock HA task here
3667 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3668 if not task_is_locked_by_me:
3669 return
3670
3671 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3672 self.logger.debug(logging_text + "Enter")
3673 # get all needed from database
3674 db_nsr = None
3675 db_nslcmop = None
3676 db_nsr_update = {}
3677 db_nslcmop_update = {}
3678 nslcmop_operation_state = None
3679 error_description_nslcmop = None
3680 exc = None
3681 try:
3682 # wait for any previous tasks in process
3683 step = "Waiting for previous operations to terminate"
3684 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3685
3686 self._write_ns_status(
3687 nsr_id=nsr_id,
3688 ns_state=None,
3689 current_operation="RUNNING ACTION",
3690 current_operation_id=nslcmop_id
3691 )
3692
3693 step = "Getting information from database"
3694 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3695 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3696
3697 nsr_deployed = db_nsr["_admin"].get("deployed")
3698 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3699 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3700 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3701 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3702 primitive = db_nslcmop["operationParams"]["primitive"]
3703 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3704 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3705
3706 if vnf_index:
3707 step = "Getting vnfr from database"
3708 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3709 step = "Getting vnfd from database"
3710 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3711 else:
3712 step = "Getting nsd from database"
3713 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3714
3715 # for backward compatibility
3716 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3717 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3718 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3719 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3720
3721 # look for primitive
3722 config_primitive_desc = descriptor_configuration = None
3723 if vdu_id:
3724 for vdu in get_iterable(db_vnfd, "vdu"):
3725 if vdu_id == vdu["id"]:
3726 descriptor_configuration = vdu.get("vdu-configuration")
3727 break
3728 elif kdu_name:
3729 for kdu in get_iterable(db_vnfd, "kdu"):
3730 if kdu_name == kdu["name"]:
3731 descriptor_configuration = kdu.get("kdu-configuration")
3732 break
3733 elif vnf_index:
3734 descriptor_configuration = db_vnfd.get("vnf-configuration")
3735 else:
3736 descriptor_configuration = db_nsd.get("ns-configuration")
3737
3738 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3739 for config_primitive in descriptor_configuration["config-primitive"]:
3740 if config_primitive["name"] == primitive:
3741 config_primitive_desc = config_primitive
3742 break
3743
3744 if not config_primitive_desc:
3745 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3746 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3747 format(primitive))
3748 primitive_name = primitive
3749 ee_descriptor_id = None
3750 else:
3751 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3752 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3753
3754 if vnf_index:
3755 if vdu_id:
3756 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3757 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3758 elif kdu_name:
3759 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3760 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3761 else:
3762 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3763 else:
3764 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3765
3766 if kdu_name:
3767 kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
3768
3769 # TODO check if ns is in a proper status
3770 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3771 # kdur and desc_params already set from before
3772 if primitive_params:
3773 desc_params.update(primitive_params)
3774 # TODO Check if we will need something at vnf level
3775 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3776 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3777 break
3778 else:
3779 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3780
3781 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3782 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3783 raise LcmException(msg)
3784
3785 db_dict = {"collection": "nsrs",
3786 "filter": {"_id": nsr_id},
3787 "path": "_admin.deployed.K8s.{}".format(index)}
3788 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3789 step = "Executing kdu {}".format(primitive_name)
3790 if primitive_name == "upgrade":
3791 if desc_params.get("kdu_model"):
3792 kdu_model = desc_params.get("kdu_model")
3793 del desc_params["kdu_model"]
3794 else:
3795 kdu_model = kdu.get("kdu-model")
3796 parts = kdu_model.split(sep=":")
3797 if len(parts) == 2:
3798 kdu_model = parts[0]
3799
3800 detailed_status = await asyncio.wait_for(
3801 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3802 cluster_uuid=kdu.get("k8scluster-uuid"),
3803 kdu_instance=kdu.get("kdu-instance"),
3804 atomic=True, kdu_model=kdu_model,
3805 params=desc_params, db_dict=db_dict,
3806 timeout=timeout_ns_action),
3807 timeout=timeout_ns_action + 10)
3808 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3809 elif primitive_name == "rollback":
3810 detailed_status = await asyncio.wait_for(
3811 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3812 cluster_uuid=kdu.get("k8scluster-uuid"),
3813 kdu_instance=kdu.get("kdu-instance"),
3814 db_dict=db_dict),
3815 timeout=timeout_ns_action)
3816 elif primitive_name == "status":
3817 detailed_status = await asyncio.wait_for(
3818 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3819 cluster_uuid=kdu.get("k8scluster-uuid"),
3820 kdu_instance=kdu.get("kdu-instance")),
3821 timeout=timeout_ns_action)
3822 else:
3823 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3824 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3825
3826 detailed_status = await asyncio.wait_for(
3827 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3828 cluster_uuid=kdu.get("k8scluster-uuid"),
3829 kdu_instance=kdu_instance,
3830 primitive_name=primitive_name,
3831 params=params, db_dict=db_dict,
3832 timeout=timeout_ns_action),
3833 timeout=timeout_ns_action)
3834
3835 if detailed_status:
3836 nslcmop_operation_state = 'COMPLETED'
3837 else:
3838 detailed_status = ''
3839 nslcmop_operation_state = 'FAILED'
3840 else:
3841 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3842 member_vnf_index=vnf_index,
3843 vdu_id=vdu_id,
3844 vdu_count_index=vdu_count_index,
3845 ee_descriptor_id=ee_descriptor_id)
3846 db_nslcmop_notif = {"collection": "nslcmops",
3847 "filter": {"_id": nslcmop_id},
3848 "path": "admin.VCA"}
3849 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3850 ee_id,
3851 primitive=primitive_name,
3852 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3853 timeout=timeout_ns_action,
3854 vca_type=vca_type,
3855 db_dict=db_nslcmop_notif)
3856
3857 db_nslcmop_update["detailed-status"] = detailed_status
3858 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3859 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3860 detailed_status))
3861 return # database update is called inside finally
3862
3863 except (DbException, LcmException, N2VCException, K8sException) as e:
3864 self.logger.error(logging_text + "Exit Exception {}".format(e))
3865 exc = e
3866 except asyncio.CancelledError:
3867 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3868 exc = "Operation was cancelled"
3869 except asyncio.TimeoutError:
3870 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3871 exc = "Timeout"
3872 except Exception as e:
3873 exc = traceback.format_exc()
3874 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3875 finally:
3876 if exc:
3877 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3878 "FAILED {}: {}".format(step, exc)
3879 nslcmop_operation_state = "FAILED"
3880 if db_nsr:
3881 self._write_ns_status(
3882 nsr_id=nsr_id,
3883 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3884 current_operation="IDLE",
3885 current_operation_id=None,
3886 # error_description=error_description_nsr,
3887 # error_detail=error_detail,
3888 other_update=db_nsr_update
3889 )
3890
3891 self._write_op_status(
3892 op_id=nslcmop_id,
3893 stage="",
3894 error_message=error_description_nslcmop,
3895 operation_state=nslcmop_operation_state,
3896 other_update=db_nslcmop_update,
3897 )
3898
3899 if nslcmop_operation_state:
3900 try:
3901 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3902 "operationState": nslcmop_operation_state},
3903 loop=self.loop)
3904 except Exception as e:
3905 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3906 self.logger.debug(logging_text + "Exit")
3907 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3908 return nslcmop_operation_state, detailed_status
3909
3910 async def scale(self, nsr_id, nslcmop_id):
3911
3912 # Try to lock HA task here
3913 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3914 if not task_is_locked_by_me:
3915 return
3916
3917 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3918 self.logger.debug(logging_text + "Enter")
3919 # get all needed from database
3920 db_nsr = None
3921 db_nslcmop = None
3922 db_nslcmop_update = {}
3923 nslcmop_operation_state = None
3924 db_nsr_update = {}
3925 exc = None
3926 # in case of error, indicates what part of scale was failed to put nsr at error status
3927 scale_process = None
3928 old_operational_status = ""
3929 old_config_status = ""
3930 try:
3931 # wait for any previous tasks in process
3932 step = "Waiting for previous operations to terminate"
3933 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3934
3935 self._write_ns_status(
3936 nsr_id=nsr_id,
3937 ns_state=None,
3938 current_operation="SCALING",
3939 current_operation_id=nslcmop_id
3940 )
3941
3942 step = "Getting nslcmop from database"
3943 self.logger.debug(step + " after having waited for previous tasks to be completed")
3944 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3945 step = "Getting nsr from database"
3946 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3947
3948 old_operational_status = db_nsr["operational-status"]
3949 old_config_status = db_nsr["config-status"]
3950 step = "Parsing scaling parameters"
3951 # self.logger.debug(step)
3952 db_nsr_update["operational-status"] = "scaling"
3953 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3954 nsr_deployed = db_nsr["_admin"].get("deployed")
3955
3956 #######
3957 nsr_deployed = db_nsr["_admin"].get("deployed")
3958 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3959 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3960 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3961 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3962 #######
3963
3964 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3965 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3966 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3967 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3968 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3969
3970 # for backward compatibility
3971 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3972 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3973 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3974 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3975
3976 step = "Getting vnfr from database"
3977 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3978 step = "Getting vnfd from database"
3979 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3980
3981 step = "Getting scaling-group-descriptor"
3982 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3983 if scaling_descriptor["name"] == scaling_group:
3984 break
3985 else:
3986 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3987 "at vnfd:scaling-group-descriptor".format(scaling_group))
3988
3989 # cooldown_time = 0
3990 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3991 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3992 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3993 # break
3994
3995 # TODO check if ns is in a proper status
3996 step = "Sending scale order to VIM"
3997 nb_scale_op = 0
3998 if not db_nsr["_admin"].get("scaling-group"):
3999 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
4000 admin_scale_index = 0
4001 else:
4002 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
4003 if admin_scale_info["name"] == scaling_group:
4004 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
4005 break
4006 else: # not found, set index one plus last element and add new entry with the name
4007 admin_scale_index += 1
4008 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
4009 RO_scaling_info = []
4010 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
4011 if scaling_type == "SCALE_OUT":
4012 # count if max-instance-count is reached
4013 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
4014 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
4015 if nb_scale_op >= max_instance_count:
4016 raise LcmException("reached the limit of {} (max-instance-count) "
4017 "scaling-out operations for the "
4018 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4019
4020 nb_scale_op += 1
4021 vdu_scaling_info["scaling_direction"] = "OUT"
4022 vdu_scaling_info["vdu-create"] = {}
4023 for vdu_scale_info in scaling_descriptor["vdu"]:
4024 vdud = next(vdu for vdu in db_vnfd.get("vdu") if vdu["id"] == vdu_scale_info["vdu-id-ref"])
4025 vdu_index = len([x for x in db_vnfr.get("vdur", ())
4026 if x.get("vdu-id-ref") == vdu_scale_info["vdu-id-ref"] and
4027 x.get("member-vnf-index-ref") == vnf_index])
4028 cloud_init_text = self._get_cloud_init(vdud, db_vnfd)
4029 if cloud_init_text:
4030 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
4031 cloud_init_list = []
4032 for x in range(vdu_scale_info.get("count", 1)):
4033 if cloud_init_text:
4034 # TODO Information of its own ip is not available because db_vnfr is not updated.
4035 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu_scale_info["vdu-id-ref"],
4036 vdu_index + x)
4037 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params,
4038 db_vnfd["id"], vdud["id"]))
4039 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4040 "type": "create", "count": vdu_scale_info.get("count", 1)})
4041 if cloud_init_list:
4042 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
4043 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4044
4045 elif scaling_type == "SCALE_IN":
4046 # count if min-instance-count is reached
4047 min_instance_count = 0
4048 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
4049 min_instance_count = int(scaling_descriptor["min-instance-count"])
4050 if nb_scale_op <= min_instance_count:
4051 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
4052 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4053 nb_scale_op -= 1
4054 vdu_scaling_info["scaling_direction"] = "IN"
4055 vdu_scaling_info["vdu-delete"] = {}
4056 for vdu_scale_info in scaling_descriptor["vdu"]:
4057 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4058 "type": "delete", "count": vdu_scale_info.get("count", 1)})
4059 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4060
4061 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
4062 vdu_create = vdu_scaling_info.get("vdu-create")
4063 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
4064 if vdu_scaling_info["scaling_direction"] == "IN":
4065 for vdur in reversed(db_vnfr["vdur"]):
4066 if vdu_delete.get(vdur["vdu-id-ref"]):
4067 vdu_delete[vdur["vdu-id-ref"]] -= 1
4068 vdu_scaling_info["vdu"].append({
4069 "name": vdur["name"],
4070 "vdu_id": vdur["vdu-id-ref"],
4071 "interface": []
4072 })
4073 for interface in vdur["interfaces"]:
4074 vdu_scaling_info["vdu"][-1]["interface"].append({
4075 "name": interface["name"],
4076 "ip_address": interface["ip-address"],
4077 "mac_address": interface.get("mac-address"),
4078 })
4079 vdu_delete = vdu_scaling_info.pop("vdu-delete")
4080
4081 # PRE-SCALE BEGIN
4082 step = "Executing pre-scale vnf-config-primitive"
4083 if scaling_descriptor.get("scaling-config-action"):
4084 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4085 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
4086 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
4087 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4088 step = db_nslcmop_update["detailed-status"] = \
4089 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
4090
4091 # look for primitive
4092 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4093 if config_primitive["name"] == vnf_config_primitive:
4094 break
4095 else:
4096 raise LcmException(
4097 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4098 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4099 "primitive".format(scaling_group, vnf_config_primitive))
4100
4101 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4102 if db_vnfr.get("additionalParamsForVnf"):
4103 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4104
4105 scale_process = "VCA"
4106 db_nsr_update["config-status"] = "configuring pre-scaling"
4107 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4108
4109 # Pre-scale retry check: Check if this sub-operation has been executed before
4110 op_index = self._check_or_add_scale_suboperation(
4111 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
4112 if op_index == self.SUBOPERATION_STATUS_SKIP:
4113 # Skip sub-operation
4114 result = 'COMPLETED'
4115 result_detail = 'Done'
4116 self.logger.debug(logging_text +
4117 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4118 vnf_config_primitive, result, result_detail))
4119 else:
4120 if op_index == self.SUBOPERATION_STATUS_NEW:
4121 # New sub-operation: Get index of this sub-operation
4122 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4123 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4124 format(vnf_config_primitive))
4125 else:
4126 # retry: Get registered params for this existing sub-operation
4127 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4128 vnf_index = op.get('member_vnf_index')
4129 vnf_config_primitive = op.get('primitive')
4130 primitive_params = op.get('primitive_params')
4131 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4132 format(vnf_config_primitive))
4133 # Execute the primitive, either with new (first-time) or registered (reintent) args
4134 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4135 primitive_name = config_primitive.get("execution-environment-primitive",
4136 vnf_config_primitive)
4137 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4138 member_vnf_index=vnf_index,
4139 vdu_id=None,
4140 vdu_count_index=None,
4141 ee_descriptor_id=ee_descriptor_id)
4142 result, result_detail = await self._ns_execute_primitive(
4143 ee_id, primitive_name, primitive_params, vca_type)
4144 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4145 vnf_config_primitive, result, result_detail))
4146 # Update operationState = COMPLETED | FAILED
4147 self._update_suboperation_status(
4148 db_nslcmop, op_index, result, result_detail)
4149
4150 if result == "FAILED":
4151 raise LcmException(result_detail)
4152 db_nsr_update["config-status"] = old_config_status
4153 scale_process = None
4154 # PRE-SCALE END
4155
4156 # SCALE RO - BEGIN
4157 # Should this block be skipped if 'RO_nsr_id' == None ?
4158 # if (RO_nsr_id and RO_scaling_info):
4159 if RO_scaling_info:
4160 scale_process = "RO"
4161 # Scale RO retry check: Check if this sub-operation has been executed before
4162 op_index = self._check_or_add_scale_suboperation(
4163 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
4164 if op_index == self.SUBOPERATION_STATUS_SKIP:
4165 # Skip sub-operation
4166 result = 'COMPLETED'
4167 result_detail = 'Done'
4168 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
4169 result, result_detail))
4170 else:
4171 if op_index == self.SUBOPERATION_STATUS_NEW:
4172 # New sub-operation: Get index of this sub-operation
4173 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4174 self.logger.debug(logging_text + "New sub-operation RO")
4175 else:
4176 # retry: Get registered params for this existing sub-operation
4177 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4178 RO_nsr_id = op.get('RO_nsr_id')
4179 RO_scaling_info = op.get('RO_scaling_info')
4180 self.logger.debug(logging_text + "Sub-operation RO retry for primitive {}".format(
4181 vnf_config_primitive))
4182
4183 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
4184 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
4185 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
4186 # wait until ready
4187 RO_nslcmop_id = RO_desc["instance_action_id"]
4188 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
4189
4190 RO_task_done = False
4191 step = detailed_status = "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id)
4192 detailed_status_old = None
4193 self.logger.debug(logging_text + step)
4194
4195 deployment_timeout = 1 * 3600 # One hour
4196 while deployment_timeout > 0:
4197 if not RO_task_done:
4198 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
4199 extra_item_id=RO_nslcmop_id)
4200
4201 # deploymentStatus
4202 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4203
4204 ns_status, ns_status_info = self.RO.check_action_status(desc)
4205 if ns_status == "ERROR":
4206 raise ROclient.ROClientException(ns_status_info)
4207 elif ns_status == "BUILD":
4208 detailed_status = step + "; {}".format(ns_status_info)
4209 elif ns_status == "ACTIVE":
4210 RO_task_done = True
4211 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
4212 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
4213 self.logger.debug(logging_text + step)
4214 else:
4215 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
4216 else:
4217 desc = await self.RO.show("ns", RO_nsr_id)
4218 ns_status, ns_status_info = self.RO.check_ns_status(desc)
4219 # deploymentStatus
4220 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4221
4222 if ns_status == "ERROR":
4223 raise ROclient.ROClientException(ns_status_info)
4224 elif ns_status == "BUILD":
4225 detailed_status = step + "; {}".format(ns_status_info)
4226 elif ns_status == "ACTIVE":
4227 step = detailed_status = \
4228 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4229 try:
4230 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4231 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
4232 break
4233 except LcmExceptionNoMgmtIP:
4234 pass
4235 else:
4236 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
4237 if detailed_status != detailed_status_old:
4238 self._update_suboperation_status(
4239 db_nslcmop, op_index, 'COMPLETED', detailed_status)
4240 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
4241 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
4242
4243 await asyncio.sleep(5, loop=self.loop)
4244 deployment_timeout -= 5
4245 if deployment_timeout <= 0:
4246 self._update_suboperation_status(
4247 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
4248 raise ROclient.ROClientException("Timeout waiting ns to be ready")
4249
4250 # update VDU_SCALING_INFO with the obtained ip_addresses
4251 if vdu_scaling_info["scaling_direction"] == "OUT":
4252 for vdur in reversed(db_vnfr["vdur"]):
4253 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
4254 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
4255 vdu_scaling_info["vdu"].append({
4256 "name": vdur["name"],
4257 "vdu_id": vdur["vdu-id-ref"],
4258 "interface": []
4259 })
4260 for interface in vdur["interfaces"]:
4261 vdu_scaling_info["vdu"][-1]["interface"].append({
4262 "name": interface["name"],
4263 "ip_address": interface["ip-address"],
4264 "mac_address": interface.get("mac-address"),
4265 })
4266 del vdu_scaling_info["vdu-create"]
4267
4268 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
4269 # SCALE RO - END
4270
4271 scale_process = None
4272 if db_nsr_update:
4273 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4274
4275 # POST-SCALE BEGIN
4276 # execute primitive service POST-SCALING
4277 step = "Executing post-scale vnf-config-primitive"
4278 if scaling_descriptor.get("scaling-config-action"):
4279 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4280 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4281 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4282 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4283 step = db_nslcmop_update["detailed-status"] = \
4284 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4285
4286 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4287 if db_vnfr.get("additionalParamsForVnf"):
4288 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4289
4290 # look for primitive
4291 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4292 if config_primitive["name"] == vnf_config_primitive:
4293 break
4294 else:
4295 raise LcmException(
4296 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4297 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4298 "config-primitive".format(scaling_group, vnf_config_primitive))
4299 scale_process = "VCA"
4300 db_nsr_update["config-status"] = "configuring post-scaling"
4301 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4302
4303 # Post-scale retry check: Check if this sub-operation has been executed before
4304 op_index = self._check_or_add_scale_suboperation(
4305 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4306 if op_index == self.SUBOPERATION_STATUS_SKIP:
4307 # Skip sub-operation
4308 result = 'COMPLETED'
4309 result_detail = 'Done'
4310 self.logger.debug(logging_text +
4311 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4312 format(vnf_config_primitive, result, result_detail))
4313 else:
4314 if op_index == self.SUBOPERATION_STATUS_NEW:
4315 # New sub-operation: Get index of this sub-operation
4316 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4317 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4318 format(vnf_config_primitive))
4319 else:
4320 # retry: Get registered params for this existing sub-operation
4321 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4322 vnf_index = op.get('member_vnf_index')
4323 vnf_config_primitive = op.get('primitive')
4324 primitive_params = op.get('primitive_params')
4325 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4326 format(vnf_config_primitive))
4327 # Execute the primitive, either with new (first-time) or registered (reintent) args
4328 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4329 primitive_name = config_primitive.get("execution-environment-primitive",
4330 vnf_config_primitive)
4331 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4332 member_vnf_index=vnf_index,
4333 vdu_id=None,
4334 vdu_count_index=None,
4335 ee_descriptor_id=ee_descriptor_id)
4336 result, result_detail = await self._ns_execute_primitive(
4337 ee_id, primitive_name, primitive_params, vca_type)
4338 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4339 vnf_config_primitive, result, result_detail))
4340 # Update operationState = COMPLETED | FAILED
4341 self._update_suboperation_status(
4342 db_nslcmop, op_index, result, result_detail)
4343
4344 if result == "FAILED":
4345 raise LcmException(result_detail)
4346 db_nsr_update["config-status"] = old_config_status
4347 scale_process = None
4348 # POST-SCALE END
4349
4350 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4351 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4352 else old_operational_status
4353 db_nsr_update["config-status"] = old_config_status
4354 return
4355 except (ROclient.ROClientException, DbException, LcmException) as e:
4356 self.logger.error(logging_text + "Exit Exception {}".format(e))
4357 exc = e
4358 except asyncio.CancelledError:
4359 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4360 exc = "Operation was cancelled"
4361 except Exception as e:
4362 exc = traceback.format_exc()
4363 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4364 finally:
4365 self._write_ns_status(
4366 nsr_id=nsr_id,
4367 ns_state=None,
4368 current_operation="IDLE",
4369 current_operation_id=None
4370 )
4371 if exc:
4372 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4373 nslcmop_operation_state = "FAILED"
4374 if db_nsr:
4375 db_nsr_update["operational-status"] = old_operational_status
4376 db_nsr_update["config-status"] = old_config_status
4377 db_nsr_update["detailed-status"] = ""
4378 if scale_process:
4379 if "VCA" in scale_process:
4380 db_nsr_update["config-status"] = "failed"
4381 if "RO" in scale_process:
4382 db_nsr_update["operational-status"] = "failed"
4383 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4384 exc)
4385 else:
4386 error_description_nslcmop = None
4387 nslcmop_operation_state = "COMPLETED"
4388 db_nslcmop_update["detailed-status"] = "Done"
4389
4390 self._write_op_status(
4391 op_id=nslcmop_id,
4392 stage="",
4393 error_message=error_description_nslcmop,
4394 operation_state=nslcmop_operation_state,
4395 other_update=db_nslcmop_update,
4396 )
4397 if db_nsr:
4398 self._write_ns_status(
4399 nsr_id=nsr_id,
4400 ns_state=None,
4401 current_operation="IDLE",
4402 current_operation_id=None,
4403 other_update=db_nsr_update
4404 )
4405
4406 if nslcmop_operation_state:
4407 try:
4408 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
4409 "operationState": nslcmop_operation_state},
4410 loop=self.loop)
4411 # if cooldown_time:
4412 # await asyncio.sleep(cooldown_time, loop=self.loop)
4413 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4414 except Exception as e:
4415 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4416 self.logger.debug(logging_text + "Exit")
4417 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4418
4419 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4420 if not self.prometheus:
4421 return
4422 # look if exist a file called 'prometheus*.j2' and
4423 artifact_content = self.fs.dir_ls(artifact_path)
4424 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4425 if not job_file:
4426 return
4427 with self.fs.file_open((artifact_path, job_file), "r") as f:
4428 job_data = f.read()
4429
4430 # TODO get_service
4431 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4432 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4433 host_port = "80"
4434 vnfr_id = vnfr_id.replace("-", "")
4435 variables = {
4436 "JOB_NAME": vnfr_id,
4437 "TARGET_IP": target_ip,
4438 "EXPORTER_POD_IP": host_name,
4439 "EXPORTER_POD_PORT": host_port,
4440 }
4441 job_list = self.prometheus.parse_job(job_data, variables)
4442 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4443 for job in job_list:
4444 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4445 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4446 job["nsr_id"] = nsr_id
4447 job_dict = {jl["job_name"]: jl for jl in job_list}
4448 if await self.prometheus.update(job_dict):
4449 return list(job_dict.keys())