fixing prometheus metric exporter issues
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, Template, meta, TemplateError, TemplateNotFound, TemplateSyntaxError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from n2vc.k8s_helm_conn import K8sHelmConnector
31 from n2vc.k8s_juju_conn import K8sJujuConnector
32
33 from osm_common.dbbase import DbException
34 from osm_common.fsbase import FsException
35
36 from n2vc.n2vc_juju_conn import N2VCJujuConnector
37 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
38
39 from osm_lcm.lcm_helm_conn import LCMHelmConn
40
41 from copy import copy, deepcopy
42 from http import HTTPStatus
43 from time import time
44 from uuid import uuid4
45 from functools import partial
46 from random import randint
47
48 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
49
50
51 class N2VCJujuConnectorLCM(N2VCJujuConnector):
52
53 async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
54 progress_timeout: float = None, total_timeout: float = None,
55 config: dict = None, artifact_path: str = None,
56 vca_type: str = None) -> (str, dict):
57 # admit two new parameters, artifact_path and vca_type
58 if vca_type == "k8s_proxy_charm":
59 ee_id = await self.n2vc.install_k8s_proxy_charm(
60 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
61 namespace=namespace,
62 artifact_path=artifact_path,
63 db_dict=db_dict)
64 return ee_id, None
65 else:
66 return await super().create_execution_environment(
67 namespace=namespace, db_dict=db_dict, reuse_ee_id=reuse_ee_id,
68 progress_timeout=progress_timeout, total_timeout=total_timeout)
69
70 async def install_configuration_sw(self, ee_id: str, artifact_path: str, db_dict: dict,
71 progress_timeout: float = None, total_timeout: float = None,
72 config: dict = None, num_units: int = 1, vca_type: str = "lxc_proxy_charm"):
73 if vca_type == "k8s_proxy_charm":
74 return
75 return await super().install_configuration_sw(
76 ee_id=ee_id, artifact_path=artifact_path, db_dict=db_dict, progress_timeout=progress_timeout,
77 total_timeout=total_timeout, config=config, num_units=num_units)
78
79
80 class NsLcm(LcmBase):
81 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
82 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
83 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
84 timeout_charm_delete = 10 * 60
85 timeout_primitive = 30 * 60 # timeout for primitive execution
86 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
87
88 SUBOPERATION_STATUS_NOT_FOUND = -1
89 SUBOPERATION_STATUS_NEW = -2
90 SUBOPERATION_STATUS_SKIP = -3
91 task_name_deploy_vca = "Deploying VCA"
92
93 def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
94 """
95 Init, Connect to database, filesystem storage, and messaging
96 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
97 :return: None
98 """
99 super().__init__(
100 db=db,
101 msg=msg,
102 fs=fs,
103 logger=logging.getLogger('lcm.ns')
104 )
105
106 self.loop = loop
107 self.lcm_tasks = lcm_tasks
108 self.timeout = config["timeout"]
109 self.ro_config = config["ro_config"]
110 self.ng_ro = config["ro_config"].get("ng")
111 self.vca_config = config["VCA"].copy()
112
113 # create N2VC connector
114 self.n2vc = N2VCJujuConnectorLCM(
115 db=self.db,
116 fs=self.fs,
117 log=self.logger,
118 loop=self.loop,
119 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
120 username=self.vca_config.get('user', None),
121 vca_config=self.vca_config,
122 on_update_db=self._on_update_n2vc_db
123 )
124
125 self.conn_helm_ee = LCMHelmConn(
126 db=self.db,
127 fs=self.fs,
128 log=self.logger,
129 loop=self.loop,
130 url=None,
131 username=None,
132 vca_config=self.vca_config,
133 on_update_db=self._on_update_n2vc_db
134 )
135
136 self.k8sclusterhelm = K8sHelmConnector(
137 kubectl_command=self.vca_config.get("kubectlpath"),
138 helm_command=self.vca_config.get("helmpath"),
139 fs=self.fs,
140 log=self.logger,
141 db=self.db,
142 on_update_db=None,
143 )
144
145 self.k8sclusterjuju = K8sJujuConnector(
146 kubectl_command=self.vca_config.get("kubectlpath"),
147 juju_command=self.vca_config.get("jujupath"),
148 fs=self.fs,
149 log=self.logger,
150 db=self.db,
151 on_update_db=None,
152 )
153
154 self.k8scluster_map = {
155 "helm-chart": self.k8sclusterhelm,
156 "chart": self.k8sclusterhelm,
157 "juju-bundle": self.k8sclusterjuju,
158 "juju": self.k8sclusterjuju,
159 }
160
161 self.vca_map = {
162 "lxc_proxy_charm": self.n2vc,
163 "native_charm": self.n2vc,
164 "k8s_proxy_charm": self.n2vc,
165 "helm": self.conn_helm_ee
166 }
167
168 self.prometheus = prometheus
169
170 # create RO client
171 if self.ng_ro:
172 self.RO = NgRoClient(self.loop, **self.ro_config)
173 else:
174 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
175
176 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
177
178 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
179
180 try:
181 # TODO filter RO descriptor fields...
182
183 # write to database
184 db_dict = dict()
185 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
186 db_dict['deploymentStatus'] = ro_descriptor
187 self.update_db_2("nsrs", nsrs_id, db_dict)
188
189 except Exception as e:
190 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
191
192 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
193
194 # remove last dot from path (if exists)
195 if path.endswith('.'):
196 path = path[:-1]
197
198 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
199 # .format(table, filter, path, updated_data))
200
201 try:
202
203 nsr_id = filter.get('_id')
204
205 # read ns record from database
206 nsr = self.db.get_one(table='nsrs', q_filter=filter)
207 current_ns_status = nsr.get('nsState')
208
209 # get vca status for NS
210 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
211
212 # vcaStatus
213 db_dict = dict()
214 db_dict['vcaStatus'] = status_dict
215
216 # update configurationStatus for this VCA
217 try:
218 vca_index = int(path[path.rfind(".")+1:])
219
220 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
221 vca_status = vca_list[vca_index].get('status')
222
223 configuration_status_list = nsr.get('configurationStatus')
224 config_status = configuration_status_list[vca_index].get('status')
225
226 if config_status == 'BROKEN' and vca_status != 'failed':
227 db_dict['configurationStatus'][vca_index] = 'READY'
228 elif config_status != 'BROKEN' and vca_status == 'failed':
229 db_dict['configurationStatus'][vca_index] = 'BROKEN'
230 except Exception as e:
231 # not update configurationStatus
232 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
233
234 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
235 # if nsState = 'DEGRADED' check if all is OK
236 is_degraded = False
237 if current_ns_status in ('READY', 'DEGRADED'):
238 error_description = ''
239 # check machines
240 if status_dict.get('machines'):
241 for machine_id in status_dict.get('machines'):
242 machine = status_dict.get('machines').get(machine_id)
243 # check machine agent-status
244 if machine.get('agent-status'):
245 s = machine.get('agent-status').get('status')
246 if s != 'started':
247 is_degraded = True
248 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
249 # check machine instance status
250 if machine.get('instance-status'):
251 s = machine.get('instance-status').get('status')
252 if s != 'running':
253 is_degraded = True
254 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
255 # check applications
256 if status_dict.get('applications'):
257 for app_id in status_dict.get('applications'):
258 app = status_dict.get('applications').get(app_id)
259 # check application status
260 if app.get('status'):
261 s = app.get('status').get('status')
262 if s != 'active':
263 is_degraded = True
264 error_description += 'application {} status={} ; '.format(app_id, s)
265
266 if error_description:
267 db_dict['errorDescription'] = error_description
268 if current_ns_status == 'READY' and is_degraded:
269 db_dict['nsState'] = 'DEGRADED'
270 if current_ns_status == 'DEGRADED' and not is_degraded:
271 db_dict['nsState'] = 'READY'
272
273 # write to database
274 self.update_db_2("nsrs", nsr_id, db_dict)
275
276 except (asyncio.CancelledError, asyncio.TimeoutError):
277 raise
278 except Exception as e:
279 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
280
281 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
282 """
283 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
284 :param vnfd: input vnfd
285 :param new_id: overrides vnf id if provided
286 :param additionalParams: Instantiation params for VNFs provided
287 :param nsrId: Id of the NSR
288 :return: copy of vnfd
289 """
290 try:
291 vnfd_RO = deepcopy(vnfd)
292 # remove unused by RO configuration, monitoring, scaling and internal keys
293 vnfd_RO.pop("_id", None)
294 vnfd_RO.pop("_admin", None)
295 vnfd_RO.pop("vnf-configuration", None)
296 vnfd_RO.pop("monitoring-param", None)
297 vnfd_RO.pop("scaling-group-descriptor", None)
298 vnfd_RO.pop("kdu", None)
299 vnfd_RO.pop("k8s-cluster", None)
300 if new_id:
301 vnfd_RO["id"] = new_id
302
303 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
304 for vdu in get_iterable(vnfd_RO, "vdu"):
305 cloud_init_file = None
306 if vdu.get("cloud-init-file"):
307 base_folder = vnfd["_admin"]["storage"]
308 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
309 vdu["cloud-init-file"])
310 with self.fs.file_open(cloud_init_file, "r") as ci_file:
311 cloud_init_content = ci_file.read()
312 vdu.pop("cloud-init-file", None)
313 elif vdu.get("cloud-init"):
314 cloud_init_content = vdu["cloud-init"]
315 else:
316 continue
317
318 env = Environment()
319 ast = env.parse(cloud_init_content)
320 mandatory_vars = meta.find_undeclared_variables(ast)
321 if mandatory_vars:
322 for var in mandatory_vars:
323 if not additionalParams or var not in additionalParams.keys():
324 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
325 "file, must be provided in the instantiation parameters inside the "
326 "'additionalParamsForVnf' block".format(var, vnfd["id"], vdu["id"]))
327 template = Template(cloud_init_content)
328 cloud_init_content = template.render(additionalParams or {})
329 vdu["cloud-init"] = cloud_init_content
330
331 return vnfd_RO
332 except FsException as e:
333 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
334 format(vnfd["id"], vdu["id"], cloud_init_file, e))
335 except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
336 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
337 format(vnfd["id"], vdu["id"], e))
338
339 def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list):
340 """
341 Creates a RO ns descriptor from OSM ns_instantiate params
342 :param ns_params: OSM instantiate params
343 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
344 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
345 :return: The RO ns descriptor
346 """
347 vim_2_RO = {}
348 wim_2_RO = {}
349 # TODO feature 1417: Check that no instantiation is set over PDU
350 # check if PDU forces a concrete vim-network-id and add it
351 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
352
353 def vim_account_2_RO(vim_account):
354 if vim_account in vim_2_RO:
355 return vim_2_RO[vim_account]
356
357 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
358 if db_vim["_admin"]["operationalState"] != "ENABLED":
359 raise LcmException("VIM={} is not available. operationalState={}".format(
360 vim_account, db_vim["_admin"]["operationalState"]))
361 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
362 vim_2_RO[vim_account] = RO_vim_id
363 return RO_vim_id
364
365 def wim_account_2_RO(wim_account):
366 if isinstance(wim_account, str):
367 if wim_account in wim_2_RO:
368 return wim_2_RO[wim_account]
369
370 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
371 if db_wim["_admin"]["operationalState"] != "ENABLED":
372 raise LcmException("WIM={} is not available. operationalState={}".format(
373 wim_account, db_wim["_admin"]["operationalState"]))
374 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
375 wim_2_RO[wim_account] = RO_wim_id
376 return RO_wim_id
377 else:
378 return wim_account
379
380 def ip_profile_2_RO(ip_profile):
381 RO_ip_profile = deepcopy((ip_profile))
382 if "dns-server" in RO_ip_profile:
383 if isinstance(RO_ip_profile["dns-server"], list):
384 RO_ip_profile["dns-address"] = []
385 for ds in RO_ip_profile.pop("dns-server"):
386 RO_ip_profile["dns-address"].append(ds['address'])
387 else:
388 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
389 if RO_ip_profile.get("ip-version") == "ipv4":
390 RO_ip_profile["ip-version"] = "IPv4"
391 if RO_ip_profile.get("ip-version") == "ipv6":
392 RO_ip_profile["ip-version"] = "IPv6"
393 if "dhcp-params" in RO_ip_profile:
394 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
395 return RO_ip_profile
396
397 if not ns_params:
398 return None
399 RO_ns_params = {
400 # "name": ns_params["nsName"],
401 # "description": ns_params.get("nsDescription"),
402 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
403 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
404 # "scenario": ns_params["nsdId"],
405 }
406 # set vim_account of each vnf if different from general vim_account.
407 # Get this information from <vnfr> database content, key vim-account-id
408 # Vim account can be set by placement_engine and it may be different from
409 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
410 for vnf_index, vnfr in db_vnfrs.items():
411 if vnfr.get("vim-account-id") and vnfr["vim-account-id"] != ns_params["vimAccountId"]:
412 populate_dict(RO_ns_params, ("vnfs", vnf_index, "datacenter"), vim_account_2_RO(vnfr["vim-account-id"]))
413
414 n2vc_key_list = n2vc_key_list or []
415 for vnfd_ref, vnfd in vnfd_dict.items():
416 vdu_needed_access = []
417 mgmt_cp = None
418 if vnfd.get("vnf-configuration"):
419 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
420 if ssh_required and vnfd.get("mgmt-interface"):
421 if vnfd["mgmt-interface"].get("vdu-id"):
422 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
423 elif vnfd["mgmt-interface"].get("cp"):
424 mgmt_cp = vnfd["mgmt-interface"]["cp"]
425
426 for vdu in vnfd.get("vdu", ()):
427 if vdu.get("vdu-configuration"):
428 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
429 if ssh_required:
430 vdu_needed_access.append(vdu["id"])
431 elif mgmt_cp:
432 for vdu_interface in vdu.get("interface"):
433 if vdu_interface.get("external-connection-point-ref") and \
434 vdu_interface["external-connection-point-ref"] == mgmt_cp:
435 vdu_needed_access.append(vdu["id"])
436 mgmt_cp = None
437 break
438
439 if vdu_needed_access:
440 for vnf_member in nsd.get("constituent-vnfd"):
441 if vnf_member["vnfd-id-ref"] != vnfd_ref:
442 continue
443 for vdu in vdu_needed_access:
444 populate_dict(RO_ns_params,
445 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
446 n2vc_key_list)
447
448 if ns_params.get("vduImage"):
449 RO_ns_params["vduImage"] = ns_params["vduImage"]
450
451 if ns_params.get("ssh_keys"):
452 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
453 for vnf_params in get_iterable(ns_params, "vnf"):
454 for constituent_vnfd in nsd["constituent-vnfd"]:
455 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
456 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
457 break
458 else:
459 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
460 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
461
462 for vdu_params in get_iterable(vnf_params, "vdu"):
463 # TODO feature 1417: check that this VDU exist and it is not a PDU
464 if vdu_params.get("volume"):
465 for volume_params in vdu_params["volume"]:
466 if volume_params.get("vim-volume-id"):
467 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
468 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
469 volume_params["vim-volume-id"])
470 if vdu_params.get("interface"):
471 for interface_params in vdu_params["interface"]:
472 if interface_params.get("ip-address"):
473 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
474 vdu_params["id"], "interfaces", interface_params["name"],
475 "ip_address"),
476 interface_params["ip-address"])
477 if interface_params.get("mac-address"):
478 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
479 vdu_params["id"], "interfaces", interface_params["name"],
480 "mac_address"),
481 interface_params["mac-address"])
482 if interface_params.get("floating-ip-required"):
483 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
484 vdu_params["id"], "interfaces", interface_params["name"],
485 "floating-ip"),
486 interface_params["floating-ip-required"])
487
488 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
489 if internal_vld_params.get("vim-network-name"):
490 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
491 internal_vld_params["name"], "vim-network-name"),
492 internal_vld_params["vim-network-name"])
493 if internal_vld_params.get("vim-network-id"):
494 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
495 internal_vld_params["name"], "vim-network-id"),
496 internal_vld_params["vim-network-id"])
497 if internal_vld_params.get("ip-profile"):
498 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
499 internal_vld_params["name"], "ip-profile"),
500 ip_profile_2_RO(internal_vld_params["ip-profile"]))
501 if internal_vld_params.get("provider-network"):
502
503 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
504 internal_vld_params["name"], "provider-network"),
505 internal_vld_params["provider-network"].copy())
506
507 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
508 # look for interface
509 iface_found = False
510 for vdu_descriptor in vnf_descriptor["vdu"]:
511 for vdu_interface in vdu_descriptor["interface"]:
512 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
513 if icp_params.get("ip-address"):
514 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
515 vdu_descriptor["id"], "interfaces",
516 vdu_interface["name"], "ip_address"),
517 icp_params["ip-address"])
518
519 if icp_params.get("mac-address"):
520 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
521 vdu_descriptor["id"], "interfaces",
522 vdu_interface["name"], "mac_address"),
523 icp_params["mac-address"])
524 iface_found = True
525 break
526 if iface_found:
527 break
528 else:
529 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
530 "internal-vld:id-ref={} is not present at vnfd:internal-"
531 "connection-point".format(vnf_params["member-vnf-index"],
532 icp_params["id-ref"]))
533
534 for vld_params in get_iterable(ns_params, "vld"):
535 if "ip-profile" in vld_params:
536 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
537 ip_profile_2_RO(vld_params["ip-profile"]))
538
539 if vld_params.get("provider-network"):
540
541 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
542 vld_params["provider-network"].copy())
543
544 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
545 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
546 wim_account_2_RO(vld_params["wimAccountId"])),
547 if vld_params.get("vim-network-name"):
548 RO_vld_sites = []
549 if isinstance(vld_params["vim-network-name"], dict):
550 for vim_account, vim_net in vld_params["vim-network-name"].items():
551 RO_vld_sites.append({
552 "netmap-use": vim_net,
553 "datacenter": vim_account_2_RO(vim_account)
554 })
555 else: # isinstance str
556 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
557 if RO_vld_sites:
558 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
559
560 if vld_params.get("vim-network-id"):
561 RO_vld_sites = []
562 if isinstance(vld_params["vim-network-id"], dict):
563 for vim_account, vim_net in vld_params["vim-network-id"].items():
564 RO_vld_sites.append({
565 "netmap-use": vim_net,
566 "datacenter": vim_account_2_RO(vim_account)
567 })
568 else: # isinstance str
569 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
570 if RO_vld_sites:
571 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
572 if vld_params.get("ns-net"):
573 if isinstance(vld_params["ns-net"], dict):
574 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
575 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
576 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
577 if "vnfd-connection-point-ref" in vld_params:
578 for cp_params in vld_params["vnfd-connection-point-ref"]:
579 # look for interface
580 for constituent_vnfd in nsd["constituent-vnfd"]:
581 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
582 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
583 break
584 else:
585 raise LcmException(
586 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
587 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
588 match_cp = False
589 for vdu_descriptor in vnf_descriptor["vdu"]:
590 for interface_descriptor in vdu_descriptor["interface"]:
591 if interface_descriptor.get("external-connection-point-ref") == \
592 cp_params["vnfd-connection-point-ref"]:
593 match_cp = True
594 break
595 if match_cp:
596 break
597 else:
598 raise LcmException(
599 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
600 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
601 cp_params["member-vnf-index-ref"],
602 cp_params["vnfd-connection-point-ref"],
603 vnf_descriptor["id"]))
604 if cp_params.get("ip-address"):
605 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
606 vdu_descriptor["id"], "interfaces",
607 interface_descriptor["name"], "ip_address"),
608 cp_params["ip-address"])
609 if cp_params.get("mac-address"):
610 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
611 vdu_descriptor["id"], "interfaces",
612 interface_descriptor["name"], "mac_address"),
613 cp_params["mac-address"])
614 return RO_ns_params
615
616 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None):
617 # make a copy to do not change
618 vdu_create = copy(vdu_create)
619 vdu_delete = copy(vdu_delete)
620
621 vdurs = db_vnfr.get("vdur")
622 if vdurs is None:
623 vdurs = []
624 vdu_index = len(vdurs)
625 while vdu_index:
626 vdu_index -= 1
627 vdur = vdurs[vdu_index]
628 if vdur.get("pdu-type"):
629 continue
630 vdu_id_ref = vdur["vdu-id-ref"]
631 if vdu_create and vdu_create.get(vdu_id_ref):
632 for index in range(0, vdu_create[vdu_id_ref]):
633 vdur = deepcopy(vdur)
634 vdur["_id"] = str(uuid4())
635 vdur["count-index"] += 1
636 vdurs.insert(vdu_index+1+index, vdur)
637 del vdu_create[vdu_id_ref]
638 if vdu_delete and vdu_delete.get(vdu_id_ref):
639 del vdurs[vdu_index]
640 vdu_delete[vdu_id_ref] -= 1
641 if not vdu_delete[vdu_id_ref]:
642 del vdu_delete[vdu_id_ref]
643 # check all operations are done
644 if vdu_create or vdu_delete:
645 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
646 vdu_create))
647 if vdu_delete:
648 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
649 vdu_delete))
650
651 vnfr_update = {"vdur": vdurs}
652 db_vnfr["vdur"] = vdurs
653 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
654
655 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
656 """
657 Updates database nsr with the RO info for the created vld
658 :param ns_update_nsr: dictionary to be filled with the updated info
659 :param db_nsr: content of db_nsr. This is also modified
660 :param nsr_desc_RO: nsr descriptor from RO
661 :return: Nothing, LcmException is raised on errors
662 """
663
664 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
665 for net_RO in get_iterable(nsr_desc_RO, "nets"):
666 if vld["id"] != net_RO.get("ns_net_osm_id"):
667 continue
668 vld["vim-id"] = net_RO.get("vim_net_id")
669 vld["name"] = net_RO.get("vim_name")
670 vld["status"] = net_RO.get("status")
671 vld["status-detailed"] = net_RO.get("error_msg")
672 ns_update_nsr["vld.{}".format(vld_index)] = vld
673 break
674 else:
675 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
676
677 def set_vnfr_at_error(self, db_vnfrs, error_text):
678 try:
679 for db_vnfr in db_vnfrs.values():
680 vnfr_update = {"status": "ERROR"}
681 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
682 if "status" not in vdur:
683 vdur["status"] = "ERROR"
684 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
685 if error_text:
686 vdur["status-detailed"] = str(error_text)
687 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
688 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
689 except DbException as e:
690 self.logger.error("Cannot update vnf. {}".format(e))
691
692 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
693 """
694 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
695 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
696 :param nsr_desc_RO: nsr descriptor from RO
697 :return: Nothing, LcmException is raised on errors
698 """
699 for vnf_index, db_vnfr in db_vnfrs.items():
700 for vnf_RO in nsr_desc_RO["vnfs"]:
701 if vnf_RO["member_vnf_index"] != vnf_index:
702 continue
703 vnfr_update = {}
704 if vnf_RO.get("ip_address"):
705 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
706 elif not db_vnfr.get("ip-address"):
707 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
708 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
709
710 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
711 vdur_RO_count_index = 0
712 if vdur.get("pdu-type"):
713 continue
714 for vdur_RO in get_iterable(vnf_RO, "vms"):
715 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
716 continue
717 if vdur["count-index"] != vdur_RO_count_index:
718 vdur_RO_count_index += 1
719 continue
720 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
721 if vdur_RO.get("ip_address"):
722 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
723 else:
724 vdur["ip-address"] = None
725 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
726 vdur["name"] = vdur_RO.get("vim_name")
727 vdur["status"] = vdur_RO.get("status")
728 vdur["status-detailed"] = vdur_RO.get("error_msg")
729 for ifacer in get_iterable(vdur, "interfaces"):
730 for interface_RO in get_iterable(vdur_RO, "interfaces"):
731 if ifacer["name"] == interface_RO.get("internal_name"):
732 ifacer["ip-address"] = interface_RO.get("ip_address")
733 ifacer["mac-address"] = interface_RO.get("mac_address")
734 break
735 else:
736 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
737 "from VIM info"
738 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
739 vnfr_update["vdur.{}".format(vdu_index)] = vdur
740 break
741 else:
742 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
743 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
744
745 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
746 for net_RO in get_iterable(nsr_desc_RO, "nets"):
747 if vld["id"] != net_RO.get("vnf_net_osm_id"):
748 continue
749 vld["vim-id"] = net_RO.get("vim_net_id")
750 vld["name"] = net_RO.get("vim_name")
751 vld["status"] = net_RO.get("status")
752 vld["status-detailed"] = net_RO.get("error_msg")
753 vnfr_update["vld.{}".format(vld_index)] = vld
754 break
755 else:
756 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
757 vnf_index, vld["id"]))
758
759 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
760 break
761
762 else:
763 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
764
765 def _get_ns_config_info(self, nsr_id):
766 """
767 Generates a mapping between vnf,vdu elements and the N2VC id
768 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
769 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
770 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
771 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
772 """
773 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
774 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
775 mapping = {}
776 ns_config_info = {"osm-config-mapping": mapping}
777 for vca in vca_deployed_list:
778 if not vca["member-vnf-index"]:
779 continue
780 if not vca["vdu_id"]:
781 mapping[vca["member-vnf-index"]] = vca["application"]
782 else:
783 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
784 vca["application"]
785 return ns_config_info
786
787 @staticmethod
788 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed):
789 """
790 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
791 primitives as verify-ssh-credentials, or config when needed
792 :param desc_primitive_list: information of the descriptor
793 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
794 this element contains a ssh public key
795 :return: The modified list. Can ba an empty list, but always a list
796 """
797 if desc_primitive_list:
798 primitive_list = desc_primitive_list.copy()
799 else:
800 primitive_list = []
801 # look for primitive config, and get the position. None if not present
802 config_position = None
803 for index, primitive in enumerate(primitive_list):
804 if primitive["name"] == "config":
805 config_position = index
806 break
807
808 # for NS, add always a config primitive if not present (bug 874)
809 if not vca_deployed["member-vnf-index"] and config_position is None:
810 primitive_list.insert(0, {"name": "config", "parameter": []})
811 config_position = 0
812 # for VNF/VDU add verify-ssh-credentials after config
813 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
814 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
815 return primitive_list
816
817 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
818 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
819 nslcmop_id = db_nslcmop["_id"]
820 target = {
821 "name": db_nsr["name"],
822 "ns": {"vld": []},
823 "vnf": [],
824 "image": deepcopy(db_nsr["image"]),
825 "flavor": deepcopy(db_nsr["flavor"]),
826 "action_id": nslcmop_id,
827 }
828 for image in target["image"]:
829 image["vim_info"] = []
830 for flavor in target["flavor"]:
831 flavor["vim_info"] = []
832
833 ns_params = db_nslcmop.get("operationParams")
834 ssh_keys = []
835 if ns_params.get("ssh_keys"):
836 ssh_keys += ns_params.get("ssh_keys")
837 if n2vc_key_list:
838 ssh_keys += n2vc_key_list
839
840 cp2target = {}
841 for vld_index, vld in enumerate(nsd.get("vld")):
842 target_vld = {"id": vld["id"],
843 "name": vld["name"],
844 "mgmt-network": vld.get("mgmt-network", False),
845 "type": vld.get("type"),
846 "vim_info": [{"vim-network-name": vld.get("vim-network-name"),
847 "vim_account_id": ns_params["vimAccountId"]}],
848 }
849 for cp in vld["vnfd-connection-point-ref"]:
850 cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \
851 "nsrs:{}:vld.{}".format(nsr_id, vld_index)
852 target["ns"]["vld"].append(target_vld)
853 for vnfr in db_vnfrs.values():
854 vnfd = db_vnfds_ref[vnfr["vnfd-ref"]]
855 target_vnf = deepcopy(vnfr)
856 for vld in target_vnf.get("vld", ()):
857 # check if connected to a ns.vld
858 vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if
859 cp.get("internal-vld-ref") == vld["id"]), None)
860 if vnf_cp:
861 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
862 if cp2target.get(ns_cp):
863 vld["target"] = cp2target[ns_cp]
864 vld["vim_info"] = [{"vim-network-name": vld.get("vim-network-name"),
865 "vim_account_id": vnfr["vim-account-id"]}]
866
867 for vdur in target_vnf.get("vdur", ()):
868 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
869 vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"])
870 # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
871
872 if ssh_keys:
873 if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")):
874 vdur["ssh-keys"] = ssh_keys
875 vdur["ssh-access-required"] = True
876 elif deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
877 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
878 vdur["ssh-keys"] = ssh_keys
879 vdur["ssh-access-required"] = True
880
881 # cloud-init
882 if vdud.get("cloud-init-file"):
883 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
884 elif vdud.get("cloud-init"):
885 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index)
886
887 # flavor
888 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
889 if not next((vi for vi in ns_flavor["vim_info"] if
890 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
891 ns_flavor["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
892 # image
893 ns_image = target["image"][int(vdur["ns-image-id"])]
894 if not next((vi for vi in ns_image["vim_info"] if
895 vi and vi.get("vim_account_id") == vnfr["vim-account-id"]), None):
896 ns_image["vim_info"].append({"vim_account_id": vnfr["vim-account-id"]})
897
898 vdur["vim_info"] = [{"vim_account_id": vnfr["vim-account-id"]}]
899 target["vnf"].append(target_vnf)
900
901 desc = await self.RO.deploy(nsr_id, target)
902 action_id = desc["action_id"]
903 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
904
905 # Updating NSR
906 db_nsr_update = {
907 "_admin.deployed.RO.operational-status": "running",
908 "detailed-status": " ".join(stage)
909 }
910 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
911 self.update_db_2("nsrs", nsr_id, db_nsr_update)
912 self._write_op_status(nslcmop_id, stage)
913 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
914 return
915
916 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_time, timeout, stage):
917 detailed_status_old = None
918 db_nsr_update = {}
919 while time() <= start_time + timeout:
920 desc_status = await self.RO.status(nsr_id, action_id)
921 if desc_status["status"] == "FAILED":
922 raise NgRoException(desc_status["details"])
923 elif desc_status["status"] == "BUILD":
924 stage[2] = "VIM: ({})".format(desc_status["details"])
925 elif desc_status["status"] == "DONE":
926 stage[2] = "Deployed at VIM"
927 break
928 else:
929 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
930 if stage[2] != detailed_status_old:
931 detailed_status_old = stage[2]
932 db_nsr_update["detailed-status"] = " ".join(stage)
933 self.update_db_2("nsrs", nsr_id, db_nsr_update)
934 self._write_op_status(nslcmop_id, stage)
935 await asyncio.sleep(5, loop=self.loop)
936 else: # timeout_ns_deploy
937 raise NgRoException("Timeout waiting ns to deploy")
938
939 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
940 db_nsr_update = {}
941 failed_detail = []
942 action_id = None
943 start_deploy = time()
944 try:
945 target = {
946 "ns": {"vld": []},
947 "vnf": [],
948 "image": [],
949 "flavor": [],
950 }
951 desc = await self.RO.deploy(nsr_id, target)
952 action_id = desc["action_id"]
953 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
954 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
955 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
956
957 # wait until done
958 delete_timeout = 20 * 60 # 20 minutes
959 await self._wait_ng_ro(self, nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
960
961 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
962 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
963 # delete all nsr
964 await self.RO.delete(nsr_id)
965 except Exception as e:
966 if isinstance(e, NgRoException) and e.http_code == 404: # not found
967 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
968 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
969 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
970 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
971 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
972 failed_detail.append("delete conflict: {}".format(e))
973 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
974 else:
975 failed_detail.append("delete error: {}".format(e))
976 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
977
978 if failed_detail:
979 stage[2] = "Error deleting from VIM"
980 else:
981 stage[2] = "Deleted from VIM"
982 db_nsr_update["detailed-status"] = " ".join(stage)
983 self.update_db_2("nsrs", nsr_id, db_nsr_update)
984 self._write_op_status(nslcmop_id, stage)
985
986 if failed_detail:
987 raise LcmException("; ".join(failed_detail))
988 return
989
990 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
991 n2vc_key_list, stage):
992 """
993 Instantiate at RO
994 :param logging_text: preffix text to use at logging
995 :param nsr_id: nsr identity
996 :param nsd: database content of ns descriptor
997 :param db_nsr: database content of ns record
998 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
999 :param db_vnfrs:
1000 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1001 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1002 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1003 :return: None or exception
1004 """
1005 try:
1006 db_nsr_update = {}
1007 RO_descriptor_number = 0 # number of descriptors created at RO
1008 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
1009 nslcmop_id = db_nslcmop["_id"]
1010 start_deploy = time()
1011 ns_params = db_nslcmop.get("operationParams")
1012 if ns_params and ns_params.get("timeout_ns_deploy"):
1013 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1014 else:
1015 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1016
1017 # Check for and optionally request placement optimization. Database will be updated if placement activated
1018 stage[2] = "Waiting for Placement."
1019 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1020 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1021 for vnfr in db_vnfrs.values():
1022 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1023 break
1024 else:
1025 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1026
1027 if self.ng_ro:
1028 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
1029 db_vnfds_ref, n2vc_key_list, stage, start_deploy,
1030 timeout_ns_deploy)
1031 # deploy RO
1032 # get vnfds, instantiate at RO
1033 for c_vnf in nsd.get("constituent-vnfd", ()):
1034 member_vnf_index = c_vnf["member-vnf-index"]
1035 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
1036 vnfd_ref = vnfd["id"]
1037
1038 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
1039 db_nsr_update["detailed-status"] = " ".join(stage)
1040 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1041 self._write_op_status(nslcmop_id, stage)
1042
1043 # self.logger.debug(logging_text + stage[2])
1044 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
1045 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
1046 RO_descriptor_number += 1
1047
1048 # look position at deployed.RO.vnfd if not present it will be appended at the end
1049 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
1050 if vnf_deployed["member-vnf-index"] == member_vnf_index:
1051 break
1052 else:
1053 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
1054 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1055
1056 # look if present
1057 RO_update = {"member-vnf-index": member_vnf_index}
1058 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
1059 if vnfd_list:
1060 RO_update["id"] = vnfd_list[0]["uuid"]
1061 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1062 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
1063 else:
1064 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
1065 get("additionalParamsForVnf"), nsr_id)
1066 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
1067 RO_update["id"] = desc["uuid"]
1068 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1069 vnfd_ref, member_vnf_index, desc["uuid"]))
1070 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
1071 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
1072
1073 # create nsd at RO
1074 nsd_ref = nsd["id"]
1075
1076 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1077 db_nsr_update["detailed-status"] = " ".join(stage)
1078 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1079 self._write_op_status(nslcmop_id, stage)
1080
1081 # self.logger.debug(logging_text + stage[2])
1082 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
1083 RO_descriptor_number += 1
1084 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
1085 if nsd_list:
1086 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
1087 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
1088 nsd_ref, RO_nsd_uuid))
1089 else:
1090 nsd_RO = deepcopy(nsd)
1091 nsd_RO["id"] = RO_osm_nsd_id
1092 nsd_RO.pop("_id", None)
1093 nsd_RO.pop("_admin", None)
1094 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
1095 member_vnf_index = c_vnf["member-vnf-index"]
1096 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1097 for c_vld in nsd_RO.get("vld", ()):
1098 for cp in c_vld.get("vnfd-connection-point-ref", ()):
1099 member_vnf_index = cp["member-vnf-index-ref"]
1100 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1101
1102 desc = await self.RO.create("nsd", descriptor=nsd_RO)
1103 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1104 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
1105 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
1106 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1107
1108 # Crate ns at RO
1109 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1110 db_nsr_update["detailed-status"] = " ".join(stage)
1111 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1112 self._write_op_status(nslcmop_id, stage)
1113
1114 # if present use it unless in error status
1115 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
1116 if RO_nsr_id:
1117 try:
1118 stage[2] = "Looking for existing ns at RO"
1119 db_nsr_update["detailed-status"] = " ".join(stage)
1120 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1121 self._write_op_status(nslcmop_id, stage)
1122 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1123 desc = await self.RO.show("ns", RO_nsr_id)
1124
1125 except ROclient.ROClientException as e:
1126 if e.http_code != HTTPStatus.NOT_FOUND:
1127 raise
1128 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1129 if RO_nsr_id:
1130 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1131 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1132 if ns_status == "ERROR":
1133 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
1134 self.logger.debug(logging_text + stage[2])
1135 await self.RO.delete("ns", RO_nsr_id)
1136 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1137 if not RO_nsr_id:
1138 stage[2] = "Checking dependencies"
1139 db_nsr_update["detailed-status"] = " ".join(stage)
1140 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1141 self._write_op_status(nslcmop_id, stage)
1142 # self.logger.debug(logging_text + stage[2])
1143
1144 # check if VIM is creating and wait look if previous tasks in process
1145 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
1146 if task_dependency:
1147 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
1148 self.logger.debug(logging_text + stage[2])
1149 await asyncio.wait(task_dependency, timeout=3600)
1150 if ns_params.get("vnf"):
1151 for vnf in ns_params["vnf"]:
1152 if "vimAccountId" in vnf:
1153 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
1154 vnf["vimAccountId"])
1155 if task_dependency:
1156 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
1157 self.logger.debug(logging_text + stage[2])
1158 await asyncio.wait(task_dependency, timeout=3600)
1159
1160 stage[2] = "Checking instantiation parameters."
1161 RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, db_vnfrs, n2vc_key_list)
1162 stage[2] = "Deploying ns at VIM."
1163 db_nsr_update["detailed-status"] = " ".join(stage)
1164 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1165 self._write_op_status(nslcmop_id, stage)
1166
1167 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
1168 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
1169 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1170 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
1171 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
1172
1173 # wait until NS is ready
1174 stage[2] = "Waiting VIM to deploy ns."
1175 db_nsr_update["detailed-status"] = " ".join(stage)
1176 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1177 self._write_op_status(nslcmop_id, stage)
1178 detailed_status_old = None
1179 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1180
1181 old_desc = None
1182 while time() <= start_deploy + timeout_ns_deploy:
1183 desc = await self.RO.show("ns", RO_nsr_id)
1184
1185 # deploymentStatus
1186 if desc != old_desc:
1187 # desc has changed => update db
1188 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
1189 old_desc = desc
1190
1191 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1192 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1193 if ns_status == "ERROR":
1194 raise ROclient.ROClientException(ns_status_info)
1195 elif ns_status == "BUILD":
1196 stage[2] = "VIM: ({})".format(ns_status_info)
1197 elif ns_status == "ACTIVE":
1198 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1199 try:
1200 self.ns_update_vnfr(db_vnfrs, desc)
1201 break
1202 except LcmExceptionNoMgmtIP:
1203 pass
1204 else:
1205 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1206 if stage[2] != detailed_status_old:
1207 detailed_status_old = stage[2]
1208 db_nsr_update["detailed-status"] = " ".join(stage)
1209 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1210 self._write_op_status(nslcmop_id, stage)
1211 await asyncio.sleep(5, loop=self.loop)
1212 else: # timeout_ns_deploy
1213 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1214
1215 # Updating NSR
1216 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
1217
1218 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
1219 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1220 stage[2] = "Deployed at VIM"
1221 db_nsr_update["detailed-status"] = " ".join(stage)
1222 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1223 self._write_op_status(nslcmop_id, stage)
1224 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1225 # self.logger.debug(logging_text + "Deployed at VIM")
1226 except (ROclient.ROClientException, LcmException, DbException, NgRoException) as e:
1227 stage[2] = "ERROR deploying at VIM"
1228 self.set_vnfr_at_error(db_vnfrs, str(e))
1229 raise
1230
1231 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1232 """
1233 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1234 :param logging_text: prefix use for logging
1235 :param nsr_id:
1236 :param vnfr_id:
1237 :param vdu_id:
1238 :param vdu_index:
1239 :param pub_key: public ssh key to inject, None to skip
1240 :param user: user to apply the public ssh key
1241 :return: IP address
1242 """
1243
1244 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1245 ro_nsr_id = None
1246 ip_address = None
1247 nb_tries = 0
1248 target_vdu_id = None
1249 ro_retries = 0
1250
1251 while True:
1252
1253 ro_retries += 1
1254 if ro_retries >= 360: # 1 hour
1255 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1256
1257 await asyncio.sleep(10, loop=self.loop)
1258
1259 # get ip address
1260 if not target_vdu_id:
1261 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1262
1263 if not vdu_id: # for the VNF case
1264 if db_vnfr.get("status") == "ERROR":
1265 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1266 ip_address = db_vnfr.get("ip-address")
1267 if not ip_address:
1268 continue
1269 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1270 else: # VDU case
1271 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1272 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1273
1274 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1275 vdur = db_vnfr["vdur"][0]
1276 if not vdur:
1277 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1278 vdu_index))
1279
1280 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE":
1281 ip_address = vdur.get("ip-address")
1282 if not ip_address:
1283 continue
1284 target_vdu_id = vdur["vdu-id-ref"]
1285 elif vdur.get("status") == "ERROR":
1286 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1287
1288 if not target_vdu_id:
1289 continue
1290
1291 # inject public key into machine
1292 if pub_key and user:
1293 # wait until NS is deployed at RO
1294 if not ro_nsr_id:
1295 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1296 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1297 if not ro_nsr_id:
1298 continue
1299
1300 # self.logger.debug(logging_text + "Inserting RO key")
1301 if vdur.get("pdu-type"):
1302 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1303 return ip_address
1304 try:
1305 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1306 if self.ng_ro:
1307 target = {"action": "inject_ssh_key", "key": pub_key, "user": user,
1308 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdu_id}]}],
1309 }
1310 await self.RO.deploy(nsr_id, target)
1311 else:
1312 result_dict = await self.RO.create_action(
1313 item="ns",
1314 item_id_name=ro_nsr_id,
1315 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1316 )
1317 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1318 if not result_dict or not isinstance(result_dict, dict):
1319 raise LcmException("Unknown response from RO when injecting key")
1320 for result in result_dict.values():
1321 if result.get("vim_result") == 200:
1322 break
1323 else:
1324 raise ROclient.ROClientException("error injecting key: {}".format(
1325 result.get("description")))
1326 break
1327 except NgRoException as e:
1328 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1329 except ROclient.ROClientException as e:
1330 if not nb_tries:
1331 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1332 format(e, 20*10))
1333 nb_tries += 1
1334 if nb_tries >= 20:
1335 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1336 else:
1337 break
1338
1339 return ip_address
1340
1341 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1342 """
1343 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1344 """
1345 my_vca = vca_deployed_list[vca_index]
1346 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1347 # vdu or kdu: no dependencies
1348 return
1349 timeout = 300
1350 while timeout >= 0:
1351 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1352 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1353 configuration_status_list = db_nsr["configurationStatus"]
1354 for index, vca_deployed in enumerate(configuration_status_list):
1355 if index == vca_index:
1356 # myself
1357 continue
1358 if not my_vca.get("member-vnf-index") or \
1359 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1360 internal_status = configuration_status_list[index].get("status")
1361 if internal_status == 'READY':
1362 continue
1363 elif internal_status == 'BROKEN':
1364 raise LcmException("Configuration aborted because dependent charm/s has failed")
1365 else:
1366 break
1367 else:
1368 # no dependencies, return
1369 return
1370 await asyncio.sleep(10)
1371 timeout -= 1
1372
1373 raise LcmException("Configuration aborted because dependent charm/s timeout")
1374
1375 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1376 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1377 ee_config_descriptor):
1378 nsr_id = db_nsr["_id"]
1379 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1380 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1381 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1382 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1383 db_dict = {
1384 'collection': 'nsrs',
1385 'filter': {'_id': nsr_id},
1386 'path': db_update_entry
1387 }
1388 step = ""
1389 try:
1390
1391 element_type = 'NS'
1392 element_under_configuration = nsr_id
1393
1394 vnfr_id = None
1395 if db_vnfr:
1396 vnfr_id = db_vnfr["_id"]
1397 osm_config["osm"]["vnf_id"] = vnfr_id
1398
1399 namespace = "{nsi}.{ns}".format(
1400 nsi=nsi_id if nsi_id else "",
1401 ns=nsr_id)
1402
1403 if vnfr_id:
1404 element_type = 'VNF'
1405 element_under_configuration = vnfr_id
1406 namespace += ".{}".format(vnfr_id)
1407 if vdu_id:
1408 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1409 element_type = 'VDU'
1410 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1411 osm_config["osm"]["vdu_id"] = vdu_id
1412 elif kdu_name:
1413 namespace += ".{}".format(kdu_name)
1414 element_type = 'KDU'
1415 element_under_configuration = kdu_name
1416 osm_config["osm"]["kdu_name"] = kdu_name
1417
1418 # Get artifact path
1419 artifact_path = "{}/{}/{}/{}".format(
1420 base_folder["folder"],
1421 base_folder["pkg-dir"],
1422 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1423 vca_name
1424 )
1425
1426 # n2vc_redesign STEP 3.1
1427
1428 # find old ee_id if exists
1429 ee_id = vca_deployed.get("ee_id")
1430
1431 # create or register execution environment in VCA
1432 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"):
1433
1434 self._write_configuration_status(
1435 nsr_id=nsr_id,
1436 vca_index=vca_index,
1437 status='CREATING',
1438 element_under_configuration=element_under_configuration,
1439 element_type=element_type
1440 )
1441
1442 step = "create execution environment"
1443 self.logger.debug(logging_text + step)
1444 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1445 namespace=namespace,
1446 reuse_ee_id=ee_id,
1447 db_dict=db_dict,
1448 config=osm_config,
1449 artifact_path=artifact_path,
1450 vca_type=vca_type)
1451
1452 elif vca_type == "native_charm":
1453 step = "Waiting to VM being up and getting IP address"
1454 self.logger.debug(logging_text + step)
1455 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1456 user=None, pub_key=None)
1457 credentials = {"hostname": rw_mgmt_ip}
1458 # get username
1459 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1460 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1461 # merged. Meanwhile let's get username from initial-config-primitive
1462 if not username and config_descriptor.get("initial-config-primitive"):
1463 for config_primitive in config_descriptor["initial-config-primitive"]:
1464 for param in config_primitive.get("parameter", ()):
1465 if param["name"] == "ssh-username":
1466 username = param["value"]
1467 break
1468 if not username:
1469 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1470 "'config-access.ssh-access.default-user'")
1471 credentials["username"] = username
1472 # n2vc_redesign STEP 3.2
1473
1474 self._write_configuration_status(
1475 nsr_id=nsr_id,
1476 vca_index=vca_index,
1477 status='REGISTERING',
1478 element_under_configuration=element_under_configuration,
1479 element_type=element_type
1480 )
1481
1482 step = "register execution environment {}".format(credentials)
1483 self.logger.debug(logging_text + step)
1484 ee_id = await self.vca_map[vca_type].register_execution_environment(
1485 credentials=credentials, namespace=namespace, db_dict=db_dict)
1486
1487 # for compatibility with MON/POL modules, the need model and application name at database
1488 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1489 ee_id_parts = ee_id.split('.')
1490 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1491 if len(ee_id_parts) >= 2:
1492 model_name = ee_id_parts[0]
1493 application_name = ee_id_parts[1]
1494 db_nsr_update[db_update_entry + "model"] = model_name
1495 db_nsr_update[db_update_entry + "application"] = application_name
1496
1497 # n2vc_redesign STEP 3.3
1498 step = "Install configuration Software"
1499
1500 self._write_configuration_status(
1501 nsr_id=nsr_id,
1502 vca_index=vca_index,
1503 status='INSTALLING SW',
1504 element_under_configuration=element_under_configuration,
1505 element_type=element_type,
1506 other_update=db_nsr_update
1507 )
1508
1509 # TODO check if already done
1510 self.logger.debug(logging_text + step)
1511 config = None
1512 if vca_type == "native_charm":
1513 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1514 if initial_config_primitive_list:
1515 for primitive in initial_config_primitive_list:
1516 if primitive["name"] == "config":
1517 config = self._map_primitive_params(
1518 primitive,
1519 {},
1520 deploy_params
1521 )
1522 break
1523 num_units = 1
1524 if vca_type == "lxc_proxy_charm":
1525 if element_type == "NS":
1526 num_units = db_nsr.get("config-units") or 1
1527 elif element_type == "VNF":
1528 num_units = db_vnfr.get("config-units") or 1
1529 elif element_type == "VDU":
1530 for v in db_vnfr["vdur"]:
1531 if vdu_id == v["vdu-id-ref"]:
1532 num_units = v.get("config-units") or 1
1533 break
1534
1535 await self.vca_map[vca_type].install_configuration_sw(
1536 ee_id=ee_id,
1537 artifact_path=artifact_path,
1538 db_dict=db_dict,
1539 config=config,
1540 num_units=num_units,
1541 vca_type=vca_type
1542 )
1543
1544 # write in db flag of configuration_sw already installed
1545 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1546
1547 # add relations for this VCA (wait for other peers related with this VCA)
1548 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1549 vca_index=vca_index, vca_type=vca_type)
1550
1551 # if SSH access is required, then get execution environment SSH public
1552 if vca_type in ("lxc_proxy_charm", "helm"): # if native charm we have waited already to VM be UP
1553 pub_key = None
1554 user = None
1555 # self.logger.debug("get ssh key block")
1556 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1557 # self.logger.debug("ssh key needed")
1558 # Needed to inject a ssh key
1559 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1560 step = "Install configuration Software, getting public ssh key"
1561 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1562
1563 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1564 else:
1565 # self.logger.debug("no need to get ssh key")
1566 step = "Waiting to VM being up and getting IP address"
1567 self.logger.debug(logging_text + step)
1568
1569 # n2vc_redesign STEP 5.1
1570 # wait for RO (ip-address) Insert pub_key into VM
1571 if vnfr_id:
1572 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1573 user=user, pub_key=pub_key)
1574 else:
1575 rw_mgmt_ip = None # This is for a NS configuration
1576
1577 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1578
1579 # store rw_mgmt_ip in deploy params for later replacement
1580 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1581
1582 # n2vc_redesign STEP 6 Execute initial config primitive
1583 step = 'execute initial config primitive'
1584 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1585
1586 # sort initial config primitives by 'seq'
1587 if initial_config_primitive_list:
1588 try:
1589 initial_config_primitive_list.sort(key=lambda val: int(val['seq']))
1590 except Exception as e:
1591 self.logger.error(logging_text + step + ": " + str(e))
1592 else:
1593 self.logger.debug(logging_text + step + ": No initial-config-primitive")
1594
1595 # add config if not present for NS charm
1596 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1597 vca_deployed)
1598
1599 # wait for dependent primitives execution (NS -> VNF -> VDU)
1600 if initial_config_primitive_list:
1601 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1602
1603 # stage, in function of element type: vdu, kdu, vnf or ns
1604 my_vca = vca_deployed_list[vca_index]
1605 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1606 # VDU or KDU
1607 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1608 elif my_vca.get("member-vnf-index"):
1609 # VNF
1610 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1611 else:
1612 # NS
1613 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1614
1615 self._write_configuration_status(
1616 nsr_id=nsr_id,
1617 vca_index=vca_index,
1618 status='EXECUTING PRIMITIVE'
1619 )
1620
1621 self._write_op_status(
1622 op_id=nslcmop_id,
1623 stage=stage
1624 )
1625
1626 check_if_terminated_needed = True
1627 for initial_config_primitive in initial_config_primitive_list:
1628 # adding information on the vca_deployed if it is a NS execution environment
1629 if not vca_deployed["member-vnf-index"]:
1630 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1631 # TODO check if already done
1632 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1633
1634 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1635 self.logger.debug(logging_text + step)
1636 await self.vca_map[vca_type].exec_primitive(
1637 ee_id=ee_id,
1638 primitive_name=initial_config_primitive["name"],
1639 params_dict=primitive_params_,
1640 db_dict=db_dict
1641 )
1642 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1643 if check_if_terminated_needed:
1644 if config_descriptor.get('terminate-config-primitive'):
1645 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1646 check_if_terminated_needed = False
1647
1648 # TODO register in database that primitive is done
1649
1650 # STEP 7 Configure metrics
1651 if vca_type == "helm":
1652 prometheus_jobs = await self.add_prometheus_metrics(
1653 ee_id=ee_id,
1654 artifact_path=artifact_path,
1655 ee_config_descriptor=ee_config_descriptor,
1656 vnfr_id=vnfr_id,
1657 nsr_id=nsr_id,
1658 target_ip=rw_mgmt_ip,
1659 )
1660 if prometheus_jobs:
1661 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1662
1663 step = "instantiated at VCA"
1664 self.logger.debug(logging_text + step)
1665
1666 self._write_configuration_status(
1667 nsr_id=nsr_id,
1668 vca_index=vca_index,
1669 status='READY'
1670 )
1671
1672 except Exception as e: # TODO not use Exception but N2VC exception
1673 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1674 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1675 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1676 self._write_configuration_status(
1677 nsr_id=nsr_id,
1678 vca_index=vca_index,
1679 status='BROKEN'
1680 )
1681 raise LcmException("{} {}".format(step, e)) from e
1682
1683 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1684 error_description: str = None, error_detail: str = None, other_update: dict = None):
1685 """
1686 Update db_nsr fields.
1687 :param nsr_id:
1688 :param ns_state:
1689 :param current_operation:
1690 :param current_operation_id:
1691 :param error_description:
1692 :param error_detail:
1693 :param other_update: Other required changes at database if provided, will be cleared
1694 :return:
1695 """
1696 try:
1697 db_dict = other_update or {}
1698 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1699 db_dict["_admin.current-operation"] = current_operation_id
1700 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1701 db_dict["currentOperation"] = current_operation
1702 db_dict["currentOperationID"] = current_operation_id
1703 db_dict["errorDescription"] = error_description
1704 db_dict["errorDetail"] = error_detail
1705
1706 if ns_state:
1707 db_dict["nsState"] = ns_state
1708 self.update_db_2("nsrs", nsr_id, db_dict)
1709 except DbException as e:
1710 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1711
1712 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1713 operation_state: str = None, other_update: dict = None):
1714 try:
1715 db_dict = other_update or {}
1716 db_dict['queuePosition'] = queuePosition
1717 if isinstance(stage, list):
1718 db_dict['stage'] = stage[0]
1719 db_dict['detailed-status'] = " ".join(stage)
1720 elif stage is not None:
1721 db_dict['stage'] = str(stage)
1722
1723 if error_message is not None:
1724 db_dict['errorMessage'] = error_message
1725 if operation_state is not None:
1726 db_dict['operationState'] = operation_state
1727 db_dict["statusEnteredTime"] = time()
1728 self.update_db_2("nslcmops", op_id, db_dict)
1729 except DbException as e:
1730 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1731
1732 def _write_all_config_status(self, db_nsr: dict, status: str):
1733 try:
1734 nsr_id = db_nsr["_id"]
1735 # configurationStatus
1736 config_status = db_nsr.get('configurationStatus')
1737 if config_status:
1738 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1739 enumerate(config_status) if v}
1740 # update status
1741 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1742
1743 except DbException as e:
1744 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1745
1746 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1747 element_under_configuration: str = None, element_type: str = None,
1748 other_update: dict = None):
1749
1750 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1751 # .format(vca_index, status))
1752
1753 try:
1754 db_path = 'configurationStatus.{}.'.format(vca_index)
1755 db_dict = other_update or {}
1756 if status:
1757 db_dict[db_path + 'status'] = status
1758 if element_under_configuration:
1759 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1760 if element_type:
1761 db_dict[db_path + 'elementType'] = element_type
1762 self.update_db_2("nsrs", nsr_id, db_dict)
1763 except DbException as e:
1764 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1765 .format(status, nsr_id, vca_index, e))
1766
1767 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1768 """
1769 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1770 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1771 Database is used because the result can be obtained from a different LCM worker in case of HA.
1772 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1773 :param db_nslcmop: database content of nslcmop
1774 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1775 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1776 computed 'vim-account-id'
1777 """
1778 modified = False
1779 nslcmop_id = db_nslcmop['_id']
1780 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1781 if placement_engine == "PLA":
1782 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1783 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1784 db_poll_interval = 5
1785 wait = db_poll_interval * 10
1786 pla_result = None
1787 while not pla_result and wait >= 0:
1788 await asyncio.sleep(db_poll_interval)
1789 wait -= db_poll_interval
1790 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1791 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1792
1793 if not pla_result:
1794 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1795
1796 for pla_vnf in pla_result['vnf']:
1797 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1798 if not pla_vnf.get('vimAccountId') or not vnfr:
1799 continue
1800 modified = True
1801 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1802 # Modifies db_vnfrs
1803 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1804 return modified
1805
1806 def update_nsrs_with_pla_result(self, params):
1807 try:
1808 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1809 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1810 except Exception as e:
1811 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1812
1813 async def instantiate(self, nsr_id, nslcmop_id):
1814 """
1815
1816 :param nsr_id: ns instance to deploy
1817 :param nslcmop_id: operation to run
1818 :return:
1819 """
1820
1821 # Try to lock HA task here
1822 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1823 if not task_is_locked_by_me:
1824 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1825 return
1826
1827 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1828 self.logger.debug(logging_text + "Enter")
1829
1830 # Sync from FSMongo
1831 self.fs.sync()
1832
1833 # get all needed from database
1834
1835 # database nsrs record
1836 db_nsr = None
1837
1838 # database nslcmops record
1839 db_nslcmop = None
1840
1841 # update operation on nsrs
1842 db_nsr_update = {}
1843 # update operation on nslcmops
1844 db_nslcmop_update = {}
1845
1846 nslcmop_operation_state = None
1847 db_vnfrs = {} # vnf's info indexed by member-index
1848 # n2vc_info = {}
1849 tasks_dict_info = {} # from task to info text
1850 exc = None
1851 error_list = []
1852 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1853 # ^ stage, step, VIM progress
1854 try:
1855 # wait for any previous tasks in process
1856 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1857
1858 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1859 stage[1] = "Reading from database,"
1860 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1861 db_nsr_update["detailed-status"] = "creating"
1862 db_nsr_update["operational-status"] = "init"
1863 self._write_ns_status(
1864 nsr_id=nsr_id,
1865 ns_state="BUILDING",
1866 current_operation="INSTANTIATING",
1867 current_operation_id=nslcmop_id,
1868 other_update=db_nsr_update
1869 )
1870 self._write_op_status(
1871 op_id=nslcmop_id,
1872 stage=stage,
1873 queuePosition=0
1874 )
1875
1876 # read from db: operation
1877 stage[1] = "Getting nslcmop={} from db".format(nslcmop_id)
1878 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1879 ns_params = db_nslcmop.get("operationParams")
1880 if ns_params and ns_params.get("timeout_ns_deploy"):
1881 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1882 else:
1883 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1884
1885 # read from db: ns
1886 stage[1] = "Getting nsr={} from db".format(nsr_id)
1887 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1888 stage[1] = "Getting nsd={} from db".format(db_nsr["nsd-id"])
1889 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1890 db_nsr["nsd"] = nsd
1891 # nsr_name = db_nsr["name"] # TODO short-name??
1892
1893 # read from db: vnf's of this ns
1894 stage[1] = "Getting vnfrs from db"
1895 self.logger.debug(logging_text + stage[1])
1896 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1897
1898 # read from db: vnfd's for every vnf
1899 db_vnfds_ref = {} # every vnfd data indexed by vnf name
1900 db_vnfds = {} # every vnfd data indexed by vnf id
1901 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
1902
1903 # for each vnf in ns, read vnfd
1904 for vnfr in db_vnfrs_list:
1905 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
1906 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
1907 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
1908 # if we haven't this vnfd, read it from db
1909 if vnfd_id not in db_vnfds:
1910 # read from db
1911 stage[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id, vnfd_ref)
1912 self.logger.debug(logging_text + stage[1])
1913 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1914
1915 # store vnfd
1916 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
1917 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
1918 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
1919
1920 # Get or generates the _admin.deployed.VCA list
1921 vca_deployed_list = None
1922 if db_nsr["_admin"].get("deployed"):
1923 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1924 if vca_deployed_list is None:
1925 vca_deployed_list = []
1926 configuration_status_list = []
1927 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1928 db_nsr_update["configurationStatus"] = configuration_status_list
1929 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1930 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1931 elif isinstance(vca_deployed_list, dict):
1932 # maintain backward compatibility. Change a dict to list at database
1933 vca_deployed_list = list(vca_deployed_list.values())
1934 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1935 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1936
1937 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1938 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1939 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1940
1941 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1942 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1943 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1944
1945 # n2vc_redesign STEP 2 Deploy Network Scenario
1946 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1947 self._write_op_status(
1948 op_id=nslcmop_id,
1949 stage=stage
1950 )
1951
1952 stage[1] = "Deploying KDUs,"
1953 # self.logger.debug(logging_text + "Before deploy_kdus")
1954 # Call to deploy_kdus in case exists the "vdu:kdu" param
1955 await self.deploy_kdus(
1956 logging_text=logging_text,
1957 nsr_id=nsr_id,
1958 nslcmop_id=nslcmop_id,
1959 db_vnfrs=db_vnfrs,
1960 db_vnfds=db_vnfds,
1961 task_instantiation_info=tasks_dict_info,
1962 )
1963
1964 stage[1] = "Getting VCA public key."
1965 # n2vc_redesign STEP 1 Get VCA public ssh-key
1966 # feature 1429. Add n2vc public key to needed VMs
1967 n2vc_key = self.n2vc.get_public_key()
1968 n2vc_key_list = [n2vc_key]
1969 if self.vca_config.get("public_key"):
1970 n2vc_key_list.append(self.vca_config["public_key"])
1971
1972 stage[1] = "Deploying NS at VIM."
1973 task_ro = asyncio.ensure_future(
1974 self.instantiate_RO(
1975 logging_text=logging_text,
1976 nsr_id=nsr_id,
1977 nsd=nsd,
1978 db_nsr=db_nsr,
1979 db_nslcmop=db_nslcmop,
1980 db_vnfrs=db_vnfrs,
1981 db_vnfds_ref=db_vnfds_ref,
1982 n2vc_key_list=n2vc_key_list,
1983 stage=stage
1984 )
1985 )
1986 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1987 tasks_dict_info[task_ro] = "Deploying at VIM"
1988
1989 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1990 stage[1] = "Deploying Execution Environments."
1991 self.logger.debug(logging_text + stage[1])
1992
1993 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1994 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1995 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
1996 vnfd_id = c_vnf["vnfd-id-ref"]
1997 vnfd = db_vnfds_ref[vnfd_id]
1998 member_vnf_index = str(c_vnf["member-vnf-index"])
1999 db_vnfr = db_vnfrs[member_vnf_index]
2000 base_folder = vnfd["_admin"]["storage"]
2001 vdu_id = None
2002 vdu_index = 0
2003 vdu_name = None
2004 kdu_name = None
2005
2006 # Get additional parameters
2007 deploy_params = {}
2008 if db_vnfr.get("additionalParamsForVnf"):
2009 deploy_params = self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy())
2010
2011 descriptor_config = vnfd.get("vnf-configuration")
2012 if descriptor_config:
2013 self._deploy_n2vc(
2014 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
2015 db_nsr=db_nsr,
2016 db_vnfr=db_vnfr,
2017 nslcmop_id=nslcmop_id,
2018 nsr_id=nsr_id,
2019 nsi_id=nsi_id,
2020 vnfd_id=vnfd_id,
2021 vdu_id=vdu_id,
2022 kdu_name=kdu_name,
2023 member_vnf_index=member_vnf_index,
2024 vdu_index=vdu_index,
2025 vdu_name=vdu_name,
2026 deploy_params=deploy_params,
2027 descriptor_config=descriptor_config,
2028 base_folder=base_folder,
2029 task_instantiation_info=tasks_dict_info,
2030 stage=stage
2031 )
2032
2033 # Deploy charms for each VDU that supports one.
2034 for vdud in get_iterable(vnfd, 'vdu'):
2035 vdu_id = vdud["id"]
2036 descriptor_config = vdud.get('vdu-configuration')
2037 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2038 if vdur.get("additionalParams"):
2039 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
2040 else:
2041 deploy_params_vdu = deploy_params
2042 if descriptor_config:
2043 # look for vdu index in the db_vnfr["vdu"] section
2044 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
2045 # if vdur["vdu-id-ref"] == vdu_id:
2046 # break
2047 # else:
2048 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
2049 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
2050 # vdu_name = vdur.get("name")
2051 vdu_name = None
2052 kdu_name = None
2053 for vdu_index in range(int(vdud.get("count", 1))):
2054 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2055 self._deploy_n2vc(
2056 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2057 member_vnf_index, vdu_id, vdu_index),
2058 db_nsr=db_nsr,
2059 db_vnfr=db_vnfr,
2060 nslcmop_id=nslcmop_id,
2061 nsr_id=nsr_id,
2062 nsi_id=nsi_id,
2063 vnfd_id=vnfd_id,
2064 vdu_id=vdu_id,
2065 kdu_name=kdu_name,
2066 member_vnf_index=member_vnf_index,
2067 vdu_index=vdu_index,
2068 vdu_name=vdu_name,
2069 deploy_params=deploy_params_vdu,
2070 descriptor_config=descriptor_config,
2071 base_folder=base_folder,
2072 task_instantiation_info=tasks_dict_info,
2073 stage=stage
2074 )
2075 for kdud in get_iterable(vnfd, 'kdu'):
2076 kdu_name = kdud["name"]
2077 descriptor_config = kdud.get('kdu-configuration')
2078 if descriptor_config:
2079 vdu_id = None
2080 vdu_index = 0
2081 vdu_name = None
2082 # look for vdu index in the db_vnfr["vdu"] section
2083 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
2084 # if vdur["vdu-id-ref"] == vdu_id:
2085 # break
2086 # else:
2087 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
2088 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
2089 # vdu_name = vdur.get("name")
2090 # vdu_name = None
2091
2092 self._deploy_n2vc(
2093 logging_text=logging_text,
2094 db_nsr=db_nsr,
2095 db_vnfr=db_vnfr,
2096 nslcmop_id=nslcmop_id,
2097 nsr_id=nsr_id,
2098 nsi_id=nsi_id,
2099 vnfd_id=vnfd_id,
2100 vdu_id=vdu_id,
2101 kdu_name=kdu_name,
2102 member_vnf_index=member_vnf_index,
2103 vdu_index=vdu_index,
2104 vdu_name=vdu_name,
2105 deploy_params=deploy_params,
2106 descriptor_config=descriptor_config,
2107 base_folder=base_folder,
2108 task_instantiation_info=tasks_dict_info,
2109 stage=stage
2110 )
2111
2112 # Check if this NS has a charm configuration
2113 descriptor_config = nsd.get("ns-configuration")
2114 if descriptor_config and descriptor_config.get("juju"):
2115 vnfd_id = None
2116 db_vnfr = None
2117 member_vnf_index = None
2118 vdu_id = None
2119 kdu_name = None
2120 vdu_index = 0
2121 vdu_name = None
2122
2123 # Get additional parameters
2124 deploy_params = {}
2125 if db_nsr.get("additionalParamsForNs"):
2126 deploy_params = self._format_additional_params(db_nsr["additionalParamsForNs"].copy())
2127 base_folder = nsd["_admin"]["storage"]
2128 self._deploy_n2vc(
2129 logging_text=logging_text,
2130 db_nsr=db_nsr,
2131 db_vnfr=db_vnfr,
2132 nslcmop_id=nslcmop_id,
2133 nsr_id=nsr_id,
2134 nsi_id=nsi_id,
2135 vnfd_id=vnfd_id,
2136 vdu_id=vdu_id,
2137 kdu_name=kdu_name,
2138 member_vnf_index=member_vnf_index,
2139 vdu_index=vdu_index,
2140 vdu_name=vdu_name,
2141 deploy_params=deploy_params,
2142 descriptor_config=descriptor_config,
2143 base_folder=base_folder,
2144 task_instantiation_info=tasks_dict_info,
2145 stage=stage
2146 )
2147
2148 # rest of staff will be done at finally
2149
2150 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2151 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
2152 exc = e
2153 except asyncio.CancelledError:
2154 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2155 exc = "Operation was cancelled"
2156 except Exception as e:
2157 exc = traceback.format_exc()
2158 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2159 finally:
2160 if exc:
2161 error_list.append(str(exc))
2162 try:
2163 # wait for pending tasks
2164 if tasks_dict_info:
2165 stage[1] = "Waiting for instantiate pending tasks."
2166 self.logger.debug(logging_text + stage[1])
2167 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
2168 stage, nslcmop_id, nsr_id=nsr_id)
2169 stage[1] = stage[2] = ""
2170 except asyncio.CancelledError:
2171 error_list.append("Cancelled")
2172 # TODO cancel all tasks
2173 except Exception as exc:
2174 error_list.append(str(exc))
2175
2176 # update operation-status
2177 db_nsr_update["operational-status"] = "running"
2178 # let's begin with VCA 'configured' status (later we can change it)
2179 db_nsr_update["config-status"] = "configured"
2180 for task, task_name in tasks_dict_info.items():
2181 if not task.done() or task.cancelled() or task.exception():
2182 if task_name.startswith(self.task_name_deploy_vca):
2183 # A N2VC task is pending
2184 db_nsr_update["config-status"] = "failed"
2185 else:
2186 # RO or KDU task is pending
2187 db_nsr_update["operational-status"] = "failed"
2188
2189 # update status at database
2190 if error_list:
2191 error_detail = ". ".join(error_list)
2192 self.logger.error(logging_text + error_detail)
2193 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
2194 error_description_nsr = 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id, stage[0])
2195
2196 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2197 db_nslcmop_update["detailed-status"] = error_detail
2198 nslcmop_operation_state = "FAILED"
2199 ns_state = "BROKEN"
2200 else:
2201 error_detail = None
2202 error_description_nsr = error_description_nslcmop = None
2203 ns_state = "READY"
2204 db_nsr_update["detailed-status"] = "Done"
2205 db_nslcmop_update["detailed-status"] = "Done"
2206 nslcmop_operation_state = "COMPLETED"
2207
2208 if db_nsr:
2209 self._write_ns_status(
2210 nsr_id=nsr_id,
2211 ns_state=ns_state,
2212 current_operation="IDLE",
2213 current_operation_id=None,
2214 error_description=error_description_nsr,
2215 error_detail=error_detail,
2216 other_update=db_nsr_update
2217 )
2218 self._write_op_status(
2219 op_id=nslcmop_id,
2220 stage="",
2221 error_message=error_description_nslcmop,
2222 operation_state=nslcmop_operation_state,
2223 other_update=db_nslcmop_update,
2224 )
2225
2226 if nslcmop_operation_state:
2227 try:
2228 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2229 "operationState": nslcmop_operation_state},
2230 loop=self.loop)
2231 except Exception as e:
2232 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2233
2234 self.logger.debug(logging_text + "Exit")
2235 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2236
2237 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2238 timeout: int = 3600, vca_type: str = None) -> bool:
2239
2240 # steps:
2241 # 1. find all relations for this VCA
2242 # 2. wait for other peers related
2243 # 3. add relations
2244
2245 try:
2246 vca_type = vca_type or "lxc_proxy_charm"
2247
2248 # STEP 1: find all relations for this VCA
2249
2250 # read nsr record
2251 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2252 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2253
2254 # this VCA data
2255 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2256
2257 # read all ns-configuration relations
2258 ns_relations = list()
2259 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2260 if db_ns_relations:
2261 for r in db_ns_relations:
2262 # check if this VCA is in the relation
2263 if my_vca.get('member-vnf-index') in\
2264 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2265 ns_relations.append(r)
2266
2267 # read all vnf-configuration relations
2268 vnf_relations = list()
2269 db_vnfd_list = db_nsr.get('vnfd-id')
2270 if db_vnfd_list:
2271 for vnfd in db_vnfd_list:
2272 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2273 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
2274 if db_vnf_relations:
2275 for r in db_vnf_relations:
2276 # check if this VCA is in the relation
2277 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2278 vnf_relations.append(r)
2279
2280 # if no relations, terminate
2281 if not ns_relations and not vnf_relations:
2282 self.logger.debug(logging_text + ' No relations')
2283 return True
2284
2285 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2286
2287 # add all relations
2288 start = time()
2289 while True:
2290 # check timeout
2291 now = time()
2292 if now - start >= timeout:
2293 self.logger.error(logging_text + ' : timeout adding relations')
2294 return False
2295
2296 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2297 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2298
2299 # for each defined NS relation, find the VCA's related
2300 for r in ns_relations:
2301 from_vca_ee_id = None
2302 to_vca_ee_id = None
2303 from_vca_endpoint = None
2304 to_vca_endpoint = None
2305 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2306 for vca in vca_list:
2307 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2308 and vca.get('config_sw_installed'):
2309 from_vca_ee_id = vca.get('ee_id')
2310 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2311 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2312 and vca.get('config_sw_installed'):
2313 to_vca_ee_id = vca.get('ee_id')
2314 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2315 if from_vca_ee_id and to_vca_ee_id:
2316 # add relation
2317 await self.vca_map[vca_type].add_relation(
2318 ee_id_1=from_vca_ee_id,
2319 ee_id_2=to_vca_ee_id,
2320 endpoint_1=from_vca_endpoint,
2321 endpoint_2=to_vca_endpoint)
2322 # remove entry from relations list
2323 ns_relations.remove(r)
2324 else:
2325 # check failed peers
2326 try:
2327 vca_status_list = db_nsr.get('configurationStatus')
2328 if vca_status_list:
2329 for i in range(len(vca_list)):
2330 vca = vca_list[i]
2331 vca_status = vca_status_list[i]
2332 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2333 if vca_status.get('status') == 'BROKEN':
2334 # peer broken: remove relation from list
2335 ns_relations.remove(r)
2336 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2337 if vca_status.get('status') == 'BROKEN':
2338 # peer broken: remove relation from list
2339 ns_relations.remove(r)
2340 except Exception:
2341 # ignore
2342 pass
2343
2344 # for each defined VNF relation, find the VCA's related
2345 for r in vnf_relations:
2346 from_vca_ee_id = None
2347 to_vca_ee_id = None
2348 from_vca_endpoint = None
2349 to_vca_endpoint = None
2350 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2351 for vca in vca_list:
2352 if vca.get('vdu_id') == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2353 from_vca_ee_id = vca.get('ee_id')
2354 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2355 if vca.get('vdu_id') == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2356 to_vca_ee_id = vca.get('ee_id')
2357 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2358 if from_vca_ee_id and to_vca_ee_id:
2359 # add relation
2360 await self.vca_map[vca_type].add_relation(
2361 ee_id_1=from_vca_ee_id,
2362 ee_id_2=to_vca_ee_id,
2363 endpoint_1=from_vca_endpoint,
2364 endpoint_2=to_vca_endpoint)
2365 # remove entry from relations list
2366 vnf_relations.remove(r)
2367 else:
2368 # check failed peers
2369 try:
2370 vca_status_list = db_nsr.get('configurationStatus')
2371 if vca_status_list:
2372 for i in range(len(vca_list)):
2373 vca = vca_list[i]
2374 vca_status = vca_status_list[i]
2375 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2376 if vca_status.get('status') == 'BROKEN':
2377 # peer broken: remove relation from list
2378 ns_relations.remove(r)
2379 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2380 if vca_status.get('status') == 'BROKEN':
2381 # peer broken: remove relation from list
2382 ns_relations.remove(r)
2383 except Exception:
2384 # ignore
2385 pass
2386
2387 # wait for next try
2388 await asyncio.sleep(5.0)
2389
2390 if not ns_relations and not vnf_relations:
2391 self.logger.debug('Relations added')
2392 break
2393
2394 return True
2395
2396 except Exception as e:
2397 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2398 return False
2399
2400 def _write_db_callback(self, task, item, _id, on_done=None, on_exc=None):
2401 """
2402 callback for kdu install intended to store the returned kdu_instance at database
2403 :return: None
2404 """
2405 db_update = {}
2406 try:
2407 result = task.result()
2408 if on_done:
2409 db_update[on_done] = str(result)
2410 except Exception as e:
2411 if on_exc:
2412 db_update[on_exc] = str(e)
2413 if db_update:
2414 try:
2415 self.update_db_2(item, _id, db_update)
2416 except Exception:
2417 pass
2418
2419 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2420 # Launch kdus if present in the descriptor
2421
2422 k8scluster_id_2_uuic = {"helm-chart": {}, "juju-bundle": {}}
2423
2424 def _get_cluster_id(cluster_id, cluster_type):
2425 nonlocal k8scluster_id_2_uuic
2426 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2427 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2428
2429 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2430 if not db_k8scluster:
2431 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2432 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2433 if not k8s_id:
2434 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id, cluster_type))
2435 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2436 return k8s_id
2437
2438 logging_text += "Deploy kdus: "
2439 step = ""
2440 try:
2441 db_nsr_update = {"_admin.deployed.K8s": []}
2442 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2443
2444 index = 0
2445 updated_cluster_list = []
2446
2447 for vnfr_data in db_vnfrs.values():
2448 for kdur in get_iterable(vnfr_data, "kdur"):
2449 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2450 vnfd_id = vnfr_data.get('vnfd-id')
2451 namespace = kdur.get("k8s-namespace")
2452 if kdur.get("helm-chart"):
2453 kdumodel = kdur["helm-chart"]
2454 k8sclustertype = "helm-chart"
2455 elif kdur.get("juju-bundle"):
2456 kdumodel = kdur["juju-bundle"]
2457 k8sclustertype = "juju-bundle"
2458 else:
2459 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2460 "juju-bundle. Maybe an old NBI version is running".
2461 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2462 # check if kdumodel is a file and exists
2463 try:
2464 storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
2465 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2466 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2467 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2468 kdumodel)
2469 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2470 kdumodel = self.fs.path + filename
2471 except (asyncio.TimeoutError, asyncio.CancelledError):
2472 raise
2473 except Exception: # it is not a file
2474 pass
2475
2476 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2477 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2478 cluster_uuid = _get_cluster_id(k8s_cluster_id, k8sclustertype)
2479
2480 if k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list:
2481 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2482 self.k8sclusterhelm.synchronize_repos(cluster_uuid=cluster_uuid))
2483 if del_repo_list or added_repo_dict:
2484 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2485 updated = {'_admin.helm_charts_added.' +
2486 item: name for item, name in added_repo_dict.items()}
2487 self.logger.debug(logging_text + "repos synchronized on k8s cluster '{}' to_delete: {}, "
2488 "to_add: {}".format(k8s_cluster_id, del_repo_list,
2489 added_repo_dict))
2490 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2491 updated_cluster_list.append(cluster_uuid)
2492
2493 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2494 kdur["kdu-name"], k8s_cluster_id)
2495
2496 k8s_instace_info = {"kdu-instance": None,
2497 "k8scluster-uuid": cluster_uuid,
2498 "k8scluster-type": k8sclustertype,
2499 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2500 "kdu-name": kdur["kdu-name"],
2501 "kdu-model": kdumodel,
2502 "namespace": namespace}
2503 db_path = "_admin.deployed.K8s.{}".format(index)
2504 db_nsr_update[db_path] = k8s_instace_info
2505 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2506
2507 db_dict = {"collection": "nsrs",
2508 "filter": {"_id": nsr_id},
2509 "path": db_path}
2510
2511 task = asyncio.ensure_future(
2512 self.k8scluster_map[k8sclustertype].install(cluster_uuid=cluster_uuid, kdu_model=kdumodel,
2513 atomic=True, params=desc_params,
2514 db_dict=db_dict, timeout=600,
2515 kdu_name=kdur["kdu-name"], namespace=namespace))
2516
2517 task.add_done_callback(partial(self._write_db_callback, item="nsrs", _id=nsr_id,
2518 on_done=db_path + ".kdu-instance",
2519 on_exc=db_path + ".detailed-status"))
2520 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2521 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2522
2523 index += 1
2524
2525 except (LcmException, asyncio.CancelledError):
2526 raise
2527 except Exception as e:
2528 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2529 if isinstance(e, (N2VCException, DbException)):
2530 self.logger.error(logging_text + msg)
2531 else:
2532 self.logger.critical(logging_text + msg, exc_info=True)
2533 raise LcmException(msg)
2534 finally:
2535 if db_nsr_update:
2536 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2537
2538 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2539 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2540 base_folder, task_instantiation_info, stage):
2541 # launch instantiate_N2VC in a asyncio task and register task object
2542 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2543 # if not found, create one entry and update database
2544 # fill db_nsr._admin.deployed.VCA.<index>
2545
2546 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2547 if descriptor_config.get("juju"): # There is one execution envioronment of type juju
2548 ee_list = [descriptor_config]
2549 elif descriptor_config.get("execution-environment-list"):
2550 ee_list = descriptor_config.get("execution-environment-list")
2551 else: # other types as script are not supported
2552 ee_list = []
2553
2554 for ee_item in ee_list:
2555 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2556 ee_item.get("helm-chart")))
2557 if ee_item.get("juju"):
2558 vca_name = ee_item['juju'].get('charm')
2559 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2560 if ee_item['juju'].get('cloud') == "k8s":
2561 vca_type = "k8s_proxy_charm"
2562 elif ee_item['juju'].get('proxy') is False:
2563 vca_type = "native_charm"
2564 elif ee_item.get("helm-chart"):
2565 vca_name = ee_item['helm-chart']
2566 vca_type = "helm"
2567 else:
2568 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2569 continue
2570
2571 vca_index = -1
2572 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2573 if not vca_deployed:
2574 continue
2575 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2576 vca_deployed.get("vdu_id") == vdu_id and \
2577 vca_deployed.get("kdu_name") == kdu_name and \
2578 vca_deployed.get("vdu_count_index", 0) == vdu_index:
2579 break
2580 else:
2581 # not found, create one.
2582 vca_deployed = {
2583 "member-vnf-index": member_vnf_index,
2584 "vdu_id": vdu_id,
2585 "kdu_name": kdu_name,
2586 "vdu_count_index": vdu_index,
2587 "operational-status": "init", # TODO revise
2588 "detailed-status": "", # TODO revise
2589 "step": "initial-deploy", # TODO revise
2590 "vnfd_id": vnfd_id,
2591 "vdu_name": vdu_name,
2592 "type": vca_type
2593 }
2594 vca_index += 1
2595
2596 # create VCA and configurationStatus in db
2597 db_dict = {
2598 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2599 "configurationStatus.{}".format(vca_index): dict()
2600 }
2601 self.update_db_2("nsrs", nsr_id, db_dict)
2602
2603 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2604
2605 # Launch task
2606 task_n2vc = asyncio.ensure_future(
2607 self.instantiate_N2VC(
2608 logging_text=logging_text,
2609 vca_index=vca_index,
2610 nsi_id=nsi_id,
2611 db_nsr=db_nsr,
2612 db_vnfr=db_vnfr,
2613 vdu_id=vdu_id,
2614 kdu_name=kdu_name,
2615 vdu_index=vdu_index,
2616 deploy_params=deploy_params,
2617 config_descriptor=descriptor_config,
2618 base_folder=base_folder,
2619 nslcmop_id=nslcmop_id,
2620 stage=stage,
2621 vca_type=vca_type,
2622 vca_name=vca_name,
2623 ee_config_descriptor=ee_item
2624 )
2625 )
2626 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2627 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2628 member_vnf_index or "", vdu_id or "")
2629
2630 # Check if this VNFD has a configured terminate action
2631 def _has_terminate_config_primitive(self, vnfd):
2632 vnf_config = vnfd.get("vnf-configuration")
2633 if vnf_config and vnf_config.get("terminate-config-primitive"):
2634 return True
2635 else:
2636 return False
2637
2638 @staticmethod
2639 def _get_terminate_config_primitive_seq_list(vnfd):
2640 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2641 # No need to check for existing primitive twice, already done before
2642 vnf_config = vnfd.get("vnf-configuration")
2643 seq_list = vnf_config.get("terminate-config-primitive")
2644 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2645 seq_list_sorted = sorted(seq_list, key=lambda x: int(x['seq']))
2646 return seq_list_sorted
2647
2648 @staticmethod
2649 def _create_nslcmop(nsr_id, operation, params):
2650 """
2651 Creates a ns-lcm-opp content to be stored at database.
2652 :param nsr_id: internal id of the instance
2653 :param operation: instantiate, terminate, scale, action, ...
2654 :param params: user parameters for the operation
2655 :return: dictionary following SOL005 format
2656 """
2657 # Raise exception if invalid arguments
2658 if not (nsr_id and operation and params):
2659 raise LcmException(
2660 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2661 now = time()
2662 _id = str(uuid4())
2663 nslcmop = {
2664 "id": _id,
2665 "_id": _id,
2666 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2667 "operationState": "PROCESSING",
2668 "statusEnteredTime": now,
2669 "nsInstanceId": nsr_id,
2670 "lcmOperationType": operation,
2671 "startTime": now,
2672 "isAutomaticInvocation": False,
2673 "operationParams": params,
2674 "isCancelPending": False,
2675 "links": {
2676 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2677 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2678 }
2679 }
2680 return nslcmop
2681
2682 def _format_additional_params(self, params):
2683 params = params or {}
2684 for key, value in params.items():
2685 if str(value).startswith("!!yaml "):
2686 params[key] = yaml.safe_load(value[7:])
2687 return params
2688
2689 def _get_terminate_primitive_params(self, seq, vnf_index):
2690 primitive = seq.get('name')
2691 primitive_params = {}
2692 params = {
2693 "member_vnf_index": vnf_index,
2694 "primitive": primitive,
2695 "primitive_params": primitive_params,
2696 }
2697 desc_params = {}
2698 return self._map_primitive_params(seq, params, desc_params)
2699
2700 # sub-operations
2701
2702 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2703 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2704 if op.get('operationState') == 'COMPLETED':
2705 # b. Skip sub-operation
2706 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2707 return self.SUBOPERATION_STATUS_SKIP
2708 else:
2709 # c. retry executing sub-operation
2710 # The sub-operation exists, and operationState != 'COMPLETED'
2711 # Update operationState = 'PROCESSING' to indicate a retry.
2712 operationState = 'PROCESSING'
2713 detailed_status = 'In progress'
2714 self._update_suboperation_status(
2715 db_nslcmop, op_index, operationState, detailed_status)
2716 # Return the sub-operation index
2717 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2718 # with arguments extracted from the sub-operation
2719 return op_index
2720
2721 # Find a sub-operation where all keys in a matching dictionary must match
2722 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2723 def _find_suboperation(self, db_nslcmop, match):
2724 if db_nslcmop and match:
2725 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2726 for i, op in enumerate(op_list):
2727 if all(op.get(k) == match[k] for k in match):
2728 return i
2729 return self.SUBOPERATION_STATUS_NOT_FOUND
2730
2731 # Update status for a sub-operation given its index
2732 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2733 # Update DB for HA tasks
2734 q_filter = {'_id': db_nslcmop['_id']}
2735 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2736 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2737 self.db.set_one("nslcmops",
2738 q_filter=q_filter,
2739 update_dict=update_dict,
2740 fail_on_empty=False)
2741
2742 # Add sub-operation, return the index of the added sub-operation
2743 # Optionally, set operationState, detailed-status, and operationType
2744 # Status and type are currently set for 'scale' sub-operations:
2745 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2746 # 'detailed-status' : status message
2747 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2748 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2749 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2750 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2751 RO_nsr_id=None, RO_scaling_info=None):
2752 if not db_nslcmop:
2753 return self.SUBOPERATION_STATUS_NOT_FOUND
2754 # Get the "_admin.operations" list, if it exists
2755 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2756 op_list = db_nslcmop_admin.get('operations')
2757 # Create or append to the "_admin.operations" list
2758 new_op = {'member_vnf_index': vnf_index,
2759 'vdu_id': vdu_id,
2760 'vdu_count_index': vdu_count_index,
2761 'primitive': primitive,
2762 'primitive_params': mapped_primitive_params}
2763 if operationState:
2764 new_op['operationState'] = operationState
2765 if detailed_status:
2766 new_op['detailed-status'] = detailed_status
2767 if operationType:
2768 new_op['lcmOperationType'] = operationType
2769 if RO_nsr_id:
2770 new_op['RO_nsr_id'] = RO_nsr_id
2771 if RO_scaling_info:
2772 new_op['RO_scaling_info'] = RO_scaling_info
2773 if not op_list:
2774 # No existing operations, create key 'operations' with current operation as first list element
2775 db_nslcmop_admin.update({'operations': [new_op]})
2776 op_list = db_nslcmop_admin.get('operations')
2777 else:
2778 # Existing operations, append operation to list
2779 op_list.append(new_op)
2780
2781 db_nslcmop_update = {'_admin.operations': op_list}
2782 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2783 op_index = len(op_list) - 1
2784 return op_index
2785
2786 # Helper methods for scale() sub-operations
2787
2788 # pre-scale/post-scale:
2789 # Check for 3 different cases:
2790 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2791 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2792 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2793 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2794 operationType, RO_nsr_id=None, RO_scaling_info=None):
2795 # Find this sub-operation
2796 if RO_nsr_id and RO_scaling_info:
2797 operationType = 'SCALE-RO'
2798 match = {
2799 'member_vnf_index': vnf_index,
2800 'RO_nsr_id': RO_nsr_id,
2801 'RO_scaling_info': RO_scaling_info,
2802 }
2803 else:
2804 match = {
2805 'member_vnf_index': vnf_index,
2806 'primitive': vnf_config_primitive,
2807 'primitive_params': primitive_params,
2808 'lcmOperationType': operationType
2809 }
2810 op_index = self._find_suboperation(db_nslcmop, match)
2811 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2812 # a. New sub-operation
2813 # The sub-operation does not exist, add it.
2814 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2815 # The following parameters are set to None for all kind of scaling:
2816 vdu_id = None
2817 vdu_count_index = None
2818 vdu_name = None
2819 if RO_nsr_id and RO_scaling_info:
2820 vnf_config_primitive = None
2821 primitive_params = None
2822 else:
2823 RO_nsr_id = None
2824 RO_scaling_info = None
2825 # Initial status for sub-operation
2826 operationState = 'PROCESSING'
2827 detailed_status = 'In progress'
2828 # Add sub-operation for pre/post-scaling (zero or more operations)
2829 self._add_suboperation(db_nslcmop,
2830 vnf_index,
2831 vdu_id,
2832 vdu_count_index,
2833 vdu_name,
2834 vnf_config_primitive,
2835 primitive_params,
2836 operationState,
2837 detailed_status,
2838 operationType,
2839 RO_nsr_id,
2840 RO_scaling_info)
2841 return self.SUBOPERATION_STATUS_NEW
2842 else:
2843 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2844 # or op_index (operationState != 'COMPLETED')
2845 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2846
2847 # Function to return execution_environment id
2848
2849 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2850 # TODO vdu_index_count
2851 for vca in vca_deployed_list:
2852 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2853 return vca["ee_id"]
2854
2855 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
2856 vca_index, destroy_ee=True, exec_primitives=True):
2857 """
2858 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2859 :param logging_text:
2860 :param db_nslcmop:
2861 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2862 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2863 :param vca_index: index in the database _admin.deployed.VCA
2864 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2865 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2866 not executed properly
2867 :return: None or exception
2868 """
2869
2870 self.logger.debug(
2871 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2872 vca_index, vca_deployed, config_descriptor, destroy_ee
2873 )
2874 )
2875
2876 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
2877
2878 # execute terminate_primitives
2879 if exec_primitives:
2880 terminate_primitives = config_descriptor.get("terminate-config-primitive")
2881 vdu_id = vca_deployed.get("vdu_id")
2882 vdu_count_index = vca_deployed.get("vdu_count_index")
2883 vdu_name = vca_deployed.get("vdu_name")
2884 vnf_index = vca_deployed.get("member-vnf-index")
2885 if terminate_primitives and vca_deployed.get("needed_terminate"):
2886 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2887 terminate_primitives = sorted(terminate_primitives, key=lambda x: int(x['seq']))
2888 for seq in terminate_primitives:
2889 # For each sequence in list, get primitive and call _ns_execute_primitive()
2890 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2891 vnf_index, seq.get("name"))
2892 self.logger.debug(logging_text + step)
2893 # Create the primitive for each sequence, i.e. "primitive": "touch"
2894 primitive = seq.get('name')
2895 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2896 # The following 3 parameters are currently set to None for 'terminate':
2897 # vdu_id, vdu_count_index, vdu_name
2898
2899 # Add sub-operation
2900 self._add_suboperation(db_nslcmop,
2901 vnf_index,
2902 vdu_id,
2903 vdu_count_index,
2904 vdu_name,
2905 primitive,
2906 mapped_primitive_params)
2907 # Sub-operations: Call _ns_execute_primitive() instead of action()
2908 try:
2909 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2910 mapped_primitive_params,
2911 vca_type=vca_type)
2912 except LcmException:
2913 # this happens when VCA is not deployed. In this case it is not needed to terminate
2914 continue
2915 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2916 if result not in result_ok:
2917 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2918 "error {}".format(seq.get("name"), vnf_index, result_detail))
2919 # set that this VCA do not need terminated
2920 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2921 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2922
2923 if vca_deployed.get("prometheus_jobs") and self.prometheus:
2924 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
2925
2926 if destroy_ee:
2927 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
2928
2929 async def _delete_all_N2VC(self, db_nsr: dict):
2930 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2931 namespace = "." + db_nsr["_id"]
2932 try:
2933 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2934 except N2VCNotFound: # already deleted. Skip
2935 pass
2936 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2937
2938 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2939 """
2940 Terminates a deployment from RO
2941 :param logging_text:
2942 :param nsr_deployed: db_nsr._admin.deployed
2943 :param nsr_id:
2944 :param nslcmop_id:
2945 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2946 this method will update only the index 2, but it will write on database the concatenated content of the list
2947 :return:
2948 """
2949 db_nsr_update = {}
2950 failed_detail = []
2951 ro_nsr_id = ro_delete_action = None
2952 if nsr_deployed and nsr_deployed.get("RO"):
2953 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2954 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2955 try:
2956 if ro_nsr_id:
2957 stage[2] = "Deleting ns from VIM."
2958 db_nsr_update["detailed-status"] = " ".join(stage)
2959 self._write_op_status(nslcmop_id, stage)
2960 self.logger.debug(logging_text + stage[2])
2961 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2962 self._write_op_status(nslcmop_id, stage)
2963 desc = await self.RO.delete("ns", ro_nsr_id)
2964 ro_delete_action = desc["action_id"]
2965 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2966 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2967 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2968 if ro_delete_action:
2969 # wait until NS is deleted from VIM
2970 stage[2] = "Waiting ns deleted from VIM."
2971 detailed_status_old = None
2972 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2973 ro_delete_action))
2974 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2975 self._write_op_status(nslcmop_id, stage)
2976
2977 delete_timeout = 20 * 60 # 20 minutes
2978 while delete_timeout > 0:
2979 desc = await self.RO.show(
2980 "ns",
2981 item_id_name=ro_nsr_id,
2982 extra_item="action",
2983 extra_item_id=ro_delete_action)
2984
2985 # deploymentStatus
2986 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2987
2988 ns_status, ns_status_info = self.RO.check_action_status(desc)
2989 if ns_status == "ERROR":
2990 raise ROclient.ROClientException(ns_status_info)
2991 elif ns_status == "BUILD":
2992 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2993 elif ns_status == "ACTIVE":
2994 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2995 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2996 break
2997 else:
2998 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2999 if stage[2] != detailed_status_old:
3000 detailed_status_old = stage[2]
3001 db_nsr_update["detailed-status"] = " ".join(stage)
3002 self._write_op_status(nslcmop_id, stage)
3003 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3004 await asyncio.sleep(5, loop=self.loop)
3005 delete_timeout -= 5
3006 else: # delete_timeout <= 0:
3007 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
3008
3009 except Exception as e:
3010 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3011 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3012 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3013 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3014 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3015 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
3016 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3017 failed_detail.append("delete conflict: {}".format(e))
3018 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
3019 else:
3020 failed_detail.append("delete error: {}".format(e))
3021 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
3022
3023 # Delete nsd
3024 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3025 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3026 try:
3027 stage[2] = "Deleting nsd from RO."
3028 db_nsr_update["detailed-status"] = " ".join(stage)
3029 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3030 self._write_op_status(nslcmop_id, stage)
3031 await self.RO.delete("nsd", ro_nsd_id)
3032 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
3033 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3034 except Exception as e:
3035 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3036 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3037 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
3038 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3039 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
3040 self.logger.debug(logging_text + failed_detail[-1])
3041 else:
3042 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
3043 self.logger.error(logging_text + failed_detail[-1])
3044
3045 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3046 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3047 if not vnf_deployed or not vnf_deployed["id"]:
3048 continue
3049 try:
3050 ro_vnfd_id = vnf_deployed["id"]
3051 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3052 vnf_deployed["member-vnf-index"], ro_vnfd_id)
3053 db_nsr_update["detailed-status"] = " ".join(stage)
3054 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3055 self._write_op_status(nslcmop_id, stage)
3056 await self.RO.delete("vnfd", ro_vnfd_id)
3057 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
3058 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3059 except Exception as e:
3060 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3061 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3062 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
3063 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3064 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
3065 self.logger.debug(logging_text + failed_detail[-1])
3066 else:
3067 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
3068 self.logger.error(logging_text + failed_detail[-1])
3069
3070 if failed_detail:
3071 stage[2] = "Error deleting from VIM"
3072 else:
3073 stage[2] = "Deleted from VIM"
3074 db_nsr_update["detailed-status"] = " ".join(stage)
3075 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3076 self._write_op_status(nslcmop_id, stage)
3077
3078 if failed_detail:
3079 raise LcmException("; ".join(failed_detail))
3080
3081 async def terminate(self, nsr_id, nslcmop_id):
3082 # Try to lock HA task here
3083 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3084 if not task_is_locked_by_me:
3085 return
3086
3087 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3088 self.logger.debug(logging_text + "Enter")
3089 timeout_ns_terminate = self.timeout_ns_terminate
3090 db_nsr = None
3091 db_nslcmop = None
3092 operation_params = None
3093 exc = None
3094 error_list = [] # annotates all failed error messages
3095 db_nslcmop_update = {}
3096 autoremove = False # autoremove after terminated
3097 tasks_dict_info = {}
3098 db_nsr_update = {}
3099 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3100 # ^ contains [stage, step, VIM-status]
3101 try:
3102 # wait for any previous tasks in process
3103 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3104
3105 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3106 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3107 operation_params = db_nslcmop.get("operationParams") or {}
3108 if operation_params.get("timeout_ns_terminate"):
3109 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3110 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3111 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3112
3113 db_nsr_update["operational-status"] = "terminating"
3114 db_nsr_update["config-status"] = "terminating"
3115 self._write_ns_status(
3116 nsr_id=nsr_id,
3117 ns_state="TERMINATING",
3118 current_operation="TERMINATING",
3119 current_operation_id=nslcmop_id,
3120 other_update=db_nsr_update
3121 )
3122 self._write_op_status(
3123 op_id=nslcmop_id,
3124 queuePosition=0,
3125 stage=stage
3126 )
3127 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3128 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3129 return
3130
3131 stage[1] = "Getting vnf descriptors from db."
3132 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3133 db_vnfds_from_id = {}
3134 db_vnfds_from_member_index = {}
3135 # Loop over VNFRs
3136 for vnfr in db_vnfrs_list:
3137 vnfd_id = vnfr["vnfd-id"]
3138 if vnfd_id not in db_vnfds_from_id:
3139 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3140 db_vnfds_from_id[vnfd_id] = vnfd
3141 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3142
3143 # Destroy individual execution environments when there are terminating primitives.
3144 # Rest of EE will be deleted at once
3145 # TODO - check before calling _destroy_N2VC
3146 # if not operation_params.get("skip_terminate_primitives"):#
3147 # or not vca.get("needed_terminate"):
3148 stage[0] = "Stage 2/3 execute terminating primitives."
3149 self.logger.debug(logging_text + stage[0])
3150 stage[1] = "Looking execution environment that needs terminate."
3151 self.logger.debug(logging_text + stage[1])
3152 # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3153 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3154 self.logger.debug("vca_index: {}, vca: {}".format(vca_index, vca))
3155 config_descriptor = None
3156 if not vca or not vca.get("ee_id"):
3157 continue
3158 if not vca.get("member-vnf-index"):
3159 # ns
3160 config_descriptor = db_nsr.get("ns-configuration")
3161 elif vca.get("vdu_id"):
3162 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3163 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
3164 if vdud:
3165 config_descriptor = vdud.get("vdu-configuration")
3166 elif vca.get("kdu_name"):
3167 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3168 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
3169 if kdud:
3170 config_descriptor = kdud.get("kdu-configuration")
3171 else:
3172 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
3173 vca_type = vca.get("type")
3174 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3175 vca.get("needed_terminate"))
3176 # For helm we must destroy_ee
3177 destroy_ee = "True" if vca_type == "helm" else "False"
3178 task = asyncio.ensure_future(
3179 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3180 destroy_ee, exec_terminate_primitives))
3181 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3182
3183 # wait for pending tasks of terminate primitives
3184 if tasks_dict_info:
3185 self.logger.debug(logging_text + 'Waiting for terminate primitive pending tasks...')
3186 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3187 min(self.timeout_charm_delete, timeout_ns_terminate),
3188 stage, nslcmop_id)
3189 if error_list:
3190 return # raise LcmException("; ".join(error_list))
3191 tasks_dict_info.clear()
3192
3193 # remove All execution environments at once
3194 stage[0] = "Stage 3/3 delete all."
3195
3196 if nsr_deployed.get("VCA"):
3197 stage[1] = "Deleting all execution environments."
3198 self.logger.debug(logging_text + stage[1])
3199 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3200 timeout=self.timeout_charm_delete))
3201 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3202 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3203
3204 # Delete from k8scluster
3205 stage[1] = "Deleting KDUs."
3206 self.logger.debug(logging_text + stage[1])
3207 # print(nsr_deployed)
3208 for kdu in get_iterable(nsr_deployed, "K8s"):
3209 if not kdu or not kdu.get("kdu-instance"):
3210 continue
3211 kdu_instance = kdu.get("kdu-instance")
3212 if kdu.get("k8scluster-type") in self.k8scluster_map:
3213 task_delete_kdu_instance = asyncio.ensure_future(
3214 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3215 cluster_uuid=kdu.get("k8scluster-uuid"),
3216 kdu_instance=kdu_instance))
3217 else:
3218 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3219 format(kdu.get("k8scluster-type")))
3220 continue
3221 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3222
3223 # remove from RO
3224 stage[1] = "Deleting ns from VIM."
3225 if self.ng_ro:
3226 task_delete_ro = asyncio.ensure_future(
3227 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3228 else:
3229 task_delete_ro = asyncio.ensure_future(
3230 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3231 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3232
3233 # rest of staff will be done at finally
3234
3235 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3236 self.logger.error(logging_text + "Exit Exception {}".format(e))
3237 exc = e
3238 except asyncio.CancelledError:
3239 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3240 exc = "Operation was cancelled"
3241 except Exception as e:
3242 exc = traceback.format_exc()
3243 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3244 finally:
3245 if exc:
3246 error_list.append(str(exc))
3247 try:
3248 # wait for pending tasks
3249 if tasks_dict_info:
3250 stage[1] = "Waiting for terminate pending tasks."
3251 self.logger.debug(logging_text + stage[1])
3252 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3253 stage, nslcmop_id)
3254 stage[1] = stage[2] = ""
3255 except asyncio.CancelledError:
3256 error_list.append("Cancelled")
3257 # TODO cancell all tasks
3258 except Exception as exc:
3259 error_list.append(str(exc))
3260 # update status at database
3261 if error_list:
3262 error_detail = "; ".join(error_list)
3263 # self.logger.error(logging_text + error_detail)
3264 error_description_nslcmop = 'Stage: {}. Detail: {}'.format(stage[0], error_detail)
3265 error_description_nsr = 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id, stage[0])
3266
3267 db_nsr_update["operational-status"] = "failed"
3268 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3269 db_nslcmop_update["detailed-status"] = error_detail
3270 nslcmop_operation_state = "FAILED"
3271 ns_state = "BROKEN"
3272 else:
3273 error_detail = None
3274 error_description_nsr = error_description_nslcmop = None
3275 ns_state = "NOT_INSTANTIATED"
3276 db_nsr_update["operational-status"] = "terminated"
3277 db_nsr_update["detailed-status"] = "Done"
3278 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3279 db_nslcmop_update["detailed-status"] = "Done"
3280 nslcmop_operation_state = "COMPLETED"
3281
3282 if db_nsr:
3283 self._write_ns_status(
3284 nsr_id=nsr_id,
3285 ns_state=ns_state,
3286 current_operation="IDLE",
3287 current_operation_id=None,
3288 error_description=error_description_nsr,
3289 error_detail=error_detail,
3290 other_update=db_nsr_update
3291 )
3292 self._write_op_status(
3293 op_id=nslcmop_id,
3294 stage="",
3295 error_message=error_description_nslcmop,
3296 operation_state=nslcmop_operation_state,
3297 other_update=db_nslcmop_update,
3298 )
3299 if operation_params:
3300 autoremove = operation_params.get("autoremove", False)
3301 if nslcmop_operation_state:
3302 try:
3303 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3304 "operationState": nslcmop_operation_state,
3305 "autoremove": autoremove},
3306 loop=self.loop)
3307 except Exception as e:
3308 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3309
3310 self.logger.debug(logging_text + "Exit")
3311 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3312
3313 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3314 time_start = time()
3315 error_detail_list = []
3316 error_list = []
3317 pending_tasks = list(created_tasks_info.keys())
3318 num_tasks = len(pending_tasks)
3319 num_done = 0
3320 stage[1] = "{}/{}.".format(num_done, num_tasks)
3321 self._write_op_status(nslcmop_id, stage)
3322 while pending_tasks:
3323 new_error = None
3324 _timeout = timeout + time_start - time()
3325 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3326 return_when=asyncio.FIRST_COMPLETED)
3327 num_done += len(done)
3328 if not done: # Timeout
3329 for task in pending_tasks:
3330 new_error = created_tasks_info[task] + ": Timeout"
3331 error_detail_list.append(new_error)
3332 error_list.append(new_error)
3333 break
3334 for task in done:
3335 if task.cancelled():
3336 exc = "Cancelled"
3337 else:
3338 exc = task.exception()
3339 if exc:
3340 if isinstance(exc, asyncio.TimeoutError):
3341 exc = "Timeout"
3342 new_error = created_tasks_info[task] + ": {}".format(exc)
3343 error_list.append(created_tasks_info[task])
3344 error_detail_list.append(new_error)
3345 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3346 K8sException)):
3347 self.logger.error(logging_text + new_error)
3348 else:
3349 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3350 self.logger.error(logging_text + created_tasks_info[task] + exc_traceback)
3351 else:
3352 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3353 stage[1] = "{}/{}.".format(num_done, num_tasks)
3354 if new_error:
3355 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3356 if nsr_id: # update also nsr
3357 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3358 "errorDetail": ". ".join(error_detail_list)})
3359 self._write_op_status(nslcmop_id, stage)
3360 return error_detail_list
3361
3362 @staticmethod
3363 def _map_primitive_params(primitive_desc, params, instantiation_params):
3364 """
3365 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3366 The default-value is used. If it is between < > it look for a value at instantiation_params
3367 :param primitive_desc: portion of VNFD/NSD that describes primitive
3368 :param params: Params provided by user
3369 :param instantiation_params: Instantiation params provided by user
3370 :return: a dictionary with the calculated params
3371 """
3372 calculated_params = {}
3373 for parameter in primitive_desc.get("parameter", ()):
3374 param_name = parameter["name"]
3375 if param_name in params:
3376 calculated_params[param_name] = params[param_name]
3377 elif "default-value" in parameter or "value" in parameter:
3378 if "value" in parameter:
3379 calculated_params[param_name] = parameter["value"]
3380 else:
3381 calculated_params[param_name] = parameter["default-value"]
3382 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3383 and calculated_params[param_name].endswith(">"):
3384 if calculated_params[param_name][1:-1] in instantiation_params:
3385 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3386 else:
3387 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3388 format(calculated_params[param_name], primitive_desc["name"]))
3389 else:
3390 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3391 format(param_name, primitive_desc["name"]))
3392
3393 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3394 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
3395 width=256)
3396 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3397 calculated_params[param_name] = calculated_params[param_name][7:]
3398
3399 # add always ns_config_info if primitive name is config
3400 if primitive_desc["name"] == "config":
3401 if "ns_config_info" in instantiation_params:
3402 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3403 return calculated_params
3404
3405 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None):
3406 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3407 for vca in deployed_vca:
3408 if not vca:
3409 continue
3410 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3411 continue
3412 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3413 continue
3414 if kdu_name and kdu_name != vca["kdu_name"]:
3415 continue
3416 break
3417 else:
3418 # vca_deployed not found
3419 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
3420 "deployed".format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3421
3422 # get ee_id
3423 ee_id = vca.get("ee_id")
3424 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3425 if not ee_id:
3426 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3427 "execution environment"
3428 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3429 return ee_id, vca_type
3430
3431 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3432 retries_interval=30, timeout=None,
3433 vca_type=None, db_dict=None) -> (str, str):
3434 try:
3435 if primitive == "config":
3436 primitive_params = {"params": primitive_params}
3437
3438 vca_type = vca_type or "lxc_proxy_charm"
3439
3440 while retries >= 0:
3441 try:
3442 output = await asyncio.wait_for(
3443 self.vca_map[vca_type].exec_primitive(
3444 ee_id=ee_id,
3445 primitive_name=primitive,
3446 params_dict=primitive_params,
3447 progress_timeout=self.timeout_progress_primitive,
3448 total_timeout=self.timeout_primitive,
3449 db_dict=db_dict),
3450 timeout=timeout or self.timeout_primitive)
3451 # execution was OK
3452 break
3453 except asyncio.CancelledError:
3454 raise
3455 except Exception as e: # asyncio.TimeoutError
3456 if isinstance(e, asyncio.TimeoutError):
3457 e = "Timeout"
3458 retries -= 1
3459 if retries >= 0:
3460 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3461 # wait and retry
3462 await asyncio.sleep(retries_interval, loop=self.loop)
3463 else:
3464 return 'FAILED', str(e)
3465
3466 return 'COMPLETED', output
3467
3468 except (LcmException, asyncio.CancelledError):
3469 raise
3470 except Exception as e:
3471 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3472
3473 async def action(self, nsr_id, nslcmop_id):
3474
3475 # Try to lock HA task here
3476 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3477 if not task_is_locked_by_me:
3478 return
3479
3480 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3481 self.logger.debug(logging_text + "Enter")
3482 # get all needed from database
3483 db_nsr = None
3484 db_nslcmop = None
3485 db_nsr_update = {}
3486 db_nslcmop_update = {}
3487 nslcmop_operation_state = None
3488 error_description_nslcmop = None
3489 exc = None
3490 try:
3491 # wait for any previous tasks in process
3492 step = "Waiting for previous operations to terminate"
3493 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3494
3495 self._write_ns_status(
3496 nsr_id=nsr_id,
3497 ns_state=None,
3498 current_operation="RUNNING ACTION",
3499 current_operation_id=nslcmop_id
3500 )
3501
3502 step = "Getting information from database"
3503 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3504 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3505
3506 nsr_deployed = db_nsr["_admin"].get("deployed")
3507 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3508 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3509 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3510 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3511 primitive = db_nslcmop["operationParams"]["primitive"]
3512 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3513 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3514
3515 if vnf_index:
3516 step = "Getting vnfr from database"
3517 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3518 step = "Getting vnfd from database"
3519 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3520 else:
3521 step = "Getting nsd from database"
3522 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3523
3524 # for backward compatibility
3525 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3526 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3527 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3528 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3529
3530 # look for primitive
3531 config_primitive_desc = None
3532 if vdu_id:
3533 for vdu in get_iterable(db_vnfd, "vdu"):
3534 if vdu_id == vdu["id"]:
3535 for config_primitive in deep_get(vdu, ("vdu-configuration", "config-primitive"), ()):
3536 if config_primitive["name"] == primitive:
3537 config_primitive_desc = config_primitive
3538 break
3539 break
3540 elif kdu_name:
3541 for kdu in get_iterable(db_vnfd, "kdu"):
3542 if kdu_name == kdu["name"]:
3543 for config_primitive in deep_get(kdu, ("kdu-configuration", "config-primitive"), ()):
3544 if config_primitive["name"] == primitive:
3545 config_primitive_desc = config_primitive
3546 break
3547 break
3548 elif vnf_index:
3549 for config_primitive in deep_get(db_vnfd, ("vnf-configuration", "config-primitive"), ()):
3550 if config_primitive["name"] == primitive:
3551 config_primitive_desc = config_primitive
3552 break
3553 else:
3554 for config_primitive in deep_get(db_nsd, ("ns-configuration", "config-primitive"), ()):
3555 if config_primitive["name"] == primitive:
3556 config_primitive_desc = config_primitive
3557 break
3558
3559 if not config_primitive_desc and not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3560 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3561 format(primitive))
3562
3563 if vnf_index:
3564 if vdu_id:
3565 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3566 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3567 elif kdu_name:
3568 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3569 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3570 else:
3571 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3572 else:
3573 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3574
3575 if kdu_name:
3576 kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
3577
3578 # TODO check if ns is in a proper status
3579 if kdu_name and (primitive in ("upgrade", "rollback", "status") or kdu_action):
3580 # kdur and desc_params already set from before
3581 if primitive_params:
3582 desc_params.update(primitive_params)
3583 # TODO Check if we will need something at vnf level
3584 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3585 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3586 break
3587 else:
3588 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3589
3590 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3591 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3592 raise LcmException(msg)
3593
3594 db_dict = {"collection": "nsrs",
3595 "filter": {"_id": nsr_id},
3596 "path": "_admin.deployed.K8s.{}".format(index)}
3597 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive, vnf_index, kdu_name))
3598 step = "Executing kdu {}".format(primitive)
3599 if primitive == "upgrade":
3600 if desc_params.get("kdu_model"):
3601 kdu_model = desc_params.get("kdu_model")
3602 del desc_params["kdu_model"]
3603 else:
3604 kdu_model = kdu.get("kdu-model")
3605 parts = kdu_model.split(sep=":")
3606 if len(parts) == 2:
3607 kdu_model = parts[0]
3608
3609 detailed_status = await asyncio.wait_for(
3610 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3611 cluster_uuid=kdu.get("k8scluster-uuid"),
3612 kdu_instance=kdu.get("kdu-instance"),
3613 atomic=True, kdu_model=kdu_model,
3614 params=desc_params, db_dict=db_dict,
3615 timeout=timeout_ns_action),
3616 timeout=timeout_ns_action + 10)
3617 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3618 elif primitive == "rollback":
3619 detailed_status = await asyncio.wait_for(
3620 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3621 cluster_uuid=kdu.get("k8scluster-uuid"),
3622 kdu_instance=kdu.get("kdu-instance"),
3623 db_dict=db_dict),
3624 timeout=timeout_ns_action)
3625 elif primitive == "status":
3626 detailed_status = await asyncio.wait_for(
3627 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3628 cluster_uuid=kdu.get("k8scluster-uuid"),
3629 kdu_instance=kdu.get("kdu-instance")),
3630 timeout=timeout_ns_action)
3631 else:
3632 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3633 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3634
3635 detailed_status = await asyncio.wait_for(
3636 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3637 cluster_uuid=kdu.get("k8scluster-uuid"),
3638 kdu_instance=kdu_instance,
3639 primitive_name=primitive,
3640 params=params, db_dict=db_dict,
3641 timeout=timeout_ns_action),
3642 timeout=timeout_ns_action)
3643
3644 if detailed_status:
3645 nslcmop_operation_state = 'COMPLETED'
3646 else:
3647 detailed_status = ''
3648 nslcmop_operation_state = 'FAILED'
3649 else:
3650 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3651 member_vnf_index=vnf_index,
3652 vdu_id=vdu_id,
3653 vdu_count_index=vdu_count_index)
3654 db_nslcmop_notif = {"collection": "nslcmops",
3655 "filter": {"_id": nslcmop_id},
3656 "path": "admin.VCA"}
3657 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3658 ee_id,
3659 primitive=primitive,
3660 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3661 timeout=timeout_ns_action,
3662 vca_type=vca_type,
3663 db_dict=db_nslcmop_notif)
3664
3665 db_nslcmop_update["detailed-status"] = detailed_status
3666 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3667 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3668 detailed_status))
3669 return # database update is called inside finally
3670
3671 except (DbException, LcmException, N2VCException, K8sException) as e:
3672 self.logger.error(logging_text + "Exit Exception {}".format(e))
3673 exc = e
3674 except asyncio.CancelledError:
3675 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3676 exc = "Operation was cancelled"
3677 except asyncio.TimeoutError:
3678 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3679 exc = "Timeout"
3680 except Exception as e:
3681 exc = traceback.format_exc()
3682 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3683 finally:
3684 if exc:
3685 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3686 "FAILED {}: {}".format(step, exc)
3687 nslcmop_operation_state = "FAILED"
3688 if db_nsr:
3689 self._write_ns_status(
3690 nsr_id=nsr_id,
3691 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3692 current_operation="IDLE",
3693 current_operation_id=None,
3694 # error_description=error_description_nsr,
3695 # error_detail=error_detail,
3696 other_update=db_nsr_update
3697 )
3698
3699 self._write_op_status(
3700 op_id=nslcmop_id,
3701 stage="",
3702 error_message=error_description_nslcmop,
3703 operation_state=nslcmop_operation_state,
3704 other_update=db_nslcmop_update,
3705 )
3706
3707 if nslcmop_operation_state:
3708 try:
3709 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3710 "operationState": nslcmop_operation_state},
3711 loop=self.loop)
3712 except Exception as e:
3713 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3714 self.logger.debug(logging_text + "Exit")
3715 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3716 return nslcmop_operation_state, detailed_status
3717
3718 async def scale(self, nsr_id, nslcmop_id):
3719
3720 # Try to lock HA task here
3721 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3722 if not task_is_locked_by_me:
3723 return
3724
3725 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3726 self.logger.debug(logging_text + "Enter")
3727 # get all needed from database
3728 db_nsr = None
3729 db_nslcmop = None
3730 db_nslcmop_update = {}
3731 nslcmop_operation_state = None
3732 db_nsr_update = {}
3733 exc = None
3734 # in case of error, indicates what part of scale was failed to put nsr at error status
3735 scale_process = None
3736 old_operational_status = ""
3737 old_config_status = ""
3738 vnfr_scaled = False
3739 try:
3740 # wait for any previous tasks in process
3741 step = "Waiting for previous operations to terminate"
3742 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3743
3744 self._write_ns_status(
3745 nsr_id=nsr_id,
3746 ns_state=None,
3747 current_operation="SCALING",
3748 current_operation_id=nslcmop_id
3749 )
3750
3751 step = "Getting nslcmop from database"
3752 self.logger.debug(step + " after having waited for previous tasks to be completed")
3753 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3754 step = "Getting nsr from database"
3755 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3756
3757 old_operational_status = db_nsr["operational-status"]
3758 old_config_status = db_nsr["config-status"]
3759 step = "Parsing scaling parameters"
3760 # self.logger.debug(step)
3761 db_nsr_update["operational-status"] = "scaling"
3762 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3763 nsr_deployed = db_nsr["_admin"].get("deployed")
3764
3765 #######
3766 nsr_deployed = db_nsr["_admin"].get("deployed")
3767 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3768 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3769 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3770 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3771 #######
3772
3773 RO_nsr_id = nsr_deployed["RO"]["nsr_id"]
3774 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3775 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3776 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3777 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3778
3779 # for backward compatibility
3780 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3781 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3782 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3783 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3784
3785 step = "Getting vnfr from database"
3786 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3787 step = "Getting vnfd from database"
3788 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3789
3790 step = "Getting scaling-group-descriptor"
3791 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
3792 if scaling_descriptor["name"] == scaling_group:
3793 break
3794 else:
3795 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3796 "at vnfd:scaling-group-descriptor".format(scaling_group))
3797
3798 # cooldown_time = 0
3799 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3800 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3801 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3802 # break
3803
3804 # TODO check if ns is in a proper status
3805 step = "Sending scale order to VIM"
3806 nb_scale_op = 0
3807 if not db_nsr["_admin"].get("scaling-group"):
3808 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3809 admin_scale_index = 0
3810 else:
3811 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3812 if admin_scale_info["name"] == scaling_group:
3813 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3814 break
3815 else: # not found, set index one plus last element and add new entry with the name
3816 admin_scale_index += 1
3817 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3818 RO_scaling_info = []
3819 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3820 if scaling_type == "SCALE_OUT":
3821 # count if max-instance-count is reached
3822 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
3823 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3824 if nb_scale_op >= max_instance_count:
3825 raise LcmException("reached the limit of {} (max-instance-count) "
3826 "scaling-out operations for the "
3827 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3828
3829 nb_scale_op += 1
3830 vdu_scaling_info["scaling_direction"] = "OUT"
3831 vdu_scaling_info["vdu-create"] = {}
3832 for vdu_scale_info in scaling_descriptor["vdu"]:
3833 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3834 "type": "create", "count": vdu_scale_info.get("count", 1)})
3835 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3836
3837 elif scaling_type == "SCALE_IN":
3838 # count if min-instance-count is reached
3839 min_instance_count = 0
3840 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3841 min_instance_count = int(scaling_descriptor["min-instance-count"])
3842 if nb_scale_op <= min_instance_count:
3843 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3844 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
3845 nb_scale_op -= 1
3846 vdu_scaling_info["scaling_direction"] = "IN"
3847 vdu_scaling_info["vdu-delete"] = {}
3848 for vdu_scale_info in scaling_descriptor["vdu"]:
3849 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
3850 "type": "delete", "count": vdu_scale_info.get("count", 1)})
3851 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
3852
3853 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3854 vdu_create = vdu_scaling_info.get("vdu-create")
3855 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3856 if vdu_scaling_info["scaling_direction"] == "IN":
3857 for vdur in reversed(db_vnfr["vdur"]):
3858 if vdu_delete.get(vdur["vdu-id-ref"]):
3859 vdu_delete[vdur["vdu-id-ref"]] -= 1
3860 vdu_scaling_info["vdu"].append({
3861 "name": vdur["name"],
3862 "vdu_id": vdur["vdu-id-ref"],
3863 "interface": []
3864 })
3865 for interface in vdur["interfaces"]:
3866 vdu_scaling_info["vdu"][-1]["interface"].append({
3867 "name": interface["name"],
3868 "ip_address": interface["ip-address"],
3869 "mac_address": interface.get("mac-address"),
3870 })
3871 vdu_delete = vdu_scaling_info.pop("vdu-delete")
3872
3873 # PRE-SCALE BEGIN
3874 step = "Executing pre-scale vnf-config-primitive"
3875 if scaling_descriptor.get("scaling-config-action"):
3876 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3877 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3878 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3879 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3880 step = db_nslcmop_update["detailed-status"] = \
3881 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3882
3883 # look for primitive
3884 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
3885 if config_primitive["name"] == vnf_config_primitive:
3886 break
3887 else:
3888 raise LcmException(
3889 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3890 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3891 "primitive".format(scaling_group, config_primitive))
3892
3893 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3894 if db_vnfr.get("additionalParamsForVnf"):
3895 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3896
3897 scale_process = "VCA"
3898 db_nsr_update["config-status"] = "configuring pre-scaling"
3899 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3900
3901 # Pre-scale retry check: Check if this sub-operation has been executed before
3902 op_index = self._check_or_add_scale_suboperation(
3903 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3904 if op_index == self.SUBOPERATION_STATUS_SKIP:
3905 # Skip sub-operation
3906 result = 'COMPLETED'
3907 result_detail = 'Done'
3908 self.logger.debug(logging_text +
3909 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3910 vnf_config_primitive, result, result_detail))
3911 else:
3912 if op_index == self.SUBOPERATION_STATUS_NEW:
3913 # New sub-operation: Get index of this sub-operation
3914 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3915 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3916 format(vnf_config_primitive))
3917 else:
3918 # retry: Get registered params for this existing sub-operation
3919 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3920 vnf_index = op.get('member_vnf_index')
3921 vnf_config_primitive = op.get('primitive')
3922 primitive_params = op.get('primitive_params')
3923 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
3924 format(vnf_config_primitive))
3925 # Execute the primitive, either with new (first-time) or registered (reintent) args
3926 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3927 member_vnf_index=vnf_index,
3928 vdu_id=None,
3929 vdu_count_index=None)
3930 result, result_detail = await self._ns_execute_primitive(
3931 ee_id, vnf_config_primitive, primitive_params, vca_type)
3932 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3933 vnf_config_primitive, result, result_detail))
3934 # Update operationState = COMPLETED | FAILED
3935 self._update_suboperation_status(
3936 db_nslcmop, op_index, result, result_detail)
3937
3938 if result == "FAILED":
3939 raise LcmException(result_detail)
3940 db_nsr_update["config-status"] = old_config_status
3941 scale_process = None
3942 # PRE-SCALE END
3943
3944 # SCALE RO - BEGIN
3945 # Should this block be skipped if 'RO_nsr_id' == None ?
3946 # if (RO_nsr_id and RO_scaling_info):
3947 if RO_scaling_info:
3948 scale_process = "RO"
3949 # Scale RO retry check: Check if this sub-operation has been executed before
3950 op_index = self._check_or_add_scale_suboperation(
3951 db_nslcmop, vnf_index, None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
3952 if op_index == self.SUBOPERATION_STATUS_SKIP:
3953 # Skip sub-operation
3954 result = 'COMPLETED'
3955 result_detail = 'Done'
3956 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(
3957 result, result_detail))
3958 else:
3959 if op_index == self.SUBOPERATION_STATUS_NEW:
3960 # New sub-operation: Get index of this sub-operation
3961 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3962 self.logger.debug(logging_text + "New sub-operation RO")
3963 else:
3964 # retry: Get registered params for this existing sub-operation
3965 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3966 RO_nsr_id = op.get('RO_nsr_id')
3967 RO_scaling_info = op.get('RO_scaling_info')
3968 self.logger.debug(logging_text + "Sub-operation RO retry for primitive {}".format(
3969 vnf_config_primitive))
3970
3971 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
3972 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3973 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3974 # wait until ready
3975 RO_nslcmop_id = RO_desc["instance_action_id"]
3976 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
3977
3978 RO_task_done = False
3979 step = detailed_status = "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id)
3980 detailed_status_old = None
3981 self.logger.debug(logging_text + step)
3982
3983 deployment_timeout = 1 * 3600 # One hour
3984 while deployment_timeout > 0:
3985 if not RO_task_done:
3986 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
3987 extra_item_id=RO_nslcmop_id)
3988
3989 # deploymentStatus
3990 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3991
3992 ns_status, ns_status_info = self.RO.check_action_status(desc)
3993 if ns_status == "ERROR":
3994 raise ROclient.ROClientException(ns_status_info)
3995 elif ns_status == "BUILD":
3996 detailed_status = step + "; {}".format(ns_status_info)
3997 elif ns_status == "ACTIVE":
3998 RO_task_done = True
3999 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
4000 self.logger.debug(logging_text + step)
4001 else:
4002 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
4003 else:
4004
4005 if ns_status == "ERROR":
4006 raise ROclient.ROClientException(ns_status_info)
4007 elif ns_status == "BUILD":
4008 detailed_status = step + "; {}".format(ns_status_info)
4009 elif ns_status == "ACTIVE":
4010 step = detailed_status = \
4011 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4012 if not vnfr_scaled:
4013 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
4014 vnfr_scaled = True
4015 try:
4016 desc = await self.RO.show("ns", RO_nsr_id)
4017
4018 # deploymentStatus
4019 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4020
4021 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4022 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
4023 break
4024 except LcmExceptionNoMgmtIP:
4025 pass
4026 else:
4027 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
4028 if detailed_status != detailed_status_old:
4029 self._update_suboperation_status(
4030 db_nslcmop, op_index, 'COMPLETED', detailed_status)
4031 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
4032 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
4033
4034 await asyncio.sleep(5, loop=self.loop)
4035 deployment_timeout -= 5
4036 if deployment_timeout <= 0:
4037 self._update_suboperation_status(
4038 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
4039 raise ROclient.ROClientException("Timeout waiting ns to be ready")
4040
4041 # update VDU_SCALING_INFO with the obtained ip_addresses
4042 if vdu_scaling_info["scaling_direction"] == "OUT":
4043 for vdur in reversed(db_vnfr["vdur"]):
4044 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
4045 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
4046 vdu_scaling_info["vdu"].append({
4047 "name": vdur["name"],
4048 "vdu_id": vdur["vdu-id-ref"],
4049 "interface": []
4050 })
4051 for interface in vdur["interfaces"]:
4052 vdu_scaling_info["vdu"][-1]["interface"].append({
4053 "name": interface["name"],
4054 "ip_address": interface["ip-address"],
4055 "mac_address": interface.get("mac-address"),
4056 })
4057 del vdu_scaling_info["vdu-create"]
4058
4059 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
4060 # SCALE RO - END
4061
4062 scale_process = None
4063 if db_nsr_update:
4064 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4065
4066 # POST-SCALE BEGIN
4067 # execute primitive service POST-SCALING
4068 step = "Executing post-scale vnf-config-primitive"
4069 if scaling_descriptor.get("scaling-config-action"):
4070 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4071 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4072 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4073 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4074 step = db_nslcmop_update["detailed-status"] = \
4075 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4076
4077 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4078 if db_vnfr.get("additionalParamsForVnf"):
4079 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4080
4081 # look for primitive
4082 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4083 if config_primitive["name"] == vnf_config_primitive:
4084 break
4085 else:
4086 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
4087 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
4088 "match any vnf-configuration:config-primitive".format(scaling_group,
4089 config_primitive))
4090 scale_process = "VCA"
4091 db_nsr_update["config-status"] = "configuring post-scaling"
4092 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4093
4094 # Post-scale retry check: Check if this sub-operation has been executed before
4095 op_index = self._check_or_add_scale_suboperation(
4096 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4097 if op_index == self.SUBOPERATION_STATUS_SKIP:
4098 # Skip sub-operation
4099 result = 'COMPLETED'
4100 result_detail = 'Done'
4101 self.logger.debug(logging_text +
4102 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4103 format(vnf_config_primitive, result, result_detail))
4104 else:
4105 if op_index == self.SUBOPERATION_STATUS_NEW:
4106 # New sub-operation: Get index of this sub-operation
4107 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4108 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4109 format(vnf_config_primitive))
4110 else:
4111 # retry: Get registered params for this existing sub-operation
4112 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4113 vnf_index = op.get('member_vnf_index')
4114 vnf_config_primitive = op.get('primitive')
4115 primitive_params = op.get('primitive_params')
4116 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4117 format(vnf_config_primitive))
4118 # Execute the primitive, either with new (first-time) or registered (reintent) args
4119 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4120 member_vnf_index=vnf_index,
4121 vdu_id=None,
4122 vdu_count_index=None)
4123 result, result_detail = await self._ns_execute_primitive(
4124 ee_id, vnf_config_primitive, primitive_params, vca_type)
4125 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4126 vnf_config_primitive, result, result_detail))
4127 # Update operationState = COMPLETED | FAILED
4128 self._update_suboperation_status(
4129 db_nslcmop, op_index, result, result_detail)
4130
4131 if result == "FAILED":
4132 raise LcmException(result_detail)
4133 db_nsr_update["config-status"] = old_config_status
4134 scale_process = None
4135 # POST-SCALE END
4136
4137 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4138 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4139 else old_operational_status
4140 db_nsr_update["config-status"] = old_config_status
4141 return
4142 except (ROclient.ROClientException, DbException, LcmException) as e:
4143 self.logger.error(logging_text + "Exit Exception {}".format(e))
4144 exc = e
4145 except asyncio.CancelledError:
4146 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4147 exc = "Operation was cancelled"
4148 except Exception as e:
4149 exc = traceback.format_exc()
4150 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4151 finally:
4152 self._write_ns_status(
4153 nsr_id=nsr_id,
4154 ns_state=None,
4155 current_operation="IDLE",
4156 current_operation_id=None
4157 )
4158 if exc:
4159 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4160 nslcmop_operation_state = "FAILED"
4161 if db_nsr:
4162 db_nsr_update["operational-status"] = old_operational_status
4163 db_nsr_update["config-status"] = old_config_status
4164 db_nsr_update["detailed-status"] = ""
4165 if scale_process:
4166 if "VCA" in scale_process:
4167 db_nsr_update["config-status"] = "failed"
4168 if "RO" in scale_process:
4169 db_nsr_update["operational-status"] = "failed"
4170 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4171 exc)
4172 else:
4173 error_description_nslcmop = None
4174 nslcmop_operation_state = "COMPLETED"
4175 db_nslcmop_update["detailed-status"] = "Done"
4176
4177 self._write_op_status(
4178 op_id=nslcmop_id,
4179 stage="",
4180 error_message=error_description_nslcmop,
4181 operation_state=nslcmop_operation_state,
4182 other_update=db_nslcmop_update,
4183 )
4184 if db_nsr:
4185 self._write_ns_status(
4186 nsr_id=nsr_id,
4187 ns_state=None,
4188 current_operation="IDLE",
4189 current_operation_id=None,
4190 other_update=db_nsr_update
4191 )
4192
4193 if nslcmop_operation_state:
4194 try:
4195 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
4196 "operationState": nslcmop_operation_state},
4197 loop=self.loop)
4198 # if cooldown_time:
4199 # await asyncio.sleep(cooldown_time, loop=self.loop)
4200 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4201 except Exception as e:
4202 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4203 self.logger.debug(logging_text + "Exit")
4204 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4205
4206 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4207 if not self.prometheus:
4208 return
4209 # look if exist a file called 'prometheus*.j2' and
4210 artifact_content = self.fs.dir_ls(artifact_path)
4211 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4212 if not job_file:
4213 return
4214 with self.fs.file_open((artifact_path, job_file), "r") as f:
4215 job_data = f.read()
4216
4217 # TODO get_service
4218 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4219 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4220 host_port = "80"
4221 vnfr_id = vnfr_id.replace("-", "")
4222 variables = {
4223 "JOB_NAME": vnfr_id,
4224 "TARGET_IP": target_ip,
4225 "EXPORTER_POD_IP": host_name,
4226 "EXPORTER_POD_PORT": host_port,
4227 }
4228 job_list = self.prometheus.parse_job(job_data, variables)
4229 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4230 for job in job_list:
4231 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4232 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4233 job["nsr_id"] = nsr_id
4234 job_dict = {jl["job_name"]: jl for jl in job_list}
4235 if await self.prometheus.update(job_dict):
4236 return list(job_dict.keys())