0f92f9d91bc9ed2fcecc90ae5126d09591817608
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from n2vc.k8s_helm_conn import K8sHelmConnector
31 from n2vc.k8s_helm3_conn import K8sHelm3Connector
32 from n2vc.k8s_juju_conn import K8sJujuConnector
33
34 from osm_common.dbbase import DbException
35 from osm_common.fsbase import FsException
36
37 from n2vc.n2vc_juju_conn import N2VCJujuConnector
38 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
39
40 from osm_lcm.lcm_helm_conn import LCMHelmConn
41
42 from copy import copy, deepcopy
43 from http import HTTPStatus
44 from time import time
45 from uuid import uuid4
46
47 from random import randint
48
49 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
50
51
52 class NsLcm(LcmBase):
53 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
54 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
55 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
56 timeout_charm_delete = 10 * 60
57 timeout_primitive = 30 * 60 # timeout for primitive execution
58 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
59
60 SUBOPERATION_STATUS_NOT_FOUND = -1
61 SUBOPERATION_STATUS_NEW = -2
62 SUBOPERATION_STATUS_SKIP = -3
63 task_name_deploy_vca = "Deploying VCA"
64
65 def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
66 """
67 Init, Connect to database, filesystem storage, and messaging
68 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
69 :return: None
70 """
71 super().__init__(
72 db=db,
73 msg=msg,
74 fs=fs,
75 logger=logging.getLogger('lcm.ns')
76 )
77
78 self.loop = loop
79 self.lcm_tasks = lcm_tasks
80 self.timeout = config["timeout"]
81 self.ro_config = config["ro_config"]
82 self.ng_ro = config["ro_config"].get("ng")
83 self.vca_config = config["VCA"].copy()
84
85 # create N2VC connector
86 self.n2vc = N2VCJujuConnector(
87 db=self.db,
88 fs=self.fs,
89 log=self.logger,
90 loop=self.loop,
91 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
92 username=self.vca_config.get('user', None),
93 vca_config=self.vca_config,
94 on_update_db=self._on_update_n2vc_db
95 )
96
97 self.conn_helm_ee = LCMHelmConn(
98 db=self.db,
99 fs=self.fs,
100 log=self.logger,
101 loop=self.loop,
102 url=None,
103 username=None,
104 vca_config=self.vca_config,
105 on_update_db=self._on_update_n2vc_db
106 )
107
108 self.k8sclusterhelm2 = K8sHelmConnector(
109 kubectl_command=self.vca_config.get("kubectlpath"),
110 helm_command=self.vca_config.get("helmpath"),
111 fs=self.fs,
112 log=self.logger,
113 db=self.db,
114 on_update_db=None,
115 )
116
117 self.k8sclusterhelm3 = K8sHelm3Connector(
118 kubectl_command=self.vca_config.get("kubectlpath"),
119 helm_command=self.vca_config.get("helm3path"),
120 fs=self.fs,
121 log=self.logger,
122 db=self.db,
123 on_update_db=None,
124 )
125
126 self.k8sclusterjuju = K8sJujuConnector(
127 kubectl_command=self.vca_config.get("kubectlpath"),
128 juju_command=self.vca_config.get("jujupath"),
129 fs=self.fs,
130 log=self.logger,
131 db=self.db,
132 loop=self.loop,
133 on_update_db=None,
134 vca_config=self.vca_config,
135 )
136
137 self.k8scluster_map = {
138 "helm-chart": self.k8sclusterhelm2,
139 "helm-chart-v3": self.k8sclusterhelm3,
140 "chart": self.k8sclusterhelm3,
141 "juju-bundle": self.k8sclusterjuju,
142 "juju": self.k8sclusterjuju,
143 }
144
145 self.vca_map = {
146 "lxc_proxy_charm": self.n2vc,
147 "native_charm": self.n2vc,
148 "k8s_proxy_charm": self.n2vc,
149 "helm": self.conn_helm_ee,
150 "helm-v3": self.conn_helm_ee
151 }
152
153 self.prometheus = prometheus
154
155 # create RO client
156 if self.ng_ro:
157 self.RO = NgRoClient(self.loop, **self.ro_config)
158 else:
159 self.RO = ROclient.ROClient(self.loop, **self.ro_config)
160
161 @staticmethod
162 def increment_ip_mac(ip_mac, vm_index=1):
163 if not isinstance(ip_mac, str):
164 return ip_mac
165 try:
166 # try with ipv4 look for last dot
167 i = ip_mac.rfind(".")
168 if i > 0:
169 i += 1
170 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
171 # try with ipv6 or mac look for last colon. Operate in hex
172 i = ip_mac.rfind(":")
173 if i > 0:
174 i += 1
175 # format in hex, len can be 2 for mac or 4 for ipv6
176 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
177 except Exception:
178 pass
179 return None
180
181 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
182
183 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
184
185 try:
186 # TODO filter RO descriptor fields...
187
188 # write to database
189 db_dict = dict()
190 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
191 db_dict['deploymentStatus'] = ro_descriptor
192 self.update_db_2("nsrs", nsrs_id, db_dict)
193
194 except Exception as e:
195 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
196
197 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
198
199 # remove last dot from path (if exists)
200 if path.endswith('.'):
201 path = path[:-1]
202
203 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
204 # .format(table, filter, path, updated_data))
205
206 try:
207
208 nsr_id = filter.get('_id')
209
210 # read ns record from database
211 nsr = self.db.get_one(table='nsrs', q_filter=filter)
212 current_ns_status = nsr.get('nsState')
213
214 # get vca status for NS
215 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
216
217 # vcaStatus
218 db_dict = dict()
219 db_dict['vcaStatus'] = status_dict
220
221 # update configurationStatus for this VCA
222 try:
223 vca_index = int(path[path.rfind(".")+1:])
224
225 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
226 vca_status = vca_list[vca_index].get('status')
227
228 configuration_status_list = nsr.get('configurationStatus')
229 config_status = configuration_status_list[vca_index].get('status')
230
231 if config_status == 'BROKEN' and vca_status != 'failed':
232 db_dict['configurationStatus'][vca_index] = 'READY'
233 elif config_status != 'BROKEN' and vca_status == 'failed':
234 db_dict['configurationStatus'][vca_index] = 'BROKEN'
235 except Exception as e:
236 # not update configurationStatus
237 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
238
239 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
240 # if nsState = 'DEGRADED' check if all is OK
241 is_degraded = False
242 if current_ns_status in ('READY', 'DEGRADED'):
243 error_description = ''
244 # check machines
245 if status_dict.get('machines'):
246 for machine_id in status_dict.get('machines'):
247 machine = status_dict.get('machines').get(machine_id)
248 # check machine agent-status
249 if machine.get('agent-status'):
250 s = machine.get('agent-status').get('status')
251 if s != 'started':
252 is_degraded = True
253 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
254 # check machine instance status
255 if machine.get('instance-status'):
256 s = machine.get('instance-status').get('status')
257 if s != 'running':
258 is_degraded = True
259 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
260 # check applications
261 if status_dict.get('applications'):
262 for app_id in status_dict.get('applications'):
263 app = status_dict.get('applications').get(app_id)
264 # check application status
265 if app.get('status'):
266 s = app.get('status').get('status')
267 if s != 'active':
268 is_degraded = True
269 error_description += 'application {} status={} ; '.format(app_id, s)
270
271 if error_description:
272 db_dict['errorDescription'] = error_description
273 if current_ns_status == 'READY' and is_degraded:
274 db_dict['nsState'] = 'DEGRADED'
275 if current_ns_status == 'DEGRADED' and not is_degraded:
276 db_dict['nsState'] = 'READY'
277
278 # write to database
279 self.update_db_2("nsrs", nsr_id, db_dict)
280
281 except (asyncio.CancelledError, asyncio.TimeoutError):
282 raise
283 except Exception as e:
284 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
285
286 @staticmethod
287 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
288 try:
289 env = Environment(undefined=StrictUndefined)
290 template = env.from_string(cloud_init_text)
291 return template.render(additional_params or {})
292 except UndefinedError as e:
293 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
294 "file, must be provided in the instantiation parameters inside the "
295 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
296 except (TemplateError, TemplateNotFound) as e:
297 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
298 format(vnfd_id, vdu_id, e))
299
300 def _get_cloud_init(self, vdu, vnfd):
301 try:
302 cloud_init_content = cloud_init_file = None
303 if vdu.get("cloud-init-file"):
304 base_folder = vnfd["_admin"]["storage"]
305 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
306 vdu["cloud-init-file"])
307 with self.fs.file_open(cloud_init_file, "r") as ci_file:
308 cloud_init_content = ci_file.read()
309 elif vdu.get("cloud-init"):
310 cloud_init_content = vdu["cloud-init"]
311
312 return cloud_init_content
313 except FsException as e:
314 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
315 format(vnfd["id"], vdu["id"], cloud_init_file, e))
316
317 def _get_osm_params(self, db_vnfr, vdu_id=None, vdu_count_index=0):
318 osm_params = {x.replace("-", "_"): db_vnfr[x] for x in ("ip-address", "vim-account-id", "vnfd-id", "vnfd-ref")
319 if db_vnfr.get(x) is not None}
320 osm_params["ns_id"] = db_vnfr["nsr-id-ref"]
321 osm_params["vnf_id"] = db_vnfr["_id"]
322 osm_params["member_vnf_index"] = db_vnfr["member-vnf-index-ref"]
323 if db_vnfr.get("vdur"):
324 osm_params["vdu"] = {}
325 for vdur in db_vnfr["vdur"]:
326 vdu = {
327 "count_index": vdur["count-index"],
328 "vdu_id": vdur["vdu-id-ref"],
329 "interfaces": {}
330 }
331 if vdur.get("ip-address"):
332 vdu["ip_address"] = vdur["ip-address"]
333 for iface in vdur["interfaces"]:
334 vdu["interfaces"][iface["name"]] = \
335 {x.replace("-", "_"): iface[x] for x in ("mac-address", "ip-address", "vnf-vld-id", "name")
336 if iface.get(x) is not None}
337 vdu_id_index = "{}-{}".format(vdur["vdu-id-ref"], vdur["count-index"])
338 osm_params["vdu"][vdu_id_index] = vdu
339 if vdu_id:
340 osm_params["vdu_id"] = vdu_id
341 osm_params["count_index"] = vdu_count_index
342 return osm_params
343
344 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
345 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
346 additional_params = vdur.get("additionalParams")
347 return self._format_additional_params(additional_params)
348
349 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
350 """
351 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
352 :param vnfd: input vnfd
353 :param new_id: overrides vnf id if provided
354 :param additionalParams: Instantiation params for VNFs provided
355 :param nsrId: Id of the NSR
356 :return: copy of vnfd
357 """
358 vnfd_RO = deepcopy(vnfd)
359 # remove unused by RO configuration, monitoring, scaling and internal keys
360 vnfd_RO.pop("_id", None)
361 vnfd_RO.pop("_admin", None)
362 vnfd_RO.pop("vnf-configuration", None)
363 vnfd_RO.pop("monitoring-param", None)
364 vnfd_RO.pop("scaling-group-descriptor", None)
365 vnfd_RO.pop("kdu", None)
366 vnfd_RO.pop("k8s-cluster", None)
367 if new_id:
368 vnfd_RO["id"] = new_id
369
370 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
371 for vdu in get_iterable(vnfd_RO, "vdu"):
372 vdu.pop("cloud-init-file", None)
373 vdu.pop("cloud-init", None)
374 return vnfd_RO
375
376 @staticmethod
377 def ip_profile_2_RO(ip_profile):
378 RO_ip_profile = deepcopy(ip_profile)
379 if "dns-server" in RO_ip_profile:
380 if isinstance(RO_ip_profile["dns-server"], list):
381 RO_ip_profile["dns-address"] = []
382 for ds in RO_ip_profile.pop("dns-server"):
383 RO_ip_profile["dns-address"].append(ds['address'])
384 else:
385 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
386 if RO_ip_profile.get("ip-version") == "ipv4":
387 RO_ip_profile["ip-version"] = "IPv4"
388 if RO_ip_profile.get("ip-version") == "ipv6":
389 RO_ip_profile["ip-version"] = "IPv6"
390 if "dhcp-params" in RO_ip_profile:
391 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
392 return RO_ip_profile
393
394 def _ns_params_2_RO(self, ns_params, nsd, vnfd_dict, db_vnfrs, n2vc_key_list):
395 """
396 Creates a RO ns descriptor from OSM ns_instantiate params
397 :param ns_params: OSM instantiate params
398 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
399 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
400 :return: The RO ns descriptor
401 """
402 vim_2_RO = {}
403 wim_2_RO = {}
404 # TODO feature 1417: Check that no instantiation is set over PDU
405 # check if PDU forces a concrete vim-network-id and add it
406 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
407
408 def vim_account_2_RO(vim_account):
409 if vim_account in vim_2_RO:
410 return vim_2_RO[vim_account]
411
412 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
413 if db_vim["_admin"]["operationalState"] != "ENABLED":
414 raise LcmException("VIM={} is not available. operationalState={}".format(
415 vim_account, db_vim["_admin"]["operationalState"]))
416 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
417 vim_2_RO[vim_account] = RO_vim_id
418 return RO_vim_id
419
420 def wim_account_2_RO(wim_account):
421 if isinstance(wim_account, str):
422 if wim_account in wim_2_RO:
423 return wim_2_RO[wim_account]
424
425 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
426 if db_wim["_admin"]["operationalState"] != "ENABLED":
427 raise LcmException("WIM={} is not available. operationalState={}".format(
428 wim_account, db_wim["_admin"]["operationalState"]))
429 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
430 wim_2_RO[wim_account] = RO_wim_id
431 return RO_wim_id
432 else:
433 return wim_account
434
435 if not ns_params:
436 return None
437 RO_ns_params = {
438 # "name": ns_params["nsName"],
439 # "description": ns_params.get("nsDescription"),
440 "datacenter": vim_account_2_RO(ns_params["vimAccountId"]),
441 "wim_account": wim_account_2_RO(ns_params.get("wimAccountId")),
442 # "scenario": ns_params["nsdId"],
443 }
444 # set vim_account of each vnf if different from general vim_account.
445 # Get this information from <vnfr> database content, key vim-account-id
446 # Vim account can be set by placement_engine and it may be different from
447 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
448 for vnf_index, vnfr in db_vnfrs.items():
449 if vnfr.get("vim-account-id") and vnfr["vim-account-id"] != ns_params["vimAccountId"]:
450 populate_dict(RO_ns_params, ("vnfs", vnf_index, "datacenter"), vim_account_2_RO(vnfr["vim-account-id"]))
451
452 n2vc_key_list = n2vc_key_list or []
453 for vnfd_ref, vnfd in vnfd_dict.items():
454 vdu_needed_access = []
455 mgmt_cp = None
456 if vnfd.get("vnf-configuration"):
457 ssh_required = deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required"))
458 if ssh_required and vnfd.get("mgmt-interface"):
459 if vnfd["mgmt-interface"].get("vdu-id"):
460 vdu_needed_access.append(vnfd["mgmt-interface"]["vdu-id"])
461 elif vnfd["mgmt-interface"].get("cp"):
462 mgmt_cp = vnfd["mgmt-interface"]["cp"]
463
464 for vdu in vnfd.get("vdu", ()):
465 if vdu.get("vdu-configuration"):
466 ssh_required = deep_get(vdu, ("vdu-configuration", "config-access", "ssh-access", "required"))
467 if ssh_required:
468 vdu_needed_access.append(vdu["id"])
469 elif mgmt_cp:
470 for vdu_interface in vdu.get("interface"):
471 if vdu_interface.get("external-connection-point-ref") and \
472 vdu_interface["external-connection-point-ref"] == mgmt_cp:
473 vdu_needed_access.append(vdu["id"])
474 mgmt_cp = None
475 break
476
477 if vdu_needed_access:
478 for vnf_member in nsd.get("constituent-vnfd"):
479 if vnf_member["vnfd-id-ref"] != vnfd_ref:
480 continue
481 for vdu in vdu_needed_access:
482 populate_dict(RO_ns_params,
483 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu, "mgmt_keys"),
484 n2vc_key_list)
485 # cloud init
486 for vdu in get_iterable(vnfd, "vdu"):
487 cloud_init_text = self._get_cloud_init(vdu, vnfd)
488 if not cloud_init_text:
489 continue
490 for vnf_member in nsd.get("constituent-vnfd"):
491 if vnf_member["vnfd-id-ref"] != vnfd_ref:
492 continue
493 db_vnfr = db_vnfrs[vnf_member["member-vnf-index"]]
494 additional_params = self._get_vdu_additional_params(db_vnfr, vdu["id"]) or {}
495
496 cloud_init_list = []
497 for vdu_index in range(0, int(vdu.get("count", 1))):
498 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu["id"], vdu_index)
499 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params, vnfd["id"],
500 vdu["id"]))
501 populate_dict(RO_ns_params,
502 ("vnfs", vnf_member["member-vnf-index"], "vdus", vdu["id"], "cloud_init"),
503 cloud_init_list)
504
505 if ns_params.get("vduImage"):
506 RO_ns_params["vduImage"] = ns_params["vduImage"]
507
508 if ns_params.get("ssh_keys"):
509 RO_ns_params["cloud-config"] = {"key-pairs": ns_params["ssh_keys"]}
510 for vnf_params in get_iterable(ns_params, "vnf"):
511 for constituent_vnfd in nsd["constituent-vnfd"]:
512 if constituent_vnfd["member-vnf-index"] == vnf_params["member-vnf-index"]:
513 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
514 break
515 else:
516 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
517 "constituent-vnfd".format(vnf_params["member-vnf-index"]))
518
519 for vdu_params in get_iterable(vnf_params, "vdu"):
520 # TODO feature 1417: check that this VDU exist and it is not a PDU
521 if vdu_params.get("volume"):
522 for volume_params in vdu_params["volume"]:
523 if volume_params.get("vim-volume-id"):
524 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
525 vdu_params["id"], "devices", volume_params["name"], "vim_id"),
526 volume_params["vim-volume-id"])
527 if vdu_params.get("interface"):
528 for interface_params in vdu_params["interface"]:
529 if interface_params.get("ip-address"):
530 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
531 vdu_params["id"], "interfaces", interface_params["name"],
532 "ip_address"),
533 interface_params["ip-address"])
534 if interface_params.get("mac-address"):
535 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
536 vdu_params["id"], "interfaces", interface_params["name"],
537 "mac_address"),
538 interface_params["mac-address"])
539 if interface_params.get("floating-ip-required"):
540 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
541 vdu_params["id"], "interfaces", interface_params["name"],
542 "floating-ip"),
543 interface_params["floating-ip-required"])
544
545 for internal_vld_params in get_iterable(vnf_params, "internal-vld"):
546 if internal_vld_params.get("vim-network-name"):
547 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
548 internal_vld_params["name"], "vim-network-name"),
549 internal_vld_params["vim-network-name"])
550 if internal_vld_params.get("vim-network-id"):
551 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
552 internal_vld_params["name"], "vim-network-id"),
553 internal_vld_params["vim-network-id"])
554 if internal_vld_params.get("ip-profile"):
555 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
556 internal_vld_params["name"], "ip-profile"),
557 self.ip_profile_2_RO(internal_vld_params["ip-profile"]))
558 if internal_vld_params.get("provider-network"):
559
560 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "networks",
561 internal_vld_params["name"], "provider-network"),
562 internal_vld_params["provider-network"].copy())
563
564 for icp_params in get_iterable(internal_vld_params, "internal-connection-point"):
565 # look for interface
566 iface_found = False
567 for vdu_descriptor in vnf_descriptor["vdu"]:
568 for vdu_interface in vdu_descriptor["interface"]:
569 if vdu_interface.get("internal-connection-point-ref") == icp_params["id-ref"]:
570 if icp_params.get("ip-address"):
571 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
572 vdu_descriptor["id"], "interfaces",
573 vdu_interface["name"], "ip_address"),
574 icp_params["ip-address"])
575
576 if icp_params.get("mac-address"):
577 populate_dict(RO_ns_params, ("vnfs", vnf_params["member-vnf-index"], "vdus",
578 vdu_descriptor["id"], "interfaces",
579 vdu_interface["name"], "mac_address"),
580 icp_params["mac-address"])
581 iface_found = True
582 break
583 if iface_found:
584 break
585 else:
586 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
587 "internal-vld:id-ref={} is not present at vnfd:internal-"
588 "connection-point".format(vnf_params["member-vnf-index"],
589 icp_params["id-ref"]))
590
591 for vld_params in get_iterable(ns_params, "vld"):
592 if "ip-profile" in vld_params:
593 populate_dict(RO_ns_params, ("networks", vld_params["name"], "ip-profile"),
594 self.ip_profile_2_RO(vld_params["ip-profile"]))
595
596 if vld_params.get("provider-network"):
597
598 populate_dict(RO_ns_params, ("networks", vld_params["name"], "provider-network"),
599 vld_params["provider-network"].copy())
600
601 if "wimAccountId" in vld_params and vld_params["wimAccountId"] is not None:
602 populate_dict(RO_ns_params, ("networks", vld_params["name"], "wim_account"),
603 wim_account_2_RO(vld_params["wimAccountId"])),
604 if vld_params.get("vim-network-name"):
605 RO_vld_sites = []
606 if isinstance(vld_params["vim-network-name"], dict):
607 for vim_account, vim_net in vld_params["vim-network-name"].items():
608 RO_vld_sites.append({
609 "netmap-use": vim_net,
610 "datacenter": vim_account_2_RO(vim_account)
611 })
612 else: # isinstance str
613 RO_vld_sites.append({"netmap-use": vld_params["vim-network-name"]})
614 if RO_vld_sites:
615 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
616
617 if vld_params.get("vim-network-id"):
618 RO_vld_sites = []
619 if isinstance(vld_params["vim-network-id"], dict):
620 for vim_account, vim_net in vld_params["vim-network-id"].items():
621 RO_vld_sites.append({
622 "netmap-use": vim_net,
623 "datacenter": vim_account_2_RO(vim_account)
624 })
625 else: # isinstance str
626 RO_vld_sites.append({"netmap-use": vld_params["vim-network-id"]})
627 if RO_vld_sites:
628 populate_dict(RO_ns_params, ("networks", vld_params["name"], "sites"), RO_vld_sites)
629 if vld_params.get("ns-net"):
630 if isinstance(vld_params["ns-net"], dict):
631 for vld_id, instance_scenario_id in vld_params["ns-net"].items():
632 RO_vld_ns_net = {"instance_scenario_id": instance_scenario_id, "osm_id": vld_id}
633 populate_dict(RO_ns_params, ("networks", vld_params["name"], "use-network"), RO_vld_ns_net)
634 if "vnfd-connection-point-ref" in vld_params:
635 for cp_params in vld_params["vnfd-connection-point-ref"]:
636 # look for interface
637 for constituent_vnfd in nsd["constituent-vnfd"]:
638 if constituent_vnfd["member-vnf-index"] == cp_params["member-vnf-index-ref"]:
639 vnf_descriptor = vnfd_dict[constituent_vnfd["vnfd-id-ref"]]
640 break
641 else:
642 raise LcmException(
643 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
644 "is not present at nsd:constituent-vnfd".format(cp_params["member-vnf-index-ref"]))
645 match_cp = False
646 for vdu_descriptor in vnf_descriptor["vdu"]:
647 for interface_descriptor in vdu_descriptor["interface"]:
648 if interface_descriptor.get("external-connection-point-ref") == \
649 cp_params["vnfd-connection-point-ref"]:
650 match_cp = True
651 break
652 if match_cp:
653 break
654 else:
655 raise LcmException(
656 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
657 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
658 cp_params["member-vnf-index-ref"],
659 cp_params["vnfd-connection-point-ref"],
660 vnf_descriptor["id"]))
661 if cp_params.get("ip-address"):
662 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
663 vdu_descriptor["id"], "interfaces",
664 interface_descriptor["name"], "ip_address"),
665 cp_params["ip-address"])
666 if cp_params.get("mac-address"):
667 populate_dict(RO_ns_params, ("vnfs", cp_params["member-vnf-index-ref"], "vdus",
668 vdu_descriptor["id"], "interfaces",
669 interface_descriptor["name"], "mac_address"),
670 cp_params["mac-address"])
671 return RO_ns_params
672
673 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
674
675 db_vdu_push_list = []
676 db_update = {"_admin.modified": time()}
677 if vdu_create:
678 for vdu_id, vdu_count in vdu_create.items():
679 vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
680 if not vdur:
681 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
682 format(vdu_id))
683
684 for count in range(vdu_count):
685 vdur_copy = deepcopy(vdur)
686 vdur_copy["status"] = "BUILD"
687 vdur_copy["status-detailed"] = None
688 vdur_copy["ip-address"]: None
689 vdur_copy["_id"] = str(uuid4())
690 vdur_copy["count-index"] += count + 1
691 vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
692 vdur_copy.pop("vim_info", None)
693 for iface in vdur_copy["interfaces"]:
694 if iface.get("fixed-ip"):
695 iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
696 else:
697 iface.pop("ip-address", None)
698 if iface.get("fixed-mac"):
699 iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
700 else:
701 iface.pop("mac-address", None)
702 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
703 db_vdu_push_list.append(vdur_copy)
704 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
705 if vdu_delete:
706 for vdu_id, vdu_count in vdu_delete.items():
707 if mark_delete:
708 indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
709 db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
710 else:
711 # it must be deleted one by one because common.db does not allow otherwise
712 vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
713 for vdu in vdus_to_delete[:vdu_count]:
714 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
715 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
716 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
717 # modify passed dictionary db_vnfr
718 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
719 db_vnfr["vdur"] = db_vnfr_["vdur"]
720
721 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
722 """
723 Updates database nsr with the RO info for the created vld
724 :param ns_update_nsr: dictionary to be filled with the updated info
725 :param db_nsr: content of db_nsr. This is also modified
726 :param nsr_desc_RO: nsr descriptor from RO
727 :return: Nothing, LcmException is raised on errors
728 """
729
730 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
731 for net_RO in get_iterable(nsr_desc_RO, "nets"):
732 if vld["id"] != net_RO.get("ns_net_osm_id"):
733 continue
734 vld["vim-id"] = net_RO.get("vim_net_id")
735 vld["name"] = net_RO.get("vim_name")
736 vld["status"] = net_RO.get("status")
737 vld["status-detailed"] = net_RO.get("error_msg")
738 ns_update_nsr["vld.{}".format(vld_index)] = vld
739 break
740 else:
741 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
742
743 def set_vnfr_at_error(self, db_vnfrs, error_text):
744 try:
745 for db_vnfr in db_vnfrs.values():
746 vnfr_update = {"status": "ERROR"}
747 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
748 if "status" not in vdur:
749 vdur["status"] = "ERROR"
750 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
751 if error_text:
752 vdur["status-detailed"] = str(error_text)
753 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
754 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
755 except DbException as e:
756 self.logger.error("Cannot update vnf. {}".format(e))
757
758 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
759 """
760 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
761 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
762 :param nsr_desc_RO: nsr descriptor from RO
763 :return: Nothing, LcmException is raised on errors
764 """
765 for vnf_index, db_vnfr in db_vnfrs.items():
766 for vnf_RO in nsr_desc_RO["vnfs"]:
767 if vnf_RO["member_vnf_index"] != vnf_index:
768 continue
769 vnfr_update = {}
770 if vnf_RO.get("ip_address"):
771 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
772 elif not db_vnfr.get("ip-address"):
773 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
774 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
775
776 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
777 vdur_RO_count_index = 0
778 if vdur.get("pdu-type"):
779 continue
780 for vdur_RO in get_iterable(vnf_RO, "vms"):
781 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
782 continue
783 if vdur["count-index"] != vdur_RO_count_index:
784 vdur_RO_count_index += 1
785 continue
786 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
787 if vdur_RO.get("ip_address"):
788 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
789 else:
790 vdur["ip-address"] = None
791 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
792 vdur["name"] = vdur_RO.get("vim_name")
793 vdur["status"] = vdur_RO.get("status")
794 vdur["status-detailed"] = vdur_RO.get("error_msg")
795 for ifacer in get_iterable(vdur, "interfaces"):
796 for interface_RO in get_iterable(vdur_RO, "interfaces"):
797 if ifacer["name"] == interface_RO.get("internal_name"):
798 ifacer["ip-address"] = interface_RO.get("ip_address")
799 ifacer["mac-address"] = interface_RO.get("mac_address")
800 break
801 else:
802 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
803 "from VIM info"
804 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
805 vnfr_update["vdur.{}".format(vdu_index)] = vdur
806 break
807 else:
808 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
809 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
810
811 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
812 for net_RO in get_iterable(nsr_desc_RO, "nets"):
813 if vld["id"] != net_RO.get("vnf_net_osm_id"):
814 continue
815 vld["vim-id"] = net_RO.get("vim_net_id")
816 vld["name"] = net_RO.get("vim_name")
817 vld["status"] = net_RO.get("status")
818 vld["status-detailed"] = net_RO.get("error_msg")
819 vnfr_update["vld.{}".format(vld_index)] = vld
820 break
821 else:
822 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
823 vnf_index, vld["id"]))
824
825 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
826 break
827
828 else:
829 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
830
831 def _get_ns_config_info(self, nsr_id):
832 """
833 Generates a mapping between vnf,vdu elements and the N2VC id
834 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
835 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
836 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
837 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
838 """
839 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
840 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
841 mapping = {}
842 ns_config_info = {"osm-config-mapping": mapping}
843 for vca in vca_deployed_list:
844 if not vca["member-vnf-index"]:
845 continue
846 if not vca["vdu_id"]:
847 mapping[vca["member-vnf-index"]] = vca["application"]
848 else:
849 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
850 vca["application"]
851 return ns_config_info
852
853 @staticmethod
854 def _get_initial_config_primitive_list(desc_primitive_list, vca_deployed, ee_descriptor_id):
855 """
856 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
857 primitives as verify-ssh-credentials, or config when needed
858 :param desc_primitive_list: information of the descriptor
859 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
860 this element contains a ssh public key
861 :param ee_descriptor_id: execution environment descriptor id. It is the value of
862 XXX_configuration.execution-environment-list.INDEX.id; it can be None
863 :return: The modified list. Can ba an empty list, but always a list
864 """
865
866 primitive_list = desc_primitive_list or []
867
868 # filter primitives by ee_id
869 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
870
871 # sort by 'seq'
872 if primitive_list:
873 primitive_list.sort(key=lambda val: int(val['seq']))
874
875 # look for primitive config, and get the position. None if not present
876 config_position = None
877 for index, primitive in enumerate(primitive_list):
878 if primitive["name"] == "config":
879 config_position = index
880 break
881
882 # for NS, add always a config primitive if not present (bug 874)
883 if not vca_deployed["member-vnf-index"] and config_position is None:
884 primitive_list.insert(0, {"name": "config", "parameter": []})
885 config_position = 0
886 # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
887 if vca_deployed["member-vnf-index"] and config_position is not None and vca_deployed.get("ssh-public-key"):
888 primitive_list.insert(config_position + 1, {"name": "verify-ssh-credentials", "parameter": []})
889 return primitive_list
890
891 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
892 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
893
894 db_vims = {}
895
896 def get_vim_account(vim_account_id):
897 nonlocal db_vims
898 if vim_account_id in db_vims:
899 return db_vims[vim_account_id]
900 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
901 db_vims[vim_account_id] = db_vim
902 return db_vim
903
904 # modify target_vld info with instantiation parameters
905 def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
906 if vld_params.get("ip-profile"):
907 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
908 if vld_params.get("provider-network"):
909 target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
910 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
911 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
912 if vld_params.get("wimAccountId"):
913 target_wim = "wim:{}".format(vld_params["wimAccountId"])
914 target_vld["vim_info"][target_wim] = {}
915 for param in ("vim-network-name", "vim-network-id"):
916 if vld_params.get(param):
917 if isinstance(vld_params[param], dict):
918 pass
919 # for vim_account, vim_net in vld_params[param].items():
920 # TODO populate vim_info RO_vld_sites.append({
921 else: # isinstance str
922 target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
923 # TODO if vld_params.get("ns-net"):
924
925 nslcmop_id = db_nslcmop["_id"]
926 target = {
927 "name": db_nsr["name"],
928 "ns": {"vld": []},
929 "vnf": [],
930 "image": deepcopy(db_nsr["image"]),
931 "flavor": deepcopy(db_nsr["flavor"]),
932 "action_id": nslcmop_id,
933 "cloud_init_content": {},
934 }
935 for image in target["image"]:
936 image["vim_info"] = {}
937 for flavor in target["flavor"]:
938 flavor["vim_info"] = {}
939
940 if db_nslcmop.get("lcmOperationType") != "instantiate":
941 # get parameters of instantiation:
942 db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
943 "lcmOperationType": "instantiate"})[-1]
944 ns_params = db_nslcmop_instantiate.get("operationParams")
945 else:
946 ns_params = db_nslcmop.get("operationParams")
947 ssh_keys = []
948 if ns_params.get("ssh_keys"):
949 ssh_keys += ns_params.get("ssh_keys")
950 if n2vc_key_list:
951 ssh_keys += n2vc_key_list
952
953 cp2target = {}
954 for vld_index, vld in enumerate(db_nsr.get("vld")):
955 target_vim = "vim:{}".format(ns_params["vimAccountId"])
956 target_vld = {
957 "id": vld["id"],
958 "name": vld["name"],
959 "mgmt-network": vld.get("mgmt-network", False),
960 "type": vld.get("type"),
961 "vim_info": {
962 target_vim: {"vim-network-name": vld.get("vim-network-name")}
963 }
964 }
965 # check if this network needs SDN assist
966 target_sdn = None
967 if vld.get("pci-interfaces"):
968 db_vim = get_vim_account(ns_params["vimAccountId"])
969 sdnc_id = db_vim["config"].get("sdn-controller")
970 if sdnc_id:
971 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
972 target_sdn = "sdn:{}".format(sdnc_id)
973 target_vld["vim_info"][target_sdn] = {
974 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
975
976 nsd_vld = next(v for v in nsd["vld"] if v["id"] == vld["id"])
977 for cp in nsd_vld["vnfd-connection-point-ref"]:
978 cp2target["member_vnf:{}.{}".format(cp["member-vnf-index-ref"], cp["vnfd-connection-point-ref"])] = \
979 "nsrs:{}:vld.{}".format(nsr_id, vld_index)
980
981 # check at nsd descriptor, if there is an ip-profile
982 vld_params = {}
983 if nsd_vld.get("ip-profile-ref"):
984 ip_profile = next(ipp for ipp in nsd["ip-profiles"] if ipp["name"] == nsd_vld["ip-profile-ref"])
985 vld_params["ip-profile"] = ip_profile["ip-profile-params"]
986 # update vld_params with instantiation params
987 vld_instantiation_params = next((v for v in get_iterable(ns_params, "vld")
988 if v["name"] in (vld["name"], vld["id"])), None)
989 if vld_instantiation_params:
990 vld_params.update(vld_instantiation_params)
991 parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn)
992 target["ns"]["vld"].append(target_vld)
993 for vnfr in db_vnfrs.values():
994 vnfd = db_vnfds_ref[vnfr["vnfd-ref"]]
995 vnf_params = next((v for v in get_iterable(ns_params, "vnf")
996 if v["member-vnf-index"] == vnfr["member-vnf-index-ref"]), None)
997 target_vnf = deepcopy(vnfr)
998 target_vim = "vim:{}".format(vnfr["vim-account-id"])
999 for vld in target_vnf.get("vld", ()):
1000 # check if connected to a ns.vld, to fill target'
1001 vnf_cp = next((cp for cp in vnfd.get("connection-point", ()) if
1002 cp.get("internal-vld-ref") == vld["id"]), None)
1003 if vnf_cp:
1004 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
1005 if cp2target.get(ns_cp):
1006 vld["target"] = cp2target[ns_cp]
1007 vld["vim_info"] = {target_vim: {"vim-network-name": vld.get("vim-network-name")}}
1008 # check if this network needs SDN assist
1009 target_sdn = None
1010 if vld.get("pci-interfaces"):
1011 db_vim = get_vim_account(vnfr["vim-account-id"])
1012 sdnc_id = db_vim["config"].get("sdn-controller")
1013 if sdnc_id:
1014 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
1015 target_sdn = "sdn:{}".format(sdnc_id)
1016 vld["vim_info"][target_sdn] = {
1017 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
1018
1019 # check at vnfd descriptor, if there is an ip-profile
1020 vld_params = {}
1021 vnfd_vld = next(v for v in vnfd["internal-vld"] if v["id"] == vld["id"])
1022 if vnfd_vld.get("ip-profile-ref"):
1023 ip_profile = next(ipp for ipp in vnfd["ip-profiles"] if ipp["name"] == vnfd_vld["ip-profile-ref"])
1024 vld_params["ip-profile"] = ip_profile["ip-profile-params"]
1025 # update vld_params with instantiation params
1026 if vnf_params:
1027 vld_instantiation_params = next((v for v in get_iterable(vnf_params, "internal-vld")
1028 if v["name"] == vld["id"]), None)
1029 if vld_instantiation_params:
1030 vld_params.update(vld_instantiation_params)
1031 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
1032
1033 vdur_list = []
1034 for vdur in target_vnf.get("vdur", ()):
1035 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
1036 continue # This vdu must not be created
1037 vdur["vim_info"] = {target_vim: {}}
1038 vdud_index, vdud = next(k for k in enumerate(vnfd["vdu"]) if k[1]["id"] == vdur["vdu-id-ref"])
1039
1040 if ssh_keys:
1041 if deep_get(vdud, ("vdu-configuration", "config-access", "ssh-access", "required")):
1042 vdur["ssh-keys"] = ssh_keys
1043 vdur["ssh-access-required"] = True
1044 elif deep_get(vnfd, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
1045 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
1046 vdur["ssh-keys"] = ssh_keys
1047 vdur["ssh-access-required"] = True
1048
1049 # cloud-init
1050 if vdud.get("cloud-init-file"):
1051 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
1052 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
1053 if vdur["cloud-init"] not in target["cloud_init_content"]:
1054 base_folder = vnfd["_admin"]["storage"]
1055 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
1056 vdud.get("cloud-init-file"))
1057 with self.fs.file_open(cloud_init_file, "r") as ci_file:
1058 target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
1059 elif vdud.get("cloud-init"):
1060 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], vdud_index)
1061 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
1062 target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
1063 vdur["additionalParams"] = vdur.get("additionalParams") or {}
1064 deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
1065 deploy_params_vdu["OSM"] = self._get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
1066 vdur["additionalParams"] = deploy_params_vdu
1067
1068 # flavor
1069 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
1070 if target_vim not in ns_flavor["vim_info"]:
1071 ns_flavor["vim_info"][target_vim] = {}
1072 # image
1073 ns_image = target["image"][int(vdur["ns-image-id"])]
1074 if target_vim not in ns_image["vim_info"]:
1075 ns_image["vim_info"][target_vim] = {}
1076
1077 vdur["vim_info"] = {target_vim: {}}
1078 # instantiation parameters
1079 # if vnf_params:
1080 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
1081 # vdud["id"]), None)
1082 vdur_list.append(vdur)
1083 target_vnf["vdur"] = vdur_list
1084 target["vnf"].append(target_vnf)
1085
1086 desc = await self.RO.deploy(nsr_id, target)
1087 action_id = desc["action_id"]
1088 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
1089
1090 # Updating NSR
1091 db_nsr_update = {
1092 "_admin.deployed.RO.operational-status": "running",
1093 "detailed-status": " ".join(stage)
1094 }
1095 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1096 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1097 self._write_op_status(nslcmop_id, stage)
1098 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
1099 return
1100
1101 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
1102 detailed_status_old = None
1103 db_nsr_update = {}
1104 start_time = start_time or time()
1105 while time() <= start_time + timeout:
1106 desc_status = await self.RO.status(nsr_id, action_id)
1107 if desc_status["status"] == "FAILED":
1108 raise NgRoException(desc_status["details"])
1109 elif desc_status["status"] == "BUILD":
1110 if stage:
1111 stage[2] = "VIM: ({})".format(desc_status["details"])
1112 elif desc_status["status"] == "DONE":
1113 if stage:
1114 stage[2] = "Deployed at VIM"
1115 break
1116 else:
1117 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
1118 if stage and nslcmop_id and stage[2] != detailed_status_old:
1119 detailed_status_old = stage[2]
1120 db_nsr_update["detailed-status"] = " ".join(stage)
1121 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1122 self._write_op_status(nslcmop_id, stage)
1123 await asyncio.sleep(5, loop=self.loop)
1124 else: # timeout_ns_deploy
1125 raise NgRoException("Timeout waiting ns to deploy")
1126
1127 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
1128 db_nsr_update = {}
1129 failed_detail = []
1130 action_id = None
1131 start_deploy = time()
1132 try:
1133 target = {
1134 "ns": {"vld": []},
1135 "vnf": [],
1136 "image": [],
1137 "flavor": [],
1138 "action_id": nslcmop_id
1139 }
1140 desc = await self.RO.deploy(nsr_id, target)
1141 action_id = desc["action_id"]
1142 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1143 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
1144 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
1145
1146 # wait until done
1147 delete_timeout = 20 * 60 # 20 minutes
1148 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
1149
1150 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1151 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1152 # delete all nsr
1153 await self.RO.delete(nsr_id)
1154 except Exception as e:
1155 if isinstance(e, NgRoException) and e.http_code == 404: # not found
1156 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1157 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
1158 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
1159 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
1160 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
1161 failed_detail.append("delete conflict: {}".format(e))
1162 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
1163 else:
1164 failed_detail.append("delete error: {}".format(e))
1165 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
1166
1167 if failed_detail:
1168 stage[2] = "Error deleting from VIM"
1169 else:
1170 stage[2] = "Deleted from VIM"
1171 db_nsr_update["detailed-status"] = " ".join(stage)
1172 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1173 self._write_op_status(nslcmop_id, stage)
1174
1175 if failed_detail:
1176 raise LcmException("; ".join(failed_detail))
1177 return
1178
1179 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds_ref,
1180 n2vc_key_list, stage):
1181 """
1182 Instantiate at RO
1183 :param logging_text: preffix text to use at logging
1184 :param nsr_id: nsr identity
1185 :param nsd: database content of ns descriptor
1186 :param db_nsr: database content of ns record
1187 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1188 :param db_vnfrs:
1189 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1190 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1191 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1192 :return: None or exception
1193 """
1194 try:
1195 db_nsr_update = {}
1196 RO_descriptor_number = 0 # number of descriptors created at RO
1197 vnf_index_2_RO_id = {} # map between vnfd/nsd id to the id used at RO
1198 nslcmop_id = db_nslcmop["_id"]
1199 start_deploy = time()
1200 ns_params = db_nslcmop.get("operationParams")
1201 if ns_params and ns_params.get("timeout_ns_deploy"):
1202 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1203 else:
1204 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1205
1206 # Check for and optionally request placement optimization. Database will be updated if placement activated
1207 stage[2] = "Waiting for Placement."
1208 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
1209 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1210 for vnfr in db_vnfrs.values():
1211 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
1212 break
1213 else:
1214 ns_params["vimAccountId"] == vnfr["vim-account-id"]
1215
1216 if self.ng_ro:
1217 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
1218 db_vnfds_ref, n2vc_key_list, stage, start_deploy,
1219 timeout_ns_deploy)
1220 # deploy RO
1221 # get vnfds, instantiate at RO
1222 for c_vnf in nsd.get("constituent-vnfd", ()):
1223 member_vnf_index = c_vnf["member-vnf-index"]
1224 vnfd = db_vnfds_ref[c_vnf['vnfd-id-ref']]
1225 vnfd_ref = vnfd["id"]
1226
1227 stage[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref, member_vnf_index)
1228 db_nsr_update["detailed-status"] = " ".join(stage)
1229 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1230 self._write_op_status(nslcmop_id, stage)
1231
1232 # self.logger.debug(logging_text + stage[2])
1233 vnfd_id_RO = "{}.{}.{}".format(nsr_id, RO_descriptor_number, member_vnf_index[:23])
1234 vnf_index_2_RO_id[member_vnf_index] = vnfd_id_RO
1235 RO_descriptor_number += 1
1236
1237 # look position at deployed.RO.vnfd if not present it will be appended at the end
1238 for index, vnf_deployed in enumerate(db_nsr["_admin"]["deployed"]["RO"]["vnfd"]):
1239 if vnf_deployed["member-vnf-index"] == member_vnf_index:
1240 break
1241 else:
1242 index = len(db_nsr["_admin"]["deployed"]["RO"]["vnfd"])
1243 db_nsr["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1244
1245 # look if present
1246 RO_update = {"member-vnf-index": member_vnf_index}
1247 vnfd_list = await self.RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
1248 if vnfd_list:
1249 RO_update["id"] = vnfd_list[0]["uuid"]
1250 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1251 format(vnfd_ref, member_vnf_index, vnfd_list[0]["uuid"]))
1252 else:
1253 vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO, db_vnfrs[c_vnf["member-vnf-index"]].
1254 get("additionalParamsForVnf"), nsr_id)
1255 desc = await self.RO.create("vnfd", descriptor=vnfd_RO)
1256 RO_update["id"] = desc["uuid"]
1257 self.logger.debug(logging_text + "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1258 vnfd_ref, member_vnf_index, desc["uuid"]))
1259 db_nsr_update["_admin.deployed.RO.vnfd.{}".format(index)] = RO_update
1260 db_nsr["_admin"]["deployed"]["RO"]["vnfd"][index] = RO_update
1261
1262 # create nsd at RO
1263 nsd_ref = nsd["id"]
1264
1265 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1266 db_nsr_update["detailed-status"] = " ".join(stage)
1267 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1268 self._write_op_status(nslcmop_id, stage)
1269
1270 # self.logger.debug(logging_text + stage[2])
1271 RO_osm_nsd_id = "{}.{}.{}".format(nsr_id, RO_descriptor_number, nsd_ref[:23])
1272 RO_descriptor_number += 1
1273 nsd_list = await self.RO.get_list("nsd", filter_by={"osm_id": RO_osm_nsd_id})
1274 if nsd_list:
1275 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = nsd_list[0]["uuid"]
1276 self.logger.debug(logging_text + "nsd={} exists at RO. Using RO_id={}".format(
1277 nsd_ref, RO_nsd_uuid))
1278 else:
1279 nsd_RO = deepcopy(nsd)
1280 nsd_RO["id"] = RO_osm_nsd_id
1281 nsd_RO.pop("_id", None)
1282 nsd_RO.pop("_admin", None)
1283 for c_vnf in nsd_RO.get("constituent-vnfd", ()):
1284 member_vnf_index = c_vnf["member-vnf-index"]
1285 c_vnf["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1286 for c_vld in nsd_RO.get("vld", ()):
1287 for cp in c_vld.get("vnfd-connection-point-ref", ()):
1288 member_vnf_index = cp["member-vnf-index-ref"]
1289 cp["vnfd-id-ref"] = vnf_index_2_RO_id[member_vnf_index]
1290
1291 desc = await self.RO.create("nsd", descriptor=nsd_RO)
1292 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1293 db_nsr_update["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid = desc["uuid"]
1294 self.logger.debug(logging_text + "nsd={} created at RO. RO_id={}".format(nsd_ref, RO_nsd_uuid))
1295 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1296
1297 # Crate ns at RO
1298 stage[2] = "Creating nsd={} at RO".format(nsd_ref)
1299 db_nsr_update["detailed-status"] = " ".join(stage)
1300 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1301 self._write_op_status(nslcmop_id, stage)
1302
1303 # if present use it unless in error status
1304 RO_nsr_id = deep_get(db_nsr, ("_admin", "deployed", "RO", "nsr_id"))
1305 if RO_nsr_id:
1306 try:
1307 stage[2] = "Looking for existing ns at RO"
1308 db_nsr_update["detailed-status"] = " ".join(stage)
1309 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1310 self._write_op_status(nslcmop_id, stage)
1311 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1312 desc = await self.RO.show("ns", RO_nsr_id)
1313
1314 except ROclient.ROClientException as e:
1315 if e.http_code != HTTPStatus.NOT_FOUND:
1316 raise
1317 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1318 if RO_nsr_id:
1319 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1320 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1321 if ns_status == "ERROR":
1322 stage[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id)
1323 self.logger.debug(logging_text + stage[2])
1324 await self.RO.delete("ns", RO_nsr_id)
1325 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = None
1326 if not RO_nsr_id:
1327 stage[2] = "Checking dependencies"
1328 db_nsr_update["detailed-status"] = " ".join(stage)
1329 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1330 self._write_op_status(nslcmop_id, stage)
1331 # self.logger.debug(logging_text + stage[2])
1332
1333 # check if VIM is creating and wait look if previous tasks in process
1334 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account", ns_params["vimAccountId"])
1335 if task_dependency:
1336 stage[2] = "Waiting for related tasks '{}' to be completed".format(task_name)
1337 self.logger.debug(logging_text + stage[2])
1338 await asyncio.wait(task_dependency, timeout=3600)
1339 if ns_params.get("vnf"):
1340 for vnf in ns_params["vnf"]:
1341 if "vimAccountId" in vnf:
1342 task_name, task_dependency = self.lcm_tasks.lookfor_related("vim_account",
1343 vnf["vimAccountId"])
1344 if task_dependency:
1345 stage[2] = "Waiting for related tasks '{}' to be completed.".format(task_name)
1346 self.logger.debug(logging_text + stage[2])
1347 await asyncio.wait(task_dependency, timeout=3600)
1348
1349 stage[2] = "Checking instantiation parameters."
1350 RO_ns_params = self._ns_params_2_RO(ns_params, nsd, db_vnfds_ref, db_vnfrs, n2vc_key_list)
1351 stage[2] = "Deploying ns at VIM."
1352 db_nsr_update["detailed-status"] = " ".join(stage)
1353 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1354 self._write_op_status(nslcmop_id, stage)
1355
1356 desc = await self.RO.create("ns", descriptor=RO_ns_params, name=db_nsr["name"], scenario=RO_nsd_uuid)
1357 RO_nsr_id = db_nsr_update["_admin.deployed.RO.nsr_id"] = desc["uuid"]
1358 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1359 db_nsr_update["_admin.deployed.RO.nsr_status"] = "BUILD"
1360 self.logger.debug(logging_text + "ns created at RO. RO_id={}".format(desc["uuid"]))
1361
1362 # wait until NS is ready
1363 stage[2] = "Waiting VIM to deploy ns."
1364 db_nsr_update["detailed-status"] = " ".join(stage)
1365 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1366 self._write_op_status(nslcmop_id, stage)
1367 detailed_status_old = None
1368 self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1369
1370 old_desc = None
1371 while time() <= start_deploy + timeout_ns_deploy:
1372 desc = await self.RO.show("ns", RO_nsr_id)
1373
1374 # deploymentStatus
1375 if desc != old_desc:
1376 # desc has changed => update db
1377 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
1378 old_desc = desc
1379
1380 ns_status, ns_status_info = self.RO.check_ns_status(desc)
1381 db_nsr_update["_admin.deployed.RO.nsr_status"] = ns_status
1382 if ns_status == "ERROR":
1383 raise ROclient.ROClientException(ns_status_info)
1384 elif ns_status == "BUILD":
1385 stage[2] = "VIM: ({})".format(ns_status_info)
1386 elif ns_status == "ACTIVE":
1387 stage[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1388 try:
1389 self.ns_update_vnfr(db_vnfrs, desc)
1390 break
1391 except LcmExceptionNoMgmtIP:
1392 pass
1393 else:
1394 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
1395 if stage[2] != detailed_status_old:
1396 detailed_status_old = stage[2]
1397 db_nsr_update["detailed-status"] = " ".join(stage)
1398 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1399 self._write_op_status(nslcmop_id, stage)
1400 await asyncio.sleep(5, loop=self.loop)
1401 else: # timeout_ns_deploy
1402 raise ROclient.ROClientException("Timeout waiting ns to be ready")
1403
1404 # Updating NSR
1405 self.ns_update_nsr(db_nsr_update, db_nsr, desc)
1406
1407 db_nsr_update["_admin.deployed.RO.operational-status"] = "running"
1408 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1409 stage[2] = "Deployed at VIM"
1410 db_nsr_update["detailed-status"] = " ".join(stage)
1411 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1412 self._write_op_status(nslcmop_id, stage)
1413 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1414 # self.logger.debug(logging_text + "Deployed at VIM")
1415 except Exception as e:
1416 stage[2] = "ERROR deploying at VIM"
1417 self.set_vnfr_at_error(db_vnfrs, str(e))
1418 self.logger.error("Error deploying at VIM {}".format(e),
1419 exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
1420 NgRoException)))
1421 raise
1422
1423 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
1424 """
1425 Wait for kdu to be up, get ip address
1426 :param logging_text: prefix use for logging
1427 :param nsr_id:
1428 :param vnfr_id:
1429 :param kdu_name:
1430 :return: IP address
1431 """
1432
1433 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1434 nb_tries = 0
1435
1436 while nb_tries < 360:
1437 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1438 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
1439 if not kdur:
1440 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
1441 if kdur.get("status"):
1442 if kdur["status"] in ("READY", "ENABLED"):
1443 return kdur.get("ip-address")
1444 else:
1445 raise LcmException("target KDU={} is in error state".format(kdu_name))
1446
1447 await asyncio.sleep(10, loop=self.loop)
1448 nb_tries += 1
1449 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
1450
1451 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
1452 """
1453 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1454 :param logging_text: prefix use for logging
1455 :param nsr_id:
1456 :param vnfr_id:
1457 :param vdu_id:
1458 :param vdu_index:
1459 :param pub_key: public ssh key to inject, None to skip
1460 :param user: user to apply the public ssh key
1461 :return: IP address
1462 """
1463
1464 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1465 ro_nsr_id = None
1466 ip_address = None
1467 nb_tries = 0
1468 target_vdu_id = None
1469 ro_retries = 0
1470
1471 while True:
1472
1473 ro_retries += 1
1474 if ro_retries >= 360: # 1 hour
1475 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1476
1477 await asyncio.sleep(10, loop=self.loop)
1478
1479 # get ip address
1480 if not target_vdu_id:
1481 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1482
1483 if not vdu_id: # for the VNF case
1484 if db_vnfr.get("status") == "ERROR":
1485 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1486 ip_address = db_vnfr.get("ip-address")
1487 if not ip_address:
1488 continue
1489 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1490 else: # VDU case
1491 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1492 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1493
1494 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1495 vdur = db_vnfr["vdur"][0]
1496 if not vdur:
1497 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1498 vdu_index))
1499 # New generation RO stores information at "vim_info"
1500 ng_ro_status = None
1501 target_vim = None
1502 if vdur.get("vim_info"):
1503 target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
1504 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1505 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
1506 ip_address = vdur.get("ip-address")
1507 if not ip_address:
1508 continue
1509 target_vdu_id = vdur["vdu-id-ref"]
1510 elif (
1511 vdur.get("status") == "ERROR" or
1512 vdur.get("vim_info", {}).get(target_vim, {}).get("vim_status") == "ERROR"
1513 ):
1514 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1515
1516 if not target_vdu_id:
1517 continue
1518
1519 # inject public key into machine
1520 if pub_key and user:
1521 self.logger.debug(logging_text + "Inserting RO key")
1522 if vdur.get("pdu-type"):
1523 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1524 return ip_address
1525 try:
1526 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1527 if self.ng_ro:
1528 target = {"action": "inject_ssh_key", "key": pub_key, "user": user,
1529 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1530 }
1531 desc = await self.RO.deploy(nsr_id, target)
1532 action_id = desc["action_id"]
1533 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1534 break
1535 else:
1536 # wait until NS is deployed at RO
1537 if not ro_nsr_id:
1538 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1539 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1540 if not ro_nsr_id:
1541 continue
1542 result_dict = await self.RO.create_action(
1543 item="ns",
1544 item_id_name=ro_nsr_id,
1545 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1546 )
1547 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1548 if not result_dict or not isinstance(result_dict, dict):
1549 raise LcmException("Unknown response from RO when injecting key")
1550 for result in result_dict.values():
1551 if result.get("vim_result") == 200:
1552 break
1553 else:
1554 raise ROclient.ROClientException("error injecting key: {}".format(
1555 result.get("description")))
1556 break
1557 except NgRoException as e:
1558 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1559 except ROclient.ROClientException as e:
1560 if not nb_tries:
1561 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1562 format(e, 20*10))
1563 nb_tries += 1
1564 if nb_tries >= 20:
1565 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1566 else:
1567 break
1568
1569 return ip_address
1570
1571 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1572 """
1573 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1574 """
1575 my_vca = vca_deployed_list[vca_index]
1576 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1577 # vdu or kdu: no dependencies
1578 return
1579 timeout = 300
1580 while timeout >= 0:
1581 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1582 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1583 configuration_status_list = db_nsr["configurationStatus"]
1584 for index, vca_deployed in enumerate(configuration_status_list):
1585 if index == vca_index:
1586 # myself
1587 continue
1588 if not my_vca.get("member-vnf-index") or \
1589 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1590 internal_status = configuration_status_list[index].get("status")
1591 if internal_status == 'READY':
1592 continue
1593 elif internal_status == 'BROKEN':
1594 raise LcmException("Configuration aborted because dependent charm/s has failed")
1595 else:
1596 break
1597 else:
1598 # no dependencies, return
1599 return
1600 await asyncio.sleep(10)
1601 timeout -= 1
1602
1603 raise LcmException("Configuration aborted because dependent charm/s timeout")
1604
1605 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1606 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1607 ee_config_descriptor):
1608 nsr_id = db_nsr["_id"]
1609 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1610 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1611 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1612 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1613 db_dict = {
1614 'collection': 'nsrs',
1615 'filter': {'_id': nsr_id},
1616 'path': db_update_entry
1617 }
1618 step = ""
1619 try:
1620
1621 element_type = 'NS'
1622 element_under_configuration = nsr_id
1623
1624 vnfr_id = None
1625 if db_vnfr:
1626 vnfr_id = db_vnfr["_id"]
1627 osm_config["osm"]["vnf_id"] = vnfr_id
1628
1629 namespace = "{nsi}.{ns}".format(
1630 nsi=nsi_id if nsi_id else "",
1631 ns=nsr_id)
1632
1633 if vnfr_id:
1634 element_type = 'VNF'
1635 element_under_configuration = vnfr_id
1636 namespace += ".{}".format(vnfr_id)
1637 if vdu_id:
1638 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1639 element_type = 'VDU'
1640 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1641 osm_config["osm"]["vdu_id"] = vdu_id
1642 elif kdu_name:
1643 namespace += ".{}".format(kdu_name)
1644 element_type = 'KDU'
1645 element_under_configuration = kdu_name
1646 osm_config["osm"]["kdu_name"] = kdu_name
1647
1648 # Get artifact path
1649 artifact_path = "{}/{}/{}/{}".format(
1650 base_folder["folder"],
1651 base_folder["pkg-dir"],
1652 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1653 vca_name
1654 )
1655 # get initial_config_primitive_list that applies to this element
1656 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1657
1658 # add config if not present for NS charm
1659 ee_descriptor_id = ee_config_descriptor.get("id")
1660 initial_config_primitive_list = self._get_initial_config_primitive_list(initial_config_primitive_list,
1661 vca_deployed, ee_descriptor_id)
1662
1663 # n2vc_redesign STEP 3.1
1664 # find old ee_id if exists
1665 ee_id = vca_deployed.get("ee_id")
1666
1667 vim_account_id = (
1668 deep_get(db_vnfr, ("vim-account-id",)) or
1669 deep_get(deploy_params, ("OSM", "vim_account_id"))
1670 )
1671 vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
1672 vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
1673 # create or register execution environment in VCA
1674 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1675
1676 self._write_configuration_status(
1677 nsr_id=nsr_id,
1678 vca_index=vca_index,
1679 status='CREATING',
1680 element_under_configuration=element_under_configuration,
1681 element_type=element_type
1682 )
1683
1684 step = "create execution environment"
1685 self.logger.debug(logging_text + step)
1686
1687 ee_id = None
1688 credentials = None
1689 if vca_type == "k8s_proxy_charm":
1690 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1691 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1692 namespace=namespace,
1693 artifact_path=artifact_path,
1694 db_dict=db_dict,
1695 cloud_name=vca_k8s_cloud,
1696 credential_name=vca_k8s_cloud_credential,
1697 )
1698 else:
1699 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1700 namespace=namespace,
1701 reuse_ee_id=ee_id,
1702 db_dict=db_dict,
1703 cloud_name=vca_cloud,
1704 credential_name=vca_cloud_credential,
1705 )
1706
1707 elif vca_type == "native_charm":
1708 step = "Waiting to VM being up and getting IP address"
1709 self.logger.debug(logging_text + step)
1710 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1711 user=None, pub_key=None)
1712 credentials = {"hostname": rw_mgmt_ip}
1713 # get username
1714 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1715 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1716 # merged. Meanwhile let's get username from initial-config-primitive
1717 if not username and initial_config_primitive_list:
1718 for config_primitive in initial_config_primitive_list:
1719 for param in config_primitive.get("parameter", ()):
1720 if param["name"] == "ssh-username":
1721 username = param["value"]
1722 break
1723 if not username:
1724 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1725 "'config-access.ssh-access.default-user'")
1726 credentials["username"] = username
1727 # n2vc_redesign STEP 3.2
1728
1729 self._write_configuration_status(
1730 nsr_id=nsr_id,
1731 vca_index=vca_index,
1732 status='REGISTERING',
1733 element_under_configuration=element_under_configuration,
1734 element_type=element_type
1735 )
1736
1737 step = "register execution environment {}".format(credentials)
1738 self.logger.debug(logging_text + step)
1739 ee_id = await self.vca_map[vca_type].register_execution_environment(
1740 credentials=credentials,
1741 namespace=namespace,
1742 db_dict=db_dict,
1743 cloud_name=vca_cloud,
1744 credential_name=vca_cloud_credential,
1745 )
1746
1747 # for compatibility with MON/POL modules, the need model and application name at database
1748 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1749 ee_id_parts = ee_id.split('.')
1750 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1751 if len(ee_id_parts) >= 2:
1752 model_name = ee_id_parts[0]
1753 application_name = ee_id_parts[1]
1754 db_nsr_update[db_update_entry + "model"] = model_name
1755 db_nsr_update[db_update_entry + "application"] = application_name
1756
1757 # n2vc_redesign STEP 3.3
1758 step = "Install configuration Software"
1759
1760 self._write_configuration_status(
1761 nsr_id=nsr_id,
1762 vca_index=vca_index,
1763 status='INSTALLING SW',
1764 element_under_configuration=element_under_configuration,
1765 element_type=element_type,
1766 other_update=db_nsr_update
1767 )
1768
1769 # TODO check if already done
1770 self.logger.debug(logging_text + step)
1771 config = None
1772 if vca_type == "native_charm":
1773 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1774 if config_primitive:
1775 config = self._map_primitive_params(
1776 config_primitive,
1777 {},
1778 deploy_params
1779 )
1780 num_units = 1
1781 if vca_type == "lxc_proxy_charm":
1782 if element_type == "NS":
1783 num_units = db_nsr.get("config-units") or 1
1784 elif element_type == "VNF":
1785 num_units = db_vnfr.get("config-units") or 1
1786 elif element_type == "VDU":
1787 for v in db_vnfr["vdur"]:
1788 if vdu_id == v["vdu-id-ref"]:
1789 num_units = v.get("config-units") or 1
1790 break
1791 if vca_type != "k8s_proxy_charm":
1792 await self.vca_map[vca_type].install_configuration_sw(
1793 ee_id=ee_id,
1794 artifact_path=artifact_path,
1795 db_dict=db_dict,
1796 config=config,
1797 num_units=num_units,
1798 )
1799
1800 # write in db flag of configuration_sw already installed
1801 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1802
1803 # add relations for this VCA (wait for other peers related with this VCA)
1804 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1805 vca_index=vca_index, vca_type=vca_type)
1806
1807 # if SSH access is required, then get execution environment SSH public
1808 # if native charm we have waited already to VM be UP
1809 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1810 pub_key = None
1811 user = None
1812 # self.logger.debug("get ssh key block")
1813 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1814 # self.logger.debug("ssh key needed")
1815 # Needed to inject a ssh key
1816 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1817 step = "Install configuration Software, getting public ssh key"
1818 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1819
1820 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1821 else:
1822 # self.logger.debug("no need to get ssh key")
1823 step = "Waiting to VM being up and getting IP address"
1824 self.logger.debug(logging_text + step)
1825
1826 # n2vc_redesign STEP 5.1
1827 # wait for RO (ip-address) Insert pub_key into VM
1828 if vnfr_id:
1829 if kdu_name:
1830 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1831 else:
1832 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1833 vdu_index, user=user, pub_key=pub_key)
1834 else:
1835 rw_mgmt_ip = None # This is for a NS configuration
1836
1837 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1838
1839 # store rw_mgmt_ip in deploy params for later replacement
1840 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1841
1842 # n2vc_redesign STEP 6 Execute initial config primitive
1843 step = 'execute initial config primitive'
1844
1845 # wait for dependent primitives execution (NS -> VNF -> VDU)
1846 if initial_config_primitive_list:
1847 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1848
1849 # stage, in function of element type: vdu, kdu, vnf or ns
1850 my_vca = vca_deployed_list[vca_index]
1851 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1852 # VDU or KDU
1853 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1854 elif my_vca.get("member-vnf-index"):
1855 # VNF
1856 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1857 else:
1858 # NS
1859 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1860
1861 self._write_configuration_status(
1862 nsr_id=nsr_id,
1863 vca_index=vca_index,
1864 status='EXECUTING PRIMITIVE'
1865 )
1866
1867 self._write_op_status(
1868 op_id=nslcmop_id,
1869 stage=stage
1870 )
1871
1872 check_if_terminated_needed = True
1873 for initial_config_primitive in initial_config_primitive_list:
1874 # adding information on the vca_deployed if it is a NS execution environment
1875 if not vca_deployed["member-vnf-index"]:
1876 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1877 # TODO check if already done
1878 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1879
1880 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1881 self.logger.debug(logging_text + step)
1882 await self.vca_map[vca_type].exec_primitive(
1883 ee_id=ee_id,
1884 primitive_name=initial_config_primitive["name"],
1885 params_dict=primitive_params_,
1886 db_dict=db_dict
1887 )
1888 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1889 if check_if_terminated_needed:
1890 if config_descriptor.get('terminate-config-primitive'):
1891 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1892 check_if_terminated_needed = False
1893
1894 # TODO register in database that primitive is done
1895
1896 # STEP 7 Configure metrics
1897 if vca_type == "helm" or vca_type == "helm-v3":
1898 prometheus_jobs = await self.add_prometheus_metrics(
1899 ee_id=ee_id,
1900 artifact_path=artifact_path,
1901 ee_config_descriptor=ee_config_descriptor,
1902 vnfr_id=vnfr_id,
1903 nsr_id=nsr_id,
1904 target_ip=rw_mgmt_ip,
1905 )
1906 if prometheus_jobs:
1907 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1908
1909 step = "instantiated at VCA"
1910 self.logger.debug(logging_text + step)
1911
1912 self._write_configuration_status(
1913 nsr_id=nsr_id,
1914 vca_index=vca_index,
1915 status='READY'
1916 )
1917
1918 except Exception as e: # TODO not use Exception but N2VC exception
1919 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1920 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1921 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1922 self._write_configuration_status(
1923 nsr_id=nsr_id,
1924 vca_index=vca_index,
1925 status='BROKEN'
1926 )
1927 raise LcmException("{} {}".format(step, e)) from e
1928
1929 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1930 error_description: str = None, error_detail: str = None, other_update: dict = None):
1931 """
1932 Update db_nsr fields.
1933 :param nsr_id:
1934 :param ns_state:
1935 :param current_operation:
1936 :param current_operation_id:
1937 :param error_description:
1938 :param error_detail:
1939 :param other_update: Other required changes at database if provided, will be cleared
1940 :return:
1941 """
1942 try:
1943 db_dict = other_update or {}
1944 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1945 db_dict["_admin.current-operation"] = current_operation_id
1946 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1947 db_dict["currentOperation"] = current_operation
1948 db_dict["currentOperationID"] = current_operation_id
1949 db_dict["errorDescription"] = error_description
1950 db_dict["errorDetail"] = error_detail
1951
1952 if ns_state:
1953 db_dict["nsState"] = ns_state
1954 self.update_db_2("nsrs", nsr_id, db_dict)
1955 except DbException as e:
1956 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1957
1958 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1959 operation_state: str = None, other_update: dict = None):
1960 try:
1961 db_dict = other_update or {}
1962 db_dict['queuePosition'] = queuePosition
1963 if isinstance(stage, list):
1964 db_dict['stage'] = stage[0]
1965 db_dict['detailed-status'] = " ".join(stage)
1966 elif stage is not None:
1967 db_dict['stage'] = str(stage)
1968
1969 if error_message is not None:
1970 db_dict['errorMessage'] = error_message
1971 if operation_state is not None:
1972 db_dict['operationState'] = operation_state
1973 db_dict["statusEnteredTime"] = time()
1974 self.update_db_2("nslcmops", op_id, db_dict)
1975 except DbException as e:
1976 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1977
1978 def _write_all_config_status(self, db_nsr: dict, status: str):
1979 try:
1980 nsr_id = db_nsr["_id"]
1981 # configurationStatus
1982 config_status = db_nsr.get('configurationStatus')
1983 if config_status:
1984 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1985 enumerate(config_status) if v}
1986 # update status
1987 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1988
1989 except DbException as e:
1990 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1991
1992 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1993 element_under_configuration: str = None, element_type: str = None,
1994 other_update: dict = None):
1995
1996 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1997 # .format(vca_index, status))
1998
1999 try:
2000 db_path = 'configurationStatus.{}.'.format(vca_index)
2001 db_dict = other_update or {}
2002 if status:
2003 db_dict[db_path + 'status'] = status
2004 if element_under_configuration:
2005 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
2006 if element_type:
2007 db_dict[db_path + 'elementType'] = element_type
2008 self.update_db_2("nsrs", nsr_id, db_dict)
2009 except DbException as e:
2010 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
2011 .format(status, nsr_id, vca_index, e))
2012
2013 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
2014 """
2015 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
2016 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
2017 Database is used because the result can be obtained from a different LCM worker in case of HA.
2018 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
2019 :param db_nslcmop: database content of nslcmop
2020 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
2021 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
2022 computed 'vim-account-id'
2023 """
2024 modified = False
2025 nslcmop_id = db_nslcmop['_id']
2026 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
2027 if placement_engine == "PLA":
2028 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
2029 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
2030 db_poll_interval = 5
2031 wait = db_poll_interval * 10
2032 pla_result = None
2033 while not pla_result and wait >= 0:
2034 await asyncio.sleep(db_poll_interval)
2035 wait -= db_poll_interval
2036 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2037 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
2038
2039 if not pla_result:
2040 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
2041
2042 for pla_vnf in pla_result['vnf']:
2043 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
2044 if not pla_vnf.get('vimAccountId') or not vnfr:
2045 continue
2046 modified = True
2047 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
2048 # Modifies db_vnfrs
2049 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
2050 return modified
2051
2052 def update_nsrs_with_pla_result(self, params):
2053 try:
2054 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
2055 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
2056 except Exception as e:
2057 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
2058
2059 async def instantiate(self, nsr_id, nslcmop_id):
2060 """
2061
2062 :param nsr_id: ns instance to deploy
2063 :param nslcmop_id: operation to run
2064 :return:
2065 """
2066
2067 # Try to lock HA task here
2068 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2069 if not task_is_locked_by_me:
2070 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
2071 return
2072
2073 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
2074 self.logger.debug(logging_text + "Enter")
2075
2076 # get all needed from database
2077
2078 # database nsrs record
2079 db_nsr = None
2080
2081 # database nslcmops record
2082 db_nslcmop = None
2083
2084 # update operation on nsrs
2085 db_nsr_update = {}
2086 # update operation on nslcmops
2087 db_nslcmop_update = {}
2088
2089 nslcmop_operation_state = None
2090 db_vnfrs = {} # vnf's info indexed by member-index
2091 # n2vc_info = {}
2092 tasks_dict_info = {} # from task to info text
2093 exc = None
2094 error_list = []
2095 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
2096 # ^ stage, step, VIM progress
2097 try:
2098 # wait for any previous tasks in process
2099 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
2100
2101 stage[1] = "Sync filesystem from database."
2102 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
2103
2104 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
2105 stage[1] = "Reading from database."
2106 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
2107 db_nsr_update["detailed-status"] = "creating"
2108 db_nsr_update["operational-status"] = "init"
2109 self._write_ns_status(
2110 nsr_id=nsr_id,
2111 ns_state="BUILDING",
2112 current_operation="INSTANTIATING",
2113 current_operation_id=nslcmop_id,
2114 other_update=db_nsr_update
2115 )
2116 self._write_op_status(
2117 op_id=nslcmop_id,
2118 stage=stage,
2119 queuePosition=0
2120 )
2121
2122 # read from db: operation
2123 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
2124 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
2125 ns_params = db_nslcmop.get("operationParams")
2126 if ns_params and ns_params.get("timeout_ns_deploy"):
2127 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
2128 else:
2129 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
2130
2131 # read from db: ns
2132 stage[1] = "Getting nsr={} from db.".format(nsr_id)
2133 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2134 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
2135 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2136 db_nsr["nsd"] = nsd
2137 # nsr_name = db_nsr["name"] # TODO short-name??
2138
2139 # read from db: vnf's of this ns
2140 stage[1] = "Getting vnfrs from db."
2141 self.logger.debug(logging_text + stage[1])
2142 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
2143
2144 # read from db: vnfd's for every vnf
2145 db_vnfds_ref = {} # every vnfd data indexed by vnf name
2146 db_vnfds = {} # every vnfd data indexed by vnf id
2147 db_vnfds_index = {} # every vnfd data indexed by vnf member-index
2148
2149 # for each vnf in ns, read vnfd
2150 for vnfr in db_vnfrs_list:
2151 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr # vnf's dict indexed by member-index: '1', '2', etc
2152 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
2153 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
2154
2155 # if we haven't this vnfd, read it from db
2156 if vnfd_id not in db_vnfds:
2157 # read from db
2158 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
2159 self.logger.debug(logging_text + stage[1])
2160 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
2161
2162 # store vnfd
2163 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
2164 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
2165 db_vnfds_index[vnfr["member-vnf-index-ref"]] = db_vnfds[vnfd_id] # vnfd's indexed by member-index
2166
2167 # Get or generates the _admin.deployed.VCA list
2168 vca_deployed_list = None
2169 if db_nsr["_admin"].get("deployed"):
2170 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
2171 if vca_deployed_list is None:
2172 vca_deployed_list = []
2173 configuration_status_list = []
2174 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2175 db_nsr_update["configurationStatus"] = configuration_status_list
2176 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2177 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2178 elif isinstance(vca_deployed_list, dict):
2179 # maintain backward compatibility. Change a dict to list at database
2180 vca_deployed_list = list(vca_deployed_list.values())
2181 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
2182 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
2183
2184 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
2185 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
2186 db_nsr_update["_admin.deployed.RO.vnfd"] = []
2187
2188 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2189 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
2190 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2191 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
2192
2193 # n2vc_redesign STEP 2 Deploy Network Scenario
2194 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
2195 self._write_op_status(
2196 op_id=nslcmop_id,
2197 stage=stage
2198 )
2199
2200 stage[1] = "Deploying KDUs."
2201 # self.logger.debug(logging_text + "Before deploy_kdus")
2202 # Call to deploy_kdus in case exists the "vdu:kdu" param
2203 await self.deploy_kdus(
2204 logging_text=logging_text,
2205 nsr_id=nsr_id,
2206 nslcmop_id=nslcmop_id,
2207 db_vnfrs=db_vnfrs,
2208 db_vnfds=db_vnfds,
2209 task_instantiation_info=tasks_dict_info,
2210 )
2211
2212 stage[1] = "Getting VCA public key."
2213 # n2vc_redesign STEP 1 Get VCA public ssh-key
2214 # feature 1429. Add n2vc public key to needed VMs
2215 n2vc_key = self.n2vc.get_public_key()
2216 n2vc_key_list = [n2vc_key]
2217 if self.vca_config.get("public_key"):
2218 n2vc_key_list.append(self.vca_config["public_key"])
2219
2220 stage[1] = "Deploying NS at VIM."
2221 task_ro = asyncio.ensure_future(
2222 self.instantiate_RO(
2223 logging_text=logging_text,
2224 nsr_id=nsr_id,
2225 nsd=nsd,
2226 db_nsr=db_nsr,
2227 db_nslcmop=db_nslcmop,
2228 db_vnfrs=db_vnfrs,
2229 db_vnfds_ref=db_vnfds_ref,
2230 n2vc_key_list=n2vc_key_list,
2231 stage=stage
2232 )
2233 )
2234 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
2235 tasks_dict_info[task_ro] = "Deploying at VIM"
2236
2237 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2238 stage[1] = "Deploying Execution Environments."
2239 self.logger.debug(logging_text + stage[1])
2240
2241 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
2242 # get_iterable() returns a value from a dict or empty tuple if key does not exist
2243 for c_vnf in get_iterable(nsd, "constituent-vnfd"):
2244 vnfd_id = c_vnf["vnfd-id-ref"]
2245 vnfd = db_vnfds_ref[vnfd_id]
2246 member_vnf_index = str(c_vnf["member-vnf-index"])
2247 db_vnfr = db_vnfrs[member_vnf_index]
2248 base_folder = vnfd["_admin"]["storage"]
2249 vdu_id = None
2250 vdu_index = 0
2251 vdu_name = None
2252 kdu_name = None
2253
2254 # Get additional parameters
2255 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2256 if db_vnfr.get("additionalParamsForVnf"):
2257 deploy_params.update(self._format_additional_params(db_vnfr["additionalParamsForVnf"].copy()))
2258
2259 descriptor_config = vnfd.get("vnf-configuration")
2260 if descriptor_config:
2261 self._deploy_n2vc(
2262 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
2263 db_nsr=db_nsr,
2264 db_vnfr=db_vnfr,
2265 nslcmop_id=nslcmop_id,
2266 nsr_id=nsr_id,
2267 nsi_id=nsi_id,
2268 vnfd_id=vnfd_id,
2269 vdu_id=vdu_id,
2270 kdu_name=kdu_name,
2271 member_vnf_index=member_vnf_index,
2272 vdu_index=vdu_index,
2273 vdu_name=vdu_name,
2274 deploy_params=deploy_params,
2275 descriptor_config=descriptor_config,
2276 base_folder=base_folder,
2277 task_instantiation_info=tasks_dict_info,
2278 stage=stage
2279 )
2280
2281 # Deploy charms for each VDU that supports one.
2282 for vdud in get_iterable(vnfd, 'vdu'):
2283 vdu_id = vdud["id"]
2284 descriptor_config = vdud.get('vdu-configuration')
2285 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
2286 if vdur.get("additionalParams"):
2287 deploy_params_vdu = self._format_additional_params(vdur["additionalParams"])
2288 else:
2289 deploy_params_vdu = deploy_params
2290 deploy_params_vdu["OSM"] = self._get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
2291 if descriptor_config:
2292 vdu_name = None
2293 kdu_name = None
2294 for vdu_index in range(int(vdud.get("count", 1))):
2295 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2296 self._deploy_n2vc(
2297 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2298 member_vnf_index, vdu_id, vdu_index),
2299 db_nsr=db_nsr,
2300 db_vnfr=db_vnfr,
2301 nslcmop_id=nslcmop_id,
2302 nsr_id=nsr_id,
2303 nsi_id=nsi_id,
2304 vnfd_id=vnfd_id,
2305 vdu_id=vdu_id,
2306 kdu_name=kdu_name,
2307 member_vnf_index=member_vnf_index,
2308 vdu_index=vdu_index,
2309 vdu_name=vdu_name,
2310 deploy_params=deploy_params_vdu,
2311 descriptor_config=descriptor_config,
2312 base_folder=base_folder,
2313 task_instantiation_info=tasks_dict_info,
2314 stage=stage
2315 )
2316 for kdud in get_iterable(vnfd, 'kdu'):
2317 kdu_name = kdud["name"]
2318 descriptor_config = kdud.get('kdu-configuration')
2319 if descriptor_config:
2320 vdu_id = None
2321 vdu_index = 0
2322 vdu_name = None
2323 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
2324 deploy_params_kdu = {"OSM": self._get_osm_params(db_vnfr)}
2325 if kdur.get("additionalParams"):
2326 deploy_params_kdu = self._format_additional_params(kdur["additionalParams"])
2327
2328 self._deploy_n2vc(
2329 logging_text=logging_text,
2330 db_nsr=db_nsr,
2331 db_vnfr=db_vnfr,
2332 nslcmop_id=nslcmop_id,
2333 nsr_id=nsr_id,
2334 nsi_id=nsi_id,
2335 vnfd_id=vnfd_id,
2336 vdu_id=vdu_id,
2337 kdu_name=kdu_name,
2338 member_vnf_index=member_vnf_index,
2339 vdu_index=vdu_index,
2340 vdu_name=vdu_name,
2341 deploy_params=deploy_params_kdu,
2342 descriptor_config=descriptor_config,
2343 base_folder=base_folder,
2344 task_instantiation_info=tasks_dict_info,
2345 stage=stage
2346 )
2347
2348 # Check if this NS has a charm configuration
2349 descriptor_config = nsd.get("ns-configuration")
2350 if descriptor_config and descriptor_config.get("juju"):
2351 vnfd_id = None
2352 db_vnfr = None
2353 member_vnf_index = None
2354 vdu_id = None
2355 kdu_name = None
2356 vdu_index = 0
2357 vdu_name = None
2358
2359 # Get additional parameters
2360 deploy_params = {"OSM": self._get_osm_params(db_vnfr)}
2361 if db_nsr.get("additionalParamsForNs"):
2362 deploy_params.update(self._format_additional_params(db_nsr["additionalParamsForNs"].copy()))
2363 base_folder = nsd["_admin"]["storage"]
2364 self._deploy_n2vc(
2365 logging_text=logging_text,
2366 db_nsr=db_nsr,
2367 db_vnfr=db_vnfr,
2368 nslcmop_id=nslcmop_id,
2369 nsr_id=nsr_id,
2370 nsi_id=nsi_id,
2371 vnfd_id=vnfd_id,
2372 vdu_id=vdu_id,
2373 kdu_name=kdu_name,
2374 member_vnf_index=member_vnf_index,
2375 vdu_index=vdu_index,
2376 vdu_name=vdu_name,
2377 deploy_params=deploy_params,
2378 descriptor_config=descriptor_config,
2379 base_folder=base_folder,
2380 task_instantiation_info=tasks_dict_info,
2381 stage=stage
2382 )
2383
2384 # rest of staff will be done at finally
2385
2386 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
2387 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
2388 exc = e
2389 except asyncio.CancelledError:
2390 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
2391 exc = "Operation was cancelled"
2392 except Exception as e:
2393 exc = traceback.format_exc()
2394 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
2395 finally:
2396 if exc:
2397 error_list.append(str(exc))
2398 try:
2399 # wait for pending tasks
2400 if tasks_dict_info:
2401 stage[1] = "Waiting for instantiate pending tasks."
2402 self.logger.debug(logging_text + stage[1])
2403 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
2404 stage, nslcmop_id, nsr_id=nsr_id)
2405 stage[1] = stage[2] = ""
2406 except asyncio.CancelledError:
2407 error_list.append("Cancelled")
2408 # TODO cancel all tasks
2409 except Exception as exc:
2410 error_list.append(str(exc))
2411
2412 # update operation-status
2413 db_nsr_update["operational-status"] = "running"
2414 # let's begin with VCA 'configured' status (later we can change it)
2415 db_nsr_update["config-status"] = "configured"
2416 for task, task_name in tasks_dict_info.items():
2417 if not task.done() or task.cancelled() or task.exception():
2418 if task_name.startswith(self.task_name_deploy_vca):
2419 # A N2VC task is pending
2420 db_nsr_update["config-status"] = "failed"
2421 else:
2422 # RO or KDU task is pending
2423 db_nsr_update["operational-status"] = "failed"
2424
2425 # update status at database
2426 if error_list:
2427 error_detail = ". ".join(error_list)
2428 self.logger.error(logging_text + error_detail)
2429 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
2430 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
2431
2432 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
2433 db_nslcmop_update["detailed-status"] = error_detail
2434 nslcmop_operation_state = "FAILED"
2435 ns_state = "BROKEN"
2436 else:
2437 error_detail = None
2438 error_description_nsr = error_description_nslcmop = None
2439 ns_state = "READY"
2440 db_nsr_update["detailed-status"] = "Done"
2441 db_nslcmop_update["detailed-status"] = "Done"
2442 nslcmop_operation_state = "COMPLETED"
2443
2444 if db_nsr:
2445 self._write_ns_status(
2446 nsr_id=nsr_id,
2447 ns_state=ns_state,
2448 current_operation="IDLE",
2449 current_operation_id=None,
2450 error_description=error_description_nsr,
2451 error_detail=error_detail,
2452 other_update=db_nsr_update
2453 )
2454 self._write_op_status(
2455 op_id=nslcmop_id,
2456 stage="",
2457 error_message=error_description_nslcmop,
2458 operation_state=nslcmop_operation_state,
2459 other_update=db_nslcmop_update,
2460 )
2461
2462 if nslcmop_operation_state:
2463 try:
2464 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2465 "operationState": nslcmop_operation_state},
2466 loop=self.loop)
2467 except Exception as e:
2468 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2469
2470 self.logger.debug(logging_text + "Exit")
2471 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2472
2473 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2474 timeout: int = 3600, vca_type: str = None) -> bool:
2475
2476 # steps:
2477 # 1. find all relations for this VCA
2478 # 2. wait for other peers related
2479 # 3. add relations
2480
2481 try:
2482 vca_type = vca_type or "lxc_proxy_charm"
2483
2484 # STEP 1: find all relations for this VCA
2485
2486 # read nsr record
2487 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2488 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2489
2490 # this VCA data
2491 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2492
2493 # read all ns-configuration relations
2494 ns_relations = list()
2495 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2496 if db_ns_relations:
2497 for r in db_ns_relations:
2498 # check if this VCA is in the relation
2499 if my_vca.get('member-vnf-index') in\
2500 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2501 ns_relations.append(r)
2502
2503 # read all vnf-configuration relations
2504 vnf_relations = list()
2505 db_vnfd_list = db_nsr.get('vnfd-id')
2506 if db_vnfd_list:
2507 for vnfd in db_vnfd_list:
2508 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2509 db_vnf_relations = deep_get(db_vnfd, ('vnf-configuration', 'relation'))
2510 if db_vnf_relations:
2511 for r in db_vnf_relations:
2512 # check if this VCA is in the relation
2513 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2514 vnf_relations.append(r)
2515
2516 # if no relations, terminate
2517 if not ns_relations and not vnf_relations:
2518 self.logger.debug(logging_text + ' No relations')
2519 return True
2520
2521 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2522
2523 # add all relations
2524 start = time()
2525 while True:
2526 # check timeout
2527 now = time()
2528 if now - start >= timeout:
2529 self.logger.error(logging_text + ' : timeout adding relations')
2530 return False
2531
2532 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2533 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2534
2535 # for each defined NS relation, find the VCA's related
2536 for r in ns_relations.copy():
2537 from_vca_ee_id = None
2538 to_vca_ee_id = None
2539 from_vca_endpoint = None
2540 to_vca_endpoint = None
2541 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2542 for vca in vca_list:
2543 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2544 and vca.get('config_sw_installed'):
2545 from_vca_ee_id = vca.get('ee_id')
2546 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2547 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2548 and vca.get('config_sw_installed'):
2549 to_vca_ee_id = vca.get('ee_id')
2550 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2551 if from_vca_ee_id and to_vca_ee_id:
2552 # add relation
2553 await self.vca_map[vca_type].add_relation(
2554 ee_id_1=from_vca_ee_id,
2555 ee_id_2=to_vca_ee_id,
2556 endpoint_1=from_vca_endpoint,
2557 endpoint_2=to_vca_endpoint)
2558 # remove entry from relations list
2559 ns_relations.remove(r)
2560 else:
2561 # check failed peers
2562 try:
2563 vca_status_list = db_nsr.get('configurationStatus')
2564 if vca_status_list:
2565 for i in range(len(vca_list)):
2566 vca = vca_list[i]
2567 vca_status = vca_status_list[i]
2568 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2569 if vca_status.get('status') == 'BROKEN':
2570 # peer broken: remove relation from list
2571 ns_relations.remove(r)
2572 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2573 if vca_status.get('status') == 'BROKEN':
2574 # peer broken: remove relation from list
2575 ns_relations.remove(r)
2576 except Exception:
2577 # ignore
2578 pass
2579
2580 # for each defined VNF relation, find the VCA's related
2581 for r in vnf_relations.copy():
2582 from_vca_ee_id = None
2583 to_vca_ee_id = None
2584 from_vca_endpoint = None
2585 to_vca_endpoint = None
2586 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2587 for vca in vca_list:
2588 key_to_check = "vdu_id"
2589 if vca.get("vdu_id") is None:
2590 key_to_check = "vnfd_id"
2591 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2592 from_vca_ee_id = vca.get('ee_id')
2593 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2594 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2595 to_vca_ee_id = vca.get('ee_id')
2596 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2597 if from_vca_ee_id and to_vca_ee_id:
2598 # add relation
2599 await self.vca_map[vca_type].add_relation(
2600 ee_id_1=from_vca_ee_id,
2601 ee_id_2=to_vca_ee_id,
2602 endpoint_1=from_vca_endpoint,
2603 endpoint_2=to_vca_endpoint)
2604 # remove entry from relations list
2605 vnf_relations.remove(r)
2606 else:
2607 # check failed peers
2608 try:
2609 vca_status_list = db_nsr.get('configurationStatus')
2610 if vca_status_list:
2611 for i in range(len(vca_list)):
2612 vca = vca_list[i]
2613 vca_status = vca_status_list[i]
2614 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2615 if vca_status.get('status') == 'BROKEN':
2616 # peer broken: remove relation from list
2617 vnf_relations.remove(r)
2618 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2619 if vca_status.get('status') == 'BROKEN':
2620 # peer broken: remove relation from list
2621 vnf_relations.remove(r)
2622 except Exception:
2623 # ignore
2624 pass
2625
2626 # wait for next try
2627 await asyncio.sleep(5.0)
2628
2629 if not ns_relations and not vnf_relations:
2630 self.logger.debug('Relations added')
2631 break
2632
2633 return True
2634
2635 except Exception as e:
2636 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2637 return False
2638
2639 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2640 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2641
2642 try:
2643 k8sclustertype = k8s_instance_info["k8scluster-type"]
2644 # Instantiate kdu
2645 db_dict_install = {"collection": "nsrs",
2646 "filter": {"_id": nsr_id},
2647 "path": nsr_db_path}
2648
2649 kdu_instance = await self.k8scluster_map[k8sclustertype].install(
2650 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2651 kdu_model=k8s_instance_info["kdu-model"],
2652 atomic=True,
2653 params=k8params,
2654 db_dict=db_dict_install,
2655 timeout=timeout,
2656 kdu_name=k8s_instance_info["kdu-name"],
2657 namespace=k8s_instance_info["namespace"])
2658 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2659
2660 # Obtain services to obtain management service ip
2661 services = await self.k8scluster_map[k8sclustertype].get_services(
2662 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2663 kdu_instance=kdu_instance,
2664 namespace=k8s_instance_info["namespace"])
2665
2666 # Obtain management service info (if exists)
2667 vnfr_update_dict = {}
2668 if services:
2669 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2670 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2671 for mgmt_service in mgmt_services:
2672 for service in services:
2673 if service["name"].startswith(mgmt_service["name"]):
2674 # Mgmt service found, Obtain service ip
2675 ip = service.get("external_ip", service.get("cluster_ip"))
2676 if isinstance(ip, list) and len(ip) == 1:
2677 ip = ip[0]
2678
2679 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2680
2681 # Check if must update also mgmt ip at the vnf
2682 service_external_cp = mgmt_service.get("external-connection-point-ref")
2683 if service_external_cp:
2684 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2685 vnfr_update_dict["ip-address"] = ip
2686
2687 break
2688 else:
2689 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2690
2691 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2692 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2693
2694 kdu_config = kdud.get("kdu-configuration")
2695 if kdu_config and kdu_config.get("initial-config-primitive") and kdu_config.get("juju") is None:
2696 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2697 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2698
2699 for initial_config_primitive in initial_config_primitive_list:
2700 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2701
2702 await asyncio.wait_for(
2703 self.k8scluster_map[k8sclustertype].exec_primitive(
2704 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2705 kdu_instance=kdu_instance,
2706 primitive_name=initial_config_primitive["name"],
2707 params=primitive_params_, db_dict={}),
2708 timeout=timeout)
2709
2710 except Exception as e:
2711 # Prepare update db with error and raise exception
2712 try:
2713 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2714 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2715 except Exception:
2716 # ignore to keep original exception
2717 pass
2718 # reraise original error
2719 raise
2720
2721 return kdu_instance
2722
2723 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2724 # Launch kdus if present in the descriptor
2725
2726 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2727
2728 async def _get_cluster_id(cluster_id, cluster_type):
2729 nonlocal k8scluster_id_2_uuic
2730 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2731 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2732
2733 # check if K8scluster is creating and wait look if previous tasks in process
2734 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2735 if task_dependency:
2736 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2737 self.logger.debug(logging_text + text)
2738 await asyncio.wait(task_dependency, timeout=3600)
2739
2740 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2741 if not db_k8scluster:
2742 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2743
2744 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2745 if not k8s_id:
2746 if cluster_type == "helm-chart-v3":
2747 try:
2748 # backward compatibility for existing clusters that have not been initialized for helm v3
2749 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2750 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2751 reuse_cluster_uuid=cluster_id)
2752 db_k8scluster_update = {}
2753 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2754 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2755 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2756 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2757 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2758 except Exception as e:
2759 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2760 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2761 cluster_type))
2762 else:
2763 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2764 format(cluster_id, cluster_type))
2765 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2766 return k8s_id
2767
2768 logging_text += "Deploy kdus: "
2769 step = ""
2770 try:
2771 db_nsr_update = {"_admin.deployed.K8s": []}
2772 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2773
2774 index = 0
2775 updated_cluster_list = []
2776 updated_v3_cluster_list = []
2777
2778 for vnfr_data in db_vnfrs.values():
2779 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2780 # Step 0: Prepare and set parameters
2781 desc_params = self._format_additional_params(kdur.get("additionalParams"))
2782 vnfd_id = vnfr_data.get('vnfd-id')
2783 kdud = next(kdud for kdud in db_vnfds[vnfd_id]["kdu"] if kdud["name"] == kdur["kdu-name"])
2784 namespace = kdur.get("k8s-namespace")
2785 if kdur.get("helm-chart"):
2786 kdumodel = kdur["helm-chart"]
2787 # Default version: helm3, if helm-version is v2 assign v2
2788 k8sclustertype = "helm-chart-v3"
2789 self.logger.debug("kdur: {}".format(kdur))
2790 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2791 k8sclustertype = "helm-chart"
2792 elif kdur.get("juju-bundle"):
2793 kdumodel = kdur["juju-bundle"]
2794 k8sclustertype = "juju-bundle"
2795 else:
2796 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2797 "juju-bundle. Maybe an old NBI version is running".
2798 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2799 # check if kdumodel is a file and exists
2800 try:
2801 storage = deep_get(db_vnfds.get(vnfd_id), ('_admin', 'storage'))
2802 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2803 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2804 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2805 kdumodel)
2806 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2807 kdumodel = self.fs.path + filename
2808 except (asyncio.TimeoutError, asyncio.CancelledError):
2809 raise
2810 except Exception: # it is not a file
2811 pass
2812
2813 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2814 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2815 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2816
2817 # Synchronize repos
2818 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2819 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2820 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2821 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2822 if del_repo_list or added_repo_dict:
2823 if k8sclustertype == "helm-chart":
2824 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2825 updated = {'_admin.helm_charts_added.' +
2826 item: name for item, name in added_repo_dict.items()}
2827 updated_cluster_list.append(cluster_uuid)
2828 elif k8sclustertype == "helm-chart-v3":
2829 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2830 updated = {'_admin.helm_charts_v3_added.' +
2831 item: name for item, name in added_repo_dict.items()}
2832 updated_v3_cluster_list.append(cluster_uuid)
2833 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2834 "'{}' to_delete: {}, to_add: {}".
2835 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2836 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2837
2838 # Instantiate kdu
2839 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2840 kdur["kdu-name"], k8s_cluster_id)
2841 k8s_instance_info = {"kdu-instance": None,
2842 "k8scluster-uuid": cluster_uuid,
2843 "k8scluster-type": k8sclustertype,
2844 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2845 "kdu-name": kdur["kdu-name"],
2846 "kdu-model": kdumodel,
2847 "namespace": namespace}
2848 db_path = "_admin.deployed.K8s.{}".format(index)
2849 db_nsr_update[db_path] = k8s_instance_info
2850 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2851
2852 task = asyncio.ensure_future(
2853 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, db_vnfds[vnfd_id],
2854 k8s_instance_info, k8params=desc_params, timeout=600))
2855 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2856 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2857
2858 index += 1
2859
2860 except (LcmException, asyncio.CancelledError):
2861 raise
2862 except Exception as e:
2863 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2864 if isinstance(e, (N2VCException, DbException)):
2865 self.logger.error(logging_text + msg)
2866 else:
2867 self.logger.critical(logging_text + msg, exc_info=True)
2868 raise LcmException(msg)
2869 finally:
2870 if db_nsr_update:
2871 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2872
2873 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2874 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2875 base_folder, task_instantiation_info, stage):
2876 # launch instantiate_N2VC in a asyncio task and register task object
2877 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2878 # if not found, create one entry and update database
2879 # fill db_nsr._admin.deployed.VCA.<index>
2880
2881 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2882 if descriptor_config.get("juju"): # There is one execution envioronment of type juju
2883 ee_list = [descriptor_config]
2884 elif descriptor_config.get("execution-environment-list"):
2885 ee_list = descriptor_config.get("execution-environment-list")
2886 else: # other types as script are not supported
2887 ee_list = []
2888
2889 for ee_item in ee_list:
2890 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2891 ee_item.get("helm-chart")))
2892 ee_descriptor_id = ee_item.get("id")
2893 if ee_item.get("juju"):
2894 vca_name = ee_item['juju'].get('charm')
2895 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2896 if ee_item['juju'].get('cloud') == "k8s":
2897 vca_type = "k8s_proxy_charm"
2898 elif ee_item['juju'].get('proxy') is False:
2899 vca_type = "native_charm"
2900 elif ee_item.get("helm-chart"):
2901 vca_name = ee_item['helm-chart']
2902 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2903 vca_type = "helm"
2904 else:
2905 vca_type = "helm-v3"
2906 else:
2907 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2908 continue
2909
2910 vca_index = -1
2911 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2912 if not vca_deployed:
2913 continue
2914 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2915 vca_deployed.get("vdu_id") == vdu_id and \
2916 vca_deployed.get("kdu_name") == kdu_name and \
2917 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2918 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2919 break
2920 else:
2921 # not found, create one.
2922 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2923 if vdu_id:
2924 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2925 elif kdu_name:
2926 target += "/kdu/{}".format(kdu_name)
2927 vca_deployed = {
2928 "target_element": target,
2929 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2930 "member-vnf-index": member_vnf_index,
2931 "vdu_id": vdu_id,
2932 "kdu_name": kdu_name,
2933 "vdu_count_index": vdu_index,
2934 "operational-status": "init", # TODO revise
2935 "detailed-status": "", # TODO revise
2936 "step": "initial-deploy", # TODO revise
2937 "vnfd_id": vnfd_id,
2938 "vdu_name": vdu_name,
2939 "type": vca_type,
2940 "ee_descriptor_id": ee_descriptor_id
2941 }
2942 vca_index += 1
2943
2944 # create VCA and configurationStatus in db
2945 db_dict = {
2946 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2947 "configurationStatus.{}".format(vca_index): dict()
2948 }
2949 self.update_db_2("nsrs", nsr_id, db_dict)
2950
2951 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2952
2953 # Launch task
2954 task_n2vc = asyncio.ensure_future(
2955 self.instantiate_N2VC(
2956 logging_text=logging_text,
2957 vca_index=vca_index,
2958 nsi_id=nsi_id,
2959 db_nsr=db_nsr,
2960 db_vnfr=db_vnfr,
2961 vdu_id=vdu_id,
2962 kdu_name=kdu_name,
2963 vdu_index=vdu_index,
2964 deploy_params=deploy_params,
2965 config_descriptor=descriptor_config,
2966 base_folder=base_folder,
2967 nslcmop_id=nslcmop_id,
2968 stage=stage,
2969 vca_type=vca_type,
2970 vca_name=vca_name,
2971 ee_config_descriptor=ee_item
2972 )
2973 )
2974 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2975 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2976 member_vnf_index or "", vdu_id or "")
2977
2978 @staticmethod
2979 def _get_terminate_config_primitive(primitive_list, vca_deployed):
2980 """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
2981 it get only those primitives for this execution envirom"""
2982
2983 primitive_list = primitive_list or []
2984 # filter primitives by ee_descriptor_id
2985 ee_descriptor_id = vca_deployed.get("ee_descriptor_id")
2986 primitive_list = [p for p in primitive_list if p.get("execution-environment-ref") == ee_descriptor_id]
2987
2988 if primitive_list:
2989 primitive_list.sort(key=lambda val: int(val['seq']))
2990
2991 return primitive_list
2992
2993 @staticmethod
2994 def _create_nslcmop(nsr_id, operation, params):
2995 """
2996 Creates a ns-lcm-opp content to be stored at database.
2997 :param nsr_id: internal id of the instance
2998 :param operation: instantiate, terminate, scale, action, ...
2999 :param params: user parameters for the operation
3000 :return: dictionary following SOL005 format
3001 """
3002 # Raise exception if invalid arguments
3003 if not (nsr_id and operation and params):
3004 raise LcmException(
3005 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
3006 now = time()
3007 _id = str(uuid4())
3008 nslcmop = {
3009 "id": _id,
3010 "_id": _id,
3011 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
3012 "operationState": "PROCESSING",
3013 "statusEnteredTime": now,
3014 "nsInstanceId": nsr_id,
3015 "lcmOperationType": operation,
3016 "startTime": now,
3017 "isAutomaticInvocation": False,
3018 "operationParams": params,
3019 "isCancelPending": False,
3020 "links": {
3021 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
3022 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
3023 }
3024 }
3025 return nslcmop
3026
3027 def _format_additional_params(self, params):
3028 params = params or {}
3029 for key, value in params.items():
3030 if str(value).startswith("!!yaml "):
3031 params[key] = yaml.safe_load(value[7:])
3032 return params
3033
3034 def _get_terminate_primitive_params(self, seq, vnf_index):
3035 primitive = seq.get('name')
3036 primitive_params = {}
3037 params = {
3038 "member_vnf_index": vnf_index,
3039 "primitive": primitive,
3040 "primitive_params": primitive_params,
3041 }
3042 desc_params = {}
3043 return self._map_primitive_params(seq, params, desc_params)
3044
3045 # sub-operations
3046
3047 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
3048 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
3049 if op.get('operationState') == 'COMPLETED':
3050 # b. Skip sub-operation
3051 # _ns_execute_primitive() or RO.create_action() will NOT be executed
3052 return self.SUBOPERATION_STATUS_SKIP
3053 else:
3054 # c. retry executing sub-operation
3055 # The sub-operation exists, and operationState != 'COMPLETED'
3056 # Update operationState = 'PROCESSING' to indicate a retry.
3057 operationState = 'PROCESSING'
3058 detailed_status = 'In progress'
3059 self._update_suboperation_status(
3060 db_nslcmop, op_index, operationState, detailed_status)
3061 # Return the sub-operation index
3062 # _ns_execute_primitive() or RO.create_action() will be called from scale()
3063 # with arguments extracted from the sub-operation
3064 return op_index
3065
3066 # Find a sub-operation where all keys in a matching dictionary must match
3067 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
3068 def _find_suboperation(self, db_nslcmop, match):
3069 if db_nslcmop and match:
3070 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
3071 for i, op in enumerate(op_list):
3072 if all(op.get(k) == match[k] for k in match):
3073 return i
3074 return self.SUBOPERATION_STATUS_NOT_FOUND
3075
3076 # Update status for a sub-operation given its index
3077 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
3078 # Update DB for HA tasks
3079 q_filter = {'_id': db_nslcmop['_id']}
3080 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
3081 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
3082 self.db.set_one("nslcmops",
3083 q_filter=q_filter,
3084 update_dict=update_dict,
3085 fail_on_empty=False)
3086
3087 # Add sub-operation, return the index of the added sub-operation
3088 # Optionally, set operationState, detailed-status, and operationType
3089 # Status and type are currently set for 'scale' sub-operations:
3090 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
3091 # 'detailed-status' : status message
3092 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
3093 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
3094 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
3095 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
3096 RO_nsr_id=None, RO_scaling_info=None):
3097 if not db_nslcmop:
3098 return self.SUBOPERATION_STATUS_NOT_FOUND
3099 # Get the "_admin.operations" list, if it exists
3100 db_nslcmop_admin = db_nslcmop.get('_admin', {})
3101 op_list = db_nslcmop_admin.get('operations')
3102 # Create or append to the "_admin.operations" list
3103 new_op = {'member_vnf_index': vnf_index,
3104 'vdu_id': vdu_id,
3105 'vdu_count_index': vdu_count_index,
3106 'primitive': primitive,
3107 'primitive_params': mapped_primitive_params}
3108 if operationState:
3109 new_op['operationState'] = operationState
3110 if detailed_status:
3111 new_op['detailed-status'] = detailed_status
3112 if operationType:
3113 new_op['lcmOperationType'] = operationType
3114 if RO_nsr_id:
3115 new_op['RO_nsr_id'] = RO_nsr_id
3116 if RO_scaling_info:
3117 new_op['RO_scaling_info'] = RO_scaling_info
3118 if not op_list:
3119 # No existing operations, create key 'operations' with current operation as first list element
3120 db_nslcmop_admin.update({'operations': [new_op]})
3121 op_list = db_nslcmop_admin.get('operations')
3122 else:
3123 # Existing operations, append operation to list
3124 op_list.append(new_op)
3125
3126 db_nslcmop_update = {'_admin.operations': op_list}
3127 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
3128 op_index = len(op_list) - 1
3129 return op_index
3130
3131 # Helper methods for scale() sub-operations
3132
3133 # pre-scale/post-scale:
3134 # Check for 3 different cases:
3135 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
3136 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
3137 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
3138 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
3139 operationType, RO_nsr_id=None, RO_scaling_info=None):
3140 # Find this sub-operation
3141 if RO_nsr_id and RO_scaling_info:
3142 operationType = 'SCALE-RO'
3143 match = {
3144 'member_vnf_index': vnf_index,
3145 'RO_nsr_id': RO_nsr_id,
3146 'RO_scaling_info': RO_scaling_info,
3147 }
3148 else:
3149 match = {
3150 'member_vnf_index': vnf_index,
3151 'primitive': vnf_config_primitive,
3152 'primitive_params': primitive_params,
3153 'lcmOperationType': operationType
3154 }
3155 op_index = self._find_suboperation(db_nslcmop, match)
3156 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
3157 # a. New sub-operation
3158 # The sub-operation does not exist, add it.
3159 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3160 # The following parameters are set to None for all kind of scaling:
3161 vdu_id = None
3162 vdu_count_index = None
3163 vdu_name = None
3164 if RO_nsr_id and RO_scaling_info:
3165 vnf_config_primitive = None
3166 primitive_params = None
3167 else:
3168 RO_nsr_id = None
3169 RO_scaling_info = None
3170 # Initial status for sub-operation
3171 operationState = 'PROCESSING'
3172 detailed_status = 'In progress'
3173 # Add sub-operation for pre/post-scaling (zero or more operations)
3174 self._add_suboperation(db_nslcmop,
3175 vnf_index,
3176 vdu_id,
3177 vdu_count_index,
3178 vdu_name,
3179 vnf_config_primitive,
3180 primitive_params,
3181 operationState,
3182 detailed_status,
3183 operationType,
3184 RO_nsr_id,
3185 RO_scaling_info)
3186 return self.SUBOPERATION_STATUS_NEW
3187 else:
3188 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3189 # or op_index (operationState != 'COMPLETED')
3190 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
3191
3192 # Function to return execution_environment id
3193
3194 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
3195 # TODO vdu_index_count
3196 for vca in vca_deployed_list:
3197 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
3198 return vca["ee_id"]
3199
3200 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
3201 vca_index, destroy_ee=True, exec_primitives=True):
3202 """
3203 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3204 :param logging_text:
3205 :param db_nslcmop:
3206 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3207 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3208 :param vca_index: index in the database _admin.deployed.VCA
3209 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3210 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3211 not executed properly
3212 :return: None or exception
3213 """
3214
3215 self.logger.debug(
3216 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3217 vca_index, vca_deployed, config_descriptor, destroy_ee
3218 )
3219 )
3220
3221 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
3222
3223 # execute terminate_primitives
3224 if exec_primitives:
3225 terminate_primitives = self._get_terminate_config_primitive(
3226 config_descriptor.get("terminate-config-primitive"), vca_deployed)
3227 vdu_id = vca_deployed.get("vdu_id")
3228 vdu_count_index = vca_deployed.get("vdu_count_index")
3229 vdu_name = vca_deployed.get("vdu_name")
3230 vnf_index = vca_deployed.get("member-vnf-index")
3231 if terminate_primitives and vca_deployed.get("needed_terminate"):
3232 for seq in terminate_primitives:
3233 # For each sequence in list, get primitive and call _ns_execute_primitive()
3234 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
3235 vnf_index, seq.get("name"))
3236 self.logger.debug(logging_text + step)
3237 # Create the primitive for each sequence, i.e. "primitive": "touch"
3238 primitive = seq.get('name')
3239 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
3240
3241 # Add sub-operation
3242 self._add_suboperation(db_nslcmop,
3243 vnf_index,
3244 vdu_id,
3245 vdu_count_index,
3246 vdu_name,
3247 primitive,
3248 mapped_primitive_params)
3249 # Sub-operations: Call _ns_execute_primitive() instead of action()
3250 try:
3251 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
3252 mapped_primitive_params,
3253 vca_type=vca_type)
3254 except LcmException:
3255 # this happens when VCA is not deployed. In this case it is not needed to terminate
3256 continue
3257 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
3258 if result not in result_ok:
3259 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
3260 "error {}".format(seq.get("name"), vnf_index, result_detail))
3261 # set that this VCA do not need terminated
3262 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
3263 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
3264
3265 if vca_deployed.get("prometheus_jobs") and self.prometheus:
3266 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
3267
3268 if destroy_ee:
3269 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
3270
3271 async def _delete_all_N2VC(self, db_nsr: dict):
3272 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
3273 namespace = "." + db_nsr["_id"]
3274 try:
3275 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
3276 except N2VCNotFound: # already deleted. Skip
3277 pass
3278 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
3279
3280 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
3281 """
3282 Terminates a deployment from RO
3283 :param logging_text:
3284 :param nsr_deployed: db_nsr._admin.deployed
3285 :param nsr_id:
3286 :param nslcmop_id:
3287 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3288 this method will update only the index 2, but it will write on database the concatenated content of the list
3289 :return:
3290 """
3291 db_nsr_update = {}
3292 failed_detail = []
3293 ro_nsr_id = ro_delete_action = None
3294 if nsr_deployed and nsr_deployed.get("RO"):
3295 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
3296 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
3297 try:
3298 if ro_nsr_id:
3299 stage[2] = "Deleting ns from VIM."
3300 db_nsr_update["detailed-status"] = " ".join(stage)
3301 self._write_op_status(nslcmop_id, stage)
3302 self.logger.debug(logging_text + stage[2])
3303 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3304 self._write_op_status(nslcmop_id, stage)
3305 desc = await self.RO.delete("ns", ro_nsr_id)
3306 ro_delete_action = desc["action_id"]
3307 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
3308 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3309 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3310 if ro_delete_action:
3311 # wait until NS is deleted from VIM
3312 stage[2] = "Waiting ns deleted from VIM."
3313 detailed_status_old = None
3314 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
3315 ro_delete_action))
3316 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3317 self._write_op_status(nslcmop_id, stage)
3318
3319 delete_timeout = 20 * 60 # 20 minutes
3320 while delete_timeout > 0:
3321 desc = await self.RO.show(
3322 "ns",
3323 item_id_name=ro_nsr_id,
3324 extra_item="action",
3325 extra_item_id=ro_delete_action)
3326
3327 # deploymentStatus
3328 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
3329
3330 ns_status, ns_status_info = self.RO.check_action_status(desc)
3331 if ns_status == "ERROR":
3332 raise ROclient.ROClientException(ns_status_info)
3333 elif ns_status == "BUILD":
3334 stage[2] = "Deleting from VIM {}".format(ns_status_info)
3335 elif ns_status == "ACTIVE":
3336 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3337 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3338 break
3339 else:
3340 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
3341 if stage[2] != detailed_status_old:
3342 detailed_status_old = stage[2]
3343 db_nsr_update["detailed-status"] = " ".join(stage)
3344 self._write_op_status(nslcmop_id, stage)
3345 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3346 await asyncio.sleep(5, loop=self.loop)
3347 delete_timeout -= 5
3348 else: # delete_timeout <= 0:
3349 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
3350
3351 except Exception as e:
3352 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3353 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3354 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
3355 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
3356 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
3357 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
3358 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3359 failed_detail.append("delete conflict: {}".format(e))
3360 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
3361 else:
3362 failed_detail.append("delete error: {}".format(e))
3363 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
3364
3365 # Delete nsd
3366 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
3367 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
3368 try:
3369 stage[2] = "Deleting nsd from RO."
3370 db_nsr_update["detailed-status"] = " ".join(stage)
3371 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3372 self._write_op_status(nslcmop_id, stage)
3373 await self.RO.delete("nsd", ro_nsd_id)
3374 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
3375 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3376 except Exception as e:
3377 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3378 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
3379 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
3380 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3381 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
3382 self.logger.debug(logging_text + failed_detail[-1])
3383 else:
3384 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
3385 self.logger.error(logging_text + failed_detail[-1])
3386
3387 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
3388 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
3389 if not vnf_deployed or not vnf_deployed["id"]:
3390 continue
3391 try:
3392 ro_vnfd_id = vnf_deployed["id"]
3393 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3394 vnf_deployed["member-vnf-index"], ro_vnfd_id)
3395 db_nsr_update["detailed-status"] = " ".join(stage)
3396 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3397 self._write_op_status(nslcmop_id, stage)
3398 await self.RO.delete("vnfd", ro_vnfd_id)
3399 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
3400 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3401 except Exception as e:
3402 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
3403 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
3404 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
3405 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
3406 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
3407 self.logger.debug(logging_text + failed_detail[-1])
3408 else:
3409 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
3410 self.logger.error(logging_text + failed_detail[-1])
3411
3412 if failed_detail:
3413 stage[2] = "Error deleting from VIM"
3414 else:
3415 stage[2] = "Deleted from VIM"
3416 db_nsr_update["detailed-status"] = " ".join(stage)
3417 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3418 self._write_op_status(nslcmop_id, stage)
3419
3420 if failed_detail:
3421 raise LcmException("; ".join(failed_detail))
3422
3423 async def terminate(self, nsr_id, nslcmop_id):
3424 # Try to lock HA task here
3425 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3426 if not task_is_locked_by_me:
3427 return
3428
3429 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
3430 self.logger.debug(logging_text + "Enter")
3431 timeout_ns_terminate = self.timeout_ns_terminate
3432 db_nsr = None
3433 db_nslcmop = None
3434 operation_params = None
3435 exc = None
3436 error_list = [] # annotates all failed error messages
3437 db_nslcmop_update = {}
3438 autoremove = False # autoremove after terminated
3439 tasks_dict_info = {}
3440 db_nsr_update = {}
3441 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3442 # ^ contains [stage, step, VIM-status]
3443 try:
3444 # wait for any previous tasks in process
3445 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3446
3447 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3448 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3449 operation_params = db_nslcmop.get("operationParams") or {}
3450 if operation_params.get("timeout_ns_terminate"):
3451 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3452 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3453 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3454
3455 db_nsr_update["operational-status"] = "terminating"
3456 db_nsr_update["config-status"] = "terminating"
3457 self._write_ns_status(
3458 nsr_id=nsr_id,
3459 ns_state="TERMINATING",
3460 current_operation="TERMINATING",
3461 current_operation_id=nslcmop_id,
3462 other_update=db_nsr_update
3463 )
3464 self._write_op_status(
3465 op_id=nslcmop_id,
3466 queuePosition=0,
3467 stage=stage
3468 )
3469 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3470 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3471 return
3472
3473 stage[1] = "Getting vnf descriptors from db."
3474 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3475 db_vnfds_from_id = {}
3476 db_vnfds_from_member_index = {}
3477 # Loop over VNFRs
3478 for vnfr in db_vnfrs_list:
3479 vnfd_id = vnfr["vnfd-id"]
3480 if vnfd_id not in db_vnfds_from_id:
3481 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3482 db_vnfds_from_id[vnfd_id] = vnfd
3483 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3484
3485 # Destroy individual execution environments when there are terminating primitives.
3486 # Rest of EE will be deleted at once
3487 # TODO - check before calling _destroy_N2VC
3488 # if not operation_params.get("skip_terminate_primitives"):#
3489 # or not vca.get("needed_terminate"):
3490 stage[0] = "Stage 2/3 execute terminating primitives."
3491 self.logger.debug(logging_text + stage[0])
3492 stage[1] = "Looking execution environment that needs terminate."
3493 self.logger.debug(logging_text + stage[1])
3494 # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3495 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3496 config_descriptor = None
3497 if not vca or not vca.get("ee_id"):
3498 continue
3499 if not vca.get("member-vnf-index"):
3500 # ns
3501 config_descriptor = db_nsr.get("ns-configuration")
3502 elif vca.get("vdu_id"):
3503 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3504 vdud = next((vdu for vdu in db_vnfd.get("vdu", ()) if vdu["id"] == vca.get("vdu_id")), None)
3505 if vdud:
3506 config_descriptor = vdud.get("vdu-configuration")
3507 elif vca.get("kdu_name"):
3508 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3509 kdud = next((kdu for kdu in db_vnfd.get("kdu", ()) if kdu["name"] == vca.get("kdu_name")), None)
3510 if kdud:
3511 config_descriptor = kdud.get("kdu-configuration")
3512 else:
3513 config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
3514 vca_type = vca.get("type")
3515 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3516 vca.get("needed_terminate"))
3517 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3518 # pending native charms
3519 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3520 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3521 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3522 task = asyncio.ensure_future(
3523 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3524 destroy_ee, exec_terminate_primitives))
3525 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3526
3527 # wait for pending tasks of terminate primitives
3528 if tasks_dict_info:
3529 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3530 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3531 min(self.timeout_charm_delete, timeout_ns_terminate),
3532 stage, nslcmop_id)
3533 tasks_dict_info.clear()
3534 if error_list:
3535 return # raise LcmException("; ".join(error_list))
3536
3537 # remove All execution environments at once
3538 stage[0] = "Stage 3/3 delete all."
3539
3540 if nsr_deployed.get("VCA"):
3541 stage[1] = "Deleting all execution environments."
3542 self.logger.debug(logging_text + stage[1])
3543 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3544 timeout=self.timeout_charm_delete))
3545 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3546 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3547
3548 # Delete from k8scluster
3549 stage[1] = "Deleting KDUs."
3550 self.logger.debug(logging_text + stage[1])
3551 # print(nsr_deployed)
3552 for kdu in get_iterable(nsr_deployed, "K8s"):
3553 if not kdu or not kdu.get("kdu-instance"):
3554 continue
3555 kdu_instance = kdu.get("kdu-instance")
3556 if kdu.get("k8scluster-type") in self.k8scluster_map:
3557 task_delete_kdu_instance = asyncio.ensure_future(
3558 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3559 cluster_uuid=kdu.get("k8scluster-uuid"),
3560 kdu_instance=kdu_instance))
3561 else:
3562 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3563 format(kdu.get("k8scluster-type")))
3564 continue
3565 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3566
3567 # remove from RO
3568 stage[1] = "Deleting ns from VIM."
3569 if self.ng_ro:
3570 task_delete_ro = asyncio.ensure_future(
3571 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3572 else:
3573 task_delete_ro = asyncio.ensure_future(
3574 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3575 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3576
3577 # rest of staff will be done at finally
3578
3579 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3580 self.logger.error(logging_text + "Exit Exception {}".format(e))
3581 exc = e
3582 except asyncio.CancelledError:
3583 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3584 exc = "Operation was cancelled"
3585 except Exception as e:
3586 exc = traceback.format_exc()
3587 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3588 finally:
3589 if exc:
3590 error_list.append(str(exc))
3591 try:
3592 # wait for pending tasks
3593 if tasks_dict_info:
3594 stage[1] = "Waiting for terminate pending tasks."
3595 self.logger.debug(logging_text + stage[1])
3596 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3597 stage, nslcmop_id)
3598 stage[1] = stage[2] = ""
3599 except asyncio.CancelledError:
3600 error_list.append("Cancelled")
3601 # TODO cancell all tasks
3602 except Exception as exc:
3603 error_list.append(str(exc))
3604 # update status at database
3605 if error_list:
3606 error_detail = "; ".join(error_list)
3607 # self.logger.error(logging_text + error_detail)
3608 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3609 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3610
3611 db_nsr_update["operational-status"] = "failed"
3612 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3613 db_nslcmop_update["detailed-status"] = error_detail
3614 nslcmop_operation_state = "FAILED"
3615 ns_state = "BROKEN"
3616 else:
3617 error_detail = None
3618 error_description_nsr = error_description_nslcmop = None
3619 ns_state = "NOT_INSTANTIATED"
3620 db_nsr_update["operational-status"] = "terminated"
3621 db_nsr_update["detailed-status"] = "Done"
3622 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3623 db_nslcmop_update["detailed-status"] = "Done"
3624 nslcmop_operation_state = "COMPLETED"
3625
3626 if db_nsr:
3627 self._write_ns_status(
3628 nsr_id=nsr_id,
3629 ns_state=ns_state,
3630 current_operation="IDLE",
3631 current_operation_id=None,
3632 error_description=error_description_nsr,
3633 error_detail=error_detail,
3634 other_update=db_nsr_update
3635 )
3636 self._write_op_status(
3637 op_id=nslcmop_id,
3638 stage="",
3639 error_message=error_description_nslcmop,
3640 operation_state=nslcmop_operation_state,
3641 other_update=db_nslcmop_update,
3642 )
3643 if ns_state == "NOT_INSTANTIATED":
3644 try:
3645 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3646 except DbException as e:
3647 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3648 format(nsr_id, e))
3649 if operation_params:
3650 autoremove = operation_params.get("autoremove", False)
3651 if nslcmop_operation_state:
3652 try:
3653 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3654 "operationState": nslcmop_operation_state,
3655 "autoremove": autoremove},
3656 loop=self.loop)
3657 except Exception as e:
3658 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3659
3660 self.logger.debug(logging_text + "Exit")
3661 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3662
3663 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3664 time_start = time()
3665 error_detail_list = []
3666 error_list = []
3667 pending_tasks = list(created_tasks_info.keys())
3668 num_tasks = len(pending_tasks)
3669 num_done = 0
3670 stage[1] = "{}/{}.".format(num_done, num_tasks)
3671 self._write_op_status(nslcmop_id, stage)
3672 while pending_tasks:
3673 new_error = None
3674 _timeout = timeout + time_start - time()
3675 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3676 return_when=asyncio.FIRST_COMPLETED)
3677 num_done += len(done)
3678 if not done: # Timeout
3679 for task in pending_tasks:
3680 new_error = created_tasks_info[task] + ": Timeout"
3681 error_detail_list.append(new_error)
3682 error_list.append(new_error)
3683 break
3684 for task in done:
3685 if task.cancelled():
3686 exc = "Cancelled"
3687 else:
3688 exc = task.exception()
3689 if exc:
3690 if isinstance(exc, asyncio.TimeoutError):
3691 exc = "Timeout"
3692 new_error = created_tasks_info[task] + ": {}".format(exc)
3693 error_list.append(created_tasks_info[task])
3694 error_detail_list.append(new_error)
3695 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3696 K8sException, NgRoException)):
3697 self.logger.error(logging_text + new_error)
3698 else:
3699 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3700 self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
3701 else:
3702 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3703 stage[1] = "{}/{}.".format(num_done, num_tasks)
3704 if new_error:
3705 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3706 if nsr_id: # update also nsr
3707 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3708 "errorDetail": ". ".join(error_detail_list)})
3709 self._write_op_status(nslcmop_id, stage)
3710 return error_detail_list
3711
3712 @staticmethod
3713 def _map_primitive_params(primitive_desc, params, instantiation_params):
3714 """
3715 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3716 The default-value is used. If it is between < > it look for a value at instantiation_params
3717 :param primitive_desc: portion of VNFD/NSD that describes primitive
3718 :param params: Params provided by user
3719 :param instantiation_params: Instantiation params provided by user
3720 :return: a dictionary with the calculated params
3721 """
3722 calculated_params = {}
3723 for parameter in primitive_desc.get("parameter", ()):
3724 param_name = parameter["name"]
3725 if param_name in params:
3726 calculated_params[param_name] = params[param_name]
3727 elif "default-value" in parameter or "value" in parameter:
3728 if "value" in parameter:
3729 calculated_params[param_name] = parameter["value"]
3730 else:
3731 calculated_params[param_name] = parameter["default-value"]
3732 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3733 and calculated_params[param_name].endswith(">"):
3734 if calculated_params[param_name][1:-1] in instantiation_params:
3735 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3736 else:
3737 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3738 format(calculated_params[param_name], primitive_desc["name"]))
3739 else:
3740 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3741 format(param_name, primitive_desc["name"]))
3742
3743 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3744 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name], default_flow_style=True,
3745 width=256)
3746 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3747 calculated_params[param_name] = calculated_params[param_name][7:]
3748 if parameter.get("data-type") == "INTEGER":
3749 try:
3750 calculated_params[param_name] = int(calculated_params[param_name])
3751 except ValueError: # error converting string to int
3752 raise LcmException(
3753 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3754 elif parameter.get("data-type") == "BOOLEAN":
3755 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3756
3757 # add always ns_config_info if primitive name is config
3758 if primitive_desc["name"] == "config":
3759 if "ns_config_info" in instantiation_params:
3760 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3761 return calculated_params
3762
3763 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3764 ee_descriptor_id=None):
3765 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3766 for vca in deployed_vca:
3767 if not vca:
3768 continue
3769 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3770 continue
3771 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3772 continue
3773 if kdu_name and kdu_name != vca["kdu_name"]:
3774 continue
3775 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3776 continue
3777 break
3778 else:
3779 # vca_deployed not found
3780 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3781 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3782 ee_descriptor_id))
3783
3784 # get ee_id
3785 ee_id = vca.get("ee_id")
3786 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3787 if not ee_id:
3788 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3789 "execution environment"
3790 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3791 return ee_id, vca_type
3792
3793 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0,
3794 retries_interval=30, timeout=None,
3795 vca_type=None, db_dict=None) -> (str, str):
3796 try:
3797 if primitive == "config":
3798 primitive_params = {"params": primitive_params}
3799
3800 vca_type = vca_type or "lxc_proxy_charm"
3801
3802 while retries >= 0:
3803 try:
3804 output = await asyncio.wait_for(
3805 self.vca_map[vca_type].exec_primitive(
3806 ee_id=ee_id,
3807 primitive_name=primitive,
3808 params_dict=primitive_params,
3809 progress_timeout=self.timeout_progress_primitive,
3810 total_timeout=self.timeout_primitive,
3811 db_dict=db_dict),
3812 timeout=timeout or self.timeout_primitive)
3813 # execution was OK
3814 break
3815 except asyncio.CancelledError:
3816 raise
3817 except Exception as e: # asyncio.TimeoutError
3818 if isinstance(e, asyncio.TimeoutError):
3819 e = "Timeout"
3820 retries -= 1
3821 if retries >= 0:
3822 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3823 # wait and retry
3824 await asyncio.sleep(retries_interval, loop=self.loop)
3825 else:
3826 return 'FAILED', str(e)
3827
3828 return 'COMPLETED', output
3829
3830 except (LcmException, asyncio.CancelledError):
3831 raise
3832 except Exception as e:
3833 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3834
3835 async def action(self, nsr_id, nslcmop_id):
3836
3837 # Try to lock HA task here
3838 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3839 if not task_is_locked_by_me:
3840 return
3841
3842 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3843 self.logger.debug(logging_text + "Enter")
3844 # get all needed from database
3845 db_nsr = None
3846 db_nslcmop = None
3847 db_nsr_update = {}
3848 db_nslcmop_update = {}
3849 nslcmop_operation_state = None
3850 error_description_nslcmop = None
3851 exc = None
3852 try:
3853 # wait for any previous tasks in process
3854 step = "Waiting for previous operations to terminate"
3855 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3856
3857 self._write_ns_status(
3858 nsr_id=nsr_id,
3859 ns_state=None,
3860 current_operation="RUNNING ACTION",
3861 current_operation_id=nslcmop_id
3862 )
3863
3864 step = "Getting information from database"
3865 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3866 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3867
3868 nsr_deployed = db_nsr["_admin"].get("deployed")
3869 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3870 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3871 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3872 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3873 primitive = db_nslcmop["operationParams"]["primitive"]
3874 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3875 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3876
3877 if vnf_index:
3878 step = "Getting vnfr from database"
3879 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3880 step = "Getting vnfd from database"
3881 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3882 else:
3883 step = "Getting nsd from database"
3884 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3885
3886 # for backward compatibility
3887 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3888 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3889 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3890 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3891
3892 # look for primitive
3893 config_primitive_desc = descriptor_configuration = None
3894 if vdu_id:
3895 for vdu in get_iterable(db_vnfd, "vdu"):
3896 if vdu_id == vdu["id"]:
3897 descriptor_configuration = vdu.get("vdu-configuration")
3898 break
3899 elif kdu_name:
3900 for kdu in get_iterable(db_vnfd, "kdu"):
3901 if kdu_name == kdu["name"]:
3902 descriptor_configuration = kdu.get("kdu-configuration")
3903 break
3904 elif vnf_index:
3905 descriptor_configuration = db_vnfd.get("vnf-configuration")
3906 else:
3907 descriptor_configuration = db_nsd.get("ns-configuration")
3908
3909 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3910 for config_primitive in descriptor_configuration["config-primitive"]:
3911 if config_primitive["name"] == primitive:
3912 config_primitive_desc = config_primitive
3913 break
3914
3915 if not config_primitive_desc:
3916 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3917 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3918 format(primitive))
3919 primitive_name = primitive
3920 ee_descriptor_id = None
3921 else:
3922 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3923 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3924
3925 if vnf_index:
3926 if vdu_id:
3927 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3928 desc_params = self._format_additional_params(vdur.get("additionalParams"))
3929 elif kdu_name:
3930 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3931 desc_params = self._format_additional_params(kdur.get("additionalParams"))
3932 else:
3933 desc_params = self._format_additional_params(db_vnfr.get("additionalParamsForVnf"))
3934 else:
3935 desc_params = self._format_additional_params(db_nsr.get("additionalParamsForNs"))
3936
3937 if kdu_name:
3938 kdu_action = True if not deep_get(kdu, ("kdu-configuration", "juju")) else False
3939
3940 # TODO check if ns is in a proper status
3941 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3942 # kdur and desc_params already set from before
3943 if primitive_params:
3944 desc_params.update(primitive_params)
3945 # TODO Check if we will need something at vnf level
3946 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3947 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3948 break
3949 else:
3950 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3951
3952 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3953 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3954 raise LcmException(msg)
3955
3956 db_dict = {"collection": "nsrs",
3957 "filter": {"_id": nsr_id},
3958 "path": "_admin.deployed.K8s.{}".format(index)}
3959 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3960 step = "Executing kdu {}".format(primitive_name)
3961 if primitive_name == "upgrade":
3962 if desc_params.get("kdu_model"):
3963 kdu_model = desc_params.get("kdu_model")
3964 del desc_params["kdu_model"]
3965 else:
3966 kdu_model = kdu.get("kdu-model")
3967 parts = kdu_model.split(sep=":")
3968 if len(parts) == 2:
3969 kdu_model = parts[0]
3970
3971 detailed_status = await asyncio.wait_for(
3972 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3973 cluster_uuid=kdu.get("k8scluster-uuid"),
3974 kdu_instance=kdu.get("kdu-instance"),
3975 atomic=True, kdu_model=kdu_model,
3976 params=desc_params, db_dict=db_dict,
3977 timeout=timeout_ns_action),
3978 timeout=timeout_ns_action + 10)
3979 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3980 elif primitive_name == "rollback":
3981 detailed_status = await asyncio.wait_for(
3982 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3983 cluster_uuid=kdu.get("k8scluster-uuid"),
3984 kdu_instance=kdu.get("kdu-instance"),
3985 db_dict=db_dict),
3986 timeout=timeout_ns_action)
3987 elif primitive_name == "status":
3988 detailed_status = await asyncio.wait_for(
3989 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3990 cluster_uuid=kdu.get("k8scluster-uuid"),
3991 kdu_instance=kdu.get("kdu-instance")),
3992 timeout=timeout_ns_action)
3993 else:
3994 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3995 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3996
3997 detailed_status = await asyncio.wait_for(
3998 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3999 cluster_uuid=kdu.get("k8scluster-uuid"),
4000 kdu_instance=kdu_instance,
4001 primitive_name=primitive_name,
4002 params=params, db_dict=db_dict,
4003 timeout=timeout_ns_action),
4004 timeout=timeout_ns_action)
4005
4006 if detailed_status:
4007 nslcmop_operation_state = 'COMPLETED'
4008 else:
4009 detailed_status = ''
4010 nslcmop_operation_state = 'FAILED'
4011 else:
4012 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4013 member_vnf_index=vnf_index,
4014 vdu_id=vdu_id,
4015 vdu_count_index=vdu_count_index,
4016 ee_descriptor_id=ee_descriptor_id)
4017 db_nslcmop_notif = {"collection": "nslcmops",
4018 "filter": {"_id": nslcmop_id},
4019 "path": "admin.VCA"}
4020 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
4021 ee_id,
4022 primitive=primitive_name,
4023 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
4024 timeout=timeout_ns_action,
4025 vca_type=vca_type,
4026 db_dict=db_nslcmop_notif)
4027
4028 db_nslcmop_update["detailed-status"] = detailed_status
4029 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
4030 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
4031 detailed_status))
4032 return # database update is called inside finally
4033
4034 except (DbException, LcmException, N2VCException, K8sException) as e:
4035 self.logger.error(logging_text + "Exit Exception {}".format(e))
4036 exc = e
4037 except asyncio.CancelledError:
4038 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4039 exc = "Operation was cancelled"
4040 except asyncio.TimeoutError:
4041 self.logger.error(logging_text + "Timeout while '{}'".format(step))
4042 exc = "Timeout"
4043 except Exception as e:
4044 exc = traceback.format_exc()
4045 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4046 finally:
4047 if exc:
4048 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
4049 "FAILED {}: {}".format(step, exc)
4050 nslcmop_operation_state = "FAILED"
4051 if db_nsr:
4052 self._write_ns_status(
4053 nsr_id=nsr_id,
4054 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
4055 current_operation="IDLE",
4056 current_operation_id=None,
4057 # error_description=error_description_nsr,
4058 # error_detail=error_detail,
4059 other_update=db_nsr_update
4060 )
4061
4062 self._write_op_status(
4063 op_id=nslcmop_id,
4064 stage="",
4065 error_message=error_description_nslcmop,
4066 operation_state=nslcmop_operation_state,
4067 other_update=db_nslcmop_update,
4068 )
4069
4070 if nslcmop_operation_state:
4071 try:
4072 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
4073 "operationState": nslcmop_operation_state},
4074 loop=self.loop)
4075 except Exception as e:
4076 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4077 self.logger.debug(logging_text + "Exit")
4078 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
4079 return nslcmop_operation_state, detailed_status
4080
4081 async def scale(self, nsr_id, nslcmop_id):
4082
4083 # Try to lock HA task here
4084 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
4085 if not task_is_locked_by_me:
4086 return
4087
4088 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
4089 stage = ['', '', '']
4090 # ^ stage, step, VIM progress
4091 self.logger.debug(logging_text + "Enter")
4092 # get all needed from database
4093 db_nsr = None
4094 db_nslcmop = None
4095 db_nslcmop_update = {}
4096 nslcmop_operation_state = None
4097 db_nsr_update = {}
4098 exc = None
4099 # in case of error, indicates what part of scale was failed to put nsr at error status
4100 scale_process = None
4101 old_operational_status = ""
4102 old_config_status = ""
4103 try:
4104 # wait for any previous tasks in process
4105 step = "Waiting for previous operations to terminate"
4106 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
4107
4108 self._write_ns_status(
4109 nsr_id=nsr_id,
4110 ns_state=None,
4111 current_operation="SCALING",
4112 current_operation_id=nslcmop_id
4113 )
4114
4115 step = "Getting nslcmop from database"
4116 self.logger.debug(step + " after having waited for previous tasks to be completed")
4117 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
4118 step = "Getting nsr from database"
4119 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
4120
4121 old_operational_status = db_nsr["operational-status"]
4122 old_config_status = db_nsr["config-status"]
4123 step = "Parsing scaling parameters"
4124 # self.logger.debug(step)
4125 db_nsr_update["operational-status"] = "scaling"
4126 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4127 nsr_deployed = db_nsr["_admin"].get("deployed")
4128
4129 #######
4130 nsr_deployed = db_nsr["_admin"].get("deployed")
4131 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
4132 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
4133 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
4134 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
4135 #######
4136
4137 RO_nsr_id = nsr_deployed["RO"].get("nsr_id")
4138 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
4139 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
4140 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
4141 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
4142
4143 # for backward compatibility
4144 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
4145 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
4146 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
4147 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4148
4149 step = "Getting vnfr from database"
4150 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
4151 step = "Getting vnfd from database"
4152 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
4153
4154 step = "Getting scaling-group-descriptor"
4155 for scaling_descriptor in db_vnfd["scaling-group-descriptor"]:
4156 if scaling_descriptor["name"] == scaling_group:
4157 break
4158 else:
4159 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
4160 "at vnfd:scaling-group-descriptor".format(scaling_group))
4161
4162 # cooldown_time = 0
4163 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
4164 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
4165 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
4166 # break
4167
4168 # TODO check if ns is in a proper status
4169 step = "Sending scale order to VIM"
4170 nb_scale_op = 0
4171 if not db_nsr["_admin"].get("scaling-group"):
4172 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
4173 admin_scale_index = 0
4174 else:
4175 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
4176 if admin_scale_info["name"] == scaling_group:
4177 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
4178 break
4179 else: # not found, set index one plus last element and add new entry with the name
4180 admin_scale_index += 1
4181 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
4182 RO_scaling_info = []
4183 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
4184 if scaling_type == "SCALE_OUT":
4185 # count if max-instance-count is reached
4186 max_instance_count = scaling_descriptor.get("max-instance-count", 10)
4187 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
4188 if nb_scale_op >= max_instance_count:
4189 raise LcmException("reached the limit of {} (max-instance-count) "
4190 "scaling-out operations for the "
4191 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4192
4193 nb_scale_op += 1
4194 vdu_scaling_info["scaling_direction"] = "OUT"
4195 vdu_scaling_info["vdu-create"] = {}
4196 for vdu_scale_info in scaling_descriptor["vdu"]:
4197 vdud = next(vdu for vdu in db_vnfd.get("vdu") if vdu["id"] == vdu_scale_info["vdu-id-ref"])
4198 vdu_index = len([x for x in db_vnfr.get("vdur", ())
4199 if x.get("vdu-id-ref") == vdu_scale_info["vdu-id-ref"] and
4200 x.get("member-vnf-index-ref") == vnf_index])
4201 cloud_init_text = self._get_cloud_init(vdud, db_vnfd)
4202 if cloud_init_text:
4203 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
4204 cloud_init_list = []
4205 for x in range(vdu_scale_info.get("count", 1)):
4206 if cloud_init_text:
4207 # TODO Information of its own ip is not available because db_vnfr is not updated.
4208 additional_params["OSM"] = self._get_osm_params(db_vnfr, vdu_scale_info["vdu-id-ref"],
4209 vdu_index + x)
4210 cloud_init_list.append(self._parse_cloud_init(cloud_init_text, additional_params,
4211 db_vnfd["id"], vdud["id"]))
4212 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4213 "type": "create", "count": vdu_scale_info.get("count", 1)})
4214 if cloud_init_list:
4215 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
4216 vdu_scaling_info["vdu-create"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4217
4218 elif scaling_type == "SCALE_IN":
4219 # count if min-instance-count is reached
4220 min_instance_count = 0
4221 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
4222 min_instance_count = int(scaling_descriptor["min-instance-count"])
4223 if nb_scale_op <= min_instance_count:
4224 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
4225 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group))
4226 nb_scale_op -= 1
4227 vdu_scaling_info["scaling_direction"] = "IN"
4228 vdu_scaling_info["vdu-delete"] = {}
4229 for vdu_scale_info in scaling_descriptor["vdu"]:
4230 RO_scaling_info.append({"osm_vdu_id": vdu_scale_info["vdu-id-ref"], "member-vnf-index": vnf_index,
4231 "type": "delete", "count": vdu_scale_info.get("count", 1)})
4232 vdu_scaling_info["vdu-delete"][vdu_scale_info["vdu-id-ref"]] = vdu_scale_info.get("count", 1)
4233
4234 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
4235 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
4236 if vdu_scaling_info["scaling_direction"] == "IN":
4237 for vdur in reversed(db_vnfr["vdur"]):
4238 if vdu_delete.get(vdur["vdu-id-ref"]):
4239 vdu_delete[vdur["vdu-id-ref"]] -= 1
4240 vdu_scaling_info["vdu"].append({
4241 "name": vdur.get("name") or vdur.get("vdu-name"),
4242 "vdu_id": vdur["vdu-id-ref"],
4243 "interface": []
4244 })
4245 for interface in vdur["interfaces"]:
4246 vdu_scaling_info["vdu"][-1]["interface"].append({
4247 "name": interface["name"],
4248 "ip_address": interface["ip-address"],
4249 "mac_address": interface.get("mac-address"),
4250 })
4251 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
4252
4253 # PRE-SCALE BEGIN
4254 step = "Executing pre-scale vnf-config-primitive"
4255 if scaling_descriptor.get("scaling-config-action"):
4256 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4257 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
4258 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
4259 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4260 step = db_nslcmop_update["detailed-status"] = \
4261 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
4262
4263 # look for primitive
4264 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4265 if config_primitive["name"] == vnf_config_primitive:
4266 break
4267 else:
4268 raise LcmException(
4269 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4270 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4271 "primitive".format(scaling_group, vnf_config_primitive))
4272
4273 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4274 if db_vnfr.get("additionalParamsForVnf"):
4275 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4276
4277 scale_process = "VCA"
4278 db_nsr_update["config-status"] = "configuring pre-scaling"
4279 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4280
4281 # Pre-scale retry check: Check if this sub-operation has been executed before
4282 op_index = self._check_or_add_scale_suboperation(
4283 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
4284 if op_index == self.SUBOPERATION_STATUS_SKIP:
4285 # Skip sub-operation
4286 result = 'COMPLETED'
4287 result_detail = 'Done'
4288 self.logger.debug(logging_text +
4289 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4290 vnf_config_primitive, result, result_detail))
4291 else:
4292 if op_index == self.SUBOPERATION_STATUS_NEW:
4293 # New sub-operation: Get index of this sub-operation
4294 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4295 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4296 format(vnf_config_primitive))
4297 else:
4298 # retry: Get registered params for this existing sub-operation
4299 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4300 vnf_index = op.get('member_vnf_index')
4301 vnf_config_primitive = op.get('primitive')
4302 primitive_params = op.get('primitive_params')
4303 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4304 format(vnf_config_primitive))
4305 # Execute the primitive, either with new (first-time) or registered (reintent) args
4306 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4307 primitive_name = config_primitive.get("execution-environment-primitive",
4308 vnf_config_primitive)
4309 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4310 member_vnf_index=vnf_index,
4311 vdu_id=None,
4312 vdu_count_index=None,
4313 ee_descriptor_id=ee_descriptor_id)
4314 result, result_detail = await self._ns_execute_primitive(
4315 ee_id, primitive_name, primitive_params, vca_type)
4316 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4317 vnf_config_primitive, result, result_detail))
4318 # Update operationState = COMPLETED | FAILED
4319 self._update_suboperation_status(
4320 db_nslcmop, op_index, result, result_detail)
4321
4322 if result == "FAILED":
4323 raise LcmException(result_detail)
4324 db_nsr_update["config-status"] = old_config_status
4325 scale_process = None
4326 # PRE-SCALE END
4327
4328 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
4329 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
4330
4331 # SCALE RO - BEGIN
4332 if RO_scaling_info:
4333 scale_process = "RO"
4334 if self.ro_config.get("ng"):
4335 await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
4336 else:
4337 await self._RO_scale(logging_text, RO_nsr_id, RO_scaling_info, db_nslcmop, db_vnfr,
4338 db_nslcmop_update, vdu_scaling_info)
4339 vdu_scaling_info.pop("vdu-create", None)
4340 vdu_scaling_info.pop("vdu-delete", None)
4341
4342 scale_process = None
4343 if db_nsr_update:
4344 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4345
4346 # POST-SCALE BEGIN
4347 # execute primitive service POST-SCALING
4348 step = "Executing post-scale vnf-config-primitive"
4349 if scaling_descriptor.get("scaling-config-action"):
4350 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4351 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4352 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4353 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4354 step = db_nslcmop_update["detailed-status"] = \
4355 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4356
4357 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4358 if db_vnfr.get("additionalParamsForVnf"):
4359 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4360
4361 # look for primitive
4362 for config_primitive in db_vnfd.get("vnf-configuration", {}).get("config-primitive", ()):
4363 if config_primitive["name"] == vnf_config_primitive:
4364 break
4365 else:
4366 raise LcmException(
4367 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4368 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4369 "config-primitive".format(scaling_group, vnf_config_primitive))
4370 scale_process = "VCA"
4371 db_nsr_update["config-status"] = "configuring post-scaling"
4372 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4373
4374 # Post-scale retry check: Check if this sub-operation has been executed before
4375 op_index = self._check_or_add_scale_suboperation(
4376 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4377 if op_index == self.SUBOPERATION_STATUS_SKIP:
4378 # Skip sub-operation
4379 result = 'COMPLETED'
4380 result_detail = 'Done'
4381 self.logger.debug(logging_text +
4382 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4383 format(vnf_config_primitive, result, result_detail))
4384 else:
4385 if op_index == self.SUBOPERATION_STATUS_NEW:
4386 # New sub-operation: Get index of this sub-operation
4387 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4388 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4389 format(vnf_config_primitive))
4390 else:
4391 # retry: Get registered params for this existing sub-operation
4392 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4393 vnf_index = op.get('member_vnf_index')
4394 vnf_config_primitive = op.get('primitive')
4395 primitive_params = op.get('primitive_params')
4396 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4397 format(vnf_config_primitive))
4398 # Execute the primitive, either with new (first-time) or registered (reintent) args
4399 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4400 primitive_name = config_primitive.get("execution-environment-primitive",
4401 vnf_config_primitive)
4402 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4403 member_vnf_index=vnf_index,
4404 vdu_id=None,
4405 vdu_count_index=None,
4406 ee_descriptor_id=ee_descriptor_id)
4407 result, result_detail = await self._ns_execute_primitive(
4408 ee_id, primitive_name, primitive_params, vca_type)
4409 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4410 vnf_config_primitive, result, result_detail))
4411 # Update operationState = COMPLETED | FAILED
4412 self._update_suboperation_status(
4413 db_nslcmop, op_index, result, result_detail)
4414
4415 if result == "FAILED":
4416 raise LcmException(result_detail)
4417 db_nsr_update["config-status"] = old_config_status
4418 scale_process = None
4419 # POST-SCALE END
4420
4421 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4422 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4423 else old_operational_status
4424 db_nsr_update["config-status"] = old_config_status
4425 return
4426 except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
4427 self.logger.error(logging_text + "Exit Exception {}".format(e))
4428 exc = e
4429 except asyncio.CancelledError:
4430 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4431 exc = "Operation was cancelled"
4432 except Exception as e:
4433 exc = traceback.format_exc()
4434 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4435 finally:
4436 self._write_ns_status(
4437 nsr_id=nsr_id,
4438 ns_state=None,
4439 current_operation="IDLE",
4440 current_operation_id=None
4441 )
4442 if exc:
4443 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4444 nslcmop_operation_state = "FAILED"
4445 if db_nsr:
4446 db_nsr_update["operational-status"] = old_operational_status
4447 db_nsr_update["config-status"] = old_config_status
4448 db_nsr_update["detailed-status"] = ""
4449 if scale_process:
4450 if "VCA" in scale_process:
4451 db_nsr_update["config-status"] = "failed"
4452 if "RO" in scale_process:
4453 db_nsr_update["operational-status"] = "failed"
4454 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4455 exc)
4456 else:
4457 error_description_nslcmop = None
4458 nslcmop_operation_state = "COMPLETED"
4459 db_nslcmop_update["detailed-status"] = "Done"
4460
4461 self._write_op_status(
4462 op_id=nslcmop_id,
4463 stage="",
4464 error_message=error_description_nslcmop,
4465 operation_state=nslcmop_operation_state,
4466 other_update=db_nslcmop_update,
4467 )
4468 if db_nsr:
4469 self._write_ns_status(
4470 nsr_id=nsr_id,
4471 ns_state=None,
4472 current_operation="IDLE",
4473 current_operation_id=None,
4474 other_update=db_nsr_update
4475 )
4476
4477 if nslcmop_operation_state:
4478 try:
4479 await self.msg.aiowrite("ns", "scaled", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
4480 "operationState": nslcmop_operation_state},
4481 loop=self.loop)
4482 # if cooldown_time:
4483 # await asyncio.sleep(cooldown_time, loop=self.loop)
4484 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4485 except Exception as e:
4486 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4487 self.logger.debug(logging_text + "Exit")
4488 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4489
4490 async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
4491 nsr_id = db_nslcmop["nsInstanceId"]
4492 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4493 db_vnfrs = {}
4494
4495 # read from db: vnfd's for every vnf
4496 db_vnfds = {} # every vnfd data indexed by vnf id
4497 db_vnfds_ref = {} # every vnfd data indexed by vnfd id
4498 db_vnfds = {}
4499
4500 # for each vnf in ns, read vnfd
4501 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
4502 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
4503 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
4504 vnfd_ref = vnfr["vnfd-ref"] # vnfd name for this vnf
4505 # if we haven't this vnfd, read it from db
4506 if vnfd_id not in db_vnfds:
4507 # read from db
4508 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4509 db_vnfds_ref[vnfd_ref] = vnfd # vnfd's indexed by name
4510 db_vnfds[vnfd_id] = vnfd # vnfd's indexed by id
4511 n2vc_key = self.n2vc.get_public_key()
4512 n2vc_key_list = [n2vc_key]
4513 self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
4514 mark_delete=True)
4515 # db_vnfr has been updated, update db_vnfrs to use it
4516 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
4517 await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
4518 db_vnfds_ref, n2vc_key_list, stage=stage, start_deploy=time(),
4519 timeout_ns_deploy=self.timeout_ns_deploy)
4520 if vdu_scaling_info.get("vdu-delete"):
4521 self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
4522
4523 async def _RO_scale(self, logging_text, RO_nsr_id, RO_scaling_info, db_nslcmop, db_vnfr, db_nslcmop_update,
4524 vdu_scaling_info):
4525 nslcmop_id = db_nslcmop["_id"]
4526 nsr_id = db_nslcmop["nsInstanceId"]
4527 vdu_create = vdu_scaling_info.get("vdu-create")
4528 vdu_delete = vdu_scaling_info.get("vdu-delete")
4529 # Scale RO retry check: Check if this sub-operation has been executed before
4530 op_index = self._check_or_add_scale_suboperation(
4531 db_nslcmop, db_vnfr["member-vnf-index-ref"], None, None, 'SCALE-RO', RO_nsr_id, RO_scaling_info)
4532 if op_index == self.SUBOPERATION_STATUS_SKIP:
4533 # Skip sub-operation
4534 result = 'COMPLETED'
4535 result_detail = 'Done'
4536 self.logger.debug(logging_text + "Skipped sub-operation RO, result {} {}".format(result, result_detail))
4537 else:
4538 if op_index == self.SUBOPERATION_STATUS_NEW:
4539 # New sub-operation: Get index of this sub-operation
4540 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4541 self.logger.debug(logging_text + "New sub-operation RO")
4542 else:
4543 # retry: Get registered params for this existing sub-operation
4544 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4545 RO_nsr_id = op.get('RO_nsr_id')
4546 RO_scaling_info = op.get('RO_scaling_info')
4547 self.logger.debug(logging_text + "Sub-operation RO retry")
4548
4549 RO_desc = await self.RO.create_action("ns", RO_nsr_id, {"vdu-scaling": RO_scaling_info})
4550 # wait until ready
4551 RO_nslcmop_id = RO_desc["instance_action_id"]
4552 db_nslcmop_update["_admin.deploy.RO"] = RO_nslcmop_id
4553
4554 RO_task_done = False
4555 step = detailed_status = "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id)
4556 detailed_status_old = None
4557 self.logger.debug(logging_text + step)
4558
4559 deployment_timeout = 1 * 3600 # One hour
4560 while deployment_timeout > 0:
4561 if not RO_task_done:
4562 desc = await self.RO.show("ns", item_id_name=RO_nsr_id, extra_item="action",
4563 extra_item_id=RO_nslcmop_id)
4564
4565 # deploymentStatus
4566 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4567
4568 ns_status, ns_status_info = self.RO.check_action_status(desc)
4569 if ns_status == "ERROR":
4570 raise ROclient.ROClientException(ns_status_info)
4571 elif ns_status == "BUILD":
4572 detailed_status = step + "; {}".format(ns_status_info)
4573 elif ns_status == "ACTIVE":
4574 RO_task_done = True
4575 self.scale_vnfr(db_vnfr, vdu_create=vdu_create, vdu_delete=vdu_delete)
4576 step = detailed_status = "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id)
4577 self.logger.debug(logging_text + step)
4578 else:
4579 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
4580 else:
4581 desc = await self.RO.show("ns", RO_nsr_id)
4582 ns_status, ns_status_info = self.RO.check_ns_status(desc)
4583 # deploymentStatus
4584 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
4585
4586 if ns_status == "ERROR":
4587 raise ROclient.ROClientException(ns_status_info)
4588 elif ns_status == "BUILD":
4589 detailed_status = step + "; {}".format(ns_status_info)
4590 elif ns_status == "ACTIVE":
4591 step = detailed_status = \
4592 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4593 try:
4594 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4595 self.ns_update_vnfr({db_vnfr["member-vnf-index-ref"]: db_vnfr}, desc)
4596 break
4597 except LcmExceptionNoMgmtIP:
4598 pass
4599 else:
4600 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
4601 if detailed_status != detailed_status_old:
4602 self._update_suboperation_status(
4603 db_nslcmop, op_index, 'COMPLETED', detailed_status)
4604 detailed_status_old = db_nslcmop_update["detailed-status"] = detailed_status
4605 self.update_db_2("nslcmops", nslcmop_id, db_nslcmop_update)
4606
4607 await asyncio.sleep(5, loop=self.loop)
4608 deployment_timeout -= 5
4609 if deployment_timeout <= 0:
4610 self._update_suboperation_status(
4611 db_nslcmop, nslcmop_id, op_index, 'FAILED', "Timeout when waiting for ns to get ready")
4612 raise ROclient.ROClientException("Timeout waiting ns to be ready")
4613
4614 # update VDU_SCALING_INFO with the obtained ip_addresses
4615 if vdu_scaling_info["scaling_direction"] == "OUT":
4616 for vdur in reversed(db_vnfr["vdur"]):
4617 if vdu_scaling_info["vdu-create"].get(vdur["vdu-id-ref"]):
4618 vdu_scaling_info["vdu-create"][vdur["vdu-id-ref"]] -= 1
4619 vdu_scaling_info["vdu"].append({
4620 "name": vdur["name"] or vdur.get("vdu-name"),
4621 "vdu_id": vdur["vdu-id-ref"],
4622 "interface": []
4623 })
4624 for interface in vdur["interfaces"]:
4625 vdu_scaling_info["vdu"][-1]["interface"].append({
4626 "name": interface["name"],
4627 "ip_address": interface["ip-address"],
4628 "mac_address": interface.get("mac-address"),
4629 })
4630 self._update_suboperation_status(db_nslcmop, op_index, 'COMPLETED', 'Done')
4631
4632 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4633 if not self.prometheus:
4634 return
4635 # look if exist a file called 'prometheus*.j2' and
4636 artifact_content = self.fs.dir_ls(artifact_path)
4637 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4638 if not job_file:
4639 return
4640 with self.fs.file_open((artifact_path, job_file), "r") as f:
4641 job_data = f.read()
4642
4643 # TODO get_service
4644 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4645 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4646 host_port = "80"
4647 vnfr_id = vnfr_id.replace("-", "")
4648 variables = {
4649 "JOB_NAME": vnfr_id,
4650 "TARGET_IP": target_ip,
4651 "EXPORTER_POD_IP": host_name,
4652 "EXPORTER_POD_PORT": host_port,
4653 }
4654 job_list = self.prometheus.parse_job(job_data, variables)
4655 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4656 for job in job_list:
4657 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4658 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4659 job["nsr_id"] = nsr_id
4660 job_dict = {jl["job_name"]: jl for jl in job_list}
4661 if await self.prometheus.update(job_dict):
4662 return list(job_dict.keys())
4663
4664 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4665 """
4666 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4667
4668 :param: vim_account_id: VIM Account ID
4669
4670 :return: (cloud_name, cloud_credential)
4671 """
4672 config = self.get_vim_account_config(vim_account_id)
4673 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4674
4675 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4676 """
4677 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4678
4679 :param: vim_account_id: VIM Account ID
4680
4681 :return: (cloud_name, cloud_credential)
4682 """
4683 config = self.get_vim_account_config(vim_account_id)
4684 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")
4685
4686 def get_vim_account_config(self, vim_account_id: str) -> dict:
4687 """
4688 Get VIM Account config from the OSM Database
4689
4690 :param: vim_account_id: VIM Account ID
4691
4692 :return: Dictionary with the config of the vim account
4693 """
4694 vim_account = self.db.get_one(table="vim_accounts", q_filter={"_id": vim_account_id}, fail_on_empty=False)
4695 return vim_account.get("config", {}) if vim_account else {}