1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, TemplateError
, TemplateNotFound
, StrictUndefined
, UndefinedError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
31 from osm_lcm
.data_utils
.vnfd
import get_vnf_configuration
, get_vdu_list
, get_vdu_profile
, \
32 get_ee_sorted_initial_config_primitive_list
, get_ee_sorted_terminate_config_primitive_list
, \
33 get_kdu_list
, get_virtual_link_profiles
, get_vdu
, get_vdu_configuration
, get_kdu_configuration
, \
34 get_vdu_index
, get_scaling_aspect
, get_number_of_instances
35 from osm_lcm
.data_utils
.list_utils
import find_in_list
36 from osm_lcm
.data_utils
.vnfr
import get_osm_params
37 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
38 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
39 from n2vc
.k8s_helm_conn
import K8sHelmConnector
40 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
41 from n2vc
.k8s_juju_conn
import K8sJujuConnector
43 from osm_common
.dbbase
import DbException
44 from osm_common
.fsbase
import FsException
46 from osm_lcm
.data_utils
.database
.database
import Database
47 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
49 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
50 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
52 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
54 from copy
import copy
, deepcopy
56 from uuid
import uuid4
58 from random
import randint
60 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
64 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete
= 10 * 60
68 timeout_primitive
= 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
71 SUBOPERATION_STATUS_NOT_FOUND
= -1
72 SUBOPERATION_STATUS_NEW
= -2
73 SUBOPERATION_STATUS_SKIP
= -3
74 task_name_deploy_vca
= "Deploying VCA"
76 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
84 logger
=logging
.getLogger('lcm.ns')
87 self
.db
= Database().instance
.db
88 self
.fs
= Filesystem().instance
.fs
90 self
.lcm_tasks
= lcm_tasks
91 self
.timeout
= config
["timeout"]
92 self
.ro_config
= config
["ro_config"]
93 self
.ng_ro
= config
["ro_config"].get("ng")
94 self
.vca_config
= config
["VCA"].copy()
96 # create N2VC connector
97 self
.n2vc
= N2VCJujuConnector(
100 url
='{}:{}'.format(self
.vca_config
['host'], self
.vca_config
['port']),
101 username
=self
.vca_config
.get('user', None),
102 vca_config
=self
.vca_config
,
103 on_update_db
=self
._on
_update
_n
2vc
_db
,
108 self
.conn_helm_ee
= LCMHelmConn(
113 vca_config
=self
.vca_config
,
114 on_update_db
=self
._on
_update
_n
2vc
_db
117 self
.k8sclusterhelm2
= K8sHelmConnector(
118 kubectl_command
=self
.vca_config
.get("kubectlpath"),
119 helm_command
=self
.vca_config
.get("helmpath"),
126 self
.k8sclusterhelm3
= K8sHelm3Connector(
127 kubectl_command
=self
.vca_config
.get("kubectlpath"),
128 helm_command
=self
.vca_config
.get("helm3path"),
135 self
.k8sclusterjuju
= K8sJujuConnector(
136 kubectl_command
=self
.vca_config
.get("kubectlpath"),
137 juju_command
=self
.vca_config
.get("jujupath"),
141 vca_config
=self
.vca_config
,
146 self
.k8scluster_map
= {
147 "helm-chart": self
.k8sclusterhelm2
,
148 "helm-chart-v3": self
.k8sclusterhelm3
,
149 "chart": self
.k8sclusterhelm3
,
150 "juju-bundle": self
.k8sclusterjuju
,
151 "juju": self
.k8sclusterjuju
,
155 "lxc_proxy_charm": self
.n2vc
,
156 "native_charm": self
.n2vc
,
157 "k8s_proxy_charm": self
.n2vc
,
158 "helm": self
.conn_helm_ee
,
159 "helm-v3": self
.conn_helm_ee
162 self
.prometheus
= prometheus
165 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
168 def increment_ip_mac(ip_mac
, vm_index
=1):
169 if not isinstance(ip_mac
, str):
172 # try with ipv4 look for last dot
173 i
= ip_mac
.rfind(".")
176 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i
= ip_mac
.rfind(":")
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
)
187 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
192 # TODO filter RO descriptor fields...
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict
['deploymentStatus'] = ro_descriptor
198 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
200 except Exception as e
:
201 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
203 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
):
205 # remove last dot from path (if exists)
206 if path
.endswith('.'):
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
214 nsr_id
= filter.get('_id')
216 # read ns record from database
217 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
218 current_ns_status
= nsr
.get('nsState')
220 # get vca status for NS
221 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False)
225 db_dict
['vcaStatus'] = status_dict
227 # update configurationStatus for this VCA
229 vca_index
= int(path
[path
.rfind(".")+1:])
231 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
232 vca_status
= vca_list
[vca_index
].get('status')
234 configuration_status_list
= nsr
.get('configurationStatus')
235 config_status
= configuration_status_list
[vca_index
].get('status')
237 if config_status
== 'BROKEN' and vca_status
!= 'failed':
238 db_dict
['configurationStatus'][vca_index
] = 'READY'
239 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
240 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
241 except Exception as e
:
242 # not update configurationStatus
243 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
248 if current_ns_status
in ('READY', 'DEGRADED'):
249 error_description
= ''
251 if status_dict
.get('machines'):
252 for machine_id
in status_dict
.get('machines'):
253 machine
= status_dict
.get('machines').get(machine_id
)
254 # check machine agent-status
255 if machine
.get('agent-status'):
256 s
= machine
.get('agent-status').get('status')
259 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
260 # check machine instance status
261 if machine
.get('instance-status'):
262 s
= machine
.get('instance-status').get('status')
265 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
267 if status_dict
.get('applications'):
268 for app_id
in status_dict
.get('applications'):
269 app
= status_dict
.get('applications').get(app_id
)
270 # check application status
271 if app
.get('status'):
272 s
= app
.get('status').get('status')
275 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
277 if error_description
:
278 db_dict
['errorDescription'] = error_description
279 if current_ns_status
== 'READY' and is_degraded
:
280 db_dict
['nsState'] = 'DEGRADED'
281 if current_ns_status
== 'DEGRADED' and not is_degraded
:
282 db_dict
['nsState'] = 'READY'
285 self
.update_db_2("nsrs", nsr_id
, db_dict
)
287 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
289 except Exception as e
:
290 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
293 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
295 env
= Environment(undefined
=StrictUndefined
)
296 template
= env
.from_string(cloud_init_text
)
297 return template
.render(additional_params
or {})
298 except UndefinedError
as e
:
299 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
300 "file, must be provided in the instantiation parameters inside the "
301 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
))
302 except (TemplateError
, TemplateNotFound
) as e
:
303 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
304 format(vnfd_id
, vdu_id
, e
))
306 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
307 cloud_init_content
= cloud_init_file
= None
309 if vdu
.get("cloud-init-file"):
310 base_folder
= vnfd
["_admin"]["storage"]
311 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
312 vdu
["cloud-init-file"])
313 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
314 cloud_init_content
= ci_file
.read()
315 elif vdu
.get("cloud-init"):
316 cloud_init_content
= vdu
["cloud-init"]
318 return cloud_init_content
319 except FsException
as e
:
320 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
321 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
323 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
324 vdur
= next(vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"])
325 additional_params
= vdur
.get("additionalParams")
326 return parse_yaml_strings(additional_params
)
328 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
330 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
331 :param vnfd: input vnfd
332 :param new_id: overrides vnf id if provided
333 :param additionalParams: Instantiation params for VNFs provided
334 :param nsrId: Id of the NSR
335 :return: copy of vnfd
337 vnfd_RO
= deepcopy(vnfd
)
338 # remove unused by RO configuration, monitoring, scaling and internal keys
339 vnfd_RO
.pop("_id", None)
340 vnfd_RO
.pop("_admin", None)
341 vnfd_RO
.pop("vnf-configuration", None)
342 vnfd_RO
.pop("monitoring-param", None)
343 vnfd_RO
.pop("scaling-group-descriptor", None)
344 vnfd_RO
.pop("kdu", None)
345 vnfd_RO
.pop("k8s-cluster", None)
347 vnfd_RO
["id"] = new_id
349 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
350 for vdu
in get_iterable(vnfd_RO
, "vdu"):
351 vdu
.pop("cloud-init-file", None)
352 vdu
.pop("cloud-init", None)
356 def ip_profile_2_RO(ip_profile
):
357 RO_ip_profile
= deepcopy(ip_profile
)
358 if "dns-server" in RO_ip_profile
:
359 if isinstance(RO_ip_profile
["dns-server"], list):
360 RO_ip_profile
["dns-address"] = []
361 for ds
in RO_ip_profile
.pop("dns-server"):
362 RO_ip_profile
["dns-address"].append(ds
['address'])
364 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
365 if RO_ip_profile
.get("ip-version") == "ipv4":
366 RO_ip_profile
["ip-version"] = "IPv4"
367 if RO_ip_profile
.get("ip-version") == "ipv6":
368 RO_ip_profile
["ip-version"] = "IPv6"
369 if "dhcp-params" in RO_ip_profile
:
370 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
373 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
374 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
375 if db_vim
["_admin"]["operationalState"] != "ENABLED":
376 raise LcmException("VIM={} is not available. operationalState={}".format(
377 vim_account
, db_vim
["_admin"]["operationalState"]))
378 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
381 def get_ro_wim_id_for_wim_account(self
, wim_account
):
382 if isinstance(wim_account
, str):
383 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
384 if db_wim
["_admin"]["operationalState"] != "ENABLED":
385 raise LcmException("WIM={} is not available. operationalState={}".format(
386 wim_account
, db_wim
["_admin"]["operationalState"]))
387 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
392 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
394 db_vdu_push_list
= []
395 db_update
= {"_admin.modified": time()}
397 for vdu_id
, vdu_count
in vdu_create
.items():
398 vdur
= next((vdur
for vdur
in reversed(db_vnfr
["vdur"]) if vdur
["vdu-id-ref"] == vdu_id
), None)
400 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
403 for count
in range(vdu_count
):
404 vdur_copy
= deepcopy(vdur
)
405 vdur_copy
["status"] = "BUILD"
406 vdur_copy
["status-detailed"] = None
407 vdur_copy
["ip-address"]: None
408 vdur_copy
["_id"] = str(uuid4())
409 vdur_copy
["count-index"] += count
+ 1
410 vdur_copy
["id"] = "{}-{}".format(vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"])
411 vdur_copy
.pop("vim_info", None)
412 for iface
in vdur_copy
["interfaces"]:
413 if iface
.get("fixed-ip"):
414 iface
["ip-address"] = self
.increment_ip_mac(iface
["ip-address"], count
+1)
416 iface
.pop("ip-address", None)
417 if iface
.get("fixed-mac"):
418 iface
["mac-address"] = self
.increment_ip_mac(iface
["mac-address"], count
+1)
420 iface
.pop("mac-address", None)
421 iface
.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
422 db_vdu_push_list
.append(vdur_copy
)
423 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
425 for vdu_id
, vdu_count
in vdu_delete
.items():
427 indexes_to_delete
= [iv
[0] for iv
in enumerate(db_vnfr
["vdur"]) if iv
[1]["vdu-id-ref"] == vdu_id
]
428 db_update
.update({"vdur.{}.status".format(i
): "DELETING" for i
in indexes_to_delete
[-vdu_count
:]})
430 # it must be deleted one by one because common.db does not allow otherwise
431 vdus_to_delete
= [v
for v
in reversed(db_vnfr
["vdur"]) if v
["vdu-id-ref"] == vdu_id
]
432 for vdu
in vdus_to_delete
[:vdu_count
]:
433 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, None, pull
={"vdur": {"_id": vdu
["_id"]}})
434 db_push
= {"vdur": db_vdu_push_list
} if db_vdu_push_list
else None
435 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
436 # modify passed dictionary db_vnfr
437 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
438 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
440 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
442 Updates database nsr with the RO info for the created vld
443 :param ns_update_nsr: dictionary to be filled with the updated info
444 :param db_nsr: content of db_nsr. This is also modified
445 :param nsr_desc_RO: nsr descriptor from RO
446 :return: Nothing, LcmException is raised on errors
449 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
450 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
451 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
453 vld
["vim-id"] = net_RO
.get("vim_net_id")
454 vld
["name"] = net_RO
.get("vim_name")
455 vld
["status"] = net_RO
.get("status")
456 vld
["status-detailed"] = net_RO
.get("error_msg")
457 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
460 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
462 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
464 for db_vnfr
in db_vnfrs
.values():
465 vnfr_update
= {"status": "ERROR"}
466 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
467 if "status" not in vdur
:
468 vdur
["status"] = "ERROR"
469 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
471 vdur
["status-detailed"] = str(error_text
)
472 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
473 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
474 except DbException
as e
:
475 self
.logger
.error("Cannot update vnf. {}".format(e
))
477 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
479 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
480 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
481 :param nsr_desc_RO: nsr descriptor from RO
482 :return: Nothing, LcmException is raised on errors
484 for vnf_index
, db_vnfr
in db_vnfrs
.items():
485 for vnf_RO
in nsr_desc_RO
["vnfs"]:
486 if vnf_RO
["member_vnf_index"] != vnf_index
:
489 if vnf_RO
.get("ip_address"):
490 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
491 elif not db_vnfr
.get("ip-address"):
492 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
493 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
495 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
496 vdur_RO_count_index
= 0
497 if vdur
.get("pdu-type"):
499 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
500 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
502 if vdur
["count-index"] != vdur_RO_count_index
:
503 vdur_RO_count_index
+= 1
505 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
506 if vdur_RO
.get("ip_address"):
507 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
509 vdur
["ip-address"] = None
510 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
511 vdur
["name"] = vdur_RO
.get("vim_name")
512 vdur
["status"] = vdur_RO
.get("status")
513 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
514 for ifacer
in get_iterable(vdur
, "interfaces"):
515 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
516 if ifacer
["name"] == interface_RO
.get("internal_name"):
517 ifacer
["ip-address"] = interface_RO
.get("ip_address")
518 ifacer
["mac-address"] = interface_RO
.get("mac_address")
521 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
523 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
524 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
527 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
528 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
530 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
531 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
532 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
534 vld
["vim-id"] = net_RO
.get("vim_net_id")
535 vld
["name"] = net_RO
.get("vim_name")
536 vld
["status"] = net_RO
.get("status")
537 vld
["status-detailed"] = net_RO
.get("error_msg")
538 vnfr_update
["vld.{}".format(vld_index
)] = vld
541 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
542 vnf_index
, vld
["id"]))
544 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
548 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
550 def _get_ns_config_info(self
, nsr_id
):
552 Generates a mapping between vnf,vdu elements and the N2VC id
553 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
554 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
555 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
556 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
558 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
559 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
561 ns_config_info
= {"osm-config-mapping": mapping
}
562 for vca
in vca_deployed_list
:
563 if not vca
["member-vnf-index"]:
565 if not vca
["vdu_id"]:
566 mapping
[vca
["member-vnf-index"]] = vca
["application"]
568 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
570 return ns_config_info
572 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
573 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
577 def get_vim_account(vim_account_id
):
579 if vim_account_id
in db_vims
:
580 return db_vims
[vim_account_id
]
581 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
582 db_vims
[vim_account_id
] = db_vim
585 # modify target_vld info with instantiation parameters
586 def parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, target_sdn
):
587 if vld_params
.get("ip-profile"):
588 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
["ip-profile"]
589 if vld_params
.get("provider-network"):
590 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
["provider-network"]
591 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
592 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
["provider-network"]["sdn-ports"]
593 if vld_params
.get("wimAccountId"):
594 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
595 target_vld
["vim_info"][target_wim
] = {}
596 for param
in ("vim-network-name", "vim-network-id"):
597 if vld_params
.get(param
):
598 if isinstance(vld_params
[param
], dict):
599 for vim
, vim_net
in vld_params
[param
]:
600 other_target_vim
= "vim:" + vim
601 populate_dict(target_vld
["vim_info"], (other_target_vim
, param
.replace("-", "_")), vim_net
)
602 else: # isinstance str
603 target_vld
["vim_info"][target_vim
][param
.replace("-", "_")] = vld_params
[param
]
604 if vld_params
.get("common_id"):
605 target_vld
["common_id"] = vld_params
.get("common_id")
607 nslcmop_id
= db_nslcmop
["_id"]
609 "name": db_nsr
["name"],
612 "image": deepcopy(db_nsr
["image"]),
613 "flavor": deepcopy(db_nsr
["flavor"]),
614 "action_id": nslcmop_id
,
615 "cloud_init_content": {},
617 for image
in target
["image"]:
618 image
["vim_info"] = {}
619 for flavor
in target
["flavor"]:
620 flavor
["vim_info"] = {}
622 if db_nslcmop
.get("lcmOperationType") != "instantiate":
623 # get parameters of instantiation:
624 db_nslcmop_instantiate
= self
.db
.get_list("nslcmops", {"nsInstanceId": db_nslcmop
["nsInstanceId"],
625 "lcmOperationType": "instantiate"})[-1]
626 ns_params
= db_nslcmop_instantiate
.get("operationParams")
628 ns_params
= db_nslcmop
.get("operationParams")
629 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
630 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
633 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
634 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
638 "mgmt-network": vld
.get("mgmt-network", False),
639 "type": vld
.get("type"),
642 "vim_network_name": vld
.get("vim-network-name"),
643 "vim_account_id": ns_params
["vimAccountId"]
647 # check if this network needs SDN assist
648 if vld
.get("pci-interfaces"):
649 db_vim
= VimAccountDB
.get_vim_account_with_id(target_vld
["vim_info"][0]["vim_account_id"])
650 sdnc_id
= db_vim
["config"].get("sdn-controller")
652 target_vld
["vim_info"].append({"sdnc_id": sdnc_id
})
654 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
655 for nsd_vnf_profile
in nsd_vnf_profiles
:
656 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
657 if cp
["virtual-link-profile-id"] == vld
["id"]:
658 cp2target
["member_vnf:{}.{}".format(
659 cp
["constituent-cpd-id"][0]["constituent-base-element-id"],
660 cp
["constituent-cpd-id"][0]["constituent-cpd-id"]
661 )] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
663 # check at nsd descriptor, if there is an ip-profile
665 virtual_link_profiles
= get_virtual_link_profiles(nsd
)
667 for vlp
in virtual_link_profiles
:
668 ip_profile
= find_in_list(nsd
["ip-profiles"],
669 lambda profile
: profile
["name"] == vlp
["ip-profile-ref"])
670 vld_params
["ip-profile"] = ip_profile
["ip-profile-params"]
671 # update vld_params with instantiation params
672 vld_instantiation_params
= find_in_list(get_iterable(ns_params
, "vld"),
673 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]))
674 if vld_instantiation_params
:
675 vld_params
.update(vld_instantiation_params
)
676 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
677 target
["ns"]["vld"].append(target_vld
)
679 for vnfr
in db_vnfrs
.values():
680 vnfd
= find_in_list(db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"])
681 vnf_params
= find_in_list(get_iterable(ns_params
, "vnf"),
682 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"])
683 target_vnf
= deepcopy(vnfr
)
684 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
685 for vld
in target_vnf
.get("vld", ()):
686 # check if connected to a ns.vld, to fill target'
687 vnf_cp
= find_in_list(vnfd
.get("int-virtual-link-desc", ()),
688 lambda cpd
: cpd
.get("id") == vld
["id"])
690 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
691 if cp2target
.get(ns_cp
):
692 vld
["target"] = cp2target
[ns_cp
]
694 vld
["vim_info"] = {target_vim
: {"vim_network_name": vld
.get("vim-network-name")}}
695 # check if this network needs SDN assist
697 if vld
.get("pci-interfaces"):
698 db_vim
= get_vim_account(vnfr
["vim-account-id"])
699 sdnc_id
= db_vim
["config"].get("sdn-controller")
701 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
702 target_sdn
= "sdn:{}".format(sdnc_id
)
703 vld
["vim_info"][target_sdn
] = {
704 "sdn": True, "target_vim": target_vim
, "vlds": [sdn_vld
], "type": vld
.get("type")}
706 # check at vnfd descriptor, if there is an ip-profile
708 vnfd_vlp
= find_in_list(
709 get_virtual_link_profiles(vnfd
),
710 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"]
712 if vnfd_vlp
and vnfd_vlp
.get("virtual-link-protocol-data") and \
713 vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data"):
714 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"]["l3-protocol-data"]
715 ip_profile_dest_data
= {}
716 if "ip-version" in ip_profile_source_data
:
717 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
["ip-version"]
718 if "cidr" in ip_profile_source_data
:
719 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
["cidr"]
720 if "gateway-ip" in ip_profile_source_data
:
721 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
["gateway-ip"]
722 if "dhcp-enabled" in ip_profile_source_data
:
723 ip_profile_dest_data
["dhcp-params"] = {
724 "enabled": ip_profile_source_data
["dhcp-enabled"]
727 vld_params
["ip-profile"] = ip_profile_dest_data
728 # update vld_params with instantiation params
730 vld_instantiation_params
= find_in_list(get_iterable(vnf_params
, "internal-vld"),
731 lambda i_vld
: i_vld
["name"] == vld
["id"])
732 if vld_instantiation_params
:
733 vld_params
.update(vld_instantiation_params
)
734 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
737 for vdur
in target_vnf
.get("vdur", ()):
738 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
739 continue # This vdu must not be created
740 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
742 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
745 vdu_configuration
= get_vdu_configuration(vnfd
, vdur
["vdu-id-ref"])
746 vnf_configuration
= get_vnf_configuration(vnfd
)
747 if vdu_configuration
and vdu_configuration
.get("config-access") and \
748 vdu_configuration
.get("config-access").get("ssh-access"):
749 vdur
["ssh-keys"] = ssh_keys_all
750 vdur
["ssh-access-required"] = vdu_configuration
["config-access"]["ssh-access"]["required"]
751 elif vnf_configuration
and vnf_configuration
.get("config-access") and \
752 vnf_configuration
.get("config-access").get("ssh-access") and \
753 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
754 vdur
["ssh-keys"] = ssh_keys_all
755 vdur
["ssh-access-required"] = vnf_configuration
["config-access"]["ssh-access"]["required"]
756 elif ssh_keys_instantiation
and \
757 find_in_list(vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")):
758 vdur
["ssh-keys"] = ssh_keys_instantiation
760 self
.logger
.debug("NS > vdur > {}".format(vdur
))
762 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
764 if vdud
.get("cloud-init-file"):
765 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
766 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
767 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
768 base_folder
= vnfd
["_admin"]["storage"]
769 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
770 vdud
.get("cloud-init-file"))
771 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
772 target
["cloud_init_content"][vdur
["cloud-init"]] = ci_file
.read()
773 elif vdud
.get("cloud-init"):
774 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"]))
775 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
776 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
["cloud-init"]
777 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
778 deploy_params_vdu
= self
._format
_additional
_params
(vdur
.get("additionalParams") or {})
779 deploy_params_vdu
["OSM"] = get_osm_params(vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"])
780 vdur
["additionalParams"] = deploy_params_vdu
783 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
784 if target_vim
not in ns_flavor
["vim_info"]:
785 ns_flavor
["vim_info"][target_vim
] = {}
787 ns_image
= target
["image"][int(vdur
["ns-image-id"])]
788 if target_vim
not in ns_image
["vim_info"]:
789 ns_image
["vim_info"][target_vim
] = {}
791 vdur
["vim_info"] = {target_vim
: {}}
792 # instantiation parameters
794 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
796 vdur_list
.append(vdur
)
797 target_vnf
["vdur"] = vdur_list
798 target
["vnf"].append(target_vnf
)
800 desc
= await self
.RO
.deploy(nsr_id
, target
)
801 self
.logger
.debug("RO return > {}".format(desc
))
802 action_id
= desc
["action_id"]
803 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
807 "_admin.deployed.RO.operational-status": "running",
808 "detailed-status": " ".join(stage
)
810 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
811 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
812 self
._write
_op
_status
(nslcmop_id
, stage
)
813 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
816 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
=None, start_time
=None, timeout
=600, stage
=None):
817 detailed_status_old
= None
819 start_time
= start_time
or time()
820 while time() <= start_time
+ timeout
:
821 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
822 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
823 if desc_status
["status"] == "FAILED":
824 raise NgRoException(desc_status
["details"])
825 elif desc_status
["status"] == "BUILD":
827 stage
[2] = "VIM: ({})".format(desc_status
["details"])
828 elif desc_status
["status"] == "DONE":
830 stage
[2] = "Deployed at VIM"
833 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
834 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
835 detailed_status_old
= stage
[2]
836 db_nsr_update
["detailed-status"] = " ".join(stage
)
837 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
838 self
._write
_op
_status
(nslcmop_id
, stage
)
839 await asyncio
.sleep(15, loop
=self
.loop
)
840 else: # timeout_ns_deploy
841 raise NgRoException("Timeout waiting ns to deploy")
843 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
847 start_deploy
= time()
854 "action_id": nslcmop_id
856 desc
= await self
.RO
.deploy(nsr_id
, target
)
857 action_id
= desc
["action_id"]
858 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
859 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
860 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
863 delete_timeout
= 20 * 60 # 20 minutes
864 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
866 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
867 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
869 await self
.RO
.delete(nsr_id
)
870 except Exception as e
:
871 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
872 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
873 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
874 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
875 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
876 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
877 failed_detail
.append("delete conflict: {}".format(e
))
878 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
880 failed_detail
.append("delete error: {}".format(e
))
881 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
884 stage
[2] = "Error deleting from VIM"
886 stage
[2] = "Deleted from VIM"
887 db_nsr_update
["detailed-status"] = " ".join(stage
)
888 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
889 self
._write
_op
_status
(nslcmop_id
, stage
)
892 raise LcmException("; ".join(failed_detail
))
895 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
896 n2vc_key_list
, stage
):
899 :param logging_text: preffix text to use at logging
900 :param nsr_id: nsr identity
901 :param nsd: database content of ns descriptor
902 :param db_nsr: database content of ns record
903 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
905 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
906 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
907 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
908 :return: None or exception
911 start_deploy
= time()
912 ns_params
= db_nslcmop
.get("operationParams")
913 if ns_params
and ns_params
.get("timeout_ns_deploy"):
914 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
916 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
918 # Check for and optionally request placement optimization. Database will be updated if placement activated
919 stage
[2] = "Waiting for Placement."
920 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
921 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
922 for vnfr
in db_vnfrs
.values():
923 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
926 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
928 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
929 db_vnfds
, n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
)
930 except Exception as e
:
931 stage
[2] = "ERROR deploying at VIM"
932 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
933 self
.logger
.error("Error deploying at VIM {}".format(e
),
934 exc_info
=not isinstance(e
, (ROclient
.ROClientException
, LcmException
, DbException
,
938 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
940 Wait for kdu to be up, get ip address
941 :param logging_text: prefix use for logging
948 # self.logger.debug(logging_text + "Starting wait_kdu_up")
951 while nb_tries
< 360:
952 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
953 kdur
= next((x
for x
in get_iterable(db_vnfr
, "kdur") if x
.get("kdu-name") == kdu_name
), None)
955 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
))
956 if kdur
.get("status"):
957 if kdur
["status"] in ("READY", "ENABLED"):
958 return kdur
.get("ip-address")
960 raise LcmException("target KDU={} is in error state".format(kdu_name
))
962 await asyncio
.sleep(10, loop
=self
.loop
)
964 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
966 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
968 Wait for ip addres at RO, and optionally, insert public key in virtual machine
969 :param logging_text: prefix use for logging
974 :param pub_key: public ssh key to inject, None to skip
975 :param user: user to apply the public ssh key
979 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
989 if ro_retries
>= 360: # 1 hour
990 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
992 await asyncio
.sleep(10, loop
=self
.loop
)
995 if not target_vdu_id
:
996 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
998 if not vdu_id
: # for the VNF case
999 if db_vnfr
.get("status") == "ERROR":
1000 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1001 ip_address
= db_vnfr
.get("ip-address")
1004 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1006 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1007 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1009 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1010 vdur
= db_vnfr
["vdur"][0]
1012 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1014 # New generation RO stores information at "vim_info"
1017 if vdur
.get("vim_info"):
1018 target_vim
= next(t
for t
in vdur
["vim_info"]) # there should be only one key
1019 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1020 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE" or ng_ro_status
== "ACTIVE":
1021 ip_address
= vdur
.get("ip-address")
1024 target_vdu_id
= vdur
["vdu-id-ref"]
1025 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1026 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1028 if not target_vdu_id
:
1031 # inject public key into machine
1032 if pub_key
and user
:
1033 self
.logger
.debug(logging_text
+ "Inserting RO key")
1034 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1035 if vdur
.get("pdu-type"):
1036 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1039 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1041 target
= {"action": {"action": "inject_ssh_key", "key": pub_key
, "user": user
},
1042 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1044 desc
= await self
.RO
.deploy(nsr_id
, target
)
1045 action_id
= desc
["action_id"]
1046 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1049 # wait until NS is deployed at RO
1051 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1052 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1055 result_dict
= await self
.RO
.create_action(
1057 item_id_name
=ro_nsr_id
,
1058 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1060 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1061 if not result_dict
or not isinstance(result_dict
, dict):
1062 raise LcmException("Unknown response from RO when injecting key")
1063 for result
in result_dict
.values():
1064 if result
.get("vim_result") == 200:
1067 raise ROclient
.ROClientException("error injecting key: {}".format(
1068 result
.get("description")))
1070 except NgRoException
as e
:
1071 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1072 except ROclient
.ROClientException
as e
:
1074 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1078 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1084 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1086 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1088 my_vca
= vca_deployed_list
[vca_index
]
1089 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1090 # vdu or kdu: no dependencies
1094 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1095 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1096 configuration_status_list
= db_nsr
["configurationStatus"]
1097 for index
, vca_deployed
in enumerate(configuration_status_list
):
1098 if index
== vca_index
:
1101 if not my_vca
.get("member-vnf-index") or \
1102 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1103 internal_status
= configuration_status_list
[index
].get("status")
1104 if internal_status
== 'READY':
1106 elif internal_status
== 'BROKEN':
1107 raise LcmException("Configuration aborted because dependent charm/s has failed")
1111 # no dependencies, return
1113 await asyncio
.sleep(10)
1116 raise LcmException("Configuration aborted because dependent charm/s timeout")
1118 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1119 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
,
1120 ee_config_descriptor
):
1121 nsr_id
= db_nsr
["_id"]
1122 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1123 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1124 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1125 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1127 'collection': 'nsrs',
1128 'filter': {'_id': nsr_id
},
1129 'path': db_update_entry
1135 element_under_configuration
= nsr_id
1139 vnfr_id
= db_vnfr
["_id"]
1140 osm_config
["osm"]["vnf_id"] = vnfr_id
1142 namespace
= "{nsi}.{ns}".format(
1143 nsi
=nsi_id
if nsi_id
else "",
1147 element_type
= 'VNF'
1148 element_under_configuration
= vnfr_id
1149 namespace
+= ".{}".format(vnfr_id
)
1151 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1152 element_type
= 'VDU'
1153 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1154 osm_config
["osm"]["vdu_id"] = vdu_id
1156 namespace
+= ".{}".format(kdu_name
)
1157 element_type
= 'KDU'
1158 element_under_configuration
= kdu_name
1159 osm_config
["osm"]["kdu_name"] = kdu_name
1162 artifact_path
= "{}/{}/{}/{}".format(
1163 base_folder
["folder"],
1164 base_folder
["pkg-dir"],
1165 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1169 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1171 # get initial_config_primitive_list that applies to this element
1172 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1174 self
.logger
.debug("Initial config primitive list > {}".format(initial_config_primitive_list
))
1176 # add config if not present for NS charm
1177 ee_descriptor_id
= ee_config_descriptor
.get("id")
1178 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1179 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list
,
1180 vca_deployed
, ee_descriptor_id
)
1182 self
.logger
.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list
))
1183 # n2vc_redesign STEP 3.1
1184 # find old ee_id if exists
1185 ee_id
= vca_deployed
.get("ee_id")
1188 deep_get(db_vnfr
, ("vim-account-id",)) or
1189 deep_get(deploy_params
, ("OSM", "vim_account_id"))
1191 vca_cloud
, vca_cloud_credential
= self
.get_vca_cloud_and_credentials(vim_account_id
)
1192 vca_k8s_cloud
, vca_k8s_cloud_credential
= self
.get_vca_k8s_cloud_and_credentials(vim_account_id
)
1193 # create or register execution environment in VCA
1194 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1196 self
._write
_configuration
_status
(
1198 vca_index
=vca_index
,
1200 element_under_configuration
=element_under_configuration
,
1201 element_type
=element_type
1204 step
= "create execution environment"
1205 self
.logger
.debug(logging_text
+ step
)
1209 if vca_type
== "k8s_proxy_charm":
1210 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1211 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
1212 namespace
=namespace
,
1213 artifact_path
=artifact_path
,
1215 cloud_name
=vca_k8s_cloud
,
1216 credential_name
=vca_k8s_cloud_credential
,
1218 elif vca_type
== "helm" or vca_type
== "helm-v3":
1219 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1220 namespace
=namespace
,
1224 artifact_path
=artifact_path
,
1228 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1229 namespace
=namespace
,
1232 cloud_name
=vca_cloud
,
1233 credential_name
=vca_cloud_credential
,
1236 elif vca_type
== "native_charm":
1237 step
= "Waiting to VM being up and getting IP address"
1238 self
.logger
.debug(logging_text
+ step
)
1239 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1240 user
=None, pub_key
=None)
1241 credentials
= {"hostname": rw_mgmt_ip
}
1243 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1244 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1245 # merged. Meanwhile let's get username from initial-config-primitive
1246 if not username
and initial_config_primitive_list
:
1247 for config_primitive
in initial_config_primitive_list
:
1248 for param
in config_primitive
.get("parameter", ()):
1249 if param
["name"] == "ssh-username":
1250 username
= param
["value"]
1253 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1254 "'config-access.ssh-access.default-user'")
1255 credentials
["username"] = username
1256 # n2vc_redesign STEP 3.2
1258 self
._write
_configuration
_status
(
1260 vca_index
=vca_index
,
1261 status
='REGISTERING',
1262 element_under_configuration
=element_under_configuration
,
1263 element_type
=element_type
1266 step
= "register execution environment {}".format(credentials
)
1267 self
.logger
.debug(logging_text
+ step
)
1268 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1269 credentials
=credentials
,
1270 namespace
=namespace
,
1272 cloud_name
=vca_cloud
,
1273 credential_name
=vca_cloud_credential
,
1276 # for compatibility with MON/POL modules, the need model and application name at database
1277 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1278 ee_id_parts
= ee_id
.split('.')
1279 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1280 if len(ee_id_parts
) >= 2:
1281 model_name
= ee_id_parts
[0]
1282 application_name
= ee_id_parts
[1]
1283 db_nsr_update
[db_update_entry
+ "model"] = model_name
1284 db_nsr_update
[db_update_entry
+ "application"] = application_name
1286 # n2vc_redesign STEP 3.3
1287 step
= "Install configuration Software"
1289 self
._write
_configuration
_status
(
1291 vca_index
=vca_index
,
1292 status
='INSTALLING SW',
1293 element_under_configuration
=element_under_configuration
,
1294 element_type
=element_type
,
1295 other_update
=db_nsr_update
1298 # TODO check if already done
1299 self
.logger
.debug(logging_text
+ step
)
1301 if vca_type
== "native_charm":
1302 config_primitive
= next((p
for p
in initial_config_primitive_list
if p
["name"] == "config"), None)
1303 if config_primitive
:
1304 config
= self
._map
_primitive
_params
(
1310 if vca_type
== "lxc_proxy_charm":
1311 if element_type
== "NS":
1312 num_units
= db_nsr
.get("config-units") or 1
1313 elif element_type
== "VNF":
1314 num_units
= db_vnfr
.get("config-units") or 1
1315 elif element_type
== "VDU":
1316 for v
in db_vnfr
["vdur"]:
1317 if vdu_id
== v
["vdu-id-ref"]:
1318 num_units
= v
.get("config-units") or 1
1320 if vca_type
!= "k8s_proxy_charm":
1321 await self
.vca_map
[vca_type
].install_configuration_sw(
1323 artifact_path
=artifact_path
,
1326 num_units
=num_units
,
1329 # write in db flag of configuration_sw already installed
1330 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1332 # add relations for this VCA (wait for other peers related with this VCA)
1333 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1334 vca_index
=vca_index
, vca_type
=vca_type
)
1336 # if SSH access is required, then get execution environment SSH public
1337 # if native charm we have waited already to VM be UP
1338 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1341 # self.logger.debug("get ssh key block")
1342 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1343 # self.logger.debug("ssh key needed")
1344 # Needed to inject a ssh key
1345 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1346 step
= "Install configuration Software, getting public ssh key"
1347 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(ee_id
=ee_id
, db_dict
=db_dict
)
1349 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1351 # self.logger.debug("no need to get ssh key")
1352 step
= "Waiting to VM being up and getting IP address"
1353 self
.logger
.debug(logging_text
+ step
)
1355 # n2vc_redesign STEP 5.1
1356 # wait for RO (ip-address) Insert pub_key into VM
1359 rw_mgmt_ip
= await self
.wait_kdu_up(logging_text
, nsr_id
, vnfr_id
, kdu_name
)
1361 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
,
1362 vdu_index
, user
=user
, pub_key
=pub_key
)
1364 rw_mgmt_ip
= None # This is for a NS configuration
1366 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1368 # store rw_mgmt_ip in deploy params for later replacement
1369 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1371 # n2vc_redesign STEP 6 Execute initial config primitive
1372 step
= 'execute initial config primitive'
1374 # wait for dependent primitives execution (NS -> VNF -> VDU)
1375 if initial_config_primitive_list
:
1376 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1378 # stage, in function of element type: vdu, kdu, vnf or ns
1379 my_vca
= vca_deployed_list
[vca_index
]
1380 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1382 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1383 elif my_vca
.get("member-vnf-index"):
1385 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1388 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1390 self
._write
_configuration
_status
(
1392 vca_index
=vca_index
,
1393 status
='EXECUTING PRIMITIVE'
1396 self
._write
_op
_status
(
1401 check_if_terminated_needed
= True
1402 for initial_config_primitive
in initial_config_primitive_list
:
1403 # adding information on the vca_deployed if it is a NS execution environment
1404 if not vca_deployed
["member-vnf-index"]:
1405 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1406 # TODO check if already done
1407 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1409 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1410 self
.logger
.debug(logging_text
+ step
)
1411 await self
.vca_map
[vca_type
].exec_primitive(
1413 primitive_name
=initial_config_primitive
["name"],
1414 params_dict
=primitive_params_
,
1417 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1418 if check_if_terminated_needed
:
1419 if config_descriptor
.get('terminate-config-primitive'):
1420 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1421 check_if_terminated_needed
= False
1423 # TODO register in database that primitive is done
1425 # STEP 7 Configure metrics
1426 if vca_type
== "helm" or vca_type
== "helm-v3":
1427 prometheus_jobs
= await self
.add_prometheus_metrics(
1429 artifact_path
=artifact_path
,
1430 ee_config_descriptor
=ee_config_descriptor
,
1433 target_ip
=rw_mgmt_ip
,
1436 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "prometheus_jobs": prometheus_jobs
})
1438 step
= "instantiated at VCA"
1439 self
.logger
.debug(logging_text
+ step
)
1441 self
._write
_configuration
_status
(
1443 vca_index
=vca_index
,
1447 except Exception as e
: # TODO not use Exception but N2VC exception
1448 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1449 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1450 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1451 self
._write
_configuration
_status
(
1453 vca_index
=vca_index
,
1456 raise LcmException("{} {}".format(step
, e
)) from e
1458 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1459 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1461 Update db_nsr fields.
1464 :param current_operation:
1465 :param current_operation_id:
1466 :param error_description:
1467 :param error_detail:
1468 :param other_update: Other required changes at database if provided, will be cleared
1472 db_dict
= other_update
or {}
1473 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1474 db_dict
["_admin.current-operation"] = current_operation_id
1475 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1476 db_dict
["currentOperation"] = current_operation
1477 db_dict
["currentOperationID"] = current_operation_id
1478 db_dict
["errorDescription"] = error_description
1479 db_dict
["errorDetail"] = error_detail
1482 db_dict
["nsState"] = ns_state
1483 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1484 except DbException
as e
:
1485 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1487 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1488 operation_state
: str = None, other_update
: dict = None):
1490 db_dict
= other_update
or {}
1491 db_dict
['queuePosition'] = queuePosition
1492 if isinstance(stage
, list):
1493 db_dict
['stage'] = stage
[0]
1494 db_dict
['detailed-status'] = " ".join(stage
)
1495 elif stage
is not None:
1496 db_dict
['stage'] = str(stage
)
1498 if error_message
is not None:
1499 db_dict
['errorMessage'] = error_message
1500 if operation_state
is not None:
1501 db_dict
['operationState'] = operation_state
1502 db_dict
["statusEnteredTime"] = time()
1503 self
.update_db_2("nslcmops", op_id
, db_dict
)
1504 except DbException
as e
:
1505 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1507 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1509 nsr_id
= db_nsr
["_id"]
1510 # configurationStatus
1511 config_status
= db_nsr
.get('configurationStatus')
1513 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1514 enumerate(config_status
) if v
}
1516 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1518 except DbException
as e
:
1519 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1521 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1522 element_under_configuration
: str = None, element_type
: str = None,
1523 other_update
: dict = None):
1525 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1526 # .format(vca_index, status))
1529 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1530 db_dict
= other_update
or {}
1532 db_dict
[db_path
+ 'status'] = status
1533 if element_under_configuration
:
1534 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1536 db_dict
[db_path
+ 'elementType'] = element_type
1537 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1538 except DbException
as e
:
1539 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1540 .format(status
, nsr_id
, vca_index
, e
))
1542 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1544 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1545 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1546 Database is used because the result can be obtained from a different LCM worker in case of HA.
1547 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1548 :param db_nslcmop: database content of nslcmop
1549 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1550 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1551 computed 'vim-account-id'
1554 nslcmop_id
= db_nslcmop
['_id']
1555 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1556 if placement_engine
== "PLA":
1557 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1558 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1559 db_poll_interval
= 5
1560 wait
= db_poll_interval
* 10
1562 while not pla_result
and wait
>= 0:
1563 await asyncio
.sleep(db_poll_interval
)
1564 wait
-= db_poll_interval
1565 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1566 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1569 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1571 for pla_vnf
in pla_result
['vnf']:
1572 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1573 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1576 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1578 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1581 def update_nsrs_with_pla_result(self
, params
):
1583 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1584 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1585 except Exception as e
:
1586 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1588 async def instantiate(self
, nsr_id
, nslcmop_id
):
1591 :param nsr_id: ns instance to deploy
1592 :param nslcmop_id: operation to run
1596 # Try to lock HA task here
1597 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1598 if not task_is_locked_by_me
:
1599 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1602 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1603 self
.logger
.debug(logging_text
+ "Enter")
1605 # get all needed from database
1607 # database nsrs record
1610 # database nslcmops record
1613 # update operation on nsrs
1615 # update operation on nslcmops
1616 db_nslcmop_update
= {}
1618 nslcmop_operation_state
= None
1619 db_vnfrs
= {} # vnf's info indexed by member-index
1621 tasks_dict_info
= {} # from task to info text
1624 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1625 # ^ stage, step, VIM progress
1627 # wait for any previous tasks in process
1628 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1630 stage
[1] = "Sync filesystem from database."
1631 self
.fs
.sync() # TODO, make use of partial sync, only for the needed packages
1633 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1634 stage
[1] = "Reading from database."
1635 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1636 db_nsr_update
["detailed-status"] = "creating"
1637 db_nsr_update
["operational-status"] = "init"
1638 self
._write
_ns
_status
(
1640 ns_state
="BUILDING",
1641 current_operation
="INSTANTIATING",
1642 current_operation_id
=nslcmop_id
,
1643 other_update
=db_nsr_update
1645 self
._write
_op
_status
(
1651 # read from db: operation
1652 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
1653 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1654 ns_params
= db_nslcmop
.get("operationParams")
1655 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1656 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1658 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1661 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
1662 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1663 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
1664 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
1666 # nsr_name = db_nsr["name"] # TODO short-name??
1668 # read from db: vnf's of this ns
1669 stage
[1] = "Getting vnfrs from db."
1670 self
.logger
.debug(logging_text
+ stage
[1])
1671 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
1673 # read from db: vnfd's for every vnf
1674 db_vnfds
= [] # every vnfd data
1676 # for each vnf in ns, read vnfd
1677 for vnfr
in db_vnfrs_list
:
1678 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
1679 vnfd_id
= vnfr
["vnfd-id"]
1680 vnfd_ref
= vnfr
["vnfd-ref"]
1682 # if we haven't this vnfd, read it from db
1683 if vnfd_id
not in db_vnfds
:
1685 stage
[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id
, vnfd_ref
)
1686 self
.logger
.debug(logging_text
+ stage
[1])
1687 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
1690 db_vnfds
.append(vnfd
) # vnfd's indexed by id
1692 # Get or generates the _admin.deployed.VCA list
1693 vca_deployed_list
= None
1694 if db_nsr
["_admin"].get("deployed"):
1695 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
1696 if vca_deployed_list
is None:
1697 vca_deployed_list
= []
1698 configuration_status_list
= []
1699 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1700 db_nsr_update
["configurationStatus"] = configuration_status_list
1701 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1702 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1703 elif isinstance(vca_deployed_list
, dict):
1704 # maintain backward compatibility. Change a dict to list at database
1705 vca_deployed_list
= list(vca_deployed_list
.values())
1706 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1707 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1709 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
1710 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
1711 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
1713 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1714 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1715 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1716 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"})
1718 # n2vc_redesign STEP 2 Deploy Network Scenario
1719 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1720 self
._write
_op
_status
(
1725 stage
[1] = "Deploying KDUs."
1726 # self.logger.debug(logging_text + "Before deploy_kdus")
1727 # Call to deploy_kdus in case exists the "vdu:kdu" param
1728 await self
.deploy_kdus(
1729 logging_text
=logging_text
,
1731 nslcmop_id
=nslcmop_id
,
1734 task_instantiation_info
=tasks_dict_info
,
1737 stage
[1] = "Getting VCA public key."
1738 # n2vc_redesign STEP 1 Get VCA public ssh-key
1739 # feature 1429. Add n2vc public key to needed VMs
1740 n2vc_key
= self
.n2vc
.get_public_key()
1741 n2vc_key_list
= [n2vc_key
]
1742 if self
.vca_config
.get("public_key"):
1743 n2vc_key_list
.append(self
.vca_config
["public_key"])
1745 stage
[1] = "Deploying NS at VIM."
1746 task_ro
= asyncio
.ensure_future(
1747 self
.instantiate_RO(
1748 logging_text
=logging_text
,
1752 db_nslcmop
=db_nslcmop
,
1755 n2vc_key_list
=n2vc_key_list
,
1759 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
1760 tasks_dict_info
[task_ro
] = "Deploying at VIM"
1762 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1763 stage
[1] = "Deploying Execution Environments."
1764 self
.logger
.debug(logging_text
+ stage
[1])
1766 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
1767 for vnf_profile
in get_vnf_profiles(nsd
):
1768 vnfd_id
= vnf_profile
["vnfd-id"]
1769 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
1770 member_vnf_index
= str(vnf_profile
["id"])
1771 db_vnfr
= db_vnfrs
[member_vnf_index
]
1772 base_folder
= vnfd
["_admin"]["storage"]
1778 # Get additional parameters
1779 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
1780 if db_vnfr
.get("additionalParamsForVnf"):
1781 deploy_params
.update(parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy()))
1783 descriptor_config
= get_vnf_configuration(vnfd
)
1784 if descriptor_config
:
1786 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
1789 nslcmop_id
=nslcmop_id
,
1795 member_vnf_index
=member_vnf_index
,
1796 vdu_index
=vdu_index
,
1798 deploy_params
=deploy_params
,
1799 descriptor_config
=descriptor_config
,
1800 base_folder
=base_folder
,
1801 task_instantiation_info
=tasks_dict_info
,
1805 # Deploy charms for each VDU that supports one.
1806 for vdud
in get_vdu_list(vnfd
):
1808 descriptor_config
= get_vdu_configuration(vnfd
, vdu_id
)
1809 vdur
= find_in_list(db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
)
1811 if vdur
.get("additionalParams"):
1812 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
1814 deploy_params_vdu
= deploy_params
1815 deploy_params_vdu
["OSM"] = get_osm_params(db_vnfr
, vdu_id
, vdu_count_index
=0)
1816 vdud_count
= get_vdu_profile(vnfd
, vdu_id
).get("max-number-of-instances", 1)
1818 self
.logger
.debug("VDUD > {}".format(vdud
))
1819 self
.logger
.debug("Descriptor config > {}".format(descriptor_config
))
1820 if descriptor_config
:
1823 for vdu_index
in range(vdud_count
):
1824 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1826 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1827 member_vnf_index
, vdu_id
, vdu_index
),
1830 nslcmop_id
=nslcmop_id
,
1836 member_vnf_index
=member_vnf_index
,
1837 vdu_index
=vdu_index
,
1839 deploy_params
=deploy_params_vdu
,
1840 descriptor_config
=descriptor_config
,
1841 base_folder
=base_folder
,
1842 task_instantiation_info
=tasks_dict_info
,
1845 for kdud
in get_kdu_list(vnfd
):
1846 kdu_name
= kdud
["name"]
1847 descriptor_config
= get_kdu_configuration(vnfd
, kdu_name
)
1848 if descriptor_config
:
1852 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
1853 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
1854 if kdur
.get("additionalParams"):
1855 deploy_params_kdu
= parse_yaml_strings(kdur
["additionalParams"])
1858 logging_text
=logging_text
,
1861 nslcmop_id
=nslcmop_id
,
1867 member_vnf_index
=member_vnf_index
,
1868 vdu_index
=vdu_index
,
1870 deploy_params
=deploy_params_kdu
,
1871 descriptor_config
=descriptor_config
,
1872 base_folder
=base_folder
,
1873 task_instantiation_info
=tasks_dict_info
,
1877 # Check if this NS has a charm configuration
1878 descriptor_config
= nsd
.get("ns-configuration")
1879 if descriptor_config
and descriptor_config
.get("juju"):
1882 member_vnf_index
= None
1888 # Get additional parameters
1889 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
1890 if db_nsr
.get("additionalParamsForNs"):
1891 deploy_params
.update(parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy()))
1892 base_folder
= nsd
["_admin"]["storage"]
1894 logging_text
=logging_text
,
1897 nslcmop_id
=nslcmop_id
,
1903 member_vnf_index
=member_vnf_index
,
1904 vdu_index
=vdu_index
,
1906 deploy_params
=deploy_params
,
1907 descriptor_config
=descriptor_config
,
1908 base_folder
=base_folder
,
1909 task_instantiation_info
=tasks_dict_info
,
1913 # rest of staff will be done at finally
1915 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
1916 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
1918 except asyncio
.CancelledError
:
1919 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
1920 exc
= "Operation was cancelled"
1921 except Exception as e
:
1922 exc
= traceback
.format_exc()
1923 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
1926 error_list
.append(str(exc
))
1928 # wait for pending tasks
1930 stage
[1] = "Waiting for instantiate pending tasks."
1931 self
.logger
.debug(logging_text
+ stage
[1])
1932 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
1933 stage
, nslcmop_id
, nsr_id
=nsr_id
)
1934 stage
[1] = stage
[2] = ""
1935 except asyncio
.CancelledError
:
1936 error_list
.append("Cancelled")
1937 # TODO cancel all tasks
1938 except Exception as exc
:
1939 error_list
.append(str(exc
))
1941 # update operation-status
1942 db_nsr_update
["operational-status"] = "running"
1943 # let's begin with VCA 'configured' status (later we can change it)
1944 db_nsr_update
["config-status"] = "configured"
1945 for task
, task_name
in tasks_dict_info
.items():
1946 if not task
.done() or task
.cancelled() or task
.exception():
1947 if task_name
.startswith(self
.task_name_deploy_vca
):
1948 # A N2VC task is pending
1949 db_nsr_update
["config-status"] = "failed"
1951 # RO or KDU task is pending
1952 db_nsr_update
["operational-status"] = "failed"
1954 # update status at database
1956 error_detail
= ". ".join(error_list
)
1957 self
.logger
.error(logging_text
+ error_detail
)
1958 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
1959 error_description_nsr
= 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id
, stage
[0])
1961 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
1962 db_nslcmop_update
["detailed-status"] = error_detail
1963 nslcmop_operation_state
= "FAILED"
1967 error_description_nsr
= error_description_nslcmop
= None
1969 db_nsr_update
["detailed-status"] = "Done"
1970 db_nslcmop_update
["detailed-status"] = "Done"
1971 nslcmop_operation_state
= "COMPLETED"
1974 self
._write
_ns
_status
(
1977 current_operation
="IDLE",
1978 current_operation_id
=None,
1979 error_description
=error_description_nsr
,
1980 error_detail
=error_detail
,
1981 other_update
=db_nsr_update
1983 self
._write
_op
_status
(
1986 error_message
=error_description_nslcmop
,
1987 operation_state
=nslcmop_operation_state
,
1988 other_update
=db_nslcmop_update
,
1991 if nslcmop_operation_state
:
1993 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
1994 "operationState": nslcmop_operation_state
},
1996 except Exception as e
:
1997 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
1999 self
.logger
.debug(logging_text
+ "Exit")
2000 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2002 async def _add_vca_relations(self
, logging_text
, nsr_id
, vca_index
: int,
2003 timeout
: int = 3600, vca_type
: str = None) -> bool:
2006 # 1. find all relations for this VCA
2007 # 2. wait for other peers related
2011 vca_type
= vca_type
or "lxc_proxy_charm"
2013 # STEP 1: find all relations for this VCA
2016 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2017 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2020 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2022 # read all ns-configuration relations
2023 ns_relations
= list()
2024 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2026 for r
in db_ns_relations
:
2027 # check if this VCA is in the relation
2028 if my_vca
.get('member-vnf-index') in\
2029 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2030 ns_relations
.append(r
)
2032 # read all vnf-configuration relations
2033 vnf_relations
= list()
2034 db_vnfd_list
= db_nsr
.get('vnfd-id')
2036 for vnfd
in db_vnfd_list
:
2037 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2038 db_vnf_relations
= deep_get(db_vnfd
, ('vnf-configuration', 'relation'))
2039 if db_vnf_relations
:
2040 for r
in db_vnf_relations
:
2041 # check if this VCA is in the relation
2042 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2043 vnf_relations
.append(r
)
2045 # if no relations, terminate
2046 if not ns_relations
and not vnf_relations
:
2047 self
.logger
.debug(logging_text
+ ' No relations')
2050 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2057 if now
- start
>= timeout
:
2058 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2061 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2062 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2064 # for each defined NS relation, find the VCA's related
2065 for r
in ns_relations
.copy():
2066 from_vca_ee_id
= None
2068 from_vca_endpoint
= None
2069 to_vca_endpoint
= None
2070 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2071 for vca
in vca_list
:
2072 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2073 and vca
.get('config_sw_installed'):
2074 from_vca_ee_id
= vca
.get('ee_id')
2075 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2076 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2077 and vca
.get('config_sw_installed'):
2078 to_vca_ee_id
= vca
.get('ee_id')
2079 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2080 if from_vca_ee_id
and to_vca_ee_id
:
2082 await self
.vca_map
[vca_type
].add_relation(
2083 ee_id_1
=from_vca_ee_id
,
2084 ee_id_2
=to_vca_ee_id
,
2085 endpoint_1
=from_vca_endpoint
,
2086 endpoint_2
=to_vca_endpoint
)
2087 # remove entry from relations list
2088 ns_relations
.remove(r
)
2090 # check failed peers
2092 vca_status_list
= db_nsr
.get('configurationStatus')
2094 for i
in range(len(vca_list
)):
2096 vca_status
= vca_status_list
[i
]
2097 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2098 if vca_status
.get('status') == 'BROKEN':
2099 # peer broken: remove relation from list
2100 ns_relations
.remove(r
)
2101 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2102 if vca_status
.get('status') == 'BROKEN':
2103 # peer broken: remove relation from list
2104 ns_relations
.remove(r
)
2109 # for each defined VNF relation, find the VCA's related
2110 for r
in vnf_relations
.copy():
2111 from_vca_ee_id
= None
2113 from_vca_endpoint
= None
2114 to_vca_endpoint
= None
2115 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2116 for vca
in vca_list
:
2117 key_to_check
= "vdu_id"
2118 if vca
.get("vdu_id") is None:
2119 key_to_check
= "vnfd_id"
2120 if vca
.get(key_to_check
) == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2121 from_vca_ee_id
= vca
.get('ee_id')
2122 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2123 if vca
.get(key_to_check
) == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2124 to_vca_ee_id
= vca
.get('ee_id')
2125 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2126 if from_vca_ee_id
and to_vca_ee_id
:
2128 await self
.vca_map
[vca_type
].add_relation(
2129 ee_id_1
=from_vca_ee_id
,
2130 ee_id_2
=to_vca_ee_id
,
2131 endpoint_1
=from_vca_endpoint
,
2132 endpoint_2
=to_vca_endpoint
)
2133 # remove entry from relations list
2134 vnf_relations
.remove(r
)
2136 # check failed peers
2138 vca_status_list
= db_nsr
.get('configurationStatus')
2140 for i
in range(len(vca_list
)):
2142 vca_status
= vca_status_list
[i
]
2143 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2144 if vca_status
.get('status') == 'BROKEN':
2145 # peer broken: remove relation from list
2146 vnf_relations
.remove(r
)
2147 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2148 if vca_status
.get('status') == 'BROKEN':
2149 # peer broken: remove relation from list
2150 vnf_relations
.remove(r
)
2156 await asyncio
.sleep(5.0)
2158 if not ns_relations
and not vnf_relations
:
2159 self
.logger
.debug('Relations added')
2164 except Exception as e
:
2165 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2168 async def _install_kdu(self
, nsr_id
: str, nsr_db_path
: str, vnfr_data
: dict, kdu_index
: int, kdud
: dict,
2169 vnfd
: dict, k8s_instance_info
: dict, k8params
: dict = None, timeout
: int = 600):
2172 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2174 db_dict_install
= {"collection": "nsrs",
2175 "filter": {"_id": nsr_id
},
2176 "path": nsr_db_path
}
2178 kdu_instance
= await self
.k8scluster_map
[k8sclustertype
].install(
2179 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2180 kdu_model
=k8s_instance_info
["kdu-model"],
2183 db_dict
=db_dict_install
,
2185 kdu_name
=k8s_instance_info
["kdu-name"],
2186 namespace
=k8s_instance_info
["namespace"])
2187 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2189 # Obtain services to obtain management service ip
2190 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2191 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2192 kdu_instance
=kdu_instance
,
2193 namespace
=k8s_instance_info
["namespace"])
2195 # Obtain management service info (if exists)
2196 vnfr_update_dict
= {}
2198 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2199 mgmt_services
= [service
for service
in kdud
.get("service", []) if service
.get("mgmt-service")]
2200 for mgmt_service
in mgmt_services
:
2201 for service
in services
:
2202 if service
["name"].startswith(mgmt_service
["name"]):
2203 # Mgmt service found, Obtain service ip
2204 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2205 if isinstance(ip
, list) and len(ip
) == 1:
2208 vnfr_update_dict
["kdur.{}.ip-address".format(kdu_index
)] = ip
2210 # Check if must update also mgmt ip at the vnf
2211 service_external_cp
= mgmt_service
.get("external-connection-point-ref")
2212 if service_external_cp
:
2213 if deep_get(vnfd
, ("mgmt-interface", "cp")) == service_external_cp
:
2214 vnfr_update_dict
["ip-address"] = ip
2218 self
.logger
.warn("Mgmt service name: {} not found".format(mgmt_service
["name"]))
2220 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
2221 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
2223 kdu_config
= kdud
.get("kdu-configuration")
2224 if kdu_config
and kdu_config
.get("initial-config-primitive") and kdu_config
.get("juju") is None:
2225 initial_config_primitive_list
= kdu_config
.get("initial-config-primitive")
2226 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
2228 for initial_config_primitive
in initial_config_primitive_list
:
2229 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, {})
2231 await asyncio
.wait_for(
2232 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
2233 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2234 kdu_instance
=kdu_instance
,
2235 primitive_name
=initial_config_primitive
["name"],
2236 params
=primitive_params_
, db_dict
={}),
2239 except Exception as e
:
2240 # Prepare update db with error and raise exception
2242 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)})
2243 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), {"kdur.{}.status".format(kdu_index
): "ERROR"})
2245 # ignore to keep original exception
2247 # reraise original error
2252 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2253 # Launch kdus if present in the descriptor
2255 k8scluster_id_2_uuic
= {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2257 async def _get_cluster_id(cluster_id
, cluster_type
):
2258 nonlocal k8scluster_id_2_uuic
2259 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2260 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2262 # check if K8scluster is creating and wait look if previous tasks in process
2263 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("k8scluster", cluster_id
)
2265 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name
, cluster_id
)
2266 self
.logger
.debug(logging_text
+ text
)
2267 await asyncio
.wait(task_dependency
, timeout
=3600)
2269 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2270 if not db_k8scluster
:
2271 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2273 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2275 if cluster_type
== "helm-chart-v3":
2277 # backward compatibility for existing clusters that have not been initialized for helm v3
2278 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
2279 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(k8s_credentials
,
2280 reuse_cluster_uuid
=cluster_id
)
2281 db_k8scluster_update
= {}
2282 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
2283 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
2284 db_k8scluster_update
["_admin.helm-chart-v3.created"] = uninstall_sw
2285 db_k8scluster_update
["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2286 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
2287 except Exception as e
:
2288 self
.logger
.error(logging_text
+ "error initializing helm-v3 cluster: {}".format(str(e
)))
2289 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id
,
2292 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2293 format(cluster_id
, cluster_type
))
2294 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2297 logging_text
+= "Deploy kdus: "
2300 db_nsr_update
= {"_admin.deployed.K8s": []}
2301 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2304 updated_cluster_list
= []
2305 updated_v3_cluster_list
= []
2307 for vnfr_data
in db_vnfrs
.values():
2308 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
2309 # Step 0: Prepare and set parameters
2310 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
2311 vnfd_id
= vnfr_data
.get('vnfd-id')
2312 kdud
= next(kdud
for kdud
in db_vnfds
[vnfd_id
]["kdu"] if kdud
["name"] == kdur
["kdu-name"])
2313 namespace
= kdur
.get("k8s-namespace")
2314 if kdur
.get("helm-chart"):
2315 kdumodel
= kdur
["helm-chart"]
2316 # Default version: helm3, if helm-version is v2 assign v2
2317 k8sclustertype
= "helm-chart-v3"
2318 self
.logger
.debug("kdur: {}".format(kdur
))
2319 if kdur
.get("helm-version") and kdur
.get("helm-version") == "v2":
2320 k8sclustertype
= "helm-chart"
2321 elif kdur
.get("juju-bundle"):
2322 kdumodel
= kdur
["juju-bundle"]
2323 k8sclustertype
= "juju-bundle"
2325 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2326 "juju-bundle. Maybe an old NBI version is running".
2327 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2328 # check if kdumodel is a file and exists
2330 storage
= deep_get(db_vnfds
.get(vnfd_id
), ('_admin', 'storage'))
2331 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2332 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2333 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2335 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2336 kdumodel
= self
.fs
.path
+ filename
2337 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2339 except Exception: # it is not a file
2342 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2343 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2344 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2347 if (k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
)\
2348 or (k8sclustertype
== "helm-chart-v3" and cluster_uuid
not in updated_v3_cluster_list
):
2349 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2350 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(cluster_uuid
=cluster_uuid
))
2351 if del_repo_list
or added_repo_dict
:
2352 if k8sclustertype
== "helm-chart":
2353 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2354 updated
= {'_admin.helm_charts_added.' +
2355 item
: name
for item
, name
in added_repo_dict
.items()}
2356 updated_cluster_list
.append(cluster_uuid
)
2357 elif k8sclustertype
== "helm-chart-v3":
2358 unset
= {'_admin.helm_charts_v3_added.' + item
: None for item
in del_repo_list
}
2359 updated
= {'_admin.helm_charts_v3_added.' +
2360 item
: name
for item
, name
in added_repo_dict
.items()}
2361 updated_v3_cluster_list
.append(cluster_uuid
)
2362 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster "
2363 "'{}' to_delete: {}, to_add: {}".
2364 format(k8s_cluster_id
, del_repo_list
, added_repo_dict
))
2365 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2368 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2369 kdur
["kdu-name"], k8s_cluster_id
)
2370 k8s_instance_info
= {"kdu-instance": None,
2371 "k8scluster-uuid": cluster_uuid
,
2372 "k8scluster-type": k8sclustertype
,
2373 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2374 "kdu-name": kdur
["kdu-name"],
2375 "kdu-model": kdumodel
,
2376 "namespace": namespace
}
2377 db_path
= "_admin.deployed.K8s.{}".format(index
)
2378 db_nsr_update
[db_path
] = k8s_instance_info
2379 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2381 task
= asyncio
.ensure_future(
2382 self
._install
_kdu
(nsr_id
, db_path
, vnfr_data
, kdu_index
, kdud
, db_vnfds
[vnfd_id
],
2383 k8s_instance_info
, k8params
=desc_params
, timeout
=600))
2384 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2385 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2389 except (LcmException
, asyncio
.CancelledError
):
2391 except Exception as e
:
2392 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2393 if isinstance(e
, (N2VCException
, DbException
)):
2394 self
.logger
.error(logging_text
+ msg
)
2396 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2397 raise LcmException(msg
)
2400 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2402 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2403 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2404 base_folder
, task_instantiation_info
, stage
):
2405 # launch instantiate_N2VC in a asyncio task and register task object
2406 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2407 # if not found, create one entry and update database
2408 # fill db_nsr._admin.deployed.VCA.<index>
2410 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2411 if descriptor_config
.get("juju"): # There is one execution envioronment of type juju
2412 ee_list
= [descriptor_config
]
2413 elif descriptor_config
.get("execution-environment-list"):
2414 ee_list
= descriptor_config
.get("execution-environment-list")
2415 else: # other types as script are not supported
2418 for ee_item
in ee_list
:
2419 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2420 ee_item
.get("helm-chart")))
2421 ee_descriptor_id
= ee_item
.get("id")
2422 if ee_item
.get("juju"):
2423 vca_name
= ee_item
['juju'].get('charm')
2424 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2425 if ee_item
['juju'].get('cloud') == "k8s":
2426 vca_type
= "k8s_proxy_charm"
2427 elif ee_item
['juju'].get('proxy') is False:
2428 vca_type
= "native_charm"
2429 elif ee_item
.get("helm-chart"):
2430 vca_name
= ee_item
['helm-chart']
2431 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
2434 vca_type
= "helm-v3"
2436 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2440 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2441 if not vca_deployed
:
2443 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2444 vca_deployed
.get("vdu_id") == vdu_id
and \
2445 vca_deployed
.get("kdu_name") == kdu_name
and \
2446 vca_deployed
.get("vdu_count_index", 0) == vdu_index
and \
2447 vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
:
2450 # not found, create one.
2451 target
= "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
2453 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
2455 target
+= "/kdu/{}".format(kdu_name
)
2457 "target_element": target
,
2458 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2459 "member-vnf-index": member_vnf_index
,
2461 "kdu_name": kdu_name
,
2462 "vdu_count_index": vdu_index
,
2463 "operational-status": "init", # TODO revise
2464 "detailed-status": "", # TODO revise
2465 "step": "initial-deploy", # TODO revise
2467 "vdu_name": vdu_name
,
2469 "ee_descriptor_id": ee_descriptor_id
2473 # create VCA and configurationStatus in db
2475 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2476 "configurationStatus.{}".format(vca_index
): dict()
2478 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2480 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2482 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
2483 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
2484 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
2487 task_n2vc
= asyncio
.ensure_future(
2488 self
.instantiate_N2VC(
2489 logging_text
=logging_text
,
2490 vca_index
=vca_index
,
2496 vdu_index
=vdu_index
,
2497 deploy_params
=deploy_params
,
2498 config_descriptor
=descriptor_config
,
2499 base_folder
=base_folder
,
2500 nslcmop_id
=nslcmop_id
,
2504 ee_config_descriptor
=ee_item
2507 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2508 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2509 member_vnf_index
or "", vdu_id
or "")
2512 def _create_nslcmop(nsr_id
, operation
, params
):
2514 Creates a ns-lcm-opp content to be stored at database.
2515 :param nsr_id: internal id of the instance
2516 :param operation: instantiate, terminate, scale, action, ...
2517 :param params: user parameters for the operation
2518 :return: dictionary following SOL005 format
2520 # Raise exception if invalid arguments
2521 if not (nsr_id
and operation
and params
):
2523 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2529 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2530 "operationState": "PROCESSING",
2531 "statusEnteredTime": now
,
2532 "nsInstanceId": nsr_id
,
2533 "lcmOperationType": operation
,
2535 "isAutomaticInvocation": False,
2536 "operationParams": params
,
2537 "isCancelPending": False,
2539 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2540 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2545 def _format_additional_params(self
, params
):
2546 params
= params
or {}
2547 for key
, value
in params
.items():
2548 if str(value
).startswith("!!yaml "):
2549 params
[key
] = yaml
.safe_load(value
[7:])
2552 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2553 primitive
= seq
.get('name')
2554 primitive_params
= {}
2556 "member_vnf_index": vnf_index
,
2557 "primitive": primitive
,
2558 "primitive_params": primitive_params
,
2561 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2565 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2566 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2567 if op
.get('operationState') == 'COMPLETED':
2568 # b. Skip sub-operation
2569 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2570 return self
.SUBOPERATION_STATUS_SKIP
2572 # c. retry executing sub-operation
2573 # The sub-operation exists, and operationState != 'COMPLETED'
2574 # Update operationState = 'PROCESSING' to indicate a retry.
2575 operationState
= 'PROCESSING'
2576 detailed_status
= 'In progress'
2577 self
._update
_suboperation
_status
(
2578 db_nslcmop
, op_index
, operationState
, detailed_status
)
2579 # Return the sub-operation index
2580 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2581 # with arguments extracted from the sub-operation
2584 # Find a sub-operation where all keys in a matching dictionary must match
2585 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2586 def _find_suboperation(self
, db_nslcmop
, match
):
2587 if db_nslcmop
and match
:
2588 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2589 for i
, op
in enumerate(op_list
):
2590 if all(op
.get(k
) == match
[k
] for k
in match
):
2592 return self
.SUBOPERATION_STATUS_NOT_FOUND
2594 # Update status for a sub-operation given its index
2595 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2596 # Update DB for HA tasks
2597 q_filter
= {'_id': db_nslcmop
['_id']}
2598 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2599 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2600 self
.db
.set_one("nslcmops",
2602 update_dict
=update_dict
,
2603 fail_on_empty
=False)
2605 # Add sub-operation, return the index of the added sub-operation
2606 # Optionally, set operationState, detailed-status, and operationType
2607 # Status and type are currently set for 'scale' sub-operations:
2608 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2609 # 'detailed-status' : status message
2610 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2611 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2612 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2613 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2614 RO_nsr_id
=None, RO_scaling_info
=None):
2616 return self
.SUBOPERATION_STATUS_NOT_FOUND
2617 # Get the "_admin.operations" list, if it exists
2618 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2619 op_list
= db_nslcmop_admin
.get('operations')
2620 # Create or append to the "_admin.operations" list
2621 new_op
= {'member_vnf_index': vnf_index
,
2623 'vdu_count_index': vdu_count_index
,
2624 'primitive': primitive
,
2625 'primitive_params': mapped_primitive_params
}
2627 new_op
['operationState'] = operationState
2629 new_op
['detailed-status'] = detailed_status
2631 new_op
['lcmOperationType'] = operationType
2633 new_op
['RO_nsr_id'] = RO_nsr_id
2635 new_op
['RO_scaling_info'] = RO_scaling_info
2637 # No existing operations, create key 'operations' with current operation as first list element
2638 db_nslcmop_admin
.update({'operations': [new_op
]})
2639 op_list
= db_nslcmop_admin
.get('operations')
2641 # Existing operations, append operation to list
2642 op_list
.append(new_op
)
2644 db_nslcmop_update
= {'_admin.operations': op_list
}
2645 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2646 op_index
= len(op_list
) - 1
2649 # Helper methods for scale() sub-operations
2651 # pre-scale/post-scale:
2652 # Check for 3 different cases:
2653 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2654 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2655 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2656 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2657 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2658 # Find this sub-operation
2659 if RO_nsr_id
and RO_scaling_info
:
2660 operationType
= 'SCALE-RO'
2662 'member_vnf_index': vnf_index
,
2663 'RO_nsr_id': RO_nsr_id
,
2664 'RO_scaling_info': RO_scaling_info
,
2668 'member_vnf_index': vnf_index
,
2669 'primitive': vnf_config_primitive
,
2670 'primitive_params': primitive_params
,
2671 'lcmOperationType': operationType
2673 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
2674 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
2675 # a. New sub-operation
2676 # The sub-operation does not exist, add it.
2677 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2678 # The following parameters are set to None for all kind of scaling:
2680 vdu_count_index
= None
2682 if RO_nsr_id
and RO_scaling_info
:
2683 vnf_config_primitive
= None
2684 primitive_params
= None
2687 RO_scaling_info
= None
2688 # Initial status for sub-operation
2689 operationState
= 'PROCESSING'
2690 detailed_status
= 'In progress'
2691 # Add sub-operation for pre/post-scaling (zero or more operations)
2692 self
._add
_suboperation
(db_nslcmop
,
2697 vnf_config_primitive
,
2704 return self
.SUBOPERATION_STATUS_NEW
2706 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2707 # or op_index (operationState != 'COMPLETED')
2708 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
2710 # Function to return execution_environment id
2712 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
2713 # TODO vdu_index_count
2714 for vca
in vca_deployed_list
:
2715 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
2718 async def destroy_N2VC(self
, logging_text
, db_nslcmop
, vca_deployed
, config_descriptor
,
2719 vca_index
, destroy_ee
=True, exec_primitives
=True):
2721 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2722 :param logging_text:
2724 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2725 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2726 :param vca_index: index in the database _admin.deployed.VCA
2727 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2728 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2729 not executed properly
2730 :return: None or exception
2734 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2735 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
2739 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
2741 # execute terminate_primitives
2743 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
2744 config_descriptor
.get("terminate-config-primitive"), vca_deployed
.get("ee_descriptor_id"))
2745 vdu_id
= vca_deployed
.get("vdu_id")
2746 vdu_count_index
= vca_deployed
.get("vdu_count_index")
2747 vdu_name
= vca_deployed
.get("vdu_name")
2748 vnf_index
= vca_deployed
.get("member-vnf-index")
2749 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
2750 for seq
in terminate_primitives
:
2751 # For each sequence in list, get primitive and call _ns_execute_primitive()
2752 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
2753 vnf_index
, seq
.get("name"))
2754 self
.logger
.debug(logging_text
+ step
)
2755 # Create the primitive for each sequence, i.e. "primitive": "touch"
2756 primitive
= seq
.get('name')
2757 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
2760 self
._add
_suboperation
(db_nslcmop
,
2766 mapped_primitive_params
)
2767 # Sub-operations: Call _ns_execute_primitive() instead of action()
2769 result
, result_detail
= await self
._ns
_execute
_primitive
(vca_deployed
["ee_id"], primitive
,
2770 mapped_primitive_params
,
2772 except LcmException
:
2773 # this happens when VCA is not deployed. In this case it is not needed to terminate
2775 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
2776 if result
not in result_ok
:
2777 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2778 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
2779 # set that this VCA do not need terminated
2780 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
2781 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
2783 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
2784 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
2787 await self
.vca_map
[vca_type
].delete_execution_environment(vca_deployed
["ee_id"])
2789 async def _delete_all_N2VC(self
, db_nsr
: dict):
2790 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
2791 namespace
= "." + db_nsr
["_id"]
2793 await self
.n2vc
.delete_namespace(namespace
=namespace
, total_timeout
=self
.timeout_charm_delete
)
2794 except N2VCNotFound
: # already deleted. Skip
2796 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
2798 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
2800 Terminates a deployment from RO
2801 :param logging_text:
2802 :param nsr_deployed: db_nsr._admin.deployed
2805 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2806 this method will update only the index 2, but it will write on database the concatenated content of the list
2811 ro_nsr_id
= ro_delete_action
= None
2812 if nsr_deployed
and nsr_deployed
.get("RO"):
2813 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
2814 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
2817 stage
[2] = "Deleting ns from VIM."
2818 db_nsr_update
["detailed-status"] = " ".join(stage
)
2819 self
._write
_op
_status
(nslcmop_id
, stage
)
2820 self
.logger
.debug(logging_text
+ stage
[2])
2821 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2822 self
._write
_op
_status
(nslcmop_id
, stage
)
2823 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
2824 ro_delete_action
= desc
["action_id"]
2825 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2826 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2827 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2828 if ro_delete_action
:
2829 # wait until NS is deleted from VIM
2830 stage
[2] = "Waiting ns deleted from VIM."
2831 detailed_status_old
= None
2832 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
2834 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2835 self
._write
_op
_status
(nslcmop_id
, stage
)
2837 delete_timeout
= 20 * 60 # 20 minutes
2838 while delete_timeout
> 0:
2839 desc
= await self
.RO
.show(
2841 item_id_name
=ro_nsr_id
,
2842 extra_item
="action",
2843 extra_item_id
=ro_delete_action
)
2846 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
2848 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
2849 if ns_status
== "ERROR":
2850 raise ROclient
.ROClientException(ns_status_info
)
2851 elif ns_status
== "BUILD":
2852 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
2853 elif ns_status
== "ACTIVE":
2854 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2855 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2858 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
2859 if stage
[2] != detailed_status_old
:
2860 detailed_status_old
= stage
[2]
2861 db_nsr_update
["detailed-status"] = " ".join(stage
)
2862 self
._write
_op
_status
(nslcmop_id
, stage
)
2863 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2864 await asyncio
.sleep(5, loop
=self
.loop
)
2866 else: # delete_timeout <= 0:
2867 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
2869 except Exception as e
:
2870 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2871 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2872 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2873 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2874 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2875 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
2876 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2877 failed_detail
.append("delete conflict: {}".format(e
))
2878 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
2880 failed_detail
.append("delete error: {}".format(e
))
2881 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
2884 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
2885 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
2887 stage
[2] = "Deleting nsd from RO."
2888 db_nsr_update
["detailed-status"] = " ".join(stage
)
2889 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2890 self
._write
_op
_status
(nslcmop_id
, stage
)
2891 await self
.RO
.delete("nsd", ro_nsd_id
)
2892 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
2893 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2894 except Exception as e
:
2895 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2896 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2897 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
2898 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2899 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
2900 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2902 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
2903 self
.logger
.error(logging_text
+ failed_detail
[-1])
2905 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
2906 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
2907 if not vnf_deployed
or not vnf_deployed
["id"]:
2910 ro_vnfd_id
= vnf_deployed
["id"]
2911 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2912 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
2913 db_nsr_update
["detailed-status"] = " ".join(stage
)
2914 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2915 self
._write
_op
_status
(nslcmop_id
, stage
)
2916 await self
.RO
.delete("vnfd", ro_vnfd_id
)
2917 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
2918 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2919 except Exception as e
:
2920 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2921 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2922 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
2923 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2924 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
2925 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2927 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
2928 self
.logger
.error(logging_text
+ failed_detail
[-1])
2931 stage
[2] = "Error deleting from VIM"
2933 stage
[2] = "Deleted from VIM"
2934 db_nsr_update
["detailed-status"] = " ".join(stage
)
2935 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2936 self
._write
_op
_status
(nslcmop_id
, stage
)
2939 raise LcmException("; ".join(failed_detail
))
2941 async def terminate(self
, nsr_id
, nslcmop_id
):
2942 # Try to lock HA task here
2943 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
2944 if not task_is_locked_by_me
:
2947 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
2948 self
.logger
.debug(logging_text
+ "Enter")
2949 timeout_ns_terminate
= self
.timeout_ns_terminate
2952 operation_params
= None
2954 error_list
= [] # annotates all failed error messages
2955 db_nslcmop_update
= {}
2956 autoremove
= False # autoremove after terminated
2957 tasks_dict_info
= {}
2959 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2960 # ^ contains [stage, step, VIM-status]
2962 # wait for any previous tasks in process
2963 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
2965 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2966 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2967 operation_params
= db_nslcmop
.get("operationParams") or {}
2968 if operation_params
.get("timeout_ns_terminate"):
2969 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
2970 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2971 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2973 db_nsr_update
["operational-status"] = "terminating"
2974 db_nsr_update
["config-status"] = "terminating"
2975 self
._write
_ns
_status
(
2977 ns_state
="TERMINATING",
2978 current_operation
="TERMINATING",
2979 current_operation_id
=nslcmop_id
,
2980 other_update
=db_nsr_update
2982 self
._write
_op
_status
(
2987 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
2988 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
2991 stage
[1] = "Getting vnf descriptors from db."
2992 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2993 db_vnfds_from_id
= {}
2994 db_vnfds_from_member_index
= {}
2996 for vnfr
in db_vnfrs_list
:
2997 vnfd_id
= vnfr
["vnfd-id"]
2998 if vnfd_id
not in db_vnfds_from_id
:
2999 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3000 db_vnfds_from_id
[vnfd_id
] = vnfd
3001 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3003 # Destroy individual execution environments when there are terminating primitives.
3004 # Rest of EE will be deleted at once
3005 # TODO - check before calling _destroy_N2VC
3006 # if not operation_params.get("skip_terminate_primitives"):#
3007 # or not vca.get("needed_terminate"):
3008 stage
[0] = "Stage 2/3 execute terminating primitives."
3009 self
.logger
.debug(logging_text
+ stage
[0])
3010 stage
[1] = "Looking execution environment that needs terminate."
3011 self
.logger
.debug(logging_text
+ stage
[1])
3013 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3014 config_descriptor
= None
3015 if not vca
or not vca
.get("ee_id"):
3017 if not vca
.get("member-vnf-index"):
3019 config_descriptor
= db_nsr
.get("ns-configuration")
3020 elif vca
.get("vdu_id"):
3021 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3022 vdud
= next((vdu
for vdu
in db_vnfd
.get("vdu", ()) if vdu
["id"] == vca
.get("vdu_id")), None)
3024 config_descriptor
= vdud
.get("vdu-configuration")
3025 elif vca
.get("kdu_name"):
3026 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3027 kdud
= next((kdu
for kdu
in db_vnfd
.get("kdu", ()) if kdu
["name"] == vca
.get("kdu_name")), None)
3029 config_descriptor
= kdud
.get("kdu-configuration")
3031 config_descriptor
= db_vnfds_from_member_index
[vca
["member-vnf-index"]].get("vnf-configuration")
3032 vca_type
= vca
.get("type")
3033 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3034 vca
.get("needed_terminate"))
3035 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3036 # pending native charms
3037 destroy_ee
= True if vca_type
in ("helm", "helm-v3", "native_charm") else False
3038 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3039 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3040 task
= asyncio
.ensure_future(
3041 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
, vca_index
,
3042 destroy_ee
, exec_terminate_primitives
))
3043 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3045 # wait for pending tasks of terminate primitives
3047 self
.logger
.debug(logging_text
+ 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
3048 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3049 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3051 tasks_dict_info
.clear()
3053 return # raise LcmException("; ".join(error_list))
3055 # remove All execution environments at once
3056 stage
[0] = "Stage 3/3 delete all."
3058 if nsr_deployed
.get("VCA"):
3059 stage
[1] = "Deleting all execution environments."
3060 self
.logger
.debug(logging_text
+ stage
[1])
3061 task_delete_ee
= asyncio
.ensure_future(asyncio
.wait_for(self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
),
3062 timeout
=self
.timeout_charm_delete
))
3063 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3064 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3066 # Delete from k8scluster
3067 stage
[1] = "Deleting KDUs."
3068 self
.logger
.debug(logging_text
+ stage
[1])
3069 # print(nsr_deployed)
3070 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3071 if not kdu
or not kdu
.get("kdu-instance"):
3073 kdu_instance
= kdu
.get("kdu-instance")
3074 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3075 task_delete_kdu_instance
= asyncio
.ensure_future(
3076 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3077 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3078 kdu_instance
=kdu_instance
))
3080 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3081 format(kdu
.get("k8scluster-type")))
3083 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3086 stage
[1] = "Deleting ns from VIM."
3088 task_delete_ro
= asyncio
.ensure_future(
3089 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3091 task_delete_ro
= asyncio
.ensure_future(
3092 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3093 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3095 # rest of staff will be done at finally
3097 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3098 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3100 except asyncio
.CancelledError
:
3101 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3102 exc
= "Operation was cancelled"
3103 except Exception as e
:
3104 exc
= traceback
.format_exc()
3105 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3108 error_list
.append(str(exc
))
3110 # wait for pending tasks
3112 stage
[1] = "Waiting for terminate pending tasks."
3113 self
.logger
.debug(logging_text
+ stage
[1])
3114 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3116 stage
[1] = stage
[2] = ""
3117 except asyncio
.CancelledError
:
3118 error_list
.append("Cancelled")
3119 # TODO cancell all tasks
3120 except Exception as exc
:
3121 error_list
.append(str(exc
))
3122 # update status at database
3124 error_detail
= "; ".join(error_list
)
3125 # self.logger.error(logging_text + error_detail)
3126 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
3127 error_description_nsr
= 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id
, stage
[0])
3129 db_nsr_update
["operational-status"] = "failed"
3130 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3131 db_nslcmop_update
["detailed-status"] = error_detail
3132 nslcmop_operation_state
= "FAILED"
3136 error_description_nsr
= error_description_nslcmop
= None
3137 ns_state
= "NOT_INSTANTIATED"
3138 db_nsr_update
["operational-status"] = "terminated"
3139 db_nsr_update
["detailed-status"] = "Done"
3140 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3141 db_nslcmop_update
["detailed-status"] = "Done"
3142 nslcmop_operation_state
= "COMPLETED"
3145 self
._write
_ns
_status
(
3148 current_operation
="IDLE",
3149 current_operation_id
=None,
3150 error_description
=error_description_nsr
,
3151 error_detail
=error_detail
,
3152 other_update
=db_nsr_update
3154 self
._write
_op
_status
(
3157 error_message
=error_description_nslcmop
,
3158 operation_state
=nslcmop_operation_state
,
3159 other_update
=db_nslcmop_update
,
3161 if ns_state
== "NOT_INSTANTIATED":
3163 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "NOT_INSTANTIATED"})
3164 except DbException
as e
:
3165 self
.logger
.warn(logging_text
+ 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3167 if operation_params
:
3168 autoremove
= operation_params
.get("autoremove", False)
3169 if nslcmop_operation_state
:
3171 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3172 "operationState": nslcmop_operation_state
,
3173 "autoremove": autoremove
},
3175 except Exception as e
:
3176 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3178 self
.logger
.debug(logging_text
+ "Exit")
3179 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3181 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3183 error_detail_list
= []
3185 pending_tasks
= list(created_tasks_info
.keys())
3186 num_tasks
= len(pending_tasks
)
3188 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3189 self
._write
_op
_status
(nslcmop_id
, stage
)
3190 while pending_tasks
:
3192 _timeout
= timeout
+ time_start
- time()
3193 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3194 return_when
=asyncio
.FIRST_COMPLETED
)
3195 num_done
+= len(done
)
3196 if not done
: # Timeout
3197 for task
in pending_tasks
:
3198 new_error
= created_tasks_info
[task
] + ": Timeout"
3199 error_detail_list
.append(new_error
)
3200 error_list
.append(new_error
)
3203 if task
.cancelled():
3206 exc
= task
.exception()
3208 if isinstance(exc
, asyncio
.TimeoutError
):
3210 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3211 error_list
.append(created_tasks_info
[task
])
3212 error_detail_list
.append(new_error
)
3213 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3214 K8sException
, NgRoException
)):
3215 self
.logger
.error(logging_text
+ new_error
)
3217 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3218 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + " " + exc_traceback
)
3220 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3221 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3223 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3224 if nsr_id
: # update also nsr
3225 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3226 "errorDetail": ". ".join(error_detail_list
)})
3227 self
._write
_op
_status
(nslcmop_id
, stage
)
3228 return error_detail_list
3231 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3233 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3234 The default-value is used. If it is between < > it look for a value at instantiation_params
3235 :param primitive_desc: portion of VNFD/NSD that describes primitive
3236 :param params: Params provided by user
3237 :param instantiation_params: Instantiation params provided by user
3238 :return: a dictionary with the calculated params
3240 calculated_params
= {}
3241 for parameter
in primitive_desc
.get("parameter", ()):
3242 param_name
= parameter
["name"]
3243 if param_name
in params
:
3244 calculated_params
[param_name
] = params
[param_name
]
3245 elif "default-value" in parameter
or "value" in parameter
:
3246 if "value" in parameter
:
3247 calculated_params
[param_name
] = parameter
["value"]
3249 calculated_params
[param_name
] = parameter
["default-value"]
3250 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3251 and calculated_params
[param_name
].endswith(">"):
3252 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3253 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3255 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3256 format(calculated_params
[param_name
], primitive_desc
["name"]))
3258 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3259 format(param_name
, primitive_desc
["name"]))
3261 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3262 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
],
3263 default_flow_style
=True, width
=256)
3264 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3265 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3266 if parameter
.get("data-type") == "INTEGER":
3268 calculated_params
[param_name
] = int(calculated_params
[param_name
])
3269 except ValueError: # error converting string to int
3271 "Parameter {} of primitive {} must be integer".format(param_name
, primitive_desc
["name"]))
3272 elif parameter
.get("data-type") == "BOOLEAN":
3273 calculated_params
[param_name
] = not ((str(calculated_params
[param_name
])).lower() == 'false')
3275 # add always ns_config_info if primitive name is config
3276 if primitive_desc
["name"] == "config":
3277 if "ns_config_info" in instantiation_params
:
3278 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3279 return calculated_params
3281 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None,
3282 ee_descriptor_id
=None):
3283 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3284 for vca
in deployed_vca
:
3287 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3289 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3291 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3293 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
3297 # vca_deployed not found
3298 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3299 " is not deployed".format(member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
,
3302 ee_id
= vca
.get("ee_id")
3303 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3305 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3306 "execution environment"
3307 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3308 return ee_id
, vca_type
3310 async def _ns_execute_primitive(self
, ee_id
, primitive
, primitive_params
, retries
=0, retries_interval
=30,
3311 timeout
=None, vca_type
=None, db_dict
=None) -> (str, str):
3313 if primitive
== "config":
3314 primitive_params
= {"params": primitive_params
}
3316 vca_type
= vca_type
or "lxc_proxy_charm"
3320 output
= await asyncio
.wait_for(
3321 self
.vca_map
[vca_type
].exec_primitive(
3323 primitive_name
=primitive
,
3324 params_dict
=primitive_params
,
3325 progress_timeout
=self
.timeout_progress_primitive
,
3326 total_timeout
=self
.timeout_primitive
,
3328 timeout
=timeout
or self
.timeout_primitive
)
3331 except asyncio
.CancelledError
:
3333 except Exception as e
: # asyncio.TimeoutError
3334 if isinstance(e
, asyncio
.TimeoutError
):
3338 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3340 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3342 return 'FAILED', str(e
)
3344 return 'COMPLETED', output
3346 except (LcmException
, asyncio
.CancelledError
):
3348 except Exception as e
:
3349 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3351 async def action(self
, nsr_id
, nslcmop_id
):
3352 # Try to lock HA task here
3353 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3354 if not task_is_locked_by_me
:
3357 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3358 self
.logger
.debug(logging_text
+ "Enter")
3359 # get all needed from database
3363 db_nslcmop_update
= {}
3364 nslcmop_operation_state
= None
3365 error_description_nslcmop
= None
3368 # wait for any previous tasks in process
3369 step
= "Waiting for previous operations to terminate"
3370 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3372 self
._write
_ns
_status
(
3375 current_operation
="RUNNING ACTION",
3376 current_operation_id
=nslcmop_id
3379 step
= "Getting information from database"
3380 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3381 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3383 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3384 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3385 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3386 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3387 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3388 primitive
= db_nslcmop
["operationParams"]["primitive"]
3389 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3390 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3393 step
= "Getting vnfr from database"
3394 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3395 step
= "Getting vnfd from database"
3396 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3398 step
= "Getting nsd from database"
3399 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3401 # for backward compatibility
3402 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3403 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3404 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3405 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3407 # look for primitive
3408 config_primitive_desc
= descriptor_configuration
= None
3410 descriptor_configuration
= get_vdu_configuration(db_vnfd
, vdu_id
)
3412 descriptor_configuration
= get_kdu_configuration(db_vnfd
, kdu_name
)
3414 descriptor_configuration
= get_vnf_configuration(db_vnfd
)
3416 descriptor_configuration
= db_nsd
.get("ns-configuration")
3418 if descriptor_configuration
and descriptor_configuration
.get("config-primitive"):
3419 for config_primitive
in descriptor_configuration
["config-primitive"]:
3420 if config_primitive
["name"] == primitive
:
3421 config_primitive_desc
= config_primitive
3424 if not config_primitive_desc
:
3425 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3426 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3428 primitive_name
= primitive
3429 ee_descriptor_id
= None
3431 primitive_name
= config_primitive_desc
.get("execution-environment-primitive", primitive
)
3432 ee_descriptor_id
= config_primitive_desc
.get("execution-environment-ref")
3436 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3437 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
3439 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3440 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3442 desc_params
= parse_yaml_strings(db_vnfr
.get("additionalParamsForVnf"))
3444 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
3446 if kdu_name
and get_kdu_configuration(db_vnfd
):
3447 kdu_action
= True if not get_kdu_configuration(db_vnfd
)["juju"] else False
3449 # TODO check if ns is in a proper status
3450 if kdu_name
and (primitive_name
in ("upgrade", "rollback", "status") or kdu_action
):
3451 # kdur and desc_params already set from before
3452 if primitive_params
:
3453 desc_params
.update(primitive_params
)
3454 # TODO Check if we will need something at vnf level
3455 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3456 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3459 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3461 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3462 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3463 raise LcmException(msg
)
3465 db_dict
= {"collection": "nsrs",
3466 "filter": {"_id": nsr_id
},
3467 "path": "_admin.deployed.K8s.{}".format(index
)}
3468 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
))
3469 step
= "Executing kdu {}".format(primitive_name
)
3470 if primitive_name
== "upgrade":
3471 if desc_params
.get("kdu_model"):
3472 kdu_model
= desc_params
.get("kdu_model")
3473 del desc_params
["kdu_model"]
3475 kdu_model
= kdu
.get("kdu-model")
3476 parts
= kdu_model
.split(sep
=":")
3478 kdu_model
= parts
[0]
3480 detailed_status
= await asyncio
.wait_for(
3481 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3482 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3483 kdu_instance
=kdu
.get("kdu-instance"),
3484 atomic
=True, kdu_model
=kdu_model
,
3485 params
=desc_params
, db_dict
=db_dict
,
3486 timeout
=timeout_ns_action
),
3487 timeout
=timeout_ns_action
+ 10)
3488 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3489 elif primitive_name
== "rollback":
3490 detailed_status
= await asyncio
.wait_for(
3491 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3492 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3493 kdu_instance
=kdu
.get("kdu-instance"),
3495 timeout
=timeout_ns_action
)
3496 elif primitive_name
== "status":
3497 detailed_status
= await asyncio
.wait_for(
3498 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3499 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3500 kdu_instance
=kdu
.get("kdu-instance")),
3501 timeout
=timeout_ns_action
)
3503 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3504 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3506 detailed_status
= await asyncio
.wait_for(
3507 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3508 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3509 kdu_instance
=kdu_instance
,
3510 primitive_name
=primitive_name
,
3511 params
=params
, db_dict
=db_dict
,
3512 timeout
=timeout_ns_action
),
3513 timeout
=timeout_ns_action
)
3516 nslcmop_operation_state
= 'COMPLETED'
3518 detailed_status
= ''
3519 nslcmop_operation_state
= 'FAILED'
3521 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"], member_vnf_index
=vnf_index
,
3522 vdu_id
=vdu_id
, vdu_count_index
=vdu_count_index
,
3523 ee_descriptor_id
=ee_descriptor_id
)
3524 db_nslcmop_notif
= {"collection": "nslcmops",
3525 "filter": {"_id": nslcmop_id
},
3526 "path": "admin.VCA"}
3527 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3529 primitive
=primitive_name
,
3530 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3531 timeout
=timeout_ns_action
,
3533 db_dict
=db_nslcmop_notif
)
3535 db_nslcmop_update
["detailed-status"] = detailed_status
3536 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3537 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3539 return # database update is called inside finally
3541 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3542 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3544 except asyncio
.CancelledError
:
3545 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3546 exc
= "Operation was cancelled"
3547 except asyncio
.TimeoutError
:
3548 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3550 except Exception as e
:
3551 exc
= traceback
.format_exc()
3552 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3555 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3556 "FAILED {}: {}".format(step
, exc
)
3557 nslcmop_operation_state
= "FAILED"
3559 self
._write
_ns
_status
(
3561 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3562 current_operation
="IDLE",
3563 current_operation_id
=None,
3564 # error_description=error_description_nsr,
3565 # error_detail=error_detail,
3566 other_update
=db_nsr_update
3569 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
3570 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
3572 if nslcmop_operation_state
:
3574 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3575 "operationState": nslcmop_operation_state
},
3577 except Exception as e
:
3578 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3579 self
.logger
.debug(logging_text
+ "Exit")
3580 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3581 return nslcmop_operation_state
, detailed_status
3583 async def scale(self
, nsr_id
, nslcmop_id
):
3584 # Try to lock HA task here
3585 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3586 if not task_is_locked_by_me
:
3589 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3590 stage
= ['', '', '']
3591 # ^ stage, step, VIM progress
3592 self
.logger
.debug(logging_text
+ "Enter")
3593 # get all needed from database
3595 db_nslcmop_update
= {}
3598 # in case of error, indicates what part of scale was failed to put nsr at error status
3599 scale_process
= None
3600 old_operational_status
= ""
3601 old_config_status
= ""
3603 # wait for any previous tasks in process
3604 step
= "Waiting for previous operations to terminate"
3605 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3606 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None,
3607 current_operation
="SCALING", current_operation_id
=nslcmop_id
)
3609 step
= "Getting nslcmop from database"
3610 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3611 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3613 step
= "Getting nsr from database"
3614 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3615 old_operational_status
= db_nsr
["operational-status"]
3616 old_config_status
= db_nsr
["config-status"]
3618 step
= "Parsing scaling parameters"
3619 db_nsr_update
["operational-status"] = "scaling"
3620 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3621 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3624 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3625 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3626 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3627 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3628 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3631 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3632 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3633 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
3634 # for backward compatibility
3635 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3636 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3637 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3638 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3640 step
= "Getting vnfr from database"
3641 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3643 step
= "Getting vnfd from database"
3644 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3646 step
= "Getting scaling-group-descriptor"
3647 scaling_descriptor
= find_in_list(
3651 lambda scale_desc
: scale_desc
["name"] == scaling_group
3653 if not scaling_descriptor
:
3654 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3655 "at vnfd:scaling-group-descriptor".format(scaling_group
))
3657 step
= "Sending scale order to VIM"
3658 # TODO check if ns is in a proper status
3660 if not db_nsr
["_admin"].get("scaling-group"):
3661 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
3662 admin_scale_index
= 0
3664 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
3665 if admin_scale_info
["name"] == scaling_group
:
3666 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
3668 else: # not found, set index one plus last element and add new entry with the name
3669 admin_scale_index
+= 1
3670 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
3671 RO_scaling_info
= []
3672 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
3673 if scaling_type
== "SCALE_OUT":
3674 if "aspect-delta-details" not in scaling_descriptor
:
3676 "Aspect delta details not fount in scaling descriptor {}".format(
3677 scaling_descriptor
["name"]
3680 # count if max-instance-count is reached
3681 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3683 vdu_scaling_info
["scaling_direction"] = "OUT"
3684 vdu_scaling_info
["vdu-create"] = {}
3685 for delta
in deltas
:
3686 for vdu_delta
in delta
["vdu-delta"]:
3687 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
3688 vdu_index
= get_vdu_index(db_vnfr
, vdu_delta
["id"])
3689 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
3691 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"]) or {}
3692 cloud_init_list
= []
3694 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3695 max_instance_count
= 10
3696 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
3697 max_instance_count
= vdu_profile
.get("max-number-of-instances", 10)
3699 deafult_instance_num
= get_number_of_instances(db_vnfd
, vdud
["id"])
3701 nb_scale_op
+= vdu_delta
.get("number-of-instances", 1)
3703 if nb_scale_op
+ deafult_instance_num
> max_instance_count
:
3705 "reached the limit of {} (max-instance-count) "
3706 "scaling-out operations for the "
3707 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3709 for x
in range(vdu_delta
.get("number-of-instances", 1)):
3711 # TODO Information of its own ip is not available because db_vnfr is not updated.
3712 additional_params
["OSM"] = get_osm_params(
3717 cloud_init_list
.append(
3718 self
._parse
_cloud
_init
(
3725 RO_scaling_info
.append(
3727 "osm_vdu_id": vdu_delta
["id"],
3728 "member-vnf-index": vnf_index
,
3730 "count": vdu_delta
.get("number-of-instances", 1)
3734 RO_scaling_info
[-1]["cloud_init"] = cloud_init_list
3735 vdu_scaling_info
["vdu-create"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3737 elif scaling_type
== "SCALE_IN":
3738 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
3739 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
3741 vdu_scaling_info
["scaling_direction"] = "IN"
3742 vdu_scaling_info
["vdu-delete"] = {}
3743 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3744 for delta
in deltas
:
3745 for vdu_delta
in delta
["vdu-delta"]:
3746 min_instance_count
= 0
3747 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3748 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
3749 min_instance_count
= vdu_profile
["min-number-of-instances"]
3751 deafult_instance_num
= get_number_of_instances(db_vnfd
, vdu_delta
["id"])
3753 nb_scale_op
-= vdu_delta
.get("number-of-instances", 1)
3754 if nb_scale_op
+ deafult_instance_num
< min_instance_count
:
3756 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3757 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3759 RO_scaling_info
.append({"osm_vdu_id": vdu_delta
["id"], "member-vnf-index": vnf_index
,
3760 "type": "delete", "count": vdu_delta
.get("number-of-instances", 1)})
3761 vdu_scaling_info
["vdu-delete"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3763 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3764 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
3765 if vdu_scaling_info
["scaling_direction"] == "IN":
3766 for vdur
in reversed(db_vnfr
["vdur"]):
3767 if vdu_delete
.get(vdur
["vdu-id-ref"]):
3768 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
3769 vdu_scaling_info
["vdu"].append({
3770 "name": vdur
.get("name") or vdur
.get("vdu-name"),
3771 "vdu_id": vdur
["vdu-id-ref"],
3774 for interface
in vdur
["interfaces"]:
3775 vdu_scaling_info
["vdu"][-1]["interface"].append({
3776 "name": interface
["name"],
3777 "ip_address": interface
["ip-address"],
3778 "mac_address": interface
.get("mac-address"),
3780 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3783 step
= "Executing pre-scale vnf-config-primitive"
3784 if scaling_descriptor
.get("scaling-config-action"):
3785 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3786 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
3787 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
3788 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3789 step
= db_nslcmop_update
["detailed-status"] = \
3790 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3792 # look for primitive
3793 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
3794 if config_primitive
["name"] == vnf_config_primitive
:
3798 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3799 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3800 "primitive".format(scaling_group
, vnf_config_primitive
))
3802 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3803 if db_vnfr
.get("additionalParamsForVnf"):
3804 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3806 scale_process
= "VCA"
3807 db_nsr_update
["config-status"] = "configuring pre-scaling"
3808 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3810 # Pre-scale retry check: Check if this sub-operation has been executed before
3811 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3812 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
3813 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3814 # Skip sub-operation
3815 result
= 'COMPLETED'
3816 result_detail
= 'Done'
3817 self
.logger
.debug(logging_text
+
3818 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3819 vnf_config_primitive
, result
, result_detail
))
3821 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3822 # New sub-operation: Get index of this sub-operation
3823 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3824 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3825 format(vnf_config_primitive
))
3827 # retry: Get registered params for this existing sub-operation
3828 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3829 vnf_index
= op
.get('member_vnf_index')
3830 vnf_config_primitive
= op
.get('primitive')
3831 primitive_params
= op
.get('primitive_params')
3832 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3833 format(vnf_config_primitive
))
3834 # Execute the primitive, either with new (first-time) or registered (reintent) args
3835 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
3836 primitive_name
= config_primitive
.get("execution-environment-primitive",
3837 vnf_config_primitive
)
3838 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3839 member_vnf_index
=vnf_index
,
3841 vdu_count_index
=None,
3842 ee_descriptor_id
=ee_descriptor_id
)
3843 result
, result_detail
= await self
._ns
_execute
_primitive
(
3844 ee_id
, primitive_name
, primitive_params
, vca_type
)
3845 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3846 vnf_config_primitive
, result
, result_detail
))
3847 # Update operationState = COMPLETED | FAILED
3848 self
._update
_suboperation
_status
(
3849 db_nslcmop
, op_index
, result
, result_detail
)
3851 if result
== "FAILED":
3852 raise LcmException(result_detail
)
3853 db_nsr_update
["config-status"] = old_config_status
3854 scale_process
= None
3857 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
3858 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
3862 scale_process
= "RO"
3863 if self
.ro_config
.get("ng"):
3864 await self
._scale
_ng
_ro
(logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
)
3865 vdu_scaling_info
.pop("vdu-create", None)
3866 vdu_scaling_info
.pop("vdu-delete", None)
3868 scale_process
= None
3870 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3873 # execute primitive service POST-SCALING
3874 step
= "Executing post-scale vnf-config-primitive"
3875 if scaling_descriptor
.get("scaling-config-action"):
3876 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3877 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
3878 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
3879 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3880 step
= db_nslcmop_update
["detailed-status"] = \
3881 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3883 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3884 if db_vnfr
.get("additionalParamsForVnf"):
3885 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3887 # look for primitive
3888 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
3889 if config_primitive
["name"] == vnf_config_primitive
:
3893 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
3894 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
3895 "config-primitive".format(scaling_group
, vnf_config_primitive
))
3896 scale_process
= "VCA"
3897 db_nsr_update
["config-status"] = "configuring post-scaling"
3898 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3900 # Post-scale retry check: Check if this sub-operation has been executed before
3901 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3902 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
3903 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3904 # Skip sub-operation
3905 result
= 'COMPLETED'
3906 result_detail
= 'Done'
3907 self
.logger
.debug(logging_text
+
3908 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3909 format(vnf_config_primitive
, result
, result_detail
))
3911 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3912 # New sub-operation: Get index of this sub-operation
3913 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3914 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3915 format(vnf_config_primitive
))
3917 # retry: Get registered params for this existing sub-operation
3918 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3919 vnf_index
= op
.get('member_vnf_index')
3920 vnf_config_primitive
= op
.get('primitive')
3921 primitive_params
= op
.get('primitive_params')
3922 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3923 format(vnf_config_primitive
))
3924 # Execute the primitive, either with new (first-time) or registered (reintent) args
3925 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
3926 primitive_name
= config_primitive
.get("execution-environment-primitive",
3927 vnf_config_primitive
)
3928 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3929 member_vnf_index
=vnf_index
,
3931 vdu_count_index
=None,
3932 ee_descriptor_id
=ee_descriptor_id
)
3933 result
, result_detail
= await self
._ns
_execute
_primitive
(
3934 ee_id
, primitive_name
, primitive_params
, vca_type
)
3935 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3936 vnf_config_primitive
, result
, result_detail
))
3937 # Update operationState = COMPLETED | FAILED
3938 self
._update
_suboperation
_status
(
3939 db_nslcmop
, op_index
, result
, result_detail
)
3941 if result
== "FAILED":
3942 raise LcmException(result_detail
)
3943 db_nsr_update
["config-status"] = old_config_status
3944 scale_process
= None
3947 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3948 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
3949 else old_operational_status
3950 db_nsr_update
["config-status"] = old_config_status
3952 except (ROclient
.ROClientException
, DbException
, LcmException
, NgRoException
) as e
:
3953 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3955 except asyncio
.CancelledError
:
3956 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3957 exc
= "Operation was cancelled"
3958 except Exception as e
:
3959 exc
= traceback
.format_exc()
3960 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3962 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE", current_operation_id
=None)
3964 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
3965 nslcmop_operation_state
= "FAILED"
3967 db_nsr_update
["operational-status"] = old_operational_status
3968 db_nsr_update
["config-status"] = old_config_status
3969 db_nsr_update
["detailed-status"] = ""
3971 if "VCA" in scale_process
:
3972 db_nsr_update
["config-status"] = "failed"
3973 if "RO" in scale_process
:
3974 db_nsr_update
["operational-status"] = "failed"
3975 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
3978 error_description_nslcmop
= None
3979 nslcmop_operation_state
= "COMPLETED"
3980 db_nslcmop_update
["detailed-status"] = "Done"
3982 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
3983 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
3985 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE",
3986 current_operation_id
=None, other_update
=db_nsr_update
)
3988 if nslcmop_operation_state
:
3990 msg
= {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
, "operationState": nslcmop_operation_state
}
3991 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
3992 except Exception as e
:
3993 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3994 self
.logger
.debug(logging_text
+ "Exit")
3995 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
3997 async def _scale_ng_ro(self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
):
3998 nsr_id
= db_nslcmop
["nsInstanceId"]
3999 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4002 # read from db: vnfd's for every vnf
4005 # for each vnf in ns, read vnfd
4006 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
4007 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
4008 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
4009 # if we haven't this vnfd, read it from db
4010 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
4012 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4013 db_vnfds
.append(vnfd
)
4014 n2vc_key
= self
.n2vc
.get_public_key()
4015 n2vc_key_list
= [n2vc_key
]
4016 self
.scale_vnfr(db_vnfr
, vdu_scaling_info
.get("vdu-create"), vdu_scaling_info
.get("vdu-delete"),
4018 # db_vnfr has been updated, update db_vnfrs to use it
4019 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
4020 await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, db_nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
4021 db_vnfds
, n2vc_key_list
, stage
=stage
, start_deploy
=time(),
4022 timeout_ns_deploy
=self
.timeout_ns_deploy
)
4023 if vdu_scaling_info
.get("vdu-delete"):
4024 self
.scale_vnfr(db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False)
4026 async def add_prometheus_metrics(self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
):
4027 if not self
.prometheus
:
4029 # look if exist a file called 'prometheus*.j2' and
4030 artifact_content
= self
.fs
.dir_ls(artifact_path
)
4031 job_file
= next((f
for f
in artifact_content
if f
.startswith("prometheus") and f
.endswith(".j2")), None)
4034 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
4038 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
4039 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
4041 vnfr_id
= vnfr_id
.replace("-", "")
4043 "JOB_NAME": vnfr_id
,
4044 "TARGET_IP": target_ip
,
4045 "EXPORTER_POD_IP": host_name
,
4046 "EXPORTER_POD_PORT": host_port
,
4048 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
4049 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4050 for job
in job_list
:
4051 if not isinstance(job
.get("job_name"), str) or vnfr_id
not in job
["job_name"]:
4052 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
4053 job
["nsr_id"] = nsr_id
4054 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
4055 if await self
.prometheus
.update(job_dict
):
4056 return list(job_dict
.keys())
4058 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4060 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4062 :param: vim_account_id: VIM Account ID
4064 :return: (cloud_name, cloud_credential)
4066 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4067 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
4069 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4071 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4073 :param: vim_account_id: VIM Account ID
4075 :return: (cloud_name, cloud_credential)
4077 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4078 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")