3ba3645e45dcb43fc5e4579d0daddc0174d4f619
[osm/LCM.git] / osm_lcm / ns.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 import traceback
24 import json
25 from jinja2 import Environment, TemplateError, TemplateNotFound, StrictUndefined, UndefinedError
26
27 from osm_lcm import ROclient
28 from osm_lcm.ng_ro import NgRoClient, NgRoException
29 from osm_lcm.lcm_utils import LcmException, LcmExceptionNoMgmtIP, LcmBase, deep_get, get_iterable, populate_dict
30 from osm_lcm.data_utils.nsd import get_vnf_profiles
31 from osm_lcm.data_utils.vnfd import get_vdu_list, get_vdu_profile, \
32 get_ee_sorted_initial_config_primitive_list, get_ee_sorted_terminate_config_primitive_list, \
33 get_kdu_list, get_virtual_link_profiles, get_vdu, get_configuration, \
34 get_vdu_index, get_scaling_aspect, get_number_of_instances, get_juju_ee_ref
35 from osm_lcm.data_utils.list_utils import find_in_list
36 from osm_lcm.data_utils.vnfr import get_osm_params, get_vdur_index
37 from osm_lcm.data_utils.dict_utils import parse_yaml_strings
38 from osm_lcm.data_utils.database.vim_account import VimAccountDB
39 from n2vc.k8s_helm_conn import K8sHelmConnector
40 from n2vc.k8s_helm3_conn import K8sHelm3Connector
41 from n2vc.k8s_juju_conn import K8sJujuConnector
42
43 from osm_common.dbbase import DbException
44 from osm_common.fsbase import FsException
45
46 from osm_lcm.data_utils.database.database import Database
47 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
48
49 from n2vc.n2vc_juju_conn import N2VCJujuConnector
50 from n2vc.exceptions import N2VCException, N2VCNotFound, K8sException
51
52 from osm_lcm.lcm_helm_conn import LCMHelmConn
53
54 from copy import copy, deepcopy
55 from time import time
56 from uuid import uuid4
57
58 from random import randint
59
60 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
61
62
63 class NsLcm(LcmBase):
64 timeout_vca_on_error = 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy = 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate = 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete = 10 * 60
68 timeout_primitive = 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive = 10 * 60 # timeout for some progress in a primitive execution
70
71 SUBOPERATION_STATUS_NOT_FOUND = -1
72 SUBOPERATION_STATUS_NEW = -2
73 SUBOPERATION_STATUS_SKIP = -3
74 task_name_deploy_vca = "Deploying VCA"
75
76 def __init__(self, msg, lcm_tasks, config, loop, prometheus=None):
77 """
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
80 :return: None
81 """
82 super().__init__(
83 msg=msg,
84 logger=logging.getLogger('lcm.ns')
85 )
86
87 self.db = Database().instance.db
88 self.fs = Filesystem().instance.fs
89 self.loop = loop
90 self.lcm_tasks = lcm_tasks
91 self.timeout = config["timeout"]
92 self.ro_config = config["ro_config"]
93 self.ng_ro = config["ro_config"].get("ng")
94 self.vca_config = config["VCA"].copy()
95
96 # create N2VC connector
97 self.n2vc = N2VCJujuConnector(
98 log=self.logger,
99 loop=self.loop,
100 url='{}:{}'.format(self.vca_config['host'], self.vca_config['port']),
101 username=self.vca_config.get('user', None),
102 vca_config=self.vca_config,
103 on_update_db=self._on_update_n2vc_db,
104 fs=self.fs,
105 db=self.db
106 )
107
108 self.conn_helm_ee = LCMHelmConn(
109 log=self.logger,
110 loop=self.loop,
111 url=None,
112 username=None,
113 vca_config=self.vca_config,
114 on_update_db=self._on_update_n2vc_db
115 )
116
117 self.k8sclusterhelm2 = K8sHelmConnector(
118 kubectl_command=self.vca_config.get("kubectlpath"),
119 helm_command=self.vca_config.get("helmpath"),
120 log=self.logger,
121 on_update_db=None,
122 fs=self.fs,
123 db=self.db
124 )
125
126 self.k8sclusterhelm3 = K8sHelm3Connector(
127 kubectl_command=self.vca_config.get("kubectlpath"),
128 helm_command=self.vca_config.get("helm3path"),
129 fs=self.fs,
130 log=self.logger,
131 db=self.db,
132 on_update_db=None,
133 )
134
135 self.k8sclusterjuju = K8sJujuConnector(
136 kubectl_command=self.vca_config.get("kubectlpath"),
137 juju_command=self.vca_config.get("jujupath"),
138 log=self.logger,
139 loop=self.loop,
140 on_update_db=None,
141 vca_config=self.vca_config,
142 fs=self.fs,
143 db=self.db
144 )
145
146 self.k8scluster_map = {
147 "helm-chart": self.k8sclusterhelm2,
148 "helm-chart-v3": self.k8sclusterhelm3,
149 "chart": self.k8sclusterhelm3,
150 "juju-bundle": self.k8sclusterjuju,
151 "juju": self.k8sclusterjuju,
152 }
153
154 self.vca_map = {
155 "lxc_proxy_charm": self.n2vc,
156 "native_charm": self.n2vc,
157 "k8s_proxy_charm": self.n2vc,
158 "helm": self.conn_helm_ee,
159 "helm-v3": self.conn_helm_ee
160 }
161
162 self.prometheus = prometheus
163
164 # create RO client
165 self.RO = NgRoClient(self.loop, **self.ro_config)
166
167 @staticmethod
168 def increment_ip_mac(ip_mac, vm_index=1):
169 if not isinstance(ip_mac, str):
170 return ip_mac
171 try:
172 # try with ipv4 look for last dot
173 i = ip_mac.rfind(".")
174 if i > 0:
175 i += 1
176 return "{}{}".format(ip_mac[:i], int(ip_mac[i:]) + vm_index)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i = ip_mac.rfind(":")
179 if i > 0:
180 i += 1
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac) - i) + "x}").format(ip_mac[:i], int(ip_mac[i:], 16) + vm_index)
183 except Exception:
184 pass
185 return None
186
187 def _on_update_ro_db(self, nsrs_id, ro_descriptor):
188
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
190
191 try:
192 # TODO filter RO descriptor fields...
193
194 # write to database
195 db_dict = dict()
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict['deploymentStatus'] = ro_descriptor
198 self.update_db_2("nsrs", nsrs_id, db_dict)
199
200 except Exception as e:
201 self.logger.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id, e))
202
203 async def _on_update_n2vc_db(self, table, filter, path, updated_data):
204
205 # remove last dot from path (if exists)
206 if path.endswith('.'):
207 path = path[:-1]
208
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
211
212 try:
213
214 nsr_id = filter.get('_id')
215
216 # read ns record from database
217 nsr = self.db.get_one(table='nsrs', q_filter=filter)
218 current_ns_status = nsr.get('nsState')
219
220 # get vca status for NS
221 status_dict = await self.n2vc.get_status(namespace='.' + nsr_id, yaml_format=False)
222
223 # vcaStatus
224 db_dict = dict()
225 db_dict['vcaStatus'] = status_dict
226
227 # update configurationStatus for this VCA
228 try:
229 vca_index = int(path[path.rfind(".")+1:])
230
231 vca_list = deep_get(target_dict=nsr, key_list=('_admin', 'deployed', 'VCA'))
232 vca_status = vca_list[vca_index].get('status')
233
234 configuration_status_list = nsr.get('configurationStatus')
235 config_status = configuration_status_list[vca_index].get('status')
236
237 if config_status == 'BROKEN' and vca_status != 'failed':
238 db_dict['configurationStatus'][vca_index] = 'READY'
239 elif config_status != 'BROKEN' and vca_status == 'failed':
240 db_dict['configurationStatus'][vca_index] = 'BROKEN'
241 except Exception as e:
242 # not update configurationStatus
243 self.logger.debug('Error updating vca_index (ignore): {}'.format(e))
244
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
247 is_degraded = False
248 if current_ns_status in ('READY', 'DEGRADED'):
249 error_description = ''
250 # check machines
251 if status_dict.get('machines'):
252 for machine_id in status_dict.get('machines'):
253 machine = status_dict.get('machines').get(machine_id)
254 # check machine agent-status
255 if machine.get('agent-status'):
256 s = machine.get('agent-status').get('status')
257 if s != 'started':
258 is_degraded = True
259 error_description += 'machine {} agent-status={} ; '.format(machine_id, s)
260 # check machine instance status
261 if machine.get('instance-status'):
262 s = machine.get('instance-status').get('status')
263 if s != 'running':
264 is_degraded = True
265 error_description += 'machine {} instance-status={} ; '.format(machine_id, s)
266 # check applications
267 if status_dict.get('applications'):
268 for app_id in status_dict.get('applications'):
269 app = status_dict.get('applications').get(app_id)
270 # check application status
271 if app.get('status'):
272 s = app.get('status').get('status')
273 if s != 'active':
274 is_degraded = True
275 error_description += 'application {} status={} ; '.format(app_id, s)
276
277 if error_description:
278 db_dict['errorDescription'] = error_description
279 if current_ns_status == 'READY' and is_degraded:
280 db_dict['nsState'] = 'DEGRADED'
281 if current_ns_status == 'DEGRADED' and not is_degraded:
282 db_dict['nsState'] = 'READY'
283
284 # write to database
285 self.update_db_2("nsrs", nsr_id, db_dict)
286
287 except (asyncio.CancelledError, asyncio.TimeoutError):
288 raise
289 except Exception as e:
290 self.logger.warn('Error updating NS state for ns={}: {}'.format(nsr_id, e))
291
292 @staticmethod
293 def _parse_cloud_init(cloud_init_text, additional_params, vnfd_id, vdu_id):
294 try:
295 env = Environment(undefined=StrictUndefined)
296 template = env.from_string(cloud_init_text)
297 return template.render(additional_params or {})
298 except UndefinedError as e:
299 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
300 "file, must be provided in the instantiation parameters inside the "
301 "'additionalParamsForVnf/Vdu' block".format(e, vnfd_id, vdu_id))
302 except (TemplateError, TemplateNotFound) as e:
303 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
304 format(vnfd_id, vdu_id, e))
305
306 def _get_vdu_cloud_init_content(self, vdu, vnfd):
307 cloud_init_content = cloud_init_file = None
308 try:
309 if vdu.get("cloud-init-file"):
310 base_folder = vnfd["_admin"]["storage"]
311 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
312 vdu["cloud-init-file"])
313 with self.fs.file_open(cloud_init_file, "r") as ci_file:
314 cloud_init_content = ci_file.read()
315 elif vdu.get("cloud-init"):
316 cloud_init_content = vdu["cloud-init"]
317
318 return cloud_init_content
319 except FsException as e:
320 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
321 format(vnfd["id"], vdu["id"], cloud_init_file, e))
322
323 def _get_vdu_additional_params(self, db_vnfr, vdu_id):
324 vdur = next(vdur for vdur in db_vnfr.get("vdur") if vdu_id == vdur["vdu-id-ref"])
325 additional_params = vdur.get("additionalParams")
326 return parse_yaml_strings(additional_params)
327
328 def vnfd2RO(self, vnfd, new_id=None, additionalParams=None, nsrId=None):
329 """
330 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
331 :param vnfd: input vnfd
332 :param new_id: overrides vnf id if provided
333 :param additionalParams: Instantiation params for VNFs provided
334 :param nsrId: Id of the NSR
335 :return: copy of vnfd
336 """
337 vnfd_RO = deepcopy(vnfd)
338 # remove unused by RO configuration, monitoring, scaling and internal keys
339 vnfd_RO.pop("_id", None)
340 vnfd_RO.pop("_admin", None)
341 vnfd_RO.pop("monitoring-param", None)
342 vnfd_RO.pop("scaling-group-descriptor", None)
343 vnfd_RO.pop("kdu", None)
344 vnfd_RO.pop("k8s-cluster", None)
345 if new_id:
346 vnfd_RO["id"] = new_id
347
348 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
349 for vdu in get_iterable(vnfd_RO, "vdu"):
350 vdu.pop("cloud-init-file", None)
351 vdu.pop("cloud-init", None)
352 return vnfd_RO
353
354 @staticmethod
355 def ip_profile_2_RO(ip_profile):
356 RO_ip_profile = deepcopy(ip_profile)
357 if "dns-server" in RO_ip_profile:
358 if isinstance(RO_ip_profile["dns-server"], list):
359 RO_ip_profile["dns-address"] = []
360 for ds in RO_ip_profile.pop("dns-server"):
361 RO_ip_profile["dns-address"].append(ds['address'])
362 else:
363 RO_ip_profile["dns-address"] = RO_ip_profile.pop("dns-server")
364 if RO_ip_profile.get("ip-version") == "ipv4":
365 RO_ip_profile["ip-version"] = "IPv4"
366 if RO_ip_profile.get("ip-version") == "ipv6":
367 RO_ip_profile["ip-version"] = "IPv6"
368 if "dhcp-params" in RO_ip_profile:
369 RO_ip_profile["dhcp"] = RO_ip_profile.pop("dhcp-params")
370 return RO_ip_profile
371
372 def _get_ro_vim_id_for_vim_account(self, vim_account):
373 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account})
374 if db_vim["_admin"]["operationalState"] != "ENABLED":
375 raise LcmException("VIM={} is not available. operationalState={}".format(
376 vim_account, db_vim["_admin"]["operationalState"]))
377 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
378 return RO_vim_id
379
380 def get_ro_wim_id_for_wim_account(self, wim_account):
381 if isinstance(wim_account, str):
382 db_wim = self.db.get_one("wim_accounts", {"_id": wim_account})
383 if db_wim["_admin"]["operationalState"] != "ENABLED":
384 raise LcmException("WIM={} is not available. operationalState={}".format(
385 wim_account, db_wim["_admin"]["operationalState"]))
386 RO_wim_id = db_wim["_admin"]["deployed"]["RO-account"]
387 return RO_wim_id
388 else:
389 return wim_account
390
391 def scale_vnfr(self, db_vnfr, vdu_create=None, vdu_delete=None, mark_delete=False):
392
393 db_vdu_push_list = []
394 db_update = {"_admin.modified": time()}
395 if vdu_create:
396 for vdu_id, vdu_count in vdu_create.items():
397 vdur = next((vdur for vdur in reversed(db_vnfr["vdur"]) if vdur["vdu-id-ref"] == vdu_id), None)
398 if not vdur:
399 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
400 format(vdu_id))
401
402 for count in range(vdu_count):
403 vdur_copy = deepcopy(vdur)
404 vdur_copy["status"] = "BUILD"
405 vdur_copy["status-detailed"] = None
406 vdur_copy["ip-address"]: None
407 vdur_copy["_id"] = str(uuid4())
408 vdur_copy["count-index"] += count + 1
409 vdur_copy["id"] = "{}-{}".format(vdur_copy["vdu-id-ref"], vdur_copy["count-index"])
410 vdur_copy.pop("vim_info", None)
411 for iface in vdur_copy["interfaces"]:
412 if iface.get("fixed-ip"):
413 iface["ip-address"] = self.increment_ip_mac(iface["ip-address"], count+1)
414 else:
415 iface.pop("ip-address", None)
416 if iface.get("fixed-mac"):
417 iface["mac-address"] = self.increment_ip_mac(iface["mac-address"], count+1)
418 else:
419 iface.pop("mac-address", None)
420 iface.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
421 db_vdu_push_list.append(vdur_copy)
422 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
423 if vdu_delete:
424 for vdu_id, vdu_count in vdu_delete.items():
425 if mark_delete:
426 indexes_to_delete = [iv[0] for iv in enumerate(db_vnfr["vdur"]) if iv[1]["vdu-id-ref"] == vdu_id]
427 db_update.update({"vdur.{}.status".format(i): "DELETING" for i in indexes_to_delete[-vdu_count:]})
428 else:
429 # it must be deleted one by one because common.db does not allow otherwise
430 vdus_to_delete = [v for v in reversed(db_vnfr["vdur"]) if v["vdu-id-ref"] == vdu_id]
431 for vdu in vdus_to_delete[:vdu_count]:
432 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, None, pull={"vdur": {"_id": vdu["_id"]}})
433 db_push = {"vdur": db_vdu_push_list} if db_vdu_push_list else None
434 self.db.set_one("vnfrs", {"_id": db_vnfr["_id"]}, db_update, push_list=db_push)
435 # modify passed dictionary db_vnfr
436 db_vnfr_ = self.db.get_one("vnfrs", {"_id": db_vnfr["_id"]})
437 db_vnfr["vdur"] = db_vnfr_["vdur"]
438
439 def ns_update_nsr(self, ns_update_nsr, db_nsr, nsr_desc_RO):
440 """
441 Updates database nsr with the RO info for the created vld
442 :param ns_update_nsr: dictionary to be filled with the updated info
443 :param db_nsr: content of db_nsr. This is also modified
444 :param nsr_desc_RO: nsr descriptor from RO
445 :return: Nothing, LcmException is raised on errors
446 """
447
448 for vld_index, vld in enumerate(get_iterable(db_nsr, "vld")):
449 for net_RO in get_iterable(nsr_desc_RO, "nets"):
450 if vld["id"] != net_RO.get("ns_net_osm_id"):
451 continue
452 vld["vim-id"] = net_RO.get("vim_net_id")
453 vld["name"] = net_RO.get("vim_name")
454 vld["status"] = net_RO.get("status")
455 vld["status-detailed"] = net_RO.get("error_msg")
456 ns_update_nsr["vld.{}".format(vld_index)] = vld
457 break
458 else:
459 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld["id"]))
460
461 def set_vnfr_at_error(self, db_vnfrs, error_text):
462 try:
463 for db_vnfr in db_vnfrs.values():
464 vnfr_update = {"status": "ERROR"}
465 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
466 if "status" not in vdur:
467 vdur["status"] = "ERROR"
468 vnfr_update["vdur.{}.status".format(vdu_index)] = "ERROR"
469 if error_text:
470 vdur["status-detailed"] = str(error_text)
471 vnfr_update["vdur.{}.status-detailed".format(vdu_index)] = "ERROR"
472 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
473 except DbException as e:
474 self.logger.error("Cannot update vnf. {}".format(e))
475
476 def ns_update_vnfr(self, db_vnfrs, nsr_desc_RO):
477 """
478 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
479 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
480 :param nsr_desc_RO: nsr descriptor from RO
481 :return: Nothing, LcmException is raised on errors
482 """
483 for vnf_index, db_vnfr in db_vnfrs.items():
484 for vnf_RO in nsr_desc_RO["vnfs"]:
485 if vnf_RO["member_vnf_index"] != vnf_index:
486 continue
487 vnfr_update = {}
488 if vnf_RO.get("ip_address"):
489 db_vnfr["ip-address"] = vnfr_update["ip-address"] = vnf_RO["ip_address"].split(";")[0]
490 elif not db_vnfr.get("ip-address"):
491 if db_vnfr.get("vdur"): # if not VDUs, there is not ip_address
492 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index))
493
494 for vdu_index, vdur in enumerate(get_iterable(db_vnfr, "vdur")):
495 vdur_RO_count_index = 0
496 if vdur.get("pdu-type"):
497 continue
498 for vdur_RO in get_iterable(vnf_RO, "vms"):
499 if vdur["vdu-id-ref"] != vdur_RO["vdu_osm_id"]:
500 continue
501 if vdur["count-index"] != vdur_RO_count_index:
502 vdur_RO_count_index += 1
503 continue
504 vdur["vim-id"] = vdur_RO.get("vim_vm_id")
505 if vdur_RO.get("ip_address"):
506 vdur["ip-address"] = vdur_RO["ip_address"].split(";")[0]
507 else:
508 vdur["ip-address"] = None
509 vdur["vdu-id-ref"] = vdur_RO.get("vdu_osm_id")
510 vdur["name"] = vdur_RO.get("vim_name")
511 vdur["status"] = vdur_RO.get("status")
512 vdur["status-detailed"] = vdur_RO.get("error_msg")
513 for ifacer in get_iterable(vdur, "interfaces"):
514 for interface_RO in get_iterable(vdur_RO, "interfaces"):
515 if ifacer["name"] == interface_RO.get("internal_name"):
516 ifacer["ip-address"] = interface_RO.get("ip_address")
517 ifacer["mac-address"] = interface_RO.get("mac_address")
518 break
519 else:
520 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
521 "from VIM info"
522 .format(vnf_index, vdur["vdu-id-ref"], ifacer["name"]))
523 vnfr_update["vdur.{}".format(vdu_index)] = vdur
524 break
525 else:
526 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
527 "VIM info".format(vnf_index, vdur["vdu-id-ref"], vdur["count-index"]))
528
529 for vld_index, vld in enumerate(get_iterable(db_vnfr, "vld")):
530 for net_RO in get_iterable(nsr_desc_RO, "nets"):
531 if vld["id"] != net_RO.get("vnf_net_osm_id"):
532 continue
533 vld["vim-id"] = net_RO.get("vim_net_id")
534 vld["name"] = net_RO.get("vim_name")
535 vld["status"] = net_RO.get("status")
536 vld["status-detailed"] = net_RO.get("error_msg")
537 vnfr_update["vld.{}".format(vld_index)] = vld
538 break
539 else:
540 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
541 vnf_index, vld["id"]))
542
543 self.update_db_2("vnfrs", db_vnfr["_id"], vnfr_update)
544 break
545
546 else:
547 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index))
548
549 def _get_ns_config_info(self, nsr_id):
550 """
551 Generates a mapping between vnf,vdu elements and the N2VC id
552 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
553 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
554 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
555 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
556 """
557 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
558 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
559 mapping = {}
560 ns_config_info = {"osm-config-mapping": mapping}
561 for vca in vca_deployed_list:
562 if not vca["member-vnf-index"]:
563 continue
564 if not vca["vdu_id"]:
565 mapping[vca["member-vnf-index"]] = vca["application"]
566 else:
567 mapping["{}.{}.{}".format(vca["member-vnf-index"], vca["vdu_id"], vca["vdu_count_index"])] =\
568 vca["application"]
569 return ns_config_info
570
571 async def _instantiate_ng_ro(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
572 n2vc_key_list, stage, start_deploy, timeout_ns_deploy):
573
574 db_vims = {}
575
576 def get_vim_account(vim_account_id):
577 nonlocal db_vims
578 if vim_account_id in db_vims:
579 return db_vims[vim_account_id]
580 db_vim = self.db.get_one("vim_accounts", {"_id": vim_account_id})
581 db_vims[vim_account_id] = db_vim
582 return db_vim
583
584 # modify target_vld info with instantiation parameters
585 def parse_vld_instantiation_params(target_vim, target_vld, vld_params, target_sdn):
586 if vld_params.get("ip-profile"):
587 target_vld["vim_info"][target_vim]["ip_profile"] = vld_params["ip-profile"]
588 if vld_params.get("provider-network"):
589 target_vld["vim_info"][target_vim]["provider_network"] = vld_params["provider-network"]
590 if "sdn-ports" in vld_params["provider-network"] and target_sdn:
591 target_vld["vim_info"][target_sdn]["sdn-ports"] = vld_params["provider-network"]["sdn-ports"]
592 if vld_params.get("wimAccountId"):
593 target_wim = "wim:{}".format(vld_params["wimAccountId"])
594 target_vld["vim_info"][target_wim] = {}
595 for param in ("vim-network-name", "vim-network-id"):
596 if vld_params.get(param):
597 if isinstance(vld_params[param], dict):
598 for vim, vim_net in vld_params[param].items():
599 other_target_vim = "vim:" + vim
600 populate_dict(target_vld["vim_info"], (other_target_vim, param.replace("-", "_")), vim_net)
601 else: # isinstance str
602 target_vld["vim_info"][target_vim][param.replace("-", "_")] = vld_params[param]
603 if vld_params.get("common_id"):
604 target_vld["common_id"] = vld_params.get("common_id")
605
606 nslcmop_id = db_nslcmop["_id"]
607 target = {
608 "name": db_nsr["name"],
609 "ns": {"vld": []},
610 "vnf": [],
611 "image": deepcopy(db_nsr["image"]),
612 "flavor": deepcopy(db_nsr["flavor"]),
613 "action_id": nslcmop_id,
614 "cloud_init_content": {},
615 }
616 for image in target["image"]:
617 image["vim_info"] = {}
618 for flavor in target["flavor"]:
619 flavor["vim_info"] = {}
620
621 if db_nslcmop.get("lcmOperationType") != "instantiate":
622 # get parameters of instantiation:
623 db_nslcmop_instantiate = self.db.get_list("nslcmops", {"nsInstanceId": db_nslcmop["nsInstanceId"],
624 "lcmOperationType": "instantiate"})[-1]
625 ns_params = db_nslcmop_instantiate.get("operationParams")
626 else:
627 ns_params = db_nslcmop.get("operationParams")
628 ssh_keys_instantiation = ns_params.get("ssh_keys") or []
629 ssh_keys_all = ssh_keys_instantiation + (n2vc_key_list or [])
630
631 cp2target = {}
632 for vld_index, vld in enumerate(db_nsr.get("vld")):
633 target_vim = "vim:{}".format(ns_params["vimAccountId"])
634 target_vld = {
635 "id": vld["id"],
636 "name": vld["name"],
637 "mgmt-network": vld.get("mgmt-network", False),
638 "type": vld.get("type"),
639 "vim_info": {
640 target_vim: {
641 "vim_network_name": vld.get("vim-network-name"),
642 "vim_account_id": ns_params["vimAccountId"]
643 }
644 }
645 }
646 # check if this network needs SDN assist
647 if vld.get("pci-interfaces"):
648 db_vim = get_vim_account(ns_params["vimAccountId"])
649 sdnc_id = db_vim["config"].get("sdn-controller")
650 if sdnc_id:
651 sdn_vld = "nsrs:{}:vld.{}".format(nsr_id, vld["id"])
652 target_sdn = "sdn:{}".format(sdnc_id)
653 target_vld["vim_info"][target_sdn] = {
654 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
655
656 nsd_vnf_profiles = get_vnf_profiles(nsd)
657 for nsd_vnf_profile in nsd_vnf_profiles:
658 for cp in nsd_vnf_profile["virtual-link-connectivity"]:
659 if cp["virtual-link-profile-id"] == vld["id"]:
660 cp2target["member_vnf:{}.{}".format(
661 cp["constituent-cpd-id"][0]["constituent-base-element-id"],
662 cp["constituent-cpd-id"][0]["constituent-cpd-id"]
663 )] = "nsrs:{}:vld.{}".format(nsr_id, vld_index)
664
665 # check at nsd descriptor, if there is an ip-profile
666 vld_params = {}
667 nsd_vlp = find_in_list(
668 get_virtual_link_profiles(nsd),
669 lambda a_link_profile: a_link_profile["virtual-link-desc-id"] == vld["id"])
670 if nsd_vlp and nsd_vlp.get("virtual-link-protocol-data") and \
671 nsd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
672 ip_profile_source_data = nsd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
673 ip_profile_dest_data = {}
674 if "ip-version" in ip_profile_source_data:
675 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
676 if "cidr" in ip_profile_source_data:
677 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
678 if "gateway-ip" in ip_profile_source_data:
679 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
680 if "dhcp-enabled" in ip_profile_source_data:
681 ip_profile_dest_data["dhcp-params"] = {
682 "enabled": ip_profile_source_data["dhcp-enabled"]
683 }
684 vld_params["ip-profile"] = ip_profile_dest_data
685
686 # update vld_params with instantiation params
687 vld_instantiation_params = find_in_list(get_iterable(ns_params, "vld"),
688 lambda a_vld: a_vld["name"] in (vld["name"], vld["id"]))
689 if vld_instantiation_params:
690 vld_params.update(vld_instantiation_params)
691 parse_vld_instantiation_params(target_vim, target_vld, vld_params, None)
692 target["ns"]["vld"].append(target_vld)
693
694 for vnfr in db_vnfrs.values():
695 vnfd = find_in_list(db_vnfds, lambda db_vnf: db_vnf["id"] == vnfr["vnfd-ref"])
696 vnf_params = find_in_list(get_iterable(ns_params, "vnf"),
697 lambda a_vnf: a_vnf["member-vnf-index"] == vnfr["member-vnf-index-ref"])
698 target_vnf = deepcopy(vnfr)
699 target_vim = "vim:{}".format(vnfr["vim-account-id"])
700 for vld in target_vnf.get("vld", ()):
701 # check if connected to a ns.vld, to fill target'
702 vnf_cp = find_in_list(vnfd.get("int-virtual-link-desc", ()),
703 lambda cpd: cpd.get("id") == vld["id"])
704 if vnf_cp:
705 ns_cp = "member_vnf:{}.{}".format(vnfr["member-vnf-index-ref"], vnf_cp["id"])
706 if cp2target.get(ns_cp):
707 vld["target"] = cp2target[ns_cp]
708
709 vld["vim_info"] = {target_vim: {"vim_network_name": vld.get("vim-network-name")}}
710 # check if this network needs SDN assist
711 target_sdn = None
712 if vld.get("pci-interfaces"):
713 db_vim = get_vim_account(vnfr["vim-account-id"])
714 sdnc_id = db_vim["config"].get("sdn-controller")
715 if sdnc_id:
716 sdn_vld = "vnfrs:{}:vld.{}".format(target_vnf["_id"], vld["id"])
717 target_sdn = "sdn:{}".format(sdnc_id)
718 vld["vim_info"][target_sdn] = {
719 "sdn": True, "target_vim": target_vim, "vlds": [sdn_vld], "type": vld.get("type")}
720
721 # check at vnfd descriptor, if there is an ip-profile
722 vld_params = {}
723 vnfd_vlp = find_in_list(
724 get_virtual_link_profiles(vnfd),
725 lambda a_link_profile: a_link_profile["id"] == vld["id"]
726 )
727 if vnfd_vlp and vnfd_vlp.get("virtual-link-protocol-data") and \
728 vnfd_vlp["virtual-link-protocol-data"].get("l3-protocol-data"):
729 ip_profile_source_data = vnfd_vlp["virtual-link-protocol-data"]["l3-protocol-data"]
730 ip_profile_dest_data = {}
731 if "ip-version" in ip_profile_source_data:
732 ip_profile_dest_data["ip-version"] = ip_profile_source_data["ip-version"]
733 if "cidr" in ip_profile_source_data:
734 ip_profile_dest_data["subnet-address"] = ip_profile_source_data["cidr"]
735 if "gateway-ip" in ip_profile_source_data:
736 ip_profile_dest_data["gateway-address"] = ip_profile_source_data["gateway-ip"]
737 if "dhcp-enabled" in ip_profile_source_data:
738 ip_profile_dest_data["dhcp-params"] = {
739 "enabled": ip_profile_source_data["dhcp-enabled"]
740 }
741
742 vld_params["ip-profile"] = ip_profile_dest_data
743 # update vld_params with instantiation params
744 if vnf_params:
745 vld_instantiation_params = find_in_list(get_iterable(vnf_params, "internal-vld"),
746 lambda i_vld: i_vld["name"] == vld["id"])
747 if vld_instantiation_params:
748 vld_params.update(vld_instantiation_params)
749 parse_vld_instantiation_params(target_vim, vld, vld_params, target_sdn)
750
751 vdur_list = []
752 for vdur in target_vnf.get("vdur", ()):
753 if vdur.get("status") == "DELETING" or vdur.get("pdu-type"):
754 continue # This vdu must not be created
755 vdur["vim_info"] = {"vim_account_id": vnfr["vim-account-id"]}
756
757 self.logger.debug("NS > ssh_keys > {}".format(ssh_keys_all))
758
759 if ssh_keys_all:
760 vdu_configuration = get_configuration(vnfd, vdur["vdu-id-ref"])
761 vnf_configuration = get_configuration(vnfd, vnfd["id"])
762 if vdu_configuration and vdu_configuration.get("config-access") and \
763 vdu_configuration.get("config-access").get("ssh-access"):
764 vdur["ssh-keys"] = ssh_keys_all
765 vdur["ssh-access-required"] = vdu_configuration["config-access"]["ssh-access"]["required"]
766 elif vnf_configuration and vnf_configuration.get("config-access") and \
767 vnf_configuration.get("config-access").get("ssh-access") and \
768 any(iface.get("mgmt-vnf") for iface in vdur["interfaces"]):
769 vdur["ssh-keys"] = ssh_keys_all
770 vdur["ssh-access-required"] = vnf_configuration["config-access"]["ssh-access"]["required"]
771 elif ssh_keys_instantiation and \
772 find_in_list(vdur["interfaces"], lambda iface: iface.get("mgmt-vnf")):
773 vdur["ssh-keys"] = ssh_keys_instantiation
774
775 self.logger.debug("NS > vdur > {}".format(vdur))
776
777 vdud = get_vdu(vnfd, vdur["vdu-id-ref"])
778 # cloud-init
779 if vdud.get("cloud-init-file"):
780 vdur["cloud-init"] = "{}:file:{}".format(vnfd["_id"], vdud.get("cloud-init-file"))
781 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
782 if vdur["cloud-init"] not in target["cloud_init_content"]:
783 base_folder = vnfd["_admin"]["storage"]
784 cloud_init_file = "{}/{}/cloud_init/{}".format(base_folder["folder"], base_folder["pkg-dir"],
785 vdud.get("cloud-init-file"))
786 with self.fs.file_open(cloud_init_file, "r") as ci_file:
787 target["cloud_init_content"][vdur["cloud-init"]] = ci_file.read()
788 elif vdud.get("cloud-init"):
789 vdur["cloud-init"] = "{}:vdu:{}".format(vnfd["_id"], get_vdu_index(vnfd, vdur["vdu-id-ref"]))
790 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
791 target["cloud_init_content"][vdur["cloud-init"]] = vdud["cloud-init"]
792 vdur["additionalParams"] = vdur.get("additionalParams") or {}
793 deploy_params_vdu = self._format_additional_params(vdur.get("additionalParams") or {})
794 deploy_params_vdu["OSM"] = get_osm_params(vnfr, vdur["vdu-id-ref"], vdur["count-index"])
795 vdur["additionalParams"] = deploy_params_vdu
796
797 # flavor
798 ns_flavor = target["flavor"][int(vdur["ns-flavor-id"])]
799 if target_vim not in ns_flavor["vim_info"]:
800 ns_flavor["vim_info"][target_vim] = {}
801
802 # deal with images
803 # in case alternative images are provided we must check if they should be applied
804 # for the vim_type, modify the vim_type taking into account
805 ns_image_id = int(vdur["ns-image-id"])
806 if vdur.get("alt-image-ids"):
807 db_vim = get_vim_account(vnfr["vim-account-id"])
808 vim_type = db_vim["vim_type"]
809 for alt_image_id in vdur.get("alt-image-ids"):
810 ns_alt_image = target["image"][int(alt_image_id)]
811 if vim_type == ns_alt_image.get("vim-type"):
812 # must use alternative image
813 self.logger.debug("use alternative image id: {}".format(alt_image_id))
814 ns_image_id = alt_image_id
815 vdur["ns-image-id"] = ns_image_id
816 break
817 ns_image = target["image"][int(ns_image_id)]
818 if target_vim not in ns_image["vim_info"]:
819 ns_image["vim_info"][target_vim] = {}
820
821 vdur["vim_info"] = {target_vim: {}}
822 # instantiation parameters
823 # if vnf_params:
824 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
825 # vdud["id"]), None)
826 vdur_list.append(vdur)
827 target_vnf["vdur"] = vdur_list
828 target["vnf"].append(target_vnf)
829
830 desc = await self.RO.deploy(nsr_id, target)
831 self.logger.debug("RO return > {}".format(desc))
832 action_id = desc["action_id"]
833 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, timeout_ns_deploy, stage)
834
835 # Updating NSR
836 db_nsr_update = {
837 "_admin.deployed.RO.operational-status": "running",
838 "detailed-status": " ".join(stage)
839 }
840 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
841 self.update_db_2("nsrs", nsr_id, db_nsr_update)
842 self._write_op_status(nslcmop_id, stage)
843 self.logger.debug(logging_text + "ns deployed at RO. RO_id={}".format(action_id))
844 return
845
846 async def _wait_ng_ro(self, nsr_id, action_id, nslcmop_id=None, start_time=None, timeout=600, stage=None):
847 detailed_status_old = None
848 db_nsr_update = {}
849 start_time = start_time or time()
850 while time() <= start_time + timeout:
851 desc_status = await self.RO.status(nsr_id, action_id)
852 self.logger.debug("Wait NG RO > {}".format(desc_status))
853 if desc_status["status"] == "FAILED":
854 raise NgRoException(desc_status["details"])
855 elif desc_status["status"] == "BUILD":
856 if stage:
857 stage[2] = "VIM: ({})".format(desc_status["details"])
858 elif desc_status["status"] == "DONE":
859 if stage:
860 stage[2] = "Deployed at VIM"
861 break
862 else:
863 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status["status"])
864 if stage and nslcmop_id and stage[2] != detailed_status_old:
865 detailed_status_old = stage[2]
866 db_nsr_update["detailed-status"] = " ".join(stage)
867 self.update_db_2("nsrs", nsr_id, db_nsr_update)
868 self._write_op_status(nslcmop_id, stage)
869 await asyncio.sleep(15, loop=self.loop)
870 else: # timeout_ns_deploy
871 raise NgRoException("Timeout waiting ns to deploy")
872
873 async def _terminate_ng_ro(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
874 db_nsr_update = {}
875 failed_detail = []
876 action_id = None
877 start_deploy = time()
878 try:
879 target = {
880 "ns": {"vld": []},
881 "vnf": [],
882 "image": [],
883 "flavor": [],
884 "action_id": nslcmop_id
885 }
886 desc = await self.RO.deploy(nsr_id, target)
887 action_id = desc["action_id"]
888 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = action_id
889 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETING"
890 self.logger.debug(logging_text + "ns terminate action at RO. action_id={}".format(action_id))
891
892 # wait until done
893 delete_timeout = 20 * 60 # 20 minutes
894 await self._wait_ng_ro(nsr_id, action_id, nslcmop_id, start_deploy, delete_timeout, stage)
895
896 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
897 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
898 # delete all nsr
899 await self.RO.delete(nsr_id)
900 except Exception as e:
901 if isinstance(e, NgRoException) and e.http_code == 404: # not found
902 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
903 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
904 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
905 self.logger.debug(logging_text + "RO_action_id={} already deleted".format(action_id))
906 elif isinstance(e, NgRoException) and e.http_code == 409: # conflict
907 failed_detail.append("delete conflict: {}".format(e))
908 self.logger.debug(logging_text + "RO_action_id={} delete conflict: {}".format(action_id, e))
909 else:
910 failed_detail.append("delete error: {}".format(e))
911 self.logger.error(logging_text + "RO_action_id={} delete error: {}".format(action_id, e))
912
913 if failed_detail:
914 stage[2] = "Error deleting from VIM"
915 else:
916 stage[2] = "Deleted from VIM"
917 db_nsr_update["detailed-status"] = " ".join(stage)
918 self.update_db_2("nsrs", nsr_id, db_nsr_update)
919 self._write_op_status(nslcmop_id, stage)
920
921 if failed_detail:
922 raise LcmException("; ".join(failed_detail))
923 return
924
925 async def instantiate_RO(self, logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs, db_vnfds,
926 n2vc_key_list, stage):
927 """
928 Instantiate at RO
929 :param logging_text: preffix text to use at logging
930 :param nsr_id: nsr identity
931 :param nsd: database content of ns descriptor
932 :param db_nsr: database content of ns record
933 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
934 :param db_vnfrs:
935 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
936 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
937 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
938 :return: None or exception
939 """
940 try:
941 start_deploy = time()
942 ns_params = db_nslcmop.get("operationParams")
943 if ns_params and ns_params.get("timeout_ns_deploy"):
944 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
945 else:
946 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
947
948 # Check for and optionally request placement optimization. Database will be updated if placement activated
949 stage[2] = "Waiting for Placement."
950 if await self._do_placement(logging_text, db_nslcmop, db_vnfrs):
951 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
952 for vnfr in db_vnfrs.values():
953 if ns_params["vimAccountId"] == vnfr["vim-account-id"]:
954 break
955 else:
956 ns_params["vimAccountId"] == vnfr["vim-account-id"]
957
958 return await self._instantiate_ng_ro(logging_text, nsr_id, nsd, db_nsr, db_nslcmop, db_vnfrs,
959 db_vnfds, n2vc_key_list, stage, start_deploy, timeout_ns_deploy)
960 except Exception as e:
961 stage[2] = "ERROR deploying at VIM"
962 self.set_vnfr_at_error(db_vnfrs, str(e))
963 self.logger.error("Error deploying at VIM {}".format(e),
964 exc_info=not isinstance(e, (ROclient.ROClientException, LcmException, DbException,
965 NgRoException)))
966 raise
967
968 async def wait_kdu_up(self, logging_text, nsr_id, vnfr_id, kdu_name):
969 """
970 Wait for kdu to be up, get ip address
971 :param logging_text: prefix use for logging
972 :param nsr_id:
973 :param vnfr_id:
974 :param kdu_name:
975 :return: IP address
976 """
977
978 # self.logger.debug(logging_text + "Starting wait_kdu_up")
979 nb_tries = 0
980
981 while nb_tries < 360:
982 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
983 kdur = next((x for x in get_iterable(db_vnfr, "kdur") if x.get("kdu-name") == kdu_name), None)
984 if not kdur:
985 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id, kdu_name))
986 if kdur.get("status"):
987 if kdur["status"] in ("READY", "ENABLED"):
988 return kdur.get("ip-address")
989 else:
990 raise LcmException("target KDU={} is in error state".format(kdu_name))
991
992 await asyncio.sleep(10, loop=self.loop)
993 nb_tries += 1
994 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name))
995
996 async def wait_vm_up_insert_key_ro(self, logging_text, nsr_id, vnfr_id, vdu_id, vdu_index, pub_key=None, user=None):
997 """
998 Wait for ip addres at RO, and optionally, insert public key in virtual machine
999 :param logging_text: prefix use for logging
1000 :param nsr_id:
1001 :param vnfr_id:
1002 :param vdu_id:
1003 :param vdu_index:
1004 :param pub_key: public ssh key to inject, None to skip
1005 :param user: user to apply the public ssh key
1006 :return: IP address
1007 """
1008
1009 self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1010 ro_nsr_id = None
1011 ip_address = None
1012 nb_tries = 0
1013 target_vdu_id = None
1014 ro_retries = 0
1015
1016 while True:
1017
1018 ro_retries += 1
1019 if ro_retries >= 360: # 1 hour
1020 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id))
1021
1022 await asyncio.sleep(10, loop=self.loop)
1023
1024 # get ip address
1025 if not target_vdu_id:
1026 db_vnfr = self.db.get_one("vnfrs", {"_id": vnfr_id})
1027
1028 if not vdu_id: # for the VNF case
1029 if db_vnfr.get("status") == "ERROR":
1030 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1031 ip_address = db_vnfr.get("ip-address")
1032 if not ip_address:
1033 continue
1034 vdur = next((x for x in get_iterable(db_vnfr, "vdur") if x.get("ip-address") == ip_address), None)
1035 else: # VDU case
1036 vdur = next((x for x in get_iterable(db_vnfr, "vdur")
1037 if x.get("vdu-id-ref") == vdu_id and x.get("count-index") == vdu_index), None)
1038
1039 if not vdur and len(db_vnfr.get("vdur", ())) == 1: # If only one, this should be the target vdu
1040 vdur = db_vnfr["vdur"][0]
1041 if not vdur:
1042 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id, vdu_id,
1043 vdu_index))
1044 # New generation RO stores information at "vim_info"
1045 ng_ro_status = None
1046 target_vim = None
1047 if vdur.get("vim_info"):
1048 target_vim = next(t for t in vdur["vim_info"]) # there should be only one key
1049 ng_ro_status = vdur["vim_info"][target_vim].get("vim_status")
1050 if vdur.get("pdu-type") or vdur.get("status") == "ACTIVE" or ng_ro_status == "ACTIVE":
1051 ip_address = vdur.get("ip-address")
1052 if not ip_address:
1053 continue
1054 target_vdu_id = vdur["vdu-id-ref"]
1055 elif vdur.get("status") == "ERROR" or ng_ro_status == "ERROR":
1056 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1057
1058 if not target_vdu_id:
1059 continue
1060
1061 # inject public key into machine
1062 if pub_key and user:
1063 self.logger.debug(logging_text + "Inserting RO key")
1064 self.logger.debug("SSH > PubKey > {}".format(pub_key))
1065 if vdur.get("pdu-type"):
1066 self.logger.error(logging_text + "Cannot inject ssh-ky to a PDU")
1067 return ip_address
1068 try:
1069 ro_vm_id = "{}-{}".format(db_vnfr["member-vnf-index-ref"], target_vdu_id) # TODO add vdu_index
1070 if self.ng_ro:
1071 target = {"action": {"action": "inject_ssh_key", "key": pub_key, "user": user},
1072 "vnf": [{"_id": vnfr_id, "vdur": [{"id": vdur["id"]}]}],
1073 }
1074 desc = await self.RO.deploy(nsr_id, target)
1075 action_id = desc["action_id"]
1076 await self._wait_ng_ro(nsr_id, action_id, timeout=600)
1077 break
1078 else:
1079 # wait until NS is deployed at RO
1080 if not ro_nsr_id:
1081 db_nsrs = self.db.get_one("nsrs", {"_id": nsr_id})
1082 ro_nsr_id = deep_get(db_nsrs, ("_admin", "deployed", "RO", "nsr_id"))
1083 if not ro_nsr_id:
1084 continue
1085 result_dict = await self.RO.create_action(
1086 item="ns",
1087 item_id_name=ro_nsr_id,
1088 descriptor={"add_public_key": pub_key, "vms": [ro_vm_id], "user": user}
1089 )
1090 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1091 if not result_dict or not isinstance(result_dict, dict):
1092 raise LcmException("Unknown response from RO when injecting key")
1093 for result in result_dict.values():
1094 if result.get("vim_result") == 200:
1095 break
1096 else:
1097 raise ROclient.ROClientException("error injecting key: {}".format(
1098 result.get("description")))
1099 break
1100 except NgRoException as e:
1101 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1102 except ROclient.ROClientException as e:
1103 if not nb_tries:
1104 self.logger.debug(logging_text + "error injecting key: {}. Retrying until {} seconds".
1105 format(e, 20*10))
1106 nb_tries += 1
1107 if nb_tries >= 20:
1108 raise LcmException("Reaching max tries injecting key. Error: {}".format(e))
1109 else:
1110 break
1111
1112 return ip_address
1113
1114 async def _wait_dependent_n2vc(self, nsr_id, vca_deployed_list, vca_index):
1115 """
1116 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1117 """
1118 my_vca = vca_deployed_list[vca_index]
1119 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1120 # vdu or kdu: no dependencies
1121 return
1122 timeout = 300
1123 while timeout >= 0:
1124 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1125 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1126 configuration_status_list = db_nsr["configurationStatus"]
1127 for index, vca_deployed in enumerate(configuration_status_list):
1128 if index == vca_index:
1129 # myself
1130 continue
1131 if not my_vca.get("member-vnf-index") or \
1132 (vca_deployed.get("member-vnf-index") == my_vca.get("member-vnf-index")):
1133 internal_status = configuration_status_list[index].get("status")
1134 if internal_status == 'READY':
1135 continue
1136 elif internal_status == 'BROKEN':
1137 raise LcmException("Configuration aborted because dependent charm/s has failed")
1138 else:
1139 break
1140 else:
1141 # no dependencies, return
1142 return
1143 await asyncio.sleep(10)
1144 timeout -= 1
1145
1146 raise LcmException("Configuration aborted because dependent charm/s timeout")
1147
1148 async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
1149 config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
1150 ee_config_descriptor):
1151 nsr_id = db_nsr["_id"]
1152 db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
1153 vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
1154 vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
1155 osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
1156 db_dict = {
1157 'collection': 'nsrs',
1158 'filter': {'_id': nsr_id},
1159 'path': db_update_entry
1160 }
1161 step = ""
1162 try:
1163
1164 element_type = 'NS'
1165 element_under_configuration = nsr_id
1166
1167 vnfr_id = None
1168 if db_vnfr:
1169 vnfr_id = db_vnfr["_id"]
1170 osm_config["osm"]["vnf_id"] = vnfr_id
1171
1172 namespace = "{nsi}.{ns}".format(
1173 nsi=nsi_id if nsi_id else "",
1174 ns=nsr_id)
1175
1176 if vnfr_id:
1177 element_type = 'VNF'
1178 element_under_configuration = vnfr_id
1179 namespace += ".{}-{}".format(vnfr_id, vdu_index or 0)
1180 if vdu_id:
1181 namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
1182 element_type = 'VDU'
1183 element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
1184 osm_config["osm"]["vdu_id"] = vdu_id
1185 elif kdu_name:
1186 namespace += ".{}.{}".format(kdu_name, vdu_index or 0)
1187 element_type = 'KDU'
1188 element_under_configuration = kdu_name
1189 osm_config["osm"]["kdu_name"] = kdu_name
1190
1191 # Get artifact path
1192 artifact_path = "{}/{}/{}/{}".format(
1193 base_folder["folder"],
1194 base_folder["pkg-dir"],
1195 "charms" if vca_type in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1196 vca_name
1197 )
1198
1199 self.logger.debug("Artifact path > {}".format(artifact_path))
1200
1201 # get initial_config_primitive_list that applies to this element
1202 initial_config_primitive_list = config_descriptor.get('initial-config-primitive')
1203
1204 self.logger.debug("Initial config primitive list > {}".format(initial_config_primitive_list))
1205
1206 # add config if not present for NS charm
1207 ee_descriptor_id = ee_config_descriptor.get("id")
1208 self.logger.debug("EE Descriptor > {}".format(ee_descriptor_id))
1209 initial_config_primitive_list = get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list,
1210 vca_deployed, ee_descriptor_id)
1211
1212 self.logger.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list))
1213 # n2vc_redesign STEP 3.1
1214 # find old ee_id if exists
1215 ee_id = vca_deployed.get("ee_id")
1216
1217 vim_account_id = (
1218 deep_get(db_vnfr, ("vim-account-id",)) or
1219 deep_get(deploy_params, ("OSM", "vim_account_id"))
1220 )
1221 vca_cloud, vca_cloud_credential = self.get_vca_cloud_and_credentials(vim_account_id)
1222 vca_k8s_cloud, vca_k8s_cloud_credential = self.get_vca_k8s_cloud_and_credentials(vim_account_id)
1223 # create or register execution environment in VCA
1224 if vca_type in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1225
1226 self._write_configuration_status(
1227 nsr_id=nsr_id,
1228 vca_index=vca_index,
1229 status='CREATING',
1230 element_under_configuration=element_under_configuration,
1231 element_type=element_type
1232 )
1233
1234 step = "create execution environment"
1235 self.logger.debug(logging_text + step)
1236
1237 ee_id = None
1238 credentials = None
1239 if vca_type == "k8s_proxy_charm":
1240 ee_id = await self.vca_map[vca_type].install_k8s_proxy_charm(
1241 charm_name=artifact_path[artifact_path.rfind("/") + 1:],
1242 namespace=namespace,
1243 artifact_path=artifact_path,
1244 db_dict=db_dict,
1245 cloud_name=vca_k8s_cloud,
1246 credential_name=vca_k8s_cloud_credential,
1247 )
1248 elif vca_type == "helm" or vca_type == "helm-v3":
1249 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1250 namespace=namespace,
1251 reuse_ee_id=ee_id,
1252 db_dict=db_dict,
1253 config=osm_config,
1254 artifact_path=artifact_path,
1255 vca_type=vca_type
1256 )
1257 else:
1258 ee_id, credentials = await self.vca_map[vca_type].create_execution_environment(
1259 namespace=namespace,
1260 reuse_ee_id=ee_id,
1261 db_dict=db_dict,
1262 cloud_name=vca_cloud,
1263 credential_name=vca_cloud_credential,
1264 )
1265
1266 elif vca_type == "native_charm":
1267 step = "Waiting to VM being up and getting IP address"
1268 self.logger.debug(logging_text + step)
1269 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id, vdu_index,
1270 user=None, pub_key=None)
1271 credentials = {"hostname": rw_mgmt_ip}
1272 # get username
1273 username = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1274 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1275 # merged. Meanwhile let's get username from initial-config-primitive
1276 if not username and initial_config_primitive_list:
1277 for config_primitive in initial_config_primitive_list:
1278 for param in config_primitive.get("parameter", ()):
1279 if param["name"] == "ssh-username":
1280 username = param["value"]
1281 break
1282 if not username:
1283 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1284 "'config-access.ssh-access.default-user'")
1285 credentials["username"] = username
1286 # n2vc_redesign STEP 3.2
1287
1288 self._write_configuration_status(
1289 nsr_id=nsr_id,
1290 vca_index=vca_index,
1291 status='REGISTERING',
1292 element_under_configuration=element_under_configuration,
1293 element_type=element_type
1294 )
1295
1296 step = "register execution environment {}".format(credentials)
1297 self.logger.debug(logging_text + step)
1298 ee_id = await self.vca_map[vca_type].register_execution_environment(
1299 credentials=credentials,
1300 namespace=namespace,
1301 db_dict=db_dict,
1302 cloud_name=vca_cloud,
1303 credential_name=vca_cloud_credential,
1304 )
1305
1306 # for compatibility with MON/POL modules, the need model and application name at database
1307 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1308 ee_id_parts = ee_id.split('.')
1309 db_nsr_update = {db_update_entry + "ee_id": ee_id}
1310 if len(ee_id_parts) >= 2:
1311 model_name = ee_id_parts[0]
1312 application_name = ee_id_parts[1]
1313 db_nsr_update[db_update_entry + "model"] = model_name
1314 db_nsr_update[db_update_entry + "application"] = application_name
1315
1316 # n2vc_redesign STEP 3.3
1317 step = "Install configuration Software"
1318
1319 self._write_configuration_status(
1320 nsr_id=nsr_id,
1321 vca_index=vca_index,
1322 status='INSTALLING SW',
1323 element_under_configuration=element_under_configuration,
1324 element_type=element_type,
1325 other_update=db_nsr_update
1326 )
1327
1328 # TODO check if already done
1329 self.logger.debug(logging_text + step)
1330 config = None
1331 if vca_type == "native_charm":
1332 config_primitive = next((p for p in initial_config_primitive_list if p["name"] == "config"), None)
1333 if config_primitive:
1334 config = self._map_primitive_params(
1335 config_primitive,
1336 {},
1337 deploy_params
1338 )
1339 num_units = 1
1340 if vca_type == "lxc_proxy_charm":
1341 if element_type == "NS":
1342 num_units = db_nsr.get("config-units") or 1
1343 elif element_type == "VNF":
1344 num_units = db_vnfr.get("config-units") or 1
1345 elif element_type == "VDU":
1346 for v in db_vnfr["vdur"]:
1347 if vdu_id == v["vdu-id-ref"]:
1348 num_units = v.get("config-units") or 1
1349 break
1350 if vca_type != "k8s_proxy_charm":
1351 await self.vca_map[vca_type].install_configuration_sw(
1352 ee_id=ee_id,
1353 artifact_path=artifact_path,
1354 db_dict=db_dict,
1355 config=config,
1356 num_units=num_units,
1357 )
1358
1359 # write in db flag of configuration_sw already installed
1360 self.update_db_2("nsrs", nsr_id, {db_update_entry + "config_sw_installed": True})
1361
1362 # add relations for this VCA (wait for other peers related with this VCA)
1363 await self._add_vca_relations(logging_text=logging_text, nsr_id=nsr_id,
1364 vca_index=vca_index, vca_type=vca_type)
1365
1366 # if SSH access is required, then get execution environment SSH public
1367 # if native charm we have waited already to VM be UP
1368 if vca_type in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1369 pub_key = None
1370 user = None
1371 # self.logger.debug("get ssh key block")
1372 if deep_get(config_descriptor, ("config-access", "ssh-access", "required")):
1373 # self.logger.debug("ssh key needed")
1374 # Needed to inject a ssh key
1375 user = deep_get(config_descriptor, ("config-access", "ssh-access", "default-user"))
1376 step = "Install configuration Software, getting public ssh key"
1377 pub_key = await self.vca_map[vca_type].get_ee_ssh_public__key(ee_id=ee_id, db_dict=db_dict)
1378
1379 step = "Insert public key into VM user={} ssh_key={}".format(user, pub_key)
1380 else:
1381 # self.logger.debug("no need to get ssh key")
1382 step = "Waiting to VM being up and getting IP address"
1383 self.logger.debug(logging_text + step)
1384
1385 # n2vc_redesign STEP 5.1
1386 # wait for RO (ip-address) Insert pub_key into VM
1387 if vnfr_id:
1388 if kdu_name:
1389 rw_mgmt_ip = await self.wait_kdu_up(logging_text, nsr_id, vnfr_id, kdu_name)
1390 else:
1391 rw_mgmt_ip = await self.wait_vm_up_insert_key_ro(logging_text, nsr_id, vnfr_id, vdu_id,
1392 vdu_index, user=user, pub_key=pub_key)
1393 else:
1394 rw_mgmt_ip = None # This is for a NS configuration
1395
1396 self.logger.debug(logging_text + ' VM_ip_address={}'.format(rw_mgmt_ip))
1397
1398 # store rw_mgmt_ip in deploy params for later replacement
1399 deploy_params["rw_mgmt_ip"] = rw_mgmt_ip
1400
1401 # n2vc_redesign STEP 6 Execute initial config primitive
1402 step = 'execute initial config primitive'
1403
1404 # wait for dependent primitives execution (NS -> VNF -> VDU)
1405 if initial_config_primitive_list:
1406 await self._wait_dependent_n2vc(nsr_id, vca_deployed_list, vca_index)
1407
1408 # stage, in function of element type: vdu, kdu, vnf or ns
1409 my_vca = vca_deployed_list[vca_index]
1410 if my_vca.get("vdu_id") or my_vca.get("kdu_name"):
1411 # VDU or KDU
1412 stage[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1413 elif my_vca.get("member-vnf-index"):
1414 # VNF
1415 stage[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1416 else:
1417 # NS
1418 stage[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1419
1420 self._write_configuration_status(
1421 nsr_id=nsr_id,
1422 vca_index=vca_index,
1423 status='EXECUTING PRIMITIVE'
1424 )
1425
1426 self._write_op_status(
1427 op_id=nslcmop_id,
1428 stage=stage
1429 )
1430
1431 check_if_terminated_needed = True
1432 for initial_config_primitive in initial_config_primitive_list:
1433 # adding information on the vca_deployed if it is a NS execution environment
1434 if not vca_deployed["member-vnf-index"]:
1435 deploy_params["ns_config_info"] = json.dumps(self._get_ns_config_info(nsr_id))
1436 # TODO check if already done
1437 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, deploy_params)
1438
1439 step = "execute primitive '{}' params '{}'".format(initial_config_primitive["name"], primitive_params_)
1440 self.logger.debug(logging_text + step)
1441 await self.vca_map[vca_type].exec_primitive(
1442 ee_id=ee_id,
1443 primitive_name=initial_config_primitive["name"],
1444 params_dict=primitive_params_,
1445 db_dict=db_dict
1446 )
1447 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1448 if check_if_terminated_needed:
1449 if config_descriptor.get('terminate-config-primitive'):
1450 self.update_db_2("nsrs", nsr_id, {db_update_entry + "needed_terminate": True})
1451 check_if_terminated_needed = False
1452
1453 # TODO register in database that primitive is done
1454
1455 # STEP 7 Configure metrics
1456 if vca_type == "helm" or vca_type == "helm-v3":
1457 prometheus_jobs = await self.add_prometheus_metrics(
1458 ee_id=ee_id,
1459 artifact_path=artifact_path,
1460 ee_config_descriptor=ee_config_descriptor,
1461 vnfr_id=vnfr_id,
1462 nsr_id=nsr_id,
1463 target_ip=rw_mgmt_ip,
1464 )
1465 if prometheus_jobs:
1466 self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
1467
1468 step = "instantiated at VCA"
1469 self.logger.debug(logging_text + step)
1470
1471 self._write_configuration_status(
1472 nsr_id=nsr_id,
1473 vca_index=vca_index,
1474 status='READY'
1475 )
1476
1477 except Exception as e: # TODO not use Exception but N2VC exception
1478 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1479 if not isinstance(e, (DbException, N2VCException, LcmException, asyncio.CancelledError)):
1480 self.logger.error("Exception while {} : {}".format(step, e), exc_info=True)
1481 self._write_configuration_status(
1482 nsr_id=nsr_id,
1483 vca_index=vca_index,
1484 status='BROKEN'
1485 )
1486 raise LcmException("{} {}".format(step, e)) from e
1487
1488 def _write_ns_status(self, nsr_id: str, ns_state: str, current_operation: str, current_operation_id: str,
1489 error_description: str = None, error_detail: str = None, other_update: dict = None):
1490 """
1491 Update db_nsr fields.
1492 :param nsr_id:
1493 :param ns_state:
1494 :param current_operation:
1495 :param current_operation_id:
1496 :param error_description:
1497 :param error_detail:
1498 :param other_update: Other required changes at database if provided, will be cleared
1499 :return:
1500 """
1501 try:
1502 db_dict = other_update or {}
1503 db_dict["_admin.nslcmop"] = current_operation_id # for backward compatibility
1504 db_dict["_admin.current-operation"] = current_operation_id
1505 db_dict["_admin.operation-type"] = current_operation if current_operation != "IDLE" else None
1506 db_dict["currentOperation"] = current_operation
1507 db_dict["currentOperationID"] = current_operation_id
1508 db_dict["errorDescription"] = error_description
1509 db_dict["errorDetail"] = error_detail
1510
1511 if ns_state:
1512 db_dict["nsState"] = ns_state
1513 self.update_db_2("nsrs", nsr_id, db_dict)
1514 except DbException as e:
1515 self.logger.warn('Error writing NS status, ns={}: {}'.format(nsr_id, e))
1516
1517 def _write_op_status(self, op_id: str, stage: list = None, error_message: str = None, queuePosition: int = 0,
1518 operation_state: str = None, other_update: dict = None):
1519 try:
1520 db_dict = other_update or {}
1521 db_dict['queuePosition'] = queuePosition
1522 if isinstance(stage, list):
1523 db_dict['stage'] = stage[0]
1524 db_dict['detailed-status'] = " ".join(stage)
1525 elif stage is not None:
1526 db_dict['stage'] = str(stage)
1527
1528 if error_message is not None:
1529 db_dict['errorMessage'] = error_message
1530 if operation_state is not None:
1531 db_dict['operationState'] = operation_state
1532 db_dict["statusEnteredTime"] = time()
1533 self.update_db_2("nslcmops", op_id, db_dict)
1534 except DbException as e:
1535 self.logger.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id, e))
1536
1537 def _write_all_config_status(self, db_nsr: dict, status: str):
1538 try:
1539 nsr_id = db_nsr["_id"]
1540 # configurationStatus
1541 config_status = db_nsr.get('configurationStatus')
1542 if config_status:
1543 db_nsr_update = {"configurationStatus.{}.status".format(index): status for index, v in
1544 enumerate(config_status) if v}
1545 # update status
1546 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1547
1548 except DbException as e:
1549 self.logger.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id, e))
1550
1551 def _write_configuration_status(self, nsr_id: str, vca_index: int, status: str = None,
1552 element_under_configuration: str = None, element_type: str = None,
1553 other_update: dict = None):
1554
1555 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1556 # .format(vca_index, status))
1557
1558 try:
1559 db_path = 'configurationStatus.{}.'.format(vca_index)
1560 db_dict = other_update or {}
1561 if status:
1562 db_dict[db_path + 'status'] = status
1563 if element_under_configuration:
1564 db_dict[db_path + 'elementUnderConfiguration'] = element_under_configuration
1565 if element_type:
1566 db_dict[db_path + 'elementType'] = element_type
1567 self.update_db_2("nsrs", nsr_id, db_dict)
1568 except DbException as e:
1569 self.logger.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1570 .format(status, nsr_id, vca_index, e))
1571
1572 async def _do_placement(self, logging_text, db_nslcmop, db_vnfrs):
1573 """
1574 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1575 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1576 Database is used because the result can be obtained from a different LCM worker in case of HA.
1577 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1578 :param db_nslcmop: database content of nslcmop
1579 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1580 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1581 computed 'vim-account-id'
1582 """
1583 modified = False
1584 nslcmop_id = db_nslcmop['_id']
1585 placement_engine = deep_get(db_nslcmop, ('operationParams', 'placement-engine'))
1586 if placement_engine == "PLA":
1587 self.logger.debug(logging_text + "Invoke and wait for placement optimization")
1588 await self.msg.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id}, loop=self.loop)
1589 db_poll_interval = 5
1590 wait = db_poll_interval * 10
1591 pla_result = None
1592 while not pla_result and wait >= 0:
1593 await asyncio.sleep(db_poll_interval)
1594 wait -= db_poll_interval
1595 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1596 pla_result = deep_get(db_nslcmop, ('_admin', 'pla'))
1597
1598 if not pla_result:
1599 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id))
1600
1601 for pla_vnf in pla_result['vnf']:
1602 vnfr = db_vnfrs.get(pla_vnf['member-vnf-index'])
1603 if not pla_vnf.get('vimAccountId') or not vnfr:
1604 continue
1605 modified = True
1606 self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, {"vim-account-id": pla_vnf['vimAccountId']})
1607 # Modifies db_vnfrs
1608 vnfr["vim-account-id"] = pla_vnf['vimAccountId']
1609 return modified
1610
1611 def update_nsrs_with_pla_result(self, params):
1612 try:
1613 nslcmop_id = deep_get(params, ('placement', 'nslcmopId'))
1614 self.update_db_2("nslcmops", nslcmop_id, {"_admin.pla": params.get('placement')})
1615 except Exception as e:
1616 self.logger.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id, e))
1617
1618 async def instantiate(self, nsr_id, nslcmop_id):
1619 """
1620
1621 :param nsr_id: ns instance to deploy
1622 :param nslcmop_id: operation to run
1623 :return:
1624 """
1625
1626 # Try to lock HA task here
1627 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
1628 if not task_is_locked_by_me:
1629 self.logger.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id))
1630 return
1631
1632 logging_text = "Task ns={} instantiate={} ".format(nsr_id, nslcmop_id)
1633 self.logger.debug(logging_text + "Enter")
1634
1635 # get all needed from database
1636
1637 # database nsrs record
1638 db_nsr = None
1639
1640 # database nslcmops record
1641 db_nslcmop = None
1642
1643 # update operation on nsrs
1644 db_nsr_update = {}
1645 # update operation on nslcmops
1646 db_nslcmop_update = {}
1647
1648 nslcmop_operation_state = None
1649 db_vnfrs = {} # vnf's info indexed by member-index
1650 # n2vc_info = {}
1651 tasks_dict_info = {} # from task to info text
1652 exc = None
1653 error_list = []
1654 stage = ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1655 # ^ stage, step, VIM progress
1656 try:
1657 # wait for any previous tasks in process
1658 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
1659
1660 stage[1] = "Sync filesystem from database."
1661 self.fs.sync() # TODO, make use of partial sync, only for the needed packages
1662
1663 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1664 stage[1] = "Reading from database."
1665 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1666 db_nsr_update["detailed-status"] = "creating"
1667 db_nsr_update["operational-status"] = "init"
1668 self._write_ns_status(
1669 nsr_id=nsr_id,
1670 ns_state="BUILDING",
1671 current_operation="INSTANTIATING",
1672 current_operation_id=nslcmop_id,
1673 other_update=db_nsr_update
1674 )
1675 self._write_op_status(
1676 op_id=nslcmop_id,
1677 stage=stage,
1678 queuePosition=0
1679 )
1680
1681 # read from db: operation
1682 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
1683 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
1684 ns_params = db_nslcmop.get("operationParams")
1685 if ns_params and ns_params.get("timeout_ns_deploy"):
1686 timeout_ns_deploy = ns_params["timeout_ns_deploy"]
1687 else:
1688 timeout_ns_deploy = self.timeout.get("ns_deploy", self.timeout_ns_deploy)
1689
1690 # read from db: ns
1691 stage[1] = "Getting nsr={} from db.".format(nsr_id)
1692 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
1693 stage[1] = "Getting nsd={} from db.".format(db_nsr["nsd-id"])
1694 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
1695 db_nsr["nsd"] = nsd
1696 # nsr_name = db_nsr["name"] # TODO short-name??
1697
1698 # read from db: vnf's of this ns
1699 stage[1] = "Getting vnfrs from db."
1700 self.logger.debug(logging_text + stage[1])
1701 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
1702
1703 # read from db: vnfd's for every vnf
1704 db_vnfds = [] # every vnfd data
1705
1706 # for each vnf in ns, read vnfd
1707 for vnfr in db_vnfrs_list:
1708 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
1709 vnfd_id = vnfr["vnfd-id"]
1710 vnfd_ref = vnfr["vnfd-ref"]
1711
1712 # if we haven't this vnfd, read it from db
1713 if vnfd_id not in db_vnfds:
1714 # read from db
1715 stage[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id, vnfd_ref)
1716 self.logger.debug(logging_text + stage[1])
1717 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
1718
1719 # store vnfd
1720 db_vnfds.append(vnfd)
1721
1722 # Get or generates the _admin.deployed.VCA list
1723 vca_deployed_list = None
1724 if db_nsr["_admin"].get("deployed"):
1725 vca_deployed_list = db_nsr["_admin"]["deployed"].get("VCA")
1726 if vca_deployed_list is None:
1727 vca_deployed_list = []
1728 configuration_status_list = []
1729 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1730 db_nsr_update["configurationStatus"] = configuration_status_list
1731 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1732 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1733 elif isinstance(vca_deployed_list, dict):
1734 # maintain backward compatibility. Change a dict to list at database
1735 vca_deployed_list = list(vca_deployed_list.values())
1736 db_nsr_update["_admin.deployed.VCA"] = vca_deployed_list
1737 populate_dict(db_nsr, ("_admin", "deployed", "VCA"), vca_deployed_list)
1738
1739 if not isinstance(deep_get(db_nsr, ("_admin", "deployed", "RO", "vnfd")), list):
1740 populate_dict(db_nsr, ("_admin", "deployed", "RO", "vnfd"), [])
1741 db_nsr_update["_admin.deployed.RO.vnfd"] = []
1742
1743 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1744 db_nsr_update["_admin.nsState"] = "INSTANTIATED"
1745 self.update_db_2("nsrs", nsr_id, db_nsr_update)
1746 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "INSTANTIATED"})
1747
1748 # n2vc_redesign STEP 2 Deploy Network Scenario
1749 stage[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1750 self._write_op_status(
1751 op_id=nslcmop_id,
1752 stage=stage
1753 )
1754
1755 stage[1] = "Deploying KDUs."
1756 # self.logger.debug(logging_text + "Before deploy_kdus")
1757 # Call to deploy_kdus in case exists the "vdu:kdu" param
1758 await self.deploy_kdus(
1759 logging_text=logging_text,
1760 nsr_id=nsr_id,
1761 nslcmop_id=nslcmop_id,
1762 db_vnfrs=db_vnfrs,
1763 db_vnfds=db_vnfds,
1764 task_instantiation_info=tasks_dict_info,
1765 )
1766
1767 stage[1] = "Getting VCA public key."
1768 # n2vc_redesign STEP 1 Get VCA public ssh-key
1769 # feature 1429. Add n2vc public key to needed VMs
1770 n2vc_key = self.n2vc.get_public_key()
1771 n2vc_key_list = [n2vc_key]
1772 if self.vca_config.get("public_key"):
1773 n2vc_key_list.append(self.vca_config["public_key"])
1774
1775 stage[1] = "Deploying NS at VIM."
1776 task_ro = asyncio.ensure_future(
1777 self.instantiate_RO(
1778 logging_text=logging_text,
1779 nsr_id=nsr_id,
1780 nsd=nsd,
1781 db_nsr=db_nsr,
1782 db_nslcmop=db_nslcmop,
1783 db_vnfrs=db_vnfrs,
1784 db_vnfds=db_vnfds,
1785 n2vc_key_list=n2vc_key_list,
1786 stage=stage
1787 )
1788 )
1789 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_RO", task_ro)
1790 tasks_dict_info[task_ro] = "Deploying at VIM"
1791
1792 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1793 stage[1] = "Deploying Execution Environments."
1794 self.logger.debug(logging_text + stage[1])
1795
1796 nsi_id = None # TODO put nsi_id when this nsr belongs to a NSI
1797 for vnf_profile in get_vnf_profiles(nsd):
1798 vnfd_id = vnf_profile["vnfd-id"]
1799 vnfd = find_in_list(db_vnfds, lambda a_vnf: a_vnf["id"] == vnfd_id)
1800 member_vnf_index = str(vnf_profile["id"])
1801 db_vnfr = db_vnfrs[member_vnf_index]
1802 base_folder = vnfd["_admin"]["storage"]
1803 vdu_id = None
1804 vdu_index = 0
1805 vdu_name = None
1806 kdu_name = None
1807
1808 # Get additional parameters
1809 deploy_params = {"OSM": get_osm_params(db_vnfr)}
1810 if db_vnfr.get("additionalParamsForVnf"):
1811 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
1812
1813 descriptor_config = get_configuration(vnfd, vnfd["id"])
1814 if descriptor_config:
1815 self._deploy_n2vc(
1816 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
1817 db_nsr=db_nsr,
1818 db_vnfr=db_vnfr,
1819 nslcmop_id=nslcmop_id,
1820 nsr_id=nsr_id,
1821 nsi_id=nsi_id,
1822 vnfd_id=vnfd_id,
1823 vdu_id=vdu_id,
1824 kdu_name=kdu_name,
1825 member_vnf_index=member_vnf_index,
1826 vdu_index=vdu_index,
1827 vdu_name=vdu_name,
1828 deploy_params=deploy_params,
1829 descriptor_config=descriptor_config,
1830 base_folder=base_folder,
1831 task_instantiation_info=tasks_dict_info,
1832 stage=stage
1833 )
1834
1835 # Deploy charms for each VDU that supports one.
1836 for vdud in get_vdu_list(vnfd):
1837 vdu_id = vdud["id"]
1838 descriptor_config = get_configuration(vnfd, vdu_id)
1839 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
1840
1841 if vdur.get("additionalParams"):
1842 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
1843 else:
1844 deploy_params_vdu = deploy_params
1845 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=0)
1846 vdud_count = get_vdu_profile(vnfd, vdu_id).get("max-number-of-instances", 1)
1847
1848 self.logger.debug("VDUD > {}".format(vdud))
1849 self.logger.debug("Descriptor config > {}".format(descriptor_config))
1850 if descriptor_config:
1851 vdu_name = None
1852 kdu_name = None
1853 for vdu_index in range(vdud_count):
1854 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1855 self._deploy_n2vc(
1856 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1857 member_vnf_index, vdu_id, vdu_index),
1858 db_nsr=db_nsr,
1859 db_vnfr=db_vnfr,
1860 nslcmop_id=nslcmop_id,
1861 nsr_id=nsr_id,
1862 nsi_id=nsi_id,
1863 vnfd_id=vnfd_id,
1864 vdu_id=vdu_id,
1865 kdu_name=kdu_name,
1866 member_vnf_index=member_vnf_index,
1867 vdu_index=vdu_index,
1868 vdu_name=vdu_name,
1869 deploy_params=deploy_params_vdu,
1870 descriptor_config=descriptor_config,
1871 base_folder=base_folder,
1872 task_instantiation_info=tasks_dict_info,
1873 stage=stage
1874 )
1875 for kdud in get_kdu_list(vnfd):
1876 kdu_name = kdud["name"]
1877 descriptor_config = get_configuration(vnfd, kdu_name)
1878 if descriptor_config:
1879 vdu_id = None
1880 vdu_index = 0
1881 vdu_name = None
1882 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
1883 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
1884 if kdur.get("additionalParams"):
1885 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
1886
1887 self._deploy_n2vc(
1888 logging_text=logging_text,
1889 db_nsr=db_nsr,
1890 db_vnfr=db_vnfr,
1891 nslcmop_id=nslcmop_id,
1892 nsr_id=nsr_id,
1893 nsi_id=nsi_id,
1894 vnfd_id=vnfd_id,
1895 vdu_id=vdu_id,
1896 kdu_name=kdu_name,
1897 member_vnf_index=member_vnf_index,
1898 vdu_index=vdu_index,
1899 vdu_name=vdu_name,
1900 deploy_params=deploy_params_kdu,
1901 descriptor_config=descriptor_config,
1902 base_folder=base_folder,
1903 task_instantiation_info=tasks_dict_info,
1904 stage=stage
1905 )
1906
1907 # Check if this NS has a charm configuration
1908 descriptor_config = nsd.get("ns-configuration")
1909 if descriptor_config and descriptor_config.get("juju"):
1910 vnfd_id = None
1911 db_vnfr = None
1912 member_vnf_index = None
1913 vdu_id = None
1914 kdu_name = None
1915 vdu_index = 0
1916 vdu_name = None
1917
1918 # Get additional parameters
1919 deploy_params = {"OSM": {"vim_account_id": ns_params["vimAccountId"]}}
1920 if db_nsr.get("additionalParamsForNs"):
1921 deploy_params.update(parse_yaml_strings(db_nsr["additionalParamsForNs"].copy()))
1922 base_folder = nsd["_admin"]["storage"]
1923 self._deploy_n2vc(
1924 logging_text=logging_text,
1925 db_nsr=db_nsr,
1926 db_vnfr=db_vnfr,
1927 nslcmop_id=nslcmop_id,
1928 nsr_id=nsr_id,
1929 nsi_id=nsi_id,
1930 vnfd_id=vnfd_id,
1931 vdu_id=vdu_id,
1932 kdu_name=kdu_name,
1933 member_vnf_index=member_vnf_index,
1934 vdu_index=vdu_index,
1935 vdu_name=vdu_name,
1936 deploy_params=deploy_params,
1937 descriptor_config=descriptor_config,
1938 base_folder=base_folder,
1939 task_instantiation_info=tasks_dict_info,
1940 stage=stage
1941 )
1942
1943 # rest of staff will be done at finally
1944
1945 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
1946 self.logger.error(logging_text + "Exit Exception while '{}': {}".format(stage[1], e))
1947 exc = e
1948 except asyncio.CancelledError:
1949 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
1950 exc = "Operation was cancelled"
1951 except Exception as e:
1952 exc = traceback.format_exc()
1953 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
1954 finally:
1955 if exc:
1956 error_list.append(str(exc))
1957 try:
1958 # wait for pending tasks
1959 if tasks_dict_info:
1960 stage[1] = "Waiting for instantiate pending tasks."
1961 self.logger.debug(logging_text + stage[1])
1962 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_deploy,
1963 stage, nslcmop_id, nsr_id=nsr_id)
1964 stage[1] = stage[2] = ""
1965 except asyncio.CancelledError:
1966 error_list.append("Cancelled")
1967 # TODO cancel all tasks
1968 except Exception as exc:
1969 error_list.append(str(exc))
1970
1971 # update operation-status
1972 db_nsr_update["operational-status"] = "running"
1973 # let's begin with VCA 'configured' status (later we can change it)
1974 db_nsr_update["config-status"] = "configured"
1975 for task, task_name in tasks_dict_info.items():
1976 if not task.done() or task.cancelled() or task.exception():
1977 if task_name.startswith(self.task_name_deploy_vca):
1978 # A N2VC task is pending
1979 db_nsr_update["config-status"] = "failed"
1980 else:
1981 # RO or KDU task is pending
1982 db_nsr_update["operational-status"] = "failed"
1983
1984 # update status at database
1985 if error_list:
1986 error_detail = ". ".join(error_list)
1987 self.logger.error(logging_text + error_detail)
1988 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
1989 error_description_nsr = 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id, stage[0])
1990
1991 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
1992 db_nslcmop_update["detailed-status"] = error_detail
1993 nslcmop_operation_state = "FAILED"
1994 ns_state = "BROKEN"
1995 else:
1996 error_detail = None
1997 error_description_nsr = error_description_nslcmop = None
1998 ns_state = "READY"
1999 db_nsr_update["detailed-status"] = "Done"
2000 db_nslcmop_update["detailed-status"] = "Done"
2001 nslcmop_operation_state = "COMPLETED"
2002
2003 if db_nsr:
2004 self._write_ns_status(
2005 nsr_id=nsr_id,
2006 ns_state=ns_state,
2007 current_operation="IDLE",
2008 current_operation_id=None,
2009 error_description=error_description_nsr,
2010 error_detail=error_detail,
2011 other_update=db_nsr_update
2012 )
2013 self._write_op_status(
2014 op_id=nslcmop_id,
2015 stage="",
2016 error_message=error_description_nslcmop,
2017 operation_state=nslcmop_operation_state,
2018 other_update=db_nslcmop_update,
2019 )
2020
2021 if nslcmop_operation_state:
2022 try:
2023 await self.msg.aiowrite("ns", "instantiated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
2024 "operationState": nslcmop_operation_state},
2025 loop=self.loop)
2026 except Exception as e:
2027 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
2028
2029 self.logger.debug(logging_text + "Exit")
2030 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_instantiate")
2031
2032 async def _add_vca_relations(self, logging_text, nsr_id, vca_index: int,
2033 timeout: int = 3600, vca_type: str = None) -> bool:
2034
2035 # steps:
2036 # 1. find all relations for this VCA
2037 # 2. wait for other peers related
2038 # 3. add relations
2039
2040 try:
2041 vca_type = vca_type or "lxc_proxy_charm"
2042
2043 # STEP 1: find all relations for this VCA
2044
2045 # read nsr record
2046 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2047 nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
2048
2049 # this VCA data
2050 my_vca = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))[vca_index]
2051
2052 # read all ns-configuration relations
2053 ns_relations = list()
2054 db_ns_relations = deep_get(nsd, ('ns-configuration', 'relation'))
2055 if db_ns_relations:
2056 for r in db_ns_relations:
2057 # check if this VCA is in the relation
2058 if my_vca.get('member-vnf-index') in\
2059 (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2060 ns_relations.append(r)
2061
2062 # read all vnf-configuration relations
2063 vnf_relations = list()
2064 db_vnfd_list = db_nsr.get('vnfd-id')
2065 if db_vnfd_list:
2066 for vnfd in db_vnfd_list:
2067 db_vnfd = self.db.get_one("vnfds", {"_id": vnfd})
2068 db_vnf_relations = get_configuration(db_vnfd, db_vnfd["id"]).get("relation", [])
2069 if db_vnf_relations:
2070 for r in db_vnf_relations:
2071 # check if this VCA is in the relation
2072 if my_vca.get('vdu_id') in (r.get('entities')[0].get('id'), r.get('entities')[1].get('id')):
2073 vnf_relations.append(r)
2074
2075 # if no relations, terminate
2076 if not ns_relations and not vnf_relations:
2077 self.logger.debug(logging_text + ' No relations')
2078 return True
2079
2080 self.logger.debug(logging_text + ' adding relations\n {}\n {}'.format(ns_relations, vnf_relations))
2081
2082 # add all relations
2083 start = time()
2084 while True:
2085 # check timeout
2086 now = time()
2087 if now - start >= timeout:
2088 self.logger.error(logging_text + ' : timeout adding relations')
2089 return False
2090
2091 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2092 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
2093
2094 # for each defined NS relation, find the VCA's related
2095 for r in ns_relations.copy():
2096 from_vca_ee_id = None
2097 to_vca_ee_id = None
2098 from_vca_endpoint = None
2099 to_vca_endpoint = None
2100 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2101 for vca in vca_list:
2102 if vca.get('member-vnf-index') == r.get('entities')[0].get('id') \
2103 and vca.get('config_sw_installed'):
2104 from_vca_ee_id = vca.get('ee_id')
2105 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2106 if vca.get('member-vnf-index') == r.get('entities')[1].get('id') \
2107 and vca.get('config_sw_installed'):
2108 to_vca_ee_id = vca.get('ee_id')
2109 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2110 if from_vca_ee_id and to_vca_ee_id:
2111 # add relation
2112 await self.vca_map[vca_type].add_relation(
2113 ee_id_1=from_vca_ee_id,
2114 ee_id_2=to_vca_ee_id,
2115 endpoint_1=from_vca_endpoint,
2116 endpoint_2=to_vca_endpoint)
2117 # remove entry from relations list
2118 ns_relations.remove(r)
2119 else:
2120 # check failed peers
2121 try:
2122 vca_status_list = db_nsr.get('configurationStatus')
2123 if vca_status_list:
2124 for i in range(len(vca_list)):
2125 vca = vca_list[i]
2126 vca_status = vca_status_list[i]
2127 if vca.get('member-vnf-index') == r.get('entities')[0].get('id'):
2128 if vca_status.get('status') == 'BROKEN':
2129 # peer broken: remove relation from list
2130 ns_relations.remove(r)
2131 if vca.get('member-vnf-index') == r.get('entities')[1].get('id'):
2132 if vca_status.get('status') == 'BROKEN':
2133 # peer broken: remove relation from list
2134 ns_relations.remove(r)
2135 except Exception:
2136 # ignore
2137 pass
2138
2139 # for each defined VNF relation, find the VCA's related
2140 for r in vnf_relations.copy():
2141 from_vca_ee_id = None
2142 to_vca_ee_id = None
2143 from_vca_endpoint = None
2144 to_vca_endpoint = None
2145 vca_list = deep_get(db_nsr, ('_admin', 'deployed', 'VCA'))
2146 for vca in vca_list:
2147 key_to_check = "vdu_id"
2148 if vca.get("vdu_id") is None:
2149 key_to_check = "vnfd_id"
2150 if vca.get(key_to_check) == r.get('entities')[0].get('id') and vca.get('config_sw_installed'):
2151 from_vca_ee_id = vca.get('ee_id')
2152 from_vca_endpoint = r.get('entities')[0].get('endpoint')
2153 if vca.get(key_to_check) == r.get('entities')[1].get('id') and vca.get('config_sw_installed'):
2154 to_vca_ee_id = vca.get('ee_id')
2155 to_vca_endpoint = r.get('entities')[1].get('endpoint')
2156 if from_vca_ee_id and to_vca_ee_id:
2157 # add relation
2158 await self.vca_map[vca_type].add_relation(
2159 ee_id_1=from_vca_ee_id,
2160 ee_id_2=to_vca_ee_id,
2161 endpoint_1=from_vca_endpoint,
2162 endpoint_2=to_vca_endpoint)
2163 # remove entry from relations list
2164 vnf_relations.remove(r)
2165 else:
2166 # check failed peers
2167 try:
2168 vca_status_list = db_nsr.get('configurationStatus')
2169 if vca_status_list:
2170 for i in range(len(vca_list)):
2171 vca = vca_list[i]
2172 vca_status = vca_status_list[i]
2173 if vca.get('vdu_id') == r.get('entities')[0].get('id'):
2174 if vca_status.get('status') == 'BROKEN':
2175 # peer broken: remove relation from list
2176 vnf_relations.remove(r)
2177 if vca.get('vdu_id') == r.get('entities')[1].get('id'):
2178 if vca_status.get('status') == 'BROKEN':
2179 # peer broken: remove relation from list
2180 vnf_relations.remove(r)
2181 except Exception:
2182 # ignore
2183 pass
2184
2185 # wait for next try
2186 await asyncio.sleep(5.0)
2187
2188 if not ns_relations and not vnf_relations:
2189 self.logger.debug('Relations added')
2190 break
2191
2192 return True
2193
2194 except Exception as e:
2195 self.logger.warn(logging_text + ' ERROR adding relations: {}'.format(e))
2196 return False
2197
2198 async def _install_kdu(self, nsr_id: str, nsr_db_path: str, vnfr_data: dict, kdu_index: int, kdud: dict,
2199 vnfd: dict, k8s_instance_info: dict, k8params: dict = None, timeout: int = 600):
2200
2201 try:
2202 k8sclustertype = k8s_instance_info["k8scluster-type"]
2203 # Instantiate kdu
2204 db_dict_install = {"collection": "nsrs",
2205 "filter": {"_id": nsr_id},
2206 "path": nsr_db_path}
2207
2208 kdu_instance = self.k8scluster_map[k8sclustertype].generate_kdu_instance_name(
2209 db_dict=db_dict_install,
2210 kdu_model=k8s_instance_info["kdu-model"],
2211 kdu_name=k8s_instance_info["kdu-name"],
2212 )
2213 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2214 await self.k8scluster_map[k8sclustertype].install(
2215 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2216 kdu_model=k8s_instance_info["kdu-model"],
2217 atomic=True,
2218 params=k8params,
2219 db_dict=db_dict_install,
2220 timeout=timeout,
2221 kdu_name=k8s_instance_info["kdu-name"],
2222 namespace=k8s_instance_info["namespace"],
2223 kdu_instance=kdu_instance,
2224 )
2225 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".kdu-instance": kdu_instance})
2226
2227 # Obtain services to obtain management service ip
2228 services = await self.k8scluster_map[k8sclustertype].get_services(
2229 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2230 kdu_instance=kdu_instance,
2231 namespace=k8s_instance_info["namespace"])
2232
2233 # Obtain management service info (if exists)
2234 vnfr_update_dict = {}
2235 if services:
2236 vnfr_update_dict["kdur.{}.services".format(kdu_index)] = services
2237 mgmt_services = [service for service in kdud.get("service", []) if service.get("mgmt-service")]
2238 for mgmt_service in mgmt_services:
2239 for service in services:
2240 if service["name"].startswith(mgmt_service["name"]):
2241 # Mgmt service found, Obtain service ip
2242 ip = service.get("external_ip", service.get("cluster_ip"))
2243 if isinstance(ip, list) and len(ip) == 1:
2244 ip = ip[0]
2245
2246 vnfr_update_dict["kdur.{}.ip-address".format(kdu_index)] = ip
2247
2248 # Check if must update also mgmt ip at the vnf
2249 service_external_cp = mgmt_service.get("external-connection-point-ref")
2250 if service_external_cp:
2251 if deep_get(vnfd, ("mgmt-interface", "cp")) == service_external_cp:
2252 vnfr_update_dict["ip-address"] = ip
2253
2254 break
2255 else:
2256 self.logger.warn("Mgmt service name: {} not found".format(mgmt_service["name"]))
2257
2258 vnfr_update_dict["kdur.{}.status".format(kdu_index)] = "READY"
2259 self.update_db_2("vnfrs", vnfr_data.get("_id"), vnfr_update_dict)
2260
2261 kdu_config = get_configuration(vnfd, k8s_instance_info["kdu-name"])
2262 if kdu_config and kdu_config.get("initial-config-primitive") and \
2263 get_juju_ee_ref(vnfd, k8s_instance_info["kdu-name"]) is None:
2264 initial_config_primitive_list = kdu_config.get("initial-config-primitive")
2265 initial_config_primitive_list.sort(key=lambda val: int(val["seq"]))
2266
2267 for initial_config_primitive in initial_config_primitive_list:
2268 primitive_params_ = self._map_primitive_params(initial_config_primitive, {}, {})
2269
2270 await asyncio.wait_for(
2271 self.k8scluster_map[k8sclustertype].exec_primitive(
2272 cluster_uuid=k8s_instance_info["k8scluster-uuid"],
2273 kdu_instance=kdu_instance,
2274 primitive_name=initial_config_primitive["name"],
2275 params=primitive_params_, db_dict={}),
2276 timeout=timeout)
2277
2278 except Exception as e:
2279 # Prepare update db with error and raise exception
2280 try:
2281 self.update_db_2("nsrs", nsr_id, {nsr_db_path + ".detailed-status": str(e)})
2282 self.update_db_2("vnfrs", vnfr_data.get("_id"), {"kdur.{}.status".format(kdu_index): "ERROR"})
2283 except Exception:
2284 # ignore to keep original exception
2285 pass
2286 # reraise original error
2287 raise
2288
2289 return kdu_instance
2290
2291 async def deploy_kdus(self, logging_text, nsr_id, nslcmop_id, db_vnfrs, db_vnfds, task_instantiation_info):
2292 # Launch kdus if present in the descriptor
2293
2294 k8scluster_id_2_uuic = {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2295
2296 async def _get_cluster_id(cluster_id, cluster_type):
2297 nonlocal k8scluster_id_2_uuic
2298 if cluster_id in k8scluster_id_2_uuic[cluster_type]:
2299 return k8scluster_id_2_uuic[cluster_type][cluster_id]
2300
2301 # check if K8scluster is creating and wait look if previous tasks in process
2302 task_name, task_dependency = self.lcm_tasks.lookfor_related("k8scluster", cluster_id)
2303 if task_dependency:
2304 text = "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name, cluster_id)
2305 self.logger.debug(logging_text + text)
2306 await asyncio.wait(task_dependency, timeout=3600)
2307
2308 db_k8scluster = self.db.get_one("k8sclusters", {"_id": cluster_id}, fail_on_empty=False)
2309 if not db_k8scluster:
2310 raise LcmException("K8s cluster {} cannot be found".format(cluster_id))
2311
2312 k8s_id = deep_get(db_k8scluster, ("_admin", cluster_type, "id"))
2313 if not k8s_id:
2314 if cluster_type == "helm-chart-v3":
2315 try:
2316 # backward compatibility for existing clusters that have not been initialized for helm v3
2317 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
2318 k8s_id, uninstall_sw = await self.k8sclusterhelm3.init_env(k8s_credentials,
2319 reuse_cluster_uuid=cluster_id)
2320 db_k8scluster_update = {}
2321 db_k8scluster_update["_admin.helm-chart-v3.error_msg"] = None
2322 db_k8scluster_update["_admin.helm-chart-v3.id"] = k8s_id
2323 db_k8scluster_update["_admin.helm-chart-v3.created"] = uninstall_sw
2324 db_k8scluster_update["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2325 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
2326 except Exception as e:
2327 self.logger.error(logging_text + "error initializing helm-v3 cluster: {}".format(str(e)))
2328 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id,
2329 cluster_type))
2330 else:
2331 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2332 format(cluster_id, cluster_type))
2333 k8scluster_id_2_uuic[cluster_type][cluster_id] = k8s_id
2334 return k8s_id
2335
2336 logging_text += "Deploy kdus: "
2337 step = ""
2338 try:
2339 db_nsr_update = {"_admin.deployed.K8s": []}
2340 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2341
2342 index = 0
2343 updated_cluster_list = []
2344 updated_v3_cluster_list = []
2345
2346 for vnfr_data in db_vnfrs.values():
2347 for kdu_index, kdur in enumerate(get_iterable(vnfr_data, "kdur")):
2348 # Step 0: Prepare and set parameters
2349 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
2350 vnfd_id = vnfr_data.get('vnfd-id')
2351 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2352 kdud = next(kdud for kdud in vnfd_with_id["kdu"] if kdud["name"] == kdur["kdu-name"])
2353 namespace = kdur.get("k8s-namespace")
2354 if kdur.get("helm-chart"):
2355 kdumodel = kdur["helm-chart"]
2356 # Default version: helm3, if helm-version is v2 assign v2
2357 k8sclustertype = "helm-chart-v3"
2358 self.logger.debug("kdur: {}".format(kdur))
2359 if kdur.get("helm-version") and kdur.get("helm-version") == "v2":
2360 k8sclustertype = "helm-chart"
2361 elif kdur.get("juju-bundle"):
2362 kdumodel = kdur["juju-bundle"]
2363 k8sclustertype = "juju-bundle"
2364 else:
2365 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2366 "juju-bundle. Maybe an old NBI version is running".
2367 format(vnfr_data["member-vnf-index-ref"], kdur["kdu-name"]))
2368 # check if kdumodel is a file and exists
2369 try:
2370 vnfd_with_id = find_in_list(db_vnfds, lambda vnfd: vnfd["_id"] == vnfd_id)
2371 storage = deep_get(vnfd_with_id, ('_admin', 'storage'))
2372 if storage and storage.get('pkg-dir'): # may be not present if vnfd has not artifacts
2373 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2374 filename = '{}/{}/{}s/{}'.format(storage["folder"], storage["pkg-dir"], k8sclustertype,
2375 kdumodel)
2376 if self.fs.file_exists(filename, mode='file') or self.fs.file_exists(filename, mode='dir'):
2377 kdumodel = self.fs.path + filename
2378 except (asyncio.TimeoutError, asyncio.CancelledError):
2379 raise
2380 except Exception: # it is not a file
2381 pass
2382
2383 k8s_cluster_id = kdur["k8s-cluster"]["id"]
2384 step = "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id)
2385 cluster_uuid = await _get_cluster_id(k8s_cluster_id, k8sclustertype)
2386
2387 # Synchronize repos
2388 if (k8sclustertype == "helm-chart" and cluster_uuid not in updated_cluster_list)\
2389 or (k8sclustertype == "helm-chart-v3" and cluster_uuid not in updated_v3_cluster_list):
2390 del_repo_list, added_repo_dict = await asyncio.ensure_future(
2391 self.k8scluster_map[k8sclustertype].synchronize_repos(cluster_uuid=cluster_uuid))
2392 if del_repo_list or added_repo_dict:
2393 if k8sclustertype == "helm-chart":
2394 unset = {'_admin.helm_charts_added.' + item: None for item in del_repo_list}
2395 updated = {'_admin.helm_charts_added.' +
2396 item: name for item, name in added_repo_dict.items()}
2397 updated_cluster_list.append(cluster_uuid)
2398 elif k8sclustertype == "helm-chart-v3":
2399 unset = {'_admin.helm_charts_v3_added.' + item: None for item in del_repo_list}
2400 updated = {'_admin.helm_charts_v3_added.' +
2401 item: name for item, name in added_repo_dict.items()}
2402 updated_v3_cluster_list.append(cluster_uuid)
2403 self.logger.debug(logging_text + "repos synchronized on k8s cluster "
2404 "'{}' to_delete: {}, to_add: {}".
2405 format(k8s_cluster_id, del_repo_list, added_repo_dict))
2406 self.db.set_one("k8sclusters", {"_id": k8s_cluster_id}, updated, unset=unset)
2407
2408 # Instantiate kdu
2409 step = "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data["member-vnf-index-ref"],
2410 kdur["kdu-name"], k8s_cluster_id)
2411 k8s_instance_info = {"kdu-instance": None,
2412 "k8scluster-uuid": cluster_uuid,
2413 "k8scluster-type": k8sclustertype,
2414 "member-vnf-index": vnfr_data["member-vnf-index-ref"],
2415 "kdu-name": kdur["kdu-name"],
2416 "kdu-model": kdumodel,
2417 "namespace": namespace}
2418 db_path = "_admin.deployed.K8s.{}".format(index)
2419 db_nsr_update[db_path] = k8s_instance_info
2420 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2421 vnfd_with_id = find_in_list(db_vnfds, lambda vnf: vnf["_id"] == vnfd_id)
2422 task = asyncio.ensure_future(
2423 self._install_kdu(nsr_id, db_path, vnfr_data, kdu_index, kdud, vnfd_with_id,
2424 k8s_instance_info, k8params=desc_params, timeout=600))
2425 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_KDU-{}".format(index), task)
2426 task_instantiation_info[task] = "Deploying KDU {}".format(kdur["kdu-name"])
2427
2428 index += 1
2429
2430 except (LcmException, asyncio.CancelledError):
2431 raise
2432 except Exception as e:
2433 msg = "Exception {} while {}: {}".format(type(e).__name__, step, e)
2434 if isinstance(e, (N2VCException, DbException)):
2435 self.logger.error(logging_text + msg)
2436 else:
2437 self.logger.critical(logging_text + msg, exc_info=True)
2438 raise LcmException(msg)
2439 finally:
2440 if db_nsr_update:
2441 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2442
2443 def _deploy_n2vc(self, logging_text, db_nsr, db_vnfr, nslcmop_id, nsr_id, nsi_id, vnfd_id, vdu_id,
2444 kdu_name, member_vnf_index, vdu_index, vdu_name, deploy_params, descriptor_config,
2445 base_folder, task_instantiation_info, stage):
2446 # launch instantiate_N2VC in a asyncio task and register task object
2447 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2448 # if not found, create one entry and update database
2449 # fill db_nsr._admin.deployed.VCA.<index>
2450
2451 self.logger.debug(logging_text + "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id, vdu_id))
2452 if "execution-environment-list" in descriptor_config:
2453 ee_list = descriptor_config.get("execution-environment-list", [])
2454 else: # other types as script are not supported
2455 ee_list = []
2456
2457 for ee_item in ee_list:
2458 self.logger.debug(logging_text + "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item.get('juju'),
2459 ee_item.get("helm-chart")))
2460 ee_descriptor_id = ee_item.get("id")
2461 if ee_item.get("juju"):
2462 vca_name = ee_item['juju'].get('charm')
2463 vca_type = "lxc_proxy_charm" if ee_item['juju'].get('charm') is not None else "native_charm"
2464 if ee_item['juju'].get('cloud') == "k8s":
2465 vca_type = "k8s_proxy_charm"
2466 elif ee_item['juju'].get('proxy') is False:
2467 vca_type = "native_charm"
2468 elif ee_item.get("helm-chart"):
2469 vca_name = ee_item['helm-chart']
2470 if ee_item.get("helm-version") and ee_item.get("helm-version") == "v2":
2471 vca_type = "helm"
2472 else:
2473 vca_type = "helm-v3"
2474 else:
2475 self.logger.debug(logging_text + "skipping non juju neither charm configuration")
2476 continue
2477
2478 vca_index = -1
2479 for vca_index, vca_deployed in enumerate(db_nsr["_admin"]["deployed"]["VCA"]):
2480 if not vca_deployed:
2481 continue
2482 if vca_deployed.get("member-vnf-index") == member_vnf_index and \
2483 vca_deployed.get("vdu_id") == vdu_id and \
2484 vca_deployed.get("kdu_name") == kdu_name and \
2485 vca_deployed.get("vdu_count_index", 0) == vdu_index and \
2486 vca_deployed.get("ee_descriptor_id") == ee_descriptor_id:
2487 break
2488 else:
2489 # not found, create one.
2490 target = "ns" if not member_vnf_index else "vnf/{}".format(member_vnf_index)
2491 if vdu_id:
2492 target += "/vdu/{}/{}".format(vdu_id, vdu_index or 0)
2493 elif kdu_name:
2494 target += "/kdu/{}".format(kdu_name)
2495 vca_deployed = {
2496 "target_element": target,
2497 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2498 "member-vnf-index": member_vnf_index,
2499 "vdu_id": vdu_id,
2500 "kdu_name": kdu_name,
2501 "vdu_count_index": vdu_index,
2502 "operational-status": "init", # TODO revise
2503 "detailed-status": "", # TODO revise
2504 "step": "initial-deploy", # TODO revise
2505 "vnfd_id": vnfd_id,
2506 "vdu_name": vdu_name,
2507 "type": vca_type,
2508 "ee_descriptor_id": ee_descriptor_id
2509 }
2510 vca_index += 1
2511
2512 # create VCA and configurationStatus in db
2513 db_dict = {
2514 "_admin.deployed.VCA.{}".format(vca_index): vca_deployed,
2515 "configurationStatus.{}".format(vca_index): dict()
2516 }
2517 self.update_db_2("nsrs", nsr_id, db_dict)
2518
2519 db_nsr["_admin"]["deployed"]["VCA"].append(vca_deployed)
2520
2521 self.logger.debug("N2VC > NSR_ID > {}".format(nsr_id))
2522 self.logger.debug("N2VC > DB_NSR > {}".format(db_nsr))
2523 self.logger.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed))
2524
2525 # Launch task
2526 task_n2vc = asyncio.ensure_future(
2527 self.instantiate_N2VC(
2528 logging_text=logging_text,
2529 vca_index=vca_index,
2530 nsi_id=nsi_id,
2531 db_nsr=db_nsr,
2532 db_vnfr=db_vnfr,
2533 vdu_id=vdu_id,
2534 kdu_name=kdu_name,
2535 vdu_index=vdu_index,
2536 deploy_params=deploy_params,
2537 config_descriptor=descriptor_config,
2538 base_folder=base_folder,
2539 nslcmop_id=nslcmop_id,
2540 stage=stage,
2541 vca_type=vca_type,
2542 vca_name=vca_name,
2543 ee_config_descriptor=ee_item
2544 )
2545 )
2546 self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
2547 task_instantiation_info[task_n2vc] = self.task_name_deploy_vca + " {}.{}".format(
2548 member_vnf_index or "", vdu_id or "")
2549
2550 @staticmethod
2551 def _create_nslcmop(nsr_id, operation, params):
2552 """
2553 Creates a ns-lcm-opp content to be stored at database.
2554 :param nsr_id: internal id of the instance
2555 :param operation: instantiate, terminate, scale, action, ...
2556 :param params: user parameters for the operation
2557 :return: dictionary following SOL005 format
2558 """
2559 # Raise exception if invalid arguments
2560 if not (nsr_id and operation and params):
2561 raise LcmException(
2562 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2563 now = time()
2564 _id = str(uuid4())
2565 nslcmop = {
2566 "id": _id,
2567 "_id": _id,
2568 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2569 "operationState": "PROCESSING",
2570 "statusEnteredTime": now,
2571 "nsInstanceId": nsr_id,
2572 "lcmOperationType": operation,
2573 "startTime": now,
2574 "isAutomaticInvocation": False,
2575 "operationParams": params,
2576 "isCancelPending": False,
2577 "links": {
2578 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
2579 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id,
2580 }
2581 }
2582 return nslcmop
2583
2584 def _format_additional_params(self, params):
2585 params = params or {}
2586 for key, value in params.items():
2587 if str(value).startswith("!!yaml "):
2588 params[key] = yaml.safe_load(value[7:])
2589 return params
2590
2591 def _get_terminate_primitive_params(self, seq, vnf_index):
2592 primitive = seq.get('name')
2593 primitive_params = {}
2594 params = {
2595 "member_vnf_index": vnf_index,
2596 "primitive": primitive,
2597 "primitive_params": primitive_params,
2598 }
2599 desc_params = {}
2600 return self._map_primitive_params(seq, params, desc_params)
2601
2602 # sub-operations
2603
2604 def _retry_or_skip_suboperation(self, db_nslcmop, op_index):
2605 op = deep_get(db_nslcmop, ('_admin', 'operations'), [])[op_index]
2606 if op.get('operationState') == 'COMPLETED':
2607 # b. Skip sub-operation
2608 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2609 return self.SUBOPERATION_STATUS_SKIP
2610 else:
2611 # c. retry executing sub-operation
2612 # The sub-operation exists, and operationState != 'COMPLETED'
2613 # Update operationState = 'PROCESSING' to indicate a retry.
2614 operationState = 'PROCESSING'
2615 detailed_status = 'In progress'
2616 self._update_suboperation_status(
2617 db_nslcmop, op_index, operationState, detailed_status)
2618 # Return the sub-operation index
2619 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2620 # with arguments extracted from the sub-operation
2621 return op_index
2622
2623 # Find a sub-operation where all keys in a matching dictionary must match
2624 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2625 def _find_suboperation(self, db_nslcmop, match):
2626 if db_nslcmop and match:
2627 op_list = db_nslcmop.get('_admin', {}).get('operations', [])
2628 for i, op in enumerate(op_list):
2629 if all(op.get(k) == match[k] for k in match):
2630 return i
2631 return self.SUBOPERATION_STATUS_NOT_FOUND
2632
2633 # Update status for a sub-operation given its index
2634 def _update_suboperation_status(self, db_nslcmop, op_index, operationState, detailed_status):
2635 # Update DB for HA tasks
2636 q_filter = {'_id': db_nslcmop['_id']}
2637 update_dict = {'_admin.operations.{}.operationState'.format(op_index): operationState,
2638 '_admin.operations.{}.detailed-status'.format(op_index): detailed_status}
2639 self.db.set_one("nslcmops",
2640 q_filter=q_filter,
2641 update_dict=update_dict,
2642 fail_on_empty=False)
2643
2644 # Add sub-operation, return the index of the added sub-operation
2645 # Optionally, set operationState, detailed-status, and operationType
2646 # Status and type are currently set for 'scale' sub-operations:
2647 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2648 # 'detailed-status' : status message
2649 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2650 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2651 def _add_suboperation(self, db_nslcmop, vnf_index, vdu_id, vdu_count_index, vdu_name, primitive,
2652 mapped_primitive_params, operationState=None, detailed_status=None, operationType=None,
2653 RO_nsr_id=None, RO_scaling_info=None):
2654 if not db_nslcmop:
2655 return self.SUBOPERATION_STATUS_NOT_FOUND
2656 # Get the "_admin.operations" list, if it exists
2657 db_nslcmop_admin = db_nslcmop.get('_admin', {})
2658 op_list = db_nslcmop_admin.get('operations')
2659 # Create or append to the "_admin.operations" list
2660 new_op = {'member_vnf_index': vnf_index,
2661 'vdu_id': vdu_id,
2662 'vdu_count_index': vdu_count_index,
2663 'primitive': primitive,
2664 'primitive_params': mapped_primitive_params}
2665 if operationState:
2666 new_op['operationState'] = operationState
2667 if detailed_status:
2668 new_op['detailed-status'] = detailed_status
2669 if operationType:
2670 new_op['lcmOperationType'] = operationType
2671 if RO_nsr_id:
2672 new_op['RO_nsr_id'] = RO_nsr_id
2673 if RO_scaling_info:
2674 new_op['RO_scaling_info'] = RO_scaling_info
2675 if not op_list:
2676 # No existing operations, create key 'operations' with current operation as first list element
2677 db_nslcmop_admin.update({'operations': [new_op]})
2678 op_list = db_nslcmop_admin.get('operations')
2679 else:
2680 # Existing operations, append operation to list
2681 op_list.append(new_op)
2682
2683 db_nslcmop_update = {'_admin.operations': op_list}
2684 self.update_db_2("nslcmops", db_nslcmop['_id'], db_nslcmop_update)
2685 op_index = len(op_list) - 1
2686 return op_index
2687
2688 # Helper methods for scale() sub-operations
2689
2690 # pre-scale/post-scale:
2691 # Check for 3 different cases:
2692 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2693 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2694 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2695 def _check_or_add_scale_suboperation(self, db_nslcmop, vnf_index, vnf_config_primitive, primitive_params,
2696 operationType, RO_nsr_id=None, RO_scaling_info=None):
2697 # Find this sub-operation
2698 if RO_nsr_id and RO_scaling_info:
2699 operationType = 'SCALE-RO'
2700 match = {
2701 'member_vnf_index': vnf_index,
2702 'RO_nsr_id': RO_nsr_id,
2703 'RO_scaling_info': RO_scaling_info,
2704 }
2705 else:
2706 match = {
2707 'member_vnf_index': vnf_index,
2708 'primitive': vnf_config_primitive,
2709 'primitive_params': primitive_params,
2710 'lcmOperationType': operationType
2711 }
2712 op_index = self._find_suboperation(db_nslcmop, match)
2713 if op_index == self.SUBOPERATION_STATUS_NOT_FOUND:
2714 # a. New sub-operation
2715 # The sub-operation does not exist, add it.
2716 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2717 # The following parameters are set to None for all kind of scaling:
2718 vdu_id = None
2719 vdu_count_index = None
2720 vdu_name = None
2721 if RO_nsr_id and RO_scaling_info:
2722 vnf_config_primitive = None
2723 primitive_params = None
2724 else:
2725 RO_nsr_id = None
2726 RO_scaling_info = None
2727 # Initial status for sub-operation
2728 operationState = 'PROCESSING'
2729 detailed_status = 'In progress'
2730 # Add sub-operation for pre/post-scaling (zero or more operations)
2731 self._add_suboperation(db_nslcmop,
2732 vnf_index,
2733 vdu_id,
2734 vdu_count_index,
2735 vdu_name,
2736 vnf_config_primitive,
2737 primitive_params,
2738 operationState,
2739 detailed_status,
2740 operationType,
2741 RO_nsr_id,
2742 RO_scaling_info)
2743 return self.SUBOPERATION_STATUS_NEW
2744 else:
2745 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2746 # or op_index (operationState != 'COMPLETED')
2747 return self._retry_or_skip_suboperation(db_nslcmop, op_index)
2748
2749 # Function to return execution_environment id
2750
2751 def _get_ee_id(self, vnf_index, vdu_id, vca_deployed_list):
2752 # TODO vdu_index_count
2753 for vca in vca_deployed_list:
2754 if vca["member-vnf-index"] == vnf_index and vca["vdu_id"] == vdu_id:
2755 return vca["ee_id"]
2756
2757 async def destroy_N2VC(self, logging_text, db_nslcmop, vca_deployed, config_descriptor,
2758 vca_index, destroy_ee=True, exec_primitives=True, scaling_in=False):
2759 """
2760 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2761 :param logging_text:
2762 :param db_nslcmop:
2763 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2764 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2765 :param vca_index: index in the database _admin.deployed.VCA
2766 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2767 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2768 not executed properly
2769 :param scaling_in: True destroys the application, False destroys the model
2770 :return: None or exception
2771 """
2772
2773 self.logger.debug(
2774 logging_text + " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2775 vca_index, vca_deployed, config_descriptor, destroy_ee
2776 )
2777 )
2778
2779 vca_type = vca_deployed.get("type", "lxc_proxy_charm")
2780
2781 # execute terminate_primitives
2782 if exec_primitives:
2783 terminate_primitives = get_ee_sorted_terminate_config_primitive_list(
2784 config_descriptor.get("terminate-config-primitive"), vca_deployed.get("ee_descriptor_id"))
2785 vdu_id = vca_deployed.get("vdu_id")
2786 vdu_count_index = vca_deployed.get("vdu_count_index")
2787 vdu_name = vca_deployed.get("vdu_name")
2788 vnf_index = vca_deployed.get("member-vnf-index")
2789 if terminate_primitives and vca_deployed.get("needed_terminate"):
2790 for seq in terminate_primitives:
2791 # For each sequence in list, get primitive and call _ns_execute_primitive()
2792 step = "Calling terminate action for vnf_member_index={} primitive={}".format(
2793 vnf_index, seq.get("name"))
2794 self.logger.debug(logging_text + step)
2795 # Create the primitive for each sequence, i.e. "primitive": "touch"
2796 primitive = seq.get('name')
2797 mapped_primitive_params = self._get_terminate_primitive_params(seq, vnf_index)
2798
2799 # Add sub-operation
2800 self._add_suboperation(db_nslcmop,
2801 vnf_index,
2802 vdu_id,
2803 vdu_count_index,
2804 vdu_name,
2805 primitive,
2806 mapped_primitive_params)
2807 # Sub-operations: Call _ns_execute_primitive() instead of action()
2808 try:
2809 result, result_detail = await self._ns_execute_primitive(vca_deployed["ee_id"], primitive,
2810 mapped_primitive_params,
2811 vca_type=vca_type)
2812 except LcmException:
2813 # this happens when VCA is not deployed. In this case it is not needed to terminate
2814 continue
2815 result_ok = ['COMPLETED', 'PARTIALLY_COMPLETED']
2816 if result not in result_ok:
2817 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2818 "error {}".format(seq.get("name"), vnf_index, result_detail))
2819 # set that this VCA do not need terminated
2820 db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
2821 self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
2822
2823 if vca_deployed.get("prometheus_jobs") and self.prometheus:
2824 await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
2825
2826 if destroy_ee:
2827 await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"], scaling_in=scaling_in)
2828
2829 async def _delete_all_N2VC(self, db_nsr: dict):
2830 self._write_all_config_status(db_nsr=db_nsr, status='TERMINATING')
2831 namespace = "." + db_nsr["_id"]
2832 try:
2833 await self.n2vc.delete_namespace(namespace=namespace, total_timeout=self.timeout_charm_delete)
2834 except N2VCNotFound: # already deleted. Skip
2835 pass
2836 self._write_all_config_status(db_nsr=db_nsr, status='DELETED')
2837
2838 async def _terminate_RO(self, logging_text, nsr_deployed, nsr_id, nslcmop_id, stage):
2839 """
2840 Terminates a deployment from RO
2841 :param logging_text:
2842 :param nsr_deployed: db_nsr._admin.deployed
2843 :param nsr_id:
2844 :param nslcmop_id:
2845 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2846 this method will update only the index 2, but it will write on database the concatenated content of the list
2847 :return:
2848 """
2849 db_nsr_update = {}
2850 failed_detail = []
2851 ro_nsr_id = ro_delete_action = None
2852 if nsr_deployed and nsr_deployed.get("RO"):
2853 ro_nsr_id = nsr_deployed["RO"].get("nsr_id")
2854 ro_delete_action = nsr_deployed["RO"].get("nsr_delete_action_id")
2855 try:
2856 if ro_nsr_id:
2857 stage[2] = "Deleting ns from VIM."
2858 db_nsr_update["detailed-status"] = " ".join(stage)
2859 self._write_op_status(nslcmop_id, stage)
2860 self.logger.debug(logging_text + stage[2])
2861 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2862 self._write_op_status(nslcmop_id, stage)
2863 desc = await self.RO.delete("ns", ro_nsr_id)
2864 ro_delete_action = desc["action_id"]
2865 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2866 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2867 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2868 if ro_delete_action:
2869 # wait until NS is deleted from VIM
2870 stage[2] = "Waiting ns deleted from VIM."
2871 detailed_status_old = None
2872 self.logger.debug(logging_text + stage[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id,
2873 ro_delete_action))
2874 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2875 self._write_op_status(nslcmop_id, stage)
2876
2877 delete_timeout = 20 * 60 # 20 minutes
2878 while delete_timeout > 0:
2879 desc = await self.RO.show(
2880 "ns",
2881 item_id_name=ro_nsr_id,
2882 extra_item="action",
2883 extra_item_id=ro_delete_action)
2884
2885 # deploymentStatus
2886 self._on_update_ro_db(nsrs_id=nsr_id, ro_descriptor=desc)
2887
2888 ns_status, ns_status_info = self.RO.check_action_status(desc)
2889 if ns_status == "ERROR":
2890 raise ROclient.ROClientException(ns_status_info)
2891 elif ns_status == "BUILD":
2892 stage[2] = "Deleting from VIM {}".format(ns_status_info)
2893 elif ns_status == "ACTIVE":
2894 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2895 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2896 break
2897 else:
2898 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status)
2899 if stage[2] != detailed_status_old:
2900 detailed_status_old = stage[2]
2901 db_nsr_update["detailed-status"] = " ".join(stage)
2902 self._write_op_status(nslcmop_id, stage)
2903 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2904 await asyncio.sleep(5, loop=self.loop)
2905 delete_timeout -= 5
2906 else: # delete_timeout <= 0:
2907 raise ROclient.ROClientException("Timeout waiting ns deleted from VIM")
2908
2909 except Exception as e:
2910 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2911 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2912 db_nsr_update["_admin.deployed.RO.nsr_id"] = None
2913 db_nsr_update["_admin.deployed.RO.nsr_status"] = "DELETED"
2914 db_nsr_update["_admin.deployed.RO.nsr_delete_action_id"] = None
2915 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(ro_nsr_id))
2916 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2917 failed_detail.append("delete conflict: {}".format(e))
2918 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(ro_nsr_id, e))
2919 else:
2920 failed_detail.append("delete error: {}".format(e))
2921 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(ro_nsr_id, e))
2922
2923 # Delete nsd
2924 if not failed_detail and deep_get(nsr_deployed, ("RO", "nsd_id")):
2925 ro_nsd_id = nsr_deployed["RO"]["nsd_id"]
2926 try:
2927 stage[2] = "Deleting nsd from RO."
2928 db_nsr_update["detailed-status"] = " ".join(stage)
2929 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2930 self._write_op_status(nslcmop_id, stage)
2931 await self.RO.delete("nsd", ro_nsd_id)
2932 self.logger.debug(logging_text + "ro_nsd_id={} deleted".format(ro_nsd_id))
2933 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2934 except Exception as e:
2935 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2936 db_nsr_update["_admin.deployed.RO.nsd_id"] = None
2937 self.logger.debug(logging_text + "ro_nsd_id={} already deleted".format(ro_nsd_id))
2938 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2939 failed_detail.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id, e))
2940 self.logger.debug(logging_text + failed_detail[-1])
2941 else:
2942 failed_detail.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id, e))
2943 self.logger.error(logging_text + failed_detail[-1])
2944
2945 if not failed_detail and deep_get(nsr_deployed, ("RO", "vnfd")):
2946 for index, vnf_deployed in enumerate(nsr_deployed["RO"]["vnfd"]):
2947 if not vnf_deployed or not vnf_deployed["id"]:
2948 continue
2949 try:
2950 ro_vnfd_id = vnf_deployed["id"]
2951 stage[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2952 vnf_deployed["member-vnf-index"], ro_vnfd_id)
2953 db_nsr_update["detailed-status"] = " ".join(stage)
2954 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2955 self._write_op_status(nslcmop_id, stage)
2956 await self.RO.delete("vnfd", ro_vnfd_id)
2957 self.logger.debug(logging_text + "ro_vnfd_id={} deleted".format(ro_vnfd_id))
2958 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2959 except Exception as e:
2960 if isinstance(e, ROclient.ROClientException) and e.http_code == 404: # not found
2961 db_nsr_update["_admin.deployed.RO.vnfd.{}.id".format(index)] = None
2962 self.logger.debug(logging_text + "ro_vnfd_id={} already deleted ".format(ro_vnfd_id))
2963 elif isinstance(e, ROclient.ROClientException) and e.http_code == 409: # conflict
2964 failed_detail.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id, e))
2965 self.logger.debug(logging_text + failed_detail[-1])
2966 else:
2967 failed_detail.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id, e))
2968 self.logger.error(logging_text + failed_detail[-1])
2969
2970 if failed_detail:
2971 stage[2] = "Error deleting from VIM"
2972 else:
2973 stage[2] = "Deleted from VIM"
2974 db_nsr_update["detailed-status"] = " ".join(stage)
2975 self.update_db_2("nsrs", nsr_id, db_nsr_update)
2976 self._write_op_status(nslcmop_id, stage)
2977
2978 if failed_detail:
2979 raise LcmException("; ".join(failed_detail))
2980
2981 async def terminate(self, nsr_id, nslcmop_id):
2982 # Try to lock HA task here
2983 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
2984 if not task_is_locked_by_me:
2985 return
2986
2987 logging_text = "Task ns={} terminate={} ".format(nsr_id, nslcmop_id)
2988 self.logger.debug(logging_text + "Enter")
2989 timeout_ns_terminate = self.timeout_ns_terminate
2990 db_nsr = None
2991 db_nslcmop = None
2992 operation_params = None
2993 exc = None
2994 error_list = [] # annotates all failed error messages
2995 db_nslcmop_update = {}
2996 autoremove = False # autoremove after terminated
2997 tasks_dict_info = {}
2998 db_nsr_update = {}
2999 stage = ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3000 # ^ contains [stage, step, VIM-status]
3001 try:
3002 # wait for any previous tasks in process
3003 await self.lcm_tasks.waitfor_related_HA("ns", 'nslcmops', nslcmop_id)
3004
3005 stage[1] = "Getting nslcmop={} from db.".format(nslcmop_id)
3006 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3007 operation_params = db_nslcmop.get("operationParams") or {}
3008 if operation_params.get("timeout_ns_terminate"):
3009 timeout_ns_terminate = operation_params["timeout_ns_terminate"]
3010 stage[1] = "Getting nsr={} from db.".format(nsr_id)
3011 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3012
3013 db_nsr_update["operational-status"] = "terminating"
3014 db_nsr_update["config-status"] = "terminating"
3015 self._write_ns_status(
3016 nsr_id=nsr_id,
3017 ns_state="TERMINATING",
3018 current_operation="TERMINATING",
3019 current_operation_id=nslcmop_id,
3020 other_update=db_nsr_update
3021 )
3022 self._write_op_status(
3023 op_id=nslcmop_id,
3024 queuePosition=0,
3025 stage=stage
3026 )
3027 nsr_deployed = deepcopy(db_nsr["_admin"].get("deployed")) or {}
3028 if db_nsr["_admin"]["nsState"] == "NOT_INSTANTIATED":
3029 return
3030
3031 stage[1] = "Getting vnf descriptors from db."
3032 db_vnfrs_list = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
3033 db_vnfds_from_id = {}
3034 db_vnfds_from_member_index = {}
3035 # Loop over VNFRs
3036 for vnfr in db_vnfrs_list:
3037 vnfd_id = vnfr["vnfd-id"]
3038 if vnfd_id not in db_vnfds_from_id:
3039 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
3040 db_vnfds_from_id[vnfd_id] = vnfd
3041 db_vnfds_from_member_index[vnfr["member-vnf-index-ref"]] = db_vnfds_from_id[vnfd_id]
3042
3043 # Destroy individual execution environments when there are terminating primitives.
3044 # Rest of EE will be deleted at once
3045 # TODO - check before calling _destroy_N2VC
3046 # if not operation_params.get("skip_terminate_primitives"):#
3047 # or not vca.get("needed_terminate"):
3048 stage[0] = "Stage 2/3 execute terminating primitives."
3049 self.logger.debug(logging_text + stage[0])
3050 stage[1] = "Looking execution environment that needs terminate."
3051 self.logger.debug(logging_text + stage[1])
3052
3053 for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
3054 config_descriptor = None
3055 if not vca or not vca.get("ee_id"):
3056 continue
3057 if not vca.get("member-vnf-index"):
3058 # ns
3059 config_descriptor = db_nsr.get("ns-configuration")
3060 elif vca.get("vdu_id"):
3061 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3062 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3063 elif vca.get("kdu_name"):
3064 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3065 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3066 else:
3067 db_vnfd = db_vnfds_from_member_index[vca["member-vnf-index"]]
3068 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3069 vca_type = vca.get("type")
3070 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3071 vca.get("needed_terminate"))
3072 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3073 # pending native charms
3074 destroy_ee = True if vca_type in ("helm", "helm-v3", "native_charm") else False
3075 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3076 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3077 task = asyncio.ensure_future(
3078 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
3079 destroy_ee, exec_terminate_primitives))
3080 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3081
3082 # wait for pending tasks of terminate primitives
3083 if tasks_dict_info:
3084 self.logger.debug(logging_text + 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3085 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3086 min(self.timeout_charm_delete, timeout_ns_terminate),
3087 stage, nslcmop_id)
3088 tasks_dict_info.clear()
3089 if error_list:
3090 return # raise LcmException("; ".join(error_list))
3091
3092 # remove All execution environments at once
3093 stage[0] = "Stage 3/3 delete all."
3094
3095 if nsr_deployed.get("VCA"):
3096 stage[1] = "Deleting all execution environments."
3097 self.logger.debug(logging_text + stage[1])
3098 task_delete_ee = asyncio.ensure_future(asyncio.wait_for(self._delete_all_N2VC(db_nsr=db_nsr),
3099 timeout=self.timeout_charm_delete))
3100 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3101 tasks_dict_info[task_delete_ee] = "Terminating all VCA"
3102
3103 # Delete from k8scluster
3104 stage[1] = "Deleting KDUs."
3105 self.logger.debug(logging_text + stage[1])
3106 # print(nsr_deployed)
3107 for kdu in get_iterable(nsr_deployed, "K8s"):
3108 if not kdu or not kdu.get("kdu-instance"):
3109 continue
3110 kdu_instance = kdu.get("kdu-instance")
3111 if kdu.get("k8scluster-type") in self.k8scluster_map:
3112 task_delete_kdu_instance = asyncio.ensure_future(
3113 self.k8scluster_map[kdu["k8scluster-type"]].uninstall(
3114 cluster_uuid=kdu.get("k8scluster-uuid"),
3115 kdu_instance=kdu_instance))
3116 else:
3117 self.logger.error(logging_text + "Unknown k8s deployment type {}".
3118 format(kdu.get("k8scluster-type")))
3119 continue
3120 tasks_dict_info[task_delete_kdu_instance] = "Terminating KDU '{}'".format(kdu.get("kdu-name"))
3121
3122 # remove from RO
3123 stage[1] = "Deleting ns from VIM."
3124 if self.ng_ro:
3125 task_delete_ro = asyncio.ensure_future(
3126 self._terminate_ng_ro(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3127 else:
3128 task_delete_ro = asyncio.ensure_future(
3129 self._terminate_RO(logging_text, nsr_deployed, nsr_id, nslcmop_id, stage))
3130 tasks_dict_info[task_delete_ro] = "Removing deployment from VIM"
3131
3132 # rest of staff will be done at finally
3133
3134 except (ROclient.ROClientException, DbException, LcmException, N2VCException) as e:
3135 self.logger.error(logging_text + "Exit Exception {}".format(e))
3136 exc = e
3137 except asyncio.CancelledError:
3138 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(stage[1]))
3139 exc = "Operation was cancelled"
3140 except Exception as e:
3141 exc = traceback.format_exc()
3142 self.logger.critical(logging_text + "Exit Exception while '{}': {}".format(stage[1], e), exc_info=True)
3143 finally:
3144 if exc:
3145 error_list.append(str(exc))
3146 try:
3147 # wait for pending tasks
3148 if tasks_dict_info:
3149 stage[1] = "Waiting for terminate pending tasks."
3150 self.logger.debug(logging_text + stage[1])
3151 error_list += await self._wait_for_tasks(logging_text, tasks_dict_info, timeout_ns_terminate,
3152 stage, nslcmop_id)
3153 stage[1] = stage[2] = ""
3154 except asyncio.CancelledError:
3155 error_list.append("Cancelled")
3156 # TODO cancell all tasks
3157 except Exception as exc:
3158 error_list.append(str(exc))
3159 # update status at database
3160 if error_list:
3161 error_detail = "; ".join(error_list)
3162 # self.logger.error(logging_text + error_detail)
3163 error_description_nslcmop = '{} Detail: {}'.format(stage[0], error_detail)
3164 error_description_nsr = 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id, stage[0])
3165
3166 db_nsr_update["operational-status"] = "failed"
3167 db_nsr_update["detailed-status"] = error_description_nsr + " Detail: " + error_detail
3168 db_nslcmop_update["detailed-status"] = error_detail
3169 nslcmop_operation_state = "FAILED"
3170 ns_state = "BROKEN"
3171 else:
3172 error_detail = None
3173 error_description_nsr = error_description_nslcmop = None
3174 ns_state = "NOT_INSTANTIATED"
3175 db_nsr_update["operational-status"] = "terminated"
3176 db_nsr_update["detailed-status"] = "Done"
3177 db_nsr_update["_admin.nsState"] = "NOT_INSTANTIATED"
3178 db_nslcmop_update["detailed-status"] = "Done"
3179 nslcmop_operation_state = "COMPLETED"
3180
3181 if db_nsr:
3182 self._write_ns_status(
3183 nsr_id=nsr_id,
3184 ns_state=ns_state,
3185 current_operation="IDLE",
3186 current_operation_id=None,
3187 error_description=error_description_nsr,
3188 error_detail=error_detail,
3189 other_update=db_nsr_update
3190 )
3191 self._write_op_status(
3192 op_id=nslcmop_id,
3193 stage="",
3194 error_message=error_description_nslcmop,
3195 operation_state=nslcmop_operation_state,
3196 other_update=db_nslcmop_update,
3197 )
3198 if ns_state == "NOT_INSTANTIATED":
3199 try:
3200 self.db.set_list("vnfrs", {"nsr-id-ref": nsr_id}, {"_admin.nsState": "NOT_INSTANTIATED"})
3201 except DbException as e:
3202 self.logger.warn(logging_text + 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3203 format(nsr_id, e))
3204 if operation_params:
3205 autoremove = operation_params.get("autoremove", False)
3206 if nslcmop_operation_state:
3207 try:
3208 await self.msg.aiowrite("ns", "terminated", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3209 "operationState": nslcmop_operation_state,
3210 "autoremove": autoremove},
3211 loop=self.loop)
3212 except Exception as e:
3213 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3214
3215 self.logger.debug(logging_text + "Exit")
3216 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_terminate")
3217
3218 async def _wait_for_tasks(self, logging_text, created_tasks_info, timeout, stage, nslcmop_id, nsr_id=None):
3219 time_start = time()
3220 error_detail_list = []
3221 error_list = []
3222 pending_tasks = list(created_tasks_info.keys())
3223 num_tasks = len(pending_tasks)
3224 num_done = 0
3225 stage[1] = "{}/{}.".format(num_done, num_tasks)
3226 self._write_op_status(nslcmop_id, stage)
3227 while pending_tasks:
3228 new_error = None
3229 _timeout = timeout + time_start - time()
3230 done, pending_tasks = await asyncio.wait(pending_tasks, timeout=_timeout,
3231 return_when=asyncio.FIRST_COMPLETED)
3232 num_done += len(done)
3233 if not done: # Timeout
3234 for task in pending_tasks:
3235 new_error = created_tasks_info[task] + ": Timeout"
3236 error_detail_list.append(new_error)
3237 error_list.append(new_error)
3238 break
3239 for task in done:
3240 if task.cancelled():
3241 exc = "Cancelled"
3242 else:
3243 exc = task.exception()
3244 if exc:
3245 if isinstance(exc, asyncio.TimeoutError):
3246 exc = "Timeout"
3247 new_error = created_tasks_info[task] + ": {}".format(exc)
3248 error_list.append(created_tasks_info[task])
3249 error_detail_list.append(new_error)
3250 if isinstance(exc, (str, DbException, N2VCException, ROclient.ROClientException, LcmException,
3251 K8sException, NgRoException)):
3252 self.logger.error(logging_text + new_error)
3253 else:
3254 exc_traceback = "".join(traceback.format_exception(None, exc, exc.__traceback__))
3255 self.logger.error(logging_text + created_tasks_info[task] + " " + exc_traceback)
3256 else:
3257 self.logger.debug(logging_text + created_tasks_info[task] + ": Done")
3258 stage[1] = "{}/{}.".format(num_done, num_tasks)
3259 if new_error:
3260 stage[1] += " Errors: " + ". ".join(error_detail_list) + "."
3261 if nsr_id: # update also nsr
3262 self.update_db_2("nsrs", nsr_id, {"errorDescription": "Error at: " + ", ".join(error_list),
3263 "errorDetail": ". ".join(error_detail_list)})
3264 self._write_op_status(nslcmop_id, stage)
3265 return error_detail_list
3266
3267 @staticmethod
3268 def _map_primitive_params(primitive_desc, params, instantiation_params):
3269 """
3270 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3271 The default-value is used. If it is between < > it look for a value at instantiation_params
3272 :param primitive_desc: portion of VNFD/NSD that describes primitive
3273 :param params: Params provided by user
3274 :param instantiation_params: Instantiation params provided by user
3275 :return: a dictionary with the calculated params
3276 """
3277 calculated_params = {}
3278 for parameter in primitive_desc.get("parameter", ()):
3279 param_name = parameter["name"]
3280 if param_name in params:
3281 calculated_params[param_name] = params[param_name]
3282 elif "default-value" in parameter or "value" in parameter:
3283 if "value" in parameter:
3284 calculated_params[param_name] = parameter["value"]
3285 else:
3286 calculated_params[param_name] = parameter["default-value"]
3287 if isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("<") \
3288 and calculated_params[param_name].endswith(">"):
3289 if calculated_params[param_name][1:-1] in instantiation_params:
3290 calculated_params[param_name] = instantiation_params[calculated_params[param_name][1:-1]]
3291 else:
3292 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3293 format(calculated_params[param_name], primitive_desc["name"]))
3294 else:
3295 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3296 format(param_name, primitive_desc["name"]))
3297
3298 if isinstance(calculated_params[param_name], (dict, list, tuple)):
3299 calculated_params[param_name] = yaml.safe_dump(calculated_params[param_name],
3300 default_flow_style=True, width=256)
3301 elif isinstance(calculated_params[param_name], str) and calculated_params[param_name].startswith("!!yaml "):
3302 calculated_params[param_name] = calculated_params[param_name][7:]
3303 if parameter.get("data-type") == "INTEGER":
3304 try:
3305 calculated_params[param_name] = int(calculated_params[param_name])
3306 except ValueError: # error converting string to int
3307 raise LcmException(
3308 "Parameter {} of primitive {} must be integer".format(param_name, primitive_desc["name"]))
3309 elif parameter.get("data-type") == "BOOLEAN":
3310 calculated_params[param_name] = not ((str(calculated_params[param_name])).lower() == 'false')
3311
3312 # add always ns_config_info if primitive name is config
3313 if primitive_desc["name"] == "config":
3314 if "ns_config_info" in instantiation_params:
3315 calculated_params["ns_config_info"] = instantiation_params["ns_config_info"]
3316 return calculated_params
3317
3318 def _look_for_deployed_vca(self, deployed_vca, member_vnf_index, vdu_id, vdu_count_index, kdu_name=None,
3319 ee_descriptor_id=None):
3320 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3321 for vca in deployed_vca:
3322 if not vca:
3323 continue
3324 if member_vnf_index != vca["member-vnf-index"] or vdu_id != vca["vdu_id"]:
3325 continue
3326 if vdu_count_index is not None and vdu_count_index != vca["vdu_count_index"]:
3327 continue
3328 if kdu_name and kdu_name != vca["kdu_name"]:
3329 continue
3330 if ee_descriptor_id and ee_descriptor_id != vca["ee_descriptor_id"]:
3331 continue
3332 break
3333 else:
3334 # vca_deployed not found
3335 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3336 " is not deployed".format(member_vnf_index, vdu_id, vdu_count_index, kdu_name,
3337 ee_descriptor_id))
3338 # get ee_id
3339 ee_id = vca.get("ee_id")
3340 vca_type = vca.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3341 if not ee_id:
3342 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3343 "execution environment"
3344 .format(member_vnf_index, vdu_id, kdu_name, vdu_count_index))
3345 return ee_id, vca_type
3346
3347 async def _ns_execute_primitive(self, ee_id, primitive, primitive_params, retries=0, retries_interval=30,
3348 timeout=None, vca_type=None, db_dict=None) -> (str, str):
3349 try:
3350 if primitive == "config":
3351 primitive_params = {"params": primitive_params}
3352
3353 vca_type = vca_type or "lxc_proxy_charm"
3354
3355 while retries >= 0:
3356 try:
3357 output = await asyncio.wait_for(
3358 self.vca_map[vca_type].exec_primitive(
3359 ee_id=ee_id,
3360 primitive_name=primitive,
3361 params_dict=primitive_params,
3362 progress_timeout=self.timeout_progress_primitive,
3363 total_timeout=self.timeout_primitive,
3364 db_dict=db_dict),
3365 timeout=timeout or self.timeout_primitive)
3366 # execution was OK
3367 break
3368 except asyncio.CancelledError:
3369 raise
3370 except Exception as e: # asyncio.TimeoutError
3371 if isinstance(e, asyncio.TimeoutError):
3372 e = "Timeout"
3373 retries -= 1
3374 if retries >= 0:
3375 self.logger.debug('Error executing action {} on {} -> {}'.format(primitive, ee_id, e))
3376 # wait and retry
3377 await asyncio.sleep(retries_interval, loop=self.loop)
3378 else:
3379 return 'FAILED', str(e)
3380
3381 return 'COMPLETED', output
3382
3383 except (LcmException, asyncio.CancelledError):
3384 raise
3385 except Exception as e:
3386 return 'FAIL', 'Error executing action {}: {}'.format(primitive, e)
3387
3388 async def action(self, nsr_id, nslcmop_id):
3389 # Try to lock HA task here
3390 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3391 if not task_is_locked_by_me:
3392 return
3393
3394 logging_text = "Task ns={} action={} ".format(nsr_id, nslcmop_id)
3395 self.logger.debug(logging_text + "Enter")
3396 # get all needed from database
3397 db_nsr = None
3398 db_nslcmop = None
3399 db_nsr_update = {}
3400 db_nslcmop_update = {}
3401 nslcmop_operation_state = None
3402 error_description_nslcmop = None
3403 exc = None
3404 try:
3405 # wait for any previous tasks in process
3406 step = "Waiting for previous operations to terminate"
3407 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3408
3409 self._write_ns_status(
3410 nsr_id=nsr_id,
3411 ns_state=None,
3412 current_operation="RUNNING ACTION",
3413 current_operation_id=nslcmop_id
3414 )
3415
3416 step = "Getting information from database"
3417 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3418 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3419
3420 nsr_deployed = db_nsr["_admin"].get("deployed")
3421 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3422 vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3423 kdu_name = db_nslcmop["operationParams"].get("kdu_name")
3424 vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3425 primitive = db_nslcmop["operationParams"]["primitive"]
3426 primitive_params = db_nslcmop["operationParams"]["primitive_params"]
3427 timeout_ns_action = db_nslcmop["operationParams"].get("timeout_ns_action", self.timeout_primitive)
3428
3429 if vnf_index:
3430 step = "Getting vnfr from database"
3431 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3432 step = "Getting vnfd from database"
3433 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3434 else:
3435 step = "Getting nsd from database"
3436 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
3437
3438 # for backward compatibility
3439 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3440 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3441 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3442 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3443
3444 # look for primitive
3445 config_primitive_desc = descriptor_configuration = None
3446 if vdu_id:
3447 descriptor_configuration = get_configuration(db_vnfd, vdu_id)
3448 elif kdu_name:
3449 descriptor_configuration = get_configuration(db_vnfd, kdu_name)
3450 elif vnf_index:
3451 descriptor_configuration = get_configuration(db_vnfd, db_vnfd["id"])
3452 else:
3453 descriptor_configuration = db_nsd.get("ns-configuration")
3454
3455 if descriptor_configuration and descriptor_configuration.get("config-primitive"):
3456 for config_primitive in descriptor_configuration["config-primitive"]:
3457 if config_primitive["name"] == primitive:
3458 config_primitive_desc = config_primitive
3459 break
3460
3461 if not config_primitive_desc:
3462 if not (kdu_name and primitive in ("upgrade", "rollback", "status")):
3463 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3464 format(primitive))
3465 primitive_name = primitive
3466 ee_descriptor_id = None
3467 else:
3468 primitive_name = config_primitive_desc.get("execution-environment-primitive", primitive)
3469 ee_descriptor_id = config_primitive_desc.get("execution-environment-ref")
3470
3471 if vnf_index:
3472 if vdu_id:
3473 vdur = next((x for x in db_vnfr["vdur"] if x["vdu-id-ref"] == vdu_id), None)
3474 desc_params = parse_yaml_strings(vdur.get("additionalParams"))
3475 elif kdu_name:
3476 kdur = next((x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name), None)
3477 desc_params = parse_yaml_strings(kdur.get("additionalParams"))
3478 else:
3479 desc_params = parse_yaml_strings(db_vnfr.get("additionalParamsForVnf"))
3480 else:
3481 desc_params = parse_yaml_strings(db_nsr.get("additionalParamsForNs"))
3482 if kdu_name and get_configuration(db_vnfd, kdu_name):
3483 kdu_configuration = get_configuration(db_vnfd, kdu_name)
3484 actions = set()
3485 for primitive in kdu_configuration.get("initial-config-primitive", []):
3486 actions.add(primitive["name"])
3487 for primitive in kdu_configuration.get("config-primitive", []):
3488 actions.add(primitive["name"])
3489 kdu_action = True if primitive_name in actions else False
3490
3491 # TODO check if ns is in a proper status
3492 if kdu_name and (primitive_name in ("upgrade", "rollback", "status") or kdu_action):
3493 # kdur and desc_params already set from before
3494 if primitive_params:
3495 desc_params.update(primitive_params)
3496 # TODO Check if we will need something at vnf level
3497 for index, kdu in enumerate(get_iterable(nsr_deployed, "K8s")):
3498 if kdu_name == kdu["kdu-name"] and kdu["member-vnf-index"] == vnf_index:
3499 break
3500 else:
3501 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name, vnf_index))
3502
3503 if kdu.get("k8scluster-type") not in self.k8scluster_map:
3504 msg = "unknown k8scluster-type '{}'".format(kdu.get("k8scluster-type"))
3505 raise LcmException(msg)
3506
3507 db_dict = {"collection": "nsrs",
3508 "filter": {"_id": nsr_id},
3509 "path": "_admin.deployed.K8s.{}".format(index)}
3510 self.logger.debug(logging_text + "Exec k8s {} on {}.{}".format(primitive_name, vnf_index, kdu_name))
3511 step = "Executing kdu {}".format(primitive_name)
3512 if primitive_name == "upgrade":
3513 if desc_params.get("kdu_model"):
3514 kdu_model = desc_params.get("kdu_model")
3515 del desc_params["kdu_model"]
3516 else:
3517 kdu_model = kdu.get("kdu-model")
3518 parts = kdu_model.split(sep=":")
3519 if len(parts) == 2:
3520 kdu_model = parts[0]
3521
3522 detailed_status = await asyncio.wait_for(
3523 self.k8scluster_map[kdu["k8scluster-type"]].upgrade(
3524 cluster_uuid=kdu.get("k8scluster-uuid"),
3525 kdu_instance=kdu.get("kdu-instance"),
3526 atomic=True, kdu_model=kdu_model,
3527 params=desc_params, db_dict=db_dict,
3528 timeout=timeout_ns_action),
3529 timeout=timeout_ns_action + 10)
3530 self.logger.debug(logging_text + " Upgrade of kdu {} done".format(detailed_status))
3531 elif primitive_name == "rollback":
3532 detailed_status = await asyncio.wait_for(
3533 self.k8scluster_map[kdu["k8scluster-type"]].rollback(
3534 cluster_uuid=kdu.get("k8scluster-uuid"),
3535 kdu_instance=kdu.get("kdu-instance"),
3536 db_dict=db_dict),
3537 timeout=timeout_ns_action)
3538 elif primitive_name == "status":
3539 detailed_status = await asyncio.wait_for(
3540 self.k8scluster_map[kdu["k8scluster-type"]].status_kdu(
3541 cluster_uuid=kdu.get("k8scluster-uuid"),
3542 kdu_instance=kdu.get("kdu-instance")),
3543 timeout=timeout_ns_action)
3544 else:
3545 kdu_instance = kdu.get("kdu-instance") or "{}-{}".format(kdu["kdu-name"], nsr_id)
3546 params = self._map_primitive_params(config_primitive_desc, primitive_params, desc_params)
3547
3548 detailed_status = await asyncio.wait_for(
3549 self.k8scluster_map[kdu["k8scluster-type"]].exec_primitive(
3550 cluster_uuid=kdu.get("k8scluster-uuid"),
3551 kdu_instance=kdu_instance,
3552 primitive_name=primitive_name,
3553 params=params, db_dict=db_dict,
3554 timeout=timeout_ns_action),
3555 timeout=timeout_ns_action)
3556
3557 if detailed_status:
3558 nslcmop_operation_state = 'COMPLETED'
3559 else:
3560 detailed_status = ''
3561 nslcmop_operation_state = 'FAILED'
3562 else:
3563 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"], member_vnf_index=vnf_index,
3564 vdu_id=vdu_id, vdu_count_index=vdu_count_index,
3565 ee_descriptor_id=ee_descriptor_id)
3566 db_nslcmop_notif = {"collection": "nslcmops",
3567 "filter": {"_id": nslcmop_id},
3568 "path": "admin.VCA"}
3569 nslcmop_operation_state, detailed_status = await self._ns_execute_primitive(
3570 ee_id,
3571 primitive=primitive_name,
3572 primitive_params=self._map_primitive_params(config_primitive_desc, primitive_params, desc_params),
3573 timeout=timeout_ns_action,
3574 vca_type=vca_type,
3575 db_dict=db_nslcmop_notif)
3576
3577 db_nslcmop_update["detailed-status"] = detailed_status
3578 error_description_nslcmop = detailed_status if nslcmop_operation_state == "FAILED" else ""
3579 self.logger.debug(logging_text + " task Done with result {} {}".format(nslcmop_operation_state,
3580 detailed_status))
3581 return # database update is called inside finally
3582
3583 except (DbException, LcmException, N2VCException, K8sException) as e:
3584 self.logger.error(logging_text + "Exit Exception {}".format(e))
3585 exc = e
3586 except asyncio.CancelledError:
3587 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
3588 exc = "Operation was cancelled"
3589 except asyncio.TimeoutError:
3590 self.logger.error(logging_text + "Timeout while '{}'".format(step))
3591 exc = "Timeout"
3592 except Exception as e:
3593 exc = traceback.format_exc()
3594 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
3595 finally:
3596 if exc:
3597 db_nslcmop_update["detailed-status"] = detailed_status = error_description_nslcmop = \
3598 "FAILED {}: {}".format(step, exc)
3599 nslcmop_operation_state = "FAILED"
3600 if db_nsr:
3601 self._write_ns_status(
3602 nsr_id=nsr_id,
3603 ns_state=db_nsr["nsState"], # TODO check if degraded. For the moment use previous status
3604 current_operation="IDLE",
3605 current_operation_id=None,
3606 # error_description=error_description_nsr,
3607 # error_detail=error_detail,
3608 other_update=db_nsr_update
3609 )
3610
3611 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
3612 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
3613
3614 if nslcmop_operation_state:
3615 try:
3616 await self.msg.aiowrite("ns", "actioned", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id,
3617 "operationState": nslcmop_operation_state},
3618 loop=self.loop)
3619 except Exception as e:
3620 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
3621 self.logger.debug(logging_text + "Exit")
3622 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_action")
3623 return nslcmop_operation_state, detailed_status
3624
3625 async def scale(self, nsr_id, nslcmop_id):
3626 # Try to lock HA task here
3627 task_is_locked_by_me = self.lcm_tasks.lock_HA('ns', 'nslcmops', nslcmop_id)
3628 if not task_is_locked_by_me:
3629 return
3630
3631 logging_text = "Task ns={} scale={} ".format(nsr_id, nslcmop_id)
3632 stage = ['', '', '']
3633 tasks_dict_info = {}
3634 # ^ stage, step, VIM progress
3635 self.logger.debug(logging_text + "Enter")
3636 # get all needed from database
3637 db_nsr = None
3638 db_nslcmop_update = {}
3639 db_nsr_update = {}
3640 exc = None
3641 # in case of error, indicates what part of scale was failed to put nsr at error status
3642 scale_process = None
3643 old_operational_status = ""
3644 old_config_status = ""
3645 nsi_id = None
3646 try:
3647 # wait for any previous tasks in process
3648 step = "Waiting for previous operations to terminate"
3649 await self.lcm_tasks.waitfor_related_HA('ns', 'nslcmops', nslcmop_id)
3650 self._write_ns_status(nsr_id=nsr_id, ns_state=None,
3651 current_operation="SCALING", current_operation_id=nslcmop_id)
3652
3653 step = "Getting nslcmop from database"
3654 self.logger.debug(step + " after having waited for previous tasks to be completed")
3655 db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
3656
3657 step = "Getting nsr from database"
3658 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
3659 old_operational_status = db_nsr["operational-status"]
3660 old_config_status = db_nsr["config-status"]
3661
3662 step = "Parsing scaling parameters"
3663 db_nsr_update["operational-status"] = "scaling"
3664 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3665 nsr_deployed = db_nsr["_admin"].get("deployed")
3666
3667 #######
3668 nsr_deployed = db_nsr["_admin"].get("deployed")
3669 vnf_index = db_nslcmop["operationParams"].get("member_vnf_index")
3670 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3671 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3672 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3673 #######
3674
3675 vnf_index = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3676 scaling_group = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3677 scaling_type = db_nslcmop["operationParams"]["scaleVnfData"]["scaleVnfType"]
3678 # for backward compatibility
3679 if nsr_deployed and isinstance(nsr_deployed.get("VCA"), dict):
3680 nsr_deployed["VCA"] = list(nsr_deployed["VCA"].values())
3681 db_nsr_update["_admin.deployed.VCA"] = nsr_deployed["VCA"]
3682 self.update_db_2("nsrs", nsr_id, db_nsr_update)
3683
3684 step = "Getting vnfr from database"
3685 db_vnfr = self.db.get_one("vnfrs", {"member-vnf-index-ref": vnf_index, "nsr-id-ref": nsr_id})
3686
3687 step = "Getting vnfd from database"
3688 db_vnfd = self.db.get_one("vnfds", {"_id": db_vnfr["vnfd-id"]})
3689
3690 base_folder = db_vnfd["_admin"]["storage"]
3691
3692 step = "Getting scaling-group-descriptor"
3693 scaling_descriptor = find_in_list(
3694 get_scaling_aspect(
3695 db_vnfd
3696 ),
3697 lambda scale_desc: scale_desc["name"] == scaling_group
3698 )
3699 if not scaling_descriptor:
3700 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3701 "at vnfd:scaling-group-descriptor".format(scaling_group))
3702
3703 step = "Sending scale order to VIM"
3704 # TODO check if ns is in a proper status
3705 nb_scale_op = 0
3706 if not db_nsr["_admin"].get("scaling-group"):
3707 self.update_db_2("nsrs", nsr_id, {"_admin.scaling-group": [{"name": scaling_group, "nb-scale-op": 0}]})
3708 admin_scale_index = 0
3709 else:
3710 for admin_scale_index, admin_scale_info in enumerate(db_nsr["_admin"]["scaling-group"]):
3711 if admin_scale_info["name"] == scaling_group:
3712 nb_scale_op = admin_scale_info.get("nb-scale-op", 0)
3713 break
3714 else: # not found, set index one plus last element and add new entry with the name
3715 admin_scale_index += 1
3716 db_nsr_update["_admin.scaling-group.{}.name".format(admin_scale_index)] = scaling_group
3717 RO_scaling_info = []
3718 VCA_scaling_info = []
3719 vdu_scaling_info = {"scaling_group_name": scaling_group, "vdu": []}
3720 if scaling_type == "SCALE_OUT":
3721 if "aspect-delta-details" not in scaling_descriptor:
3722 raise LcmException(
3723 "Aspect delta details not fount in scaling descriptor {}".format(
3724 scaling_descriptor["name"]
3725 )
3726 )
3727 # count if max-instance-count is reached
3728 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3729
3730 vdu_scaling_info["scaling_direction"] = "OUT"
3731 vdu_scaling_info["vdu-create"] = {}
3732 for delta in deltas:
3733 for vdu_delta in delta["vdu-delta"]:
3734 vdud = get_vdu(db_vnfd, vdu_delta["id"])
3735 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3736 cloud_init_text = self._get_vdu_cloud_init_content(vdud, db_vnfd)
3737 if cloud_init_text:
3738 additional_params = self._get_vdu_additional_params(db_vnfr, vdud["id"]) or {}
3739 cloud_init_list = []
3740
3741 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3742 max_instance_count = 10
3743 if vdu_profile and "max-number-of-instances" in vdu_profile:
3744 max_instance_count = vdu_profile.get("max-number-of-instances", 10)
3745
3746 default_instance_num = get_number_of_instances(db_vnfd, vdud["id"])
3747
3748 nb_scale_op += vdu_delta.get("number-of-instances", 1)
3749
3750 if nb_scale_op + default_instance_num > max_instance_count:
3751 raise LcmException(
3752 "reached the limit of {} (max-instance-count) "
3753 "scaling-out operations for the "
3754 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3755 )
3756 for x in range(vdu_delta.get("number-of-instances", 1)):
3757 if cloud_init_text:
3758 # TODO Information of its own ip is not available because db_vnfr is not updated.
3759 additional_params["OSM"] = get_osm_params(
3760 db_vnfr,
3761 vdu_delta["id"],
3762 vdu_index + x
3763 )
3764 cloud_init_list.append(
3765 self._parse_cloud_init(
3766 cloud_init_text,
3767 additional_params,
3768 db_vnfd["id"],
3769 vdud["id"]
3770 )
3771 )
3772 VCA_scaling_info.append(
3773 {
3774 "osm_vdu_id": vdu_delta["id"],
3775 "member-vnf-index": vnf_index,
3776 "type": "create",
3777 "vdu_index": vdu_index + x
3778 }
3779 )
3780 RO_scaling_info.append(
3781 {
3782 "osm_vdu_id": vdu_delta["id"],
3783 "member-vnf-index": vnf_index,
3784 "type": "create",
3785 "count": vdu_delta.get("number-of-instances", 1)
3786 }
3787 )
3788 if cloud_init_list:
3789 RO_scaling_info[-1]["cloud_init"] = cloud_init_list
3790 vdu_scaling_info["vdu-create"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3791
3792 elif scaling_type == "SCALE_IN":
3793 if "min-instance-count" in scaling_descriptor and scaling_descriptor["min-instance-count"] is not None:
3794 min_instance_count = int(scaling_descriptor["min-instance-count"])
3795
3796 vdu_scaling_info["scaling_direction"] = "IN"
3797 vdu_scaling_info["vdu-delete"] = {}
3798 deltas = scaling_descriptor.get("aspect-delta-details")["deltas"]
3799 for delta in deltas:
3800 for vdu_delta in delta["vdu-delta"]:
3801 vdu_index = get_vdur_index(db_vnfr, vdu_delta)
3802 min_instance_count = 0
3803 vdu_profile = get_vdu_profile(db_vnfd, vdu_delta["id"])
3804 if vdu_profile and "min-number-of-instances" in vdu_profile:
3805 min_instance_count = vdu_profile["min-number-of-instances"]
3806
3807 default_instance_num = get_number_of_instances(db_vnfd, vdu_delta["id"])
3808
3809 nb_scale_op -= vdu_delta.get("number-of-instances", 1)
3810 if nb_scale_op + default_instance_num < min_instance_count:
3811 raise LcmException(
3812 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3813 "scaling-group-descriptor '{}'".format(nb_scale_op, scaling_group)
3814 )
3815 RO_scaling_info.append({"osm_vdu_id": vdu_delta["id"], "member-vnf-index": vnf_index,
3816 "type": "delete", "count": vdu_delta.get("number-of-instances", 1),
3817 "vdu_index": vdu_index - 1})
3818 for x in range(vdu_delta.get("number-of-instances", 1)):
3819 VCA_scaling_info.append(
3820 {
3821 "osm_vdu_id": vdu_delta["id"],
3822 "member-vnf-index": vnf_index,
3823 "type": "delete",
3824 "vdu_index": vdu_index - 1 - x
3825 }
3826 )
3827 vdu_scaling_info["vdu-delete"][vdu_delta["id"]] = vdu_delta.get("number-of-instances", 1)
3828
3829 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3830 vdu_delete = copy(vdu_scaling_info.get("vdu-delete"))
3831 if vdu_scaling_info["scaling_direction"] == "IN":
3832 for vdur in reversed(db_vnfr["vdur"]):
3833 if vdu_delete.get(vdur["vdu-id-ref"]):
3834 vdu_delete[vdur["vdu-id-ref"]] -= 1
3835 vdu_scaling_info["vdu"].append({
3836 "name": vdur.get("name") or vdur.get("vdu-name"),
3837 "vdu_id": vdur["vdu-id-ref"],
3838 "interface": []
3839 })
3840 for interface in vdur["interfaces"]:
3841 vdu_scaling_info["vdu"][-1]["interface"].append({
3842 "name": interface["name"],
3843 "ip_address": interface["ip-address"],
3844 "mac_address": interface.get("mac-address"),
3845 })
3846 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3847
3848 # PRE-SCALE BEGIN
3849 step = "Executing pre-scale vnf-config-primitive"
3850 if scaling_descriptor.get("scaling-config-action"):
3851 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
3852 if (scaling_config_action.get("trigger") == "pre-scale-in" and scaling_type == "SCALE_IN") \
3853 or (scaling_config_action.get("trigger") == "pre-scale-out" and scaling_type == "SCALE_OUT"):
3854 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
3855 step = db_nslcmop_update["detailed-status"] = \
3856 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive)
3857
3858 # look for primitive
3859 for config_primitive in (get_configuration(
3860 db_vnfd, db_vnfd["id"]
3861 ) or {}).get("config-primitive", ()):
3862 if config_primitive["name"] == vnf_config_primitive:
3863 break
3864 else:
3865 raise LcmException(
3866 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3867 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3868 "primitive".format(scaling_group, vnf_config_primitive))
3869
3870 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
3871 if db_vnfr.get("additionalParamsForVnf"):
3872 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
3873
3874 scale_process = "VCA"
3875 db_nsr_update["config-status"] = "configuring pre-scaling"
3876 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
3877
3878 # Pre-scale retry check: Check if this sub-operation has been executed before
3879 op_index = self._check_or_add_scale_suboperation(
3880 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'PRE-SCALE')
3881 if op_index == self.SUBOPERATION_STATUS_SKIP:
3882 # Skip sub-operation
3883 result = 'COMPLETED'
3884 result_detail = 'Done'
3885 self.logger.debug(logging_text +
3886 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3887 vnf_config_primitive, result, result_detail))
3888 else:
3889 if op_index == self.SUBOPERATION_STATUS_NEW:
3890 # New sub-operation: Get index of this sub-operation
3891 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
3892 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
3893 format(vnf_config_primitive))
3894 else:
3895 # retry: Get registered params for this existing sub-operation
3896 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
3897 vnf_index = op.get('member_vnf_index')
3898 vnf_config_primitive = op.get('primitive')
3899 primitive_params = op.get('primitive_params')
3900 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
3901 format(vnf_config_primitive))
3902 # Execute the primitive, either with new (first-time) or registered (reintent) args
3903 ee_descriptor_id = config_primitive.get("execution-environment-ref")
3904 primitive_name = config_primitive.get("execution-environment-primitive",
3905 vnf_config_primitive)
3906 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
3907 member_vnf_index=vnf_index,
3908 vdu_id=None,
3909 vdu_count_index=None,
3910 ee_descriptor_id=ee_descriptor_id)
3911 result, result_detail = await self._ns_execute_primitive(
3912 ee_id, primitive_name, primitive_params, vca_type=vca_type)
3913 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
3914 vnf_config_primitive, result, result_detail))
3915 # Update operationState = COMPLETED | FAILED
3916 self._update_suboperation_status(
3917 db_nslcmop, op_index, result, result_detail)
3918
3919 if result == "FAILED":
3920 raise LcmException(result_detail)
3921 db_nsr_update["config-status"] = old_config_status
3922 scale_process = None
3923 # PRE-SCALE END
3924
3925 db_nsr_update["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index)] = nb_scale_op
3926 db_nsr_update["_admin.scaling-group.{}.time".format(admin_scale_index)] = time()
3927
3928 # SCALE-IN VCA - BEGIN
3929 if VCA_scaling_info:
3930 step = db_nslcmop_update["detailed-status"] = \
3931 "Deleting the execution environments"
3932 scale_process = "VCA"
3933 for vdu_info in VCA_scaling_info:
3934 if vdu_info["type"] == "delete":
3935 member_vnf_index = str(vdu_info["member-vnf-index"])
3936 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
3937 vdu_id = vdu_info["osm_vdu_id"]
3938 vdu_index = int(vdu_info["vdu_index"])
3939 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
3940 member_vnf_index, vdu_id, vdu_index)
3941 stage[2] = step = "Scaling in VCA"
3942 self._write_op_status(
3943 op_id=nslcmop_id,
3944 stage=stage
3945 )
3946 vca_update = db_nsr["_admin"]["deployed"]["VCA"]
3947 config_update = db_nsr["configurationStatus"]
3948 for vca_index, vca in enumerate(vca_update):
3949 if (vca or vca.get("ee_id")) and vca["member-vnf-index"] == member_vnf_index and \
3950 vca["vdu_count_index"] == vdu_index:
3951 if vca.get("vdu_id"):
3952 config_descriptor = get_configuration(db_vnfd, vca.get("vdu_id"))
3953 elif vca.get("kdu_name"):
3954 config_descriptor = get_configuration(db_vnfd, vca.get("kdu_name"))
3955 else:
3956 config_descriptor = get_configuration(db_vnfd, db_vnfd["id"])
3957 operation_params = db_nslcmop.get("operationParams") or {}
3958 exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
3959 vca.get("needed_terminate"))
3960 task = asyncio.ensure_future(asyncio.wait_for(
3961 self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
3962 vca_index, destroy_ee=True,
3963 exec_primitives=exec_terminate_primitives,
3964 scaling_in=True), timeout=self.timeout_charm_delete))
3965 # wait before next removal
3966 await asyncio.sleep(30)
3967 tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
3968 del vca_update[vca_index]
3969 del config_update[vca_index]
3970 # wait for pending tasks of terminate primitives
3971 if tasks_dict_info:
3972 self.logger.debug(logging_text +
3973 'Waiting for tasks {}'.format(list(tasks_dict_info.keys())))
3974 error_list = await self._wait_for_tasks(logging_text, tasks_dict_info,
3975 min(self.timeout_charm_delete,
3976 self.timeout_ns_terminate),
3977 stage, nslcmop_id)
3978 tasks_dict_info.clear()
3979 if error_list:
3980 raise LcmException("; ".join(error_list))
3981
3982 db_vca_and_config_update = {
3983 "_admin.deployed.VCA": vca_update,
3984 "configurationStatus": config_update
3985 }
3986 self.update_db_2("nsrs", db_nsr["_id"], db_vca_and_config_update)
3987 scale_process = None
3988 # SCALE-IN VCA - END
3989
3990 # SCALE RO - BEGIN
3991 if RO_scaling_info:
3992 scale_process = "RO"
3993 if self.ro_config.get("ng"):
3994 await self._scale_ng_ro(logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage)
3995 vdu_scaling_info.pop("vdu-create", None)
3996 vdu_scaling_info.pop("vdu-delete", None)
3997
3998 scale_process = None
3999 if db_nsr_update:
4000 self.update_db_2("nsrs", nsr_id, db_nsr_update)
4001 # SCALE RO - END
4002
4003 # SCALE-UP VCA - BEGIN
4004 if VCA_scaling_info:
4005 step = db_nslcmop_update["detailed-status"] = \
4006 "Creating new execution environments"
4007 scale_process = "VCA"
4008 for vdu_info in VCA_scaling_info:
4009 if vdu_info["type"] == "create":
4010 member_vnf_index = str(vdu_info["member-vnf-index"])
4011 self.logger.debug(logging_text + "vdu info: {}".format(vdu_info))
4012 vnfd_id = db_vnfr["vnfd-ref"]
4013 vdu_index = int(vdu_info["vdu_index"])
4014 deploy_params = {"OSM": get_osm_params(db_vnfr)}
4015 if db_vnfr.get("additionalParamsForVnf"):
4016 deploy_params.update(parse_yaml_strings(db_vnfr["additionalParamsForVnf"].copy()))
4017 descriptor_config = get_configuration(db_vnfd, db_vnfd["id"])
4018 if descriptor_config:
4019 vdu_id = None
4020 vdu_name = None
4021 kdu_name = None
4022 self._deploy_n2vc(
4023 logging_text=logging_text + "member_vnf_index={} ".format(member_vnf_index),
4024 db_nsr=db_nsr,
4025 db_vnfr=db_vnfr,
4026 nslcmop_id=nslcmop_id,
4027 nsr_id=nsr_id,
4028 nsi_id=nsi_id,
4029 vnfd_id=vnfd_id,
4030 vdu_id=vdu_id,
4031 kdu_name=kdu_name,
4032 member_vnf_index=member_vnf_index,
4033 vdu_index=vdu_index,
4034 vdu_name=vdu_name,
4035 deploy_params=deploy_params,
4036 descriptor_config=descriptor_config,
4037 base_folder=base_folder,
4038 task_instantiation_info=tasks_dict_info,
4039 stage=stage
4040 )
4041 vdu_id = vdu_info["osm_vdu_id"]
4042 vdur = find_in_list(db_vnfr["vdur"], lambda vdu: vdu["vdu-id-ref"] == vdu_id)
4043 descriptor_config = get_configuration(db_vnfd, vdu_id)
4044 if vdur.get("additionalParams"):
4045 deploy_params_vdu = parse_yaml_strings(vdur["additionalParams"])
4046 else:
4047 deploy_params_vdu = deploy_params
4048 deploy_params_vdu["OSM"] = get_osm_params(db_vnfr, vdu_id, vdu_count_index=vdu_index)
4049 if descriptor_config:
4050 vdu_name = None
4051 kdu_name = None
4052 stage[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4053 member_vnf_index, vdu_id, vdu_index)
4054 stage[2] = step = "Scaling out VCA"
4055 self._write_op_status(
4056 op_id=nslcmop_id,
4057 stage=stage
4058 )
4059 self._deploy_n2vc(
4060 logging_text=logging_text + "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4061 member_vnf_index, vdu_id, vdu_index),
4062 db_nsr=db_nsr,
4063 db_vnfr=db_vnfr,
4064 nslcmop_id=nslcmop_id,
4065 nsr_id=nsr_id,
4066 nsi_id=nsi_id,
4067 vnfd_id=vnfd_id,
4068 vdu_id=vdu_id,
4069 kdu_name=kdu_name,
4070 member_vnf_index=member_vnf_index,
4071 vdu_index=vdu_index,
4072 vdu_name=vdu_name,
4073 deploy_params=deploy_params_vdu,
4074 descriptor_config=descriptor_config,
4075 base_folder=base_folder,
4076 task_instantiation_info=tasks_dict_info,
4077 stage=stage
4078 )
4079 # TODO: scaling for kdu is not implemented yet.
4080 kdu_name = vdu_info["osm_vdu_id"]
4081 descriptor_config = get_configuration(db_vnfd, kdu_name)
4082 if descriptor_config:
4083 vdu_id = None
4084 vdu_index = vdu_index
4085 vdu_name = None
4086 kdur = next(x for x in db_vnfr["kdur"] if x["kdu-name"] == kdu_name)
4087 deploy_params_kdu = {"OSM": get_osm_params(db_vnfr)}
4088 if kdur.get("additionalParams"):
4089 deploy_params_kdu = parse_yaml_strings(kdur["additionalParams"])
4090
4091 self._deploy_n2vc(
4092 logging_text=logging_text,
4093 db_nsr=db_nsr,
4094 db_vnfr=db_vnfr,
4095 nslcmop_id=nslcmop_id,
4096 nsr_id=nsr_id,
4097 nsi_id=nsi_id,
4098 vnfd_id=vnfd_id,
4099 vdu_id=vdu_id,
4100 kdu_name=kdu_name,
4101 member_vnf_index=member_vnf_index,
4102 vdu_index=vdu_index,
4103 vdu_name=vdu_name,
4104 deploy_params=deploy_params_kdu,
4105 descriptor_config=descriptor_config,
4106 base_folder=base_folder,
4107 task_instantiation_info=tasks_dict_info,
4108 stage=stage
4109 )
4110 # SCALE-UP VCA - END
4111 scale_process = None
4112
4113 # POST-SCALE BEGIN
4114 # execute primitive service POST-SCALING
4115 step = "Executing post-scale vnf-config-primitive"
4116 if scaling_descriptor.get("scaling-config-action"):
4117 for scaling_config_action in scaling_descriptor["scaling-config-action"]:
4118 if (scaling_config_action.get("trigger") == "post-scale-in" and scaling_type == "SCALE_IN") \
4119 or (scaling_config_action.get("trigger") == "post-scale-out" and scaling_type == "SCALE_OUT"):
4120 vnf_config_primitive = scaling_config_action["vnf-config-primitive-name-ref"]
4121 step = db_nslcmop_update["detailed-status"] = \
4122 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive)
4123
4124 vnfr_params = {"VDU_SCALE_INFO": vdu_scaling_info}
4125 if db_vnfr.get("additionalParamsForVnf"):
4126 vnfr_params.update(db_vnfr["additionalParamsForVnf"])
4127
4128 # look for primitive
4129 for config_primitive in (
4130 get_configuration(db_vnfd, db_vnfd["id"]) or {}
4131 ).get("config-primitive", ()):
4132 if config_primitive["name"] == vnf_config_primitive:
4133 break
4134 else:
4135 raise LcmException(
4136 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4137 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4138 "config-primitive".format(scaling_group, vnf_config_primitive))
4139 scale_process = "VCA"
4140 db_nsr_update["config-status"] = "configuring post-scaling"
4141 primitive_params = self._map_primitive_params(config_primitive, {}, vnfr_params)
4142
4143 # Post-scale retry check: Check if this sub-operation has been executed before
4144 op_index = self._check_or_add_scale_suboperation(
4145 db_nslcmop, nslcmop_id, vnf_index, vnf_config_primitive, primitive_params, 'POST-SCALE')
4146 if op_index == self.SUBOPERATION_STATUS_SKIP:
4147 # Skip sub-operation
4148 result = 'COMPLETED'
4149 result_detail = 'Done'
4150 self.logger.debug(logging_text +
4151 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4152 format(vnf_config_primitive, result, result_detail))
4153 else:
4154 if op_index == self.SUBOPERATION_STATUS_NEW:
4155 # New sub-operation: Get index of this sub-operation
4156 op_index = len(db_nslcmop.get('_admin', {}).get('operations')) - 1
4157 self.logger.debug(logging_text + "vnf_config_primitive={} New sub-operation".
4158 format(vnf_config_primitive))
4159 else:
4160 # retry: Get registered params for this existing sub-operation
4161 op = db_nslcmop.get('_admin', {}).get('operations', [])[op_index]
4162 vnf_index = op.get('member_vnf_index')
4163 vnf_config_primitive = op.get('primitive')
4164 primitive_params = op.get('primitive_params')
4165 self.logger.debug(logging_text + "vnf_config_primitive={} Sub-operation retry".
4166 format(vnf_config_primitive))
4167 # Execute the primitive, either with new (first-time) or registered (reintent) args
4168 ee_descriptor_id = config_primitive.get("execution-environment-ref")
4169 primitive_name = config_primitive.get("execution-environment-primitive",
4170 vnf_config_primitive)
4171 ee_id, vca_type = self._look_for_deployed_vca(nsr_deployed["VCA"],
4172 member_vnf_index=vnf_index,
4173 vdu_id=None,
4174 vdu_count_index=None,
4175 ee_descriptor_id=ee_descriptor_id)
4176 result, result_detail = await self._ns_execute_primitive(
4177 ee_id, primitive_name, primitive_params, vca_type=vca_type)
4178 self.logger.debug(logging_text + "vnf_config_primitive={} Done with result {} {}".format(
4179 vnf_config_primitive, result, result_detail))
4180 # Update operationState = COMPLETED | FAILED
4181 self._update_suboperation_status(
4182 db_nslcmop, op_index, result, result_detail)
4183
4184 if result == "FAILED":
4185 raise LcmException(result_detail)
4186 db_nsr_update["config-status"] = old_config_status
4187 scale_process = None
4188 # POST-SCALE END
4189
4190 db_nsr_update["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4191 db_nsr_update["operational-status"] = "running" if old_operational_status == "failed" \
4192 else old_operational_status
4193 db_nsr_update["config-status"] = old_config_status
4194 return
4195 except (ROclient.ROClientException, DbException, LcmException, NgRoException) as e:
4196 self.logger.error(logging_text + "Exit Exception {}".format(e))
4197 exc = e
4198 except asyncio.CancelledError:
4199 self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step))
4200 exc = "Operation was cancelled"
4201 except Exception as e:
4202 exc = traceback.format_exc()
4203 self.logger.critical(logging_text + "Exit Exception {} {}".format(type(e).__name__, e), exc_info=True)
4204 finally:
4205 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE", current_operation_id=None)
4206 if tasks_dict_info:
4207 stage[1] = "Waiting for instantiate pending tasks."
4208 self.logger.debug(logging_text + stage[1])
4209 exc = await self._wait_for_tasks(logging_text, tasks_dict_info, self.timeout_ns_deploy,
4210 stage, nslcmop_id, nsr_id=nsr_id)
4211 if exc:
4212 db_nslcmop_update["detailed-status"] = error_description_nslcmop = "FAILED {}: {}".format(step, exc)
4213 nslcmop_operation_state = "FAILED"
4214 if db_nsr:
4215 db_nsr_update["operational-status"] = old_operational_status
4216 db_nsr_update["config-status"] = old_config_status
4217 db_nsr_update["detailed-status"] = ""
4218 if scale_process:
4219 if "VCA" in scale_process:
4220 db_nsr_update["config-status"] = "failed"
4221 if "RO" in scale_process:
4222 db_nsr_update["operational-status"] = "failed"
4223 db_nsr_update["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id, step,
4224 exc)
4225 else:
4226 error_description_nslcmop = None
4227 nslcmop_operation_state = "COMPLETED"
4228 db_nslcmop_update["detailed-status"] = "Done"
4229
4230 self._write_op_status(op_id=nslcmop_id, stage="", error_message=error_description_nslcmop,
4231 operation_state=nslcmop_operation_state, other_update=db_nslcmop_update)
4232 if db_nsr:
4233 self._write_ns_status(nsr_id=nsr_id, ns_state=None, current_operation="IDLE",
4234 current_operation_id=None, other_update=db_nsr_update)
4235
4236 if nslcmop_operation_state:
4237 try:
4238 msg = {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id, "operationState": nslcmop_operation_state}
4239 await self.msg.aiowrite("ns", "scaled", msg, loop=self.loop)
4240 except Exception as e:
4241 self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
4242 self.logger.debug(logging_text + "Exit")
4243 self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
4244
4245 async def _scale_ng_ro(self, logging_text, db_nsr, db_nslcmop, db_vnfr, vdu_scaling_info, stage):
4246 nsr_id = db_nslcmop["nsInstanceId"]
4247 db_nsd = self.db.get_one("nsds", {"_id": db_nsr["nsd-id"]})
4248 db_vnfrs = {}
4249
4250 # read from db: vnfd's for every vnf
4251 db_vnfds = []
4252
4253 # for each vnf in ns, read vnfd
4254 for vnfr in self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id}):
4255 db_vnfrs[vnfr["member-vnf-index-ref"]] = vnfr
4256 vnfd_id = vnfr["vnfd-id"] # vnfd uuid for this vnf
4257 # if we haven't this vnfd, read it from db
4258 if not find_in_list(db_vnfds, lambda a_vnfd: a_vnfd["id"] == vnfd_id):
4259 # read from db
4260 vnfd = self.db.get_one("vnfds", {"_id": vnfd_id})
4261 db_vnfds.append(vnfd)
4262 n2vc_key = self.n2vc.get_public_key()
4263 n2vc_key_list = [n2vc_key]
4264 self.scale_vnfr(db_vnfr, vdu_scaling_info.get("vdu-create"), vdu_scaling_info.get("vdu-delete"),
4265 mark_delete=True)
4266 # db_vnfr has been updated, update db_vnfrs to use it
4267 db_vnfrs[db_vnfr["member-vnf-index-ref"]] = db_vnfr
4268 await self._instantiate_ng_ro(logging_text, nsr_id, db_nsd, db_nsr, db_nslcmop, db_vnfrs,
4269 db_vnfds, n2vc_key_list, stage=stage, start_deploy=time(),
4270 timeout_ns_deploy=self.timeout_ns_deploy)
4271 if vdu_scaling_info.get("vdu-delete"):
4272 self.scale_vnfr(db_vnfr, None, vdu_scaling_info["vdu-delete"], mark_delete=False)
4273
4274 async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
4275 if not self.prometheus:
4276 return
4277 # look if exist a file called 'prometheus*.j2' and
4278 artifact_content = self.fs.dir_ls(artifact_path)
4279 job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
4280 if not job_file:
4281 return
4282 with self.fs.file_open((artifact_path, job_file), "r") as f:
4283 job_data = f.read()
4284
4285 # TODO get_service
4286 _, _, service = ee_id.partition(".") # remove prefix "namespace."
4287 host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
4288 host_port = "80"
4289 vnfr_id = vnfr_id.replace("-", "")
4290 variables = {
4291 "JOB_NAME": vnfr_id,
4292 "TARGET_IP": target_ip,
4293 "EXPORTER_POD_IP": host_name,
4294 "EXPORTER_POD_PORT": host_port,
4295 }
4296 job_list = self.prometheus.parse_job(job_data, variables)
4297 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4298 for job in job_list:
4299 if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
4300 job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
4301 job["nsr_id"] = nsr_id
4302 job_dict = {jl["job_name"]: jl for jl in job_list}
4303 if await self.prometheus.update(job_dict):
4304 return list(job_dict.keys())
4305
4306 def get_vca_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4307 """
4308 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4309
4310 :param: vim_account_id: VIM Account ID
4311
4312 :return: (cloud_name, cloud_credential)
4313 """
4314 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4315 return config.get("vca_cloud"), config.get("vca_cloud_credential")
4316
4317 def get_vca_k8s_cloud_and_credentials(self, vim_account_id: str) -> (str, str):
4318 """
4319 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4320
4321 :param: vim_account_id: VIM Account ID
4322
4323 :return: (cloud_name, cloud_credential)
4324 """
4325 config = VimAccountDB.get_vim_account_with_id(vim_account_id).get("config", {})
4326 return config.get("vca_k8s_cloud"), config.get("vca_k8s_cloud_credential")