1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, TemplateError
, TemplateNotFound
, StrictUndefined
, UndefinedError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
31 from osm_lcm
.data_utils
.vnfd
import get_vnf_configuration
, get_vdu_list
, get_vdu_profile
, \
32 get_ee_sorted_initial_config_primitive_list
, get_ee_sorted_terminate_config_primitive_list
, \
33 get_kdu_list
, get_virtual_link_profiles
, get_vdu
, get_vdu_configuration
, get_kdu_configuration
, \
35 from osm_lcm
.data_utils
.list_utils
import find_in_list
36 from osm_lcm
.data_utils
.vnfr
import get_osm_params
37 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
38 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
39 from n2vc
.k8s_helm_conn
import K8sHelmConnector
40 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
41 from n2vc
.k8s_juju_conn
import K8sJujuConnector
43 from osm_common
.dbbase
import DbException
44 from osm_common
.fsbase
import FsException
46 from osm_lcm
.data_utils
.database
.database
import Database
47 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
49 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
50 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
52 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
54 from copy
import copy
, deepcopy
56 from uuid
import uuid4
58 from random
import randint
60 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
64 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete
= 10 * 60
68 timeout_primitive
= 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
71 SUBOPERATION_STATUS_NOT_FOUND
= -1
72 SUBOPERATION_STATUS_NEW
= -2
73 SUBOPERATION_STATUS_SKIP
= -3
74 task_name_deploy_vca
= "Deploying VCA"
76 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
84 logger
=logging
.getLogger('lcm.ns')
87 self
.db
= Database().instance
.db
88 self
.fs
= Filesystem().instance
.fs
90 self
.lcm_tasks
= lcm_tasks
91 self
.timeout
= config
["timeout"]
92 self
.ro_config
= config
["ro_config"]
93 self
.ng_ro
= config
["ro_config"].get("ng")
94 self
.vca_config
= config
["VCA"].copy()
96 # create N2VC connector
97 self
.n2vc
= N2VCJujuConnector(
100 url
='{}:{}'.format(self
.vca_config
['host'], self
.vca_config
['port']),
101 username
=self
.vca_config
.get('user', None),
102 vca_config
=self
.vca_config
,
103 on_update_db
=self
._on
_update
_n
2vc
_db
,
108 self
.conn_helm_ee
= LCMHelmConn(
113 vca_config
=self
.vca_config
,
114 on_update_db
=self
._on
_update
_n
2vc
_db
117 self
.k8sclusterhelm2
= K8sHelmConnector(
118 kubectl_command
=self
.vca_config
.get("kubectlpath"),
119 helm_command
=self
.vca_config
.get("helmpath"),
126 self
.k8sclusterhelm3
= K8sHelm3Connector(
127 kubectl_command
=self
.vca_config
.get("kubectlpath"),
128 helm_command
=self
.vca_config
.get("helm3path"),
135 self
.k8sclusterjuju
= K8sJujuConnector(
136 kubectl_command
=self
.vca_config
.get("kubectlpath"),
137 juju_command
=self
.vca_config
.get("jujupath"),
141 vca_config
=self
.vca_config
,
146 self
.k8scluster_map
= {
147 "helm-chart": self
.k8sclusterhelm2
,
148 "helm-chart-v3": self
.k8sclusterhelm3
,
149 "chart": self
.k8sclusterhelm3
,
150 "juju-bundle": self
.k8sclusterjuju
,
151 "juju": self
.k8sclusterjuju
,
155 "lxc_proxy_charm": self
.n2vc
,
156 "native_charm": self
.n2vc
,
157 "k8s_proxy_charm": self
.n2vc
,
158 "helm": self
.conn_helm_ee
,
159 "helm-v3": self
.conn_helm_ee
162 self
.prometheus
= prometheus
165 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
168 def increment_ip_mac(ip_mac
, vm_index
=1):
169 if not isinstance(ip_mac
, str):
172 # try with ipv4 look for last dot
173 i
= ip_mac
.rfind(".")
176 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i
= ip_mac
.rfind(":")
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
)
187 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
192 # TODO filter RO descriptor fields...
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict
['deploymentStatus'] = ro_descriptor
198 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
200 except Exception as e
:
201 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
203 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
):
205 # remove last dot from path (if exists)
206 if path
.endswith('.'):
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
214 nsr_id
= filter.get('_id')
216 # read ns record from database
217 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
218 current_ns_status
= nsr
.get('nsState')
220 # get vca status for NS
221 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False)
225 db_dict
['vcaStatus'] = status_dict
227 # update configurationStatus for this VCA
229 vca_index
= int(path
[path
.rfind(".")+1:])
231 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
232 vca_status
= vca_list
[vca_index
].get('status')
234 configuration_status_list
= nsr
.get('configurationStatus')
235 config_status
= configuration_status_list
[vca_index
].get('status')
237 if config_status
== 'BROKEN' and vca_status
!= 'failed':
238 db_dict
['configurationStatus'][vca_index
] = 'READY'
239 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
240 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
241 except Exception as e
:
242 # not update configurationStatus
243 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
248 if current_ns_status
in ('READY', 'DEGRADED'):
249 error_description
= ''
251 if status_dict
.get('machines'):
252 for machine_id
in status_dict
.get('machines'):
253 machine
= status_dict
.get('machines').get(machine_id
)
254 # check machine agent-status
255 if machine
.get('agent-status'):
256 s
= machine
.get('agent-status').get('status')
259 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
260 # check machine instance status
261 if machine
.get('instance-status'):
262 s
= machine
.get('instance-status').get('status')
265 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
267 if status_dict
.get('applications'):
268 for app_id
in status_dict
.get('applications'):
269 app
= status_dict
.get('applications').get(app_id
)
270 # check application status
271 if app
.get('status'):
272 s
= app
.get('status').get('status')
275 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
277 if error_description
:
278 db_dict
['errorDescription'] = error_description
279 if current_ns_status
== 'READY' and is_degraded
:
280 db_dict
['nsState'] = 'DEGRADED'
281 if current_ns_status
== 'DEGRADED' and not is_degraded
:
282 db_dict
['nsState'] = 'READY'
285 self
.update_db_2("nsrs", nsr_id
, db_dict
)
287 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
289 except Exception as e
:
290 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
293 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
295 env
= Environment(undefined
=StrictUndefined
)
296 template
= env
.from_string(cloud_init_text
)
297 return template
.render(additional_params
or {})
298 except UndefinedError
as e
:
299 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
300 "file, must be provided in the instantiation parameters inside the "
301 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
))
302 except (TemplateError
, TemplateNotFound
) as e
:
303 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
304 format(vnfd_id
, vdu_id
, e
))
306 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
307 cloud_init_content
= cloud_init_file
= None
309 if vdu
.get("cloud-init-file"):
310 base_folder
= vnfd
["_admin"]["storage"]
311 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
312 vdu
["cloud-init-file"])
313 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
314 cloud_init_content
= ci_file
.read()
315 elif vdu
.get("cloud-init"):
316 cloud_init_content
= vdu
["cloud-init"]
318 return cloud_init_content
319 except FsException
as e
:
320 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
321 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
323 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
324 vdur
= next(vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"])
325 additional_params
= vdur
.get("additionalParams")
326 return parse_yaml_strings(additional_params
)
328 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
330 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
331 :param vnfd: input vnfd
332 :param new_id: overrides vnf id if provided
333 :param additionalParams: Instantiation params for VNFs provided
334 :param nsrId: Id of the NSR
335 :return: copy of vnfd
337 vnfd_RO
= deepcopy(vnfd
)
338 # remove unused by RO configuration, monitoring, scaling and internal keys
339 vnfd_RO
.pop("_id", None)
340 vnfd_RO
.pop("_admin", None)
341 vnfd_RO
.pop("vnf-configuration", None)
342 vnfd_RO
.pop("monitoring-param", None)
343 vnfd_RO
.pop("scaling-group-descriptor", None)
344 vnfd_RO
.pop("kdu", None)
345 vnfd_RO
.pop("k8s-cluster", None)
347 vnfd_RO
["id"] = new_id
349 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
350 for vdu
in get_iterable(vnfd_RO
, "vdu"):
351 vdu
.pop("cloud-init-file", None)
352 vdu
.pop("cloud-init", None)
356 def ip_profile_2_RO(ip_profile
):
357 RO_ip_profile
= deepcopy(ip_profile
)
358 if "dns-server" in RO_ip_profile
:
359 if isinstance(RO_ip_profile
["dns-server"], list):
360 RO_ip_profile
["dns-address"] = []
361 for ds
in RO_ip_profile
.pop("dns-server"):
362 RO_ip_profile
["dns-address"].append(ds
['address'])
364 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
365 if RO_ip_profile
.get("ip-version") == "ipv4":
366 RO_ip_profile
["ip-version"] = "IPv4"
367 if RO_ip_profile
.get("ip-version") == "ipv6":
368 RO_ip_profile
["ip-version"] = "IPv6"
369 if "dhcp-params" in RO_ip_profile
:
370 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
373 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
374 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
375 if db_vim
["_admin"]["operationalState"] != "ENABLED":
376 raise LcmException("VIM={} is not available. operationalState={}".format(
377 vim_account
, db_vim
["_admin"]["operationalState"]))
378 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
381 def get_ro_wim_id_for_wim_account(self
, wim_account
):
382 if isinstance(wim_account
, str):
383 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
384 if db_wim
["_admin"]["operationalState"] != "ENABLED":
385 raise LcmException("WIM={} is not available. operationalState={}".format(
386 wim_account
, db_wim
["_admin"]["operationalState"]))
387 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
392 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
394 db_vdu_push_list
= []
395 db_update
= {"_admin.modified": time()}
397 for vdu_id
, vdu_count
in vdu_create
.items():
398 vdur
= next((vdur
for vdur
in reversed(db_vnfr
["vdur"]) if vdur
["vdu-id-ref"] == vdu_id
), None)
400 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
403 for count
in range(vdu_count
):
404 vdur_copy
= deepcopy(vdur
)
405 vdur_copy
["status"] = "BUILD"
406 vdur_copy
["status-detailed"] = None
407 vdur_copy
["ip-address"]: None
408 vdur_copy
["_id"] = str(uuid4())
409 vdur_copy
["count-index"] += count
+ 1
410 vdur_copy
["id"] = "{}-{}".format(vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"])
411 vdur_copy
.pop("vim_info", None)
412 for iface
in vdur_copy
["interfaces"]:
413 if iface
.get("fixed-ip"):
414 iface
["ip-address"] = self
.increment_ip_mac(iface
["ip-address"], count
+1)
416 iface
.pop("ip-address", None)
417 if iface
.get("fixed-mac"):
418 iface
["mac-address"] = self
.increment_ip_mac(iface
["mac-address"], count
+1)
420 iface
.pop("mac-address", None)
421 iface
.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
422 db_vdu_push_list
.append(vdur_copy
)
423 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
425 for vdu_id
, vdu_count
in vdu_delete
.items():
427 indexes_to_delete
= [iv
[0] for iv
in enumerate(db_vnfr
["vdur"]) if iv
[1]["vdu-id-ref"] == vdu_id
]
428 db_update
.update({"vdur.{}.status".format(i
): "DELETING" for i
in indexes_to_delete
[-vdu_count
:]})
430 # it must be deleted one by one because common.db does not allow otherwise
431 vdus_to_delete
= [v
for v
in reversed(db_vnfr
["vdur"]) if v
["vdu-id-ref"] == vdu_id
]
432 for vdu
in vdus_to_delete
[:vdu_count
]:
433 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, None, pull
={"vdur": {"_id": vdu
["_id"]}})
434 db_push
= {"vdur": db_vdu_push_list
} if db_vdu_push_list
else None
435 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
436 # modify passed dictionary db_vnfr
437 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
438 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
440 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
442 Updates database nsr with the RO info for the created vld
443 :param ns_update_nsr: dictionary to be filled with the updated info
444 :param db_nsr: content of db_nsr. This is also modified
445 :param nsr_desc_RO: nsr descriptor from RO
446 :return: Nothing, LcmException is raised on errors
449 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
450 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
451 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
453 vld
["vim-id"] = net_RO
.get("vim_net_id")
454 vld
["name"] = net_RO
.get("vim_name")
455 vld
["status"] = net_RO
.get("status")
456 vld
["status-detailed"] = net_RO
.get("error_msg")
457 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
460 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
462 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
464 for db_vnfr
in db_vnfrs
.values():
465 vnfr_update
= {"status": "ERROR"}
466 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
467 if "status" not in vdur
:
468 vdur
["status"] = "ERROR"
469 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
471 vdur
["status-detailed"] = str(error_text
)
472 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
473 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
474 except DbException
as e
:
475 self
.logger
.error("Cannot update vnf. {}".format(e
))
477 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
479 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
480 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
481 :param nsr_desc_RO: nsr descriptor from RO
482 :return: Nothing, LcmException is raised on errors
484 for vnf_index
, db_vnfr
in db_vnfrs
.items():
485 for vnf_RO
in nsr_desc_RO
["vnfs"]:
486 if vnf_RO
["member_vnf_index"] != vnf_index
:
489 if vnf_RO
.get("ip_address"):
490 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
491 elif not db_vnfr
.get("ip-address"):
492 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
493 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
495 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
496 vdur_RO_count_index
= 0
497 if vdur
.get("pdu-type"):
499 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
500 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
502 if vdur
["count-index"] != vdur_RO_count_index
:
503 vdur_RO_count_index
+= 1
505 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
506 if vdur_RO
.get("ip_address"):
507 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
509 vdur
["ip-address"] = None
510 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
511 vdur
["name"] = vdur_RO
.get("vim_name")
512 vdur
["status"] = vdur_RO
.get("status")
513 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
514 for ifacer
in get_iterable(vdur
, "interfaces"):
515 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
516 if ifacer
["name"] == interface_RO
.get("internal_name"):
517 ifacer
["ip-address"] = interface_RO
.get("ip_address")
518 ifacer
["mac-address"] = interface_RO
.get("mac_address")
521 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
523 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
524 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
527 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
528 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
530 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
531 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
532 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
534 vld
["vim-id"] = net_RO
.get("vim_net_id")
535 vld
["name"] = net_RO
.get("vim_name")
536 vld
["status"] = net_RO
.get("status")
537 vld
["status-detailed"] = net_RO
.get("error_msg")
538 vnfr_update
["vld.{}".format(vld_index
)] = vld
541 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
542 vnf_index
, vld
["id"]))
544 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
548 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
550 def _get_ns_config_info(self
, nsr_id
):
552 Generates a mapping between vnf,vdu elements and the N2VC id
553 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
554 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
555 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
556 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
558 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
559 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
561 ns_config_info
= {"osm-config-mapping": mapping
}
562 for vca
in vca_deployed_list
:
563 if not vca
["member-vnf-index"]:
565 if not vca
["vdu_id"]:
566 mapping
[vca
["member-vnf-index"]] = vca
["application"]
568 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
570 return ns_config_info
572 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
573 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
577 def get_vim_account(vim_account_id
):
579 if vim_account_id
in db_vims
:
580 return db_vims
[vim_account_id
]
581 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
582 db_vims
[vim_account_id
] = db_vim
585 # modify target_vld info with instantiation parameters
586 def parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, target_sdn
):
587 if vld_params
.get("ip-profile"):
588 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
["ip-profile"]
589 if vld_params
.get("provider-network"):
590 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
["provider-network"]
591 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
592 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
["provider-network"]["sdn-ports"]
593 if vld_params
.get("wimAccountId"):
594 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
595 target_vld
["vim_info"][target_wim
] = {}
596 for param
in ("vim-network-name", "vim-network-id"):
597 if vld_params
.get(param
):
598 if isinstance(vld_params
[param
], dict):
599 for vim
, vim_net
in vld_params
[param
]:
600 other_target_vim
= "vim:" + vim
601 populate_dict(target_vld
["vim_info"], (other_target_vim
, param
.replace("-", "_")), vim_net
)
602 else: # isinstance str
603 target_vld
["vim_info"][target_vim
][param
.replace("-", "_")] = vld_params
[param
]
604 if vld_params
.get("common_id"):
605 target_vld
["common_id"] = vld_params
.get("common_id")
607 nslcmop_id
= db_nslcmop
["_id"]
609 "name": db_nsr
["name"],
612 "image": deepcopy(db_nsr
["image"]),
613 "flavor": deepcopy(db_nsr
["flavor"]),
614 "action_id": nslcmop_id
,
615 "cloud_init_content": {},
617 for image
in target
["image"]:
618 image
["vim_info"] = {}
619 for flavor
in target
["flavor"]:
620 flavor
["vim_info"] = {}
622 if db_nslcmop
.get("lcmOperationType") != "instantiate":
623 # get parameters of instantiation:
624 db_nslcmop_instantiate
= self
.db
.get_list("nslcmops", {"nsInstanceId": db_nslcmop
["nsInstanceId"],
625 "lcmOperationType": "instantiate"})[-1]
626 ns_params
= db_nslcmop_instantiate
.get("operationParams")
628 ns_params
= db_nslcmop
.get("operationParams")
629 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
630 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
633 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
634 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
638 "mgmt-network": vld
.get("mgmt-network", False),
639 "type": vld
.get("type"),
642 "vim_network_name": vld
.get("vim-network-name"),
643 "vim_account_id": ns_params
["vimAccountId"]
647 # check if this network needs SDN assist
648 if vld
.get("pci-interfaces"):
649 db_vim
= VimAccountDB
.get_vim_account_with_id(target_vld
["vim_info"][0]["vim_account_id"])
650 sdnc_id
= db_vim
["config"].get("sdn-controller")
652 target_vld
["vim_info"].append({"sdnc_id": sdnc_id
})
654 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
655 for nsd_vnf_profile
in nsd_vnf_profiles
:
656 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
657 if cp
["virtual-link-profile-id"] == vld
["id"]:
658 cp2target
["member_vnf:{}.{}".format(
659 cp
["constituent-cpd-id"][0]["constituent-base-element-id"],
660 cp
["constituent-cpd-id"][0]["constituent-cpd-id"]
661 )] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
663 # check at nsd descriptor, if there is an ip-profile
665 virtual_link_profiles
= get_virtual_link_profiles(nsd
)
667 for vlp
in virtual_link_profiles
:
668 ip_profile
= find_in_list(nsd
["ip-profiles"],
669 lambda profile
: profile
["name"] == vlp
["ip-profile-ref"])
670 vld_params
["ip-profile"] = ip_profile
["ip-profile-params"]
671 # update vld_params with instantiation params
672 vld_instantiation_params
= find_in_list(get_iterable(ns_params
, "vld"),
673 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]))
674 if vld_instantiation_params
:
675 vld_params
.update(vld_instantiation_params
)
676 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
677 target
["ns"]["vld"].append(target_vld
)
679 for vnfr
in db_vnfrs
.values():
680 vnfd
= find_in_list(db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"])
681 vnf_params
= find_in_list(get_iterable(ns_params
, "vnf"),
682 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"])
683 target_vnf
= deepcopy(vnfr
)
684 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
685 for vld
in target_vnf
.get("vld", ()):
686 # check if connected to a ns.vld, to fill target'
687 vnf_cp
= find_in_list(vnfd
.get("int-virtual-link-desc", ()),
688 lambda cpd
: cpd
.get("id") == vld
["id"])
690 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
691 if cp2target
.get(ns_cp
):
692 vld
["target"] = cp2target
[ns_cp
]
694 vld
["vim_info"] = {target_vim
: {"vim_network_name": vld
.get("vim-network-name")}}
695 # check if this network needs SDN assist
697 if vld
.get("pci-interfaces"):
698 db_vim
= get_vim_account(vnfr
["vim-account-id"])
699 sdnc_id
= db_vim
["config"].get("sdn-controller")
701 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
702 target_sdn
= "sdn:{}".format(sdnc_id
)
703 vld
["vim_info"][target_sdn
] = {
704 "sdn": True, "target_vim": target_vim
, "vlds": [sdn_vld
], "type": vld
.get("type")}
706 # check at vnfd descriptor, if there is an ip-profile
708 vnfd_vlp
= find_in_list(
709 get_virtual_link_profiles(vnfd
),
710 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"]
712 if vnfd_vlp
and vnfd_vlp
.get("virtual-link-protocol-data") and \
713 vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data"):
714 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"]["l3-protocol-data"]
715 ip_profile_dest_data
= {}
716 if "ip-version" in ip_profile_source_data
:
717 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
["ip-version"]
718 if "cidr" in ip_profile_source_data
:
719 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
["cidr"]
720 if "gateway-ip" in ip_profile_source_data
:
721 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
["gateway-ip"]
722 if "dhcp-enabled" in ip_profile_source_data
:
723 ip_profile_dest_data
["dhcp-params"] = {
724 "enabled": ip_profile_source_data
["dhcp-enabled"]
727 vld_params
["ip-profile"] = ip_profile_dest_data
728 # update vld_params with instantiation params
730 vld_instantiation_params
= find_in_list(get_iterable(vnf_params
, "internal-vld"),
731 lambda i_vld
: i_vld
["name"] == vld
["id"])
732 if vld_instantiation_params
:
733 vld_params
.update(vld_instantiation_params
)
734 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
737 for vdur
in target_vnf
.get("vdur", ()):
738 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
739 continue # This vdu must not be created
740 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
742 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
745 vdu_configuration
= get_vdu_configuration(vnfd
, vdur
["vdu-id-ref"])
746 vnf_configuration
= get_vnf_configuration(vnfd
)
747 if vdu_configuration
and vdu_configuration
.get("config-access") and \
748 vdu_configuration
.get("config-access").get("ssh-access"):
749 vdur
["ssh-keys"] = ssh_keys_all
750 vdur
["ssh-access-required"] = vdu_configuration
["config-access"]["ssh-access"]["required"]
751 elif vnf_configuration
and vnf_configuration
.get("config-access") and \
752 vnf_configuration
.get("config-access").get("ssh-access") and \
753 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
754 vdur
["ssh-keys"] = ssh_keys_all
755 vdur
["ssh-access-required"] = vnf_configuration
["config-access"]["ssh-access"]["required"]
756 elif ssh_keys_instantiation
and \
757 find_in_list(vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")):
758 vdur
["ssh-keys"] = ssh_keys_instantiation
760 self
.logger
.debug("NS > vdur > {}".format(vdur
))
762 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
764 if vdud
.get("cloud-init-file"):
765 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
766 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
767 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
768 base_folder
= vnfd
["_admin"]["storage"]
769 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
770 vdud
.get("cloud-init-file"))
771 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
772 target
["cloud_init_content"][vdur
["cloud-init"]] = ci_file
.read()
773 elif vdud
.get("cloud-init"):
774 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"]))
775 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
776 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
["cloud-init"]
777 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
778 deploy_params_vdu
= self
._format
_additional
_params
(vdur
.get("additionalParams") or {})
779 deploy_params_vdu
["OSM"] = get_osm_params(vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"])
780 vdur
["additionalParams"] = deploy_params_vdu
783 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
784 if target_vim
not in ns_flavor
["vim_info"]:
785 ns_flavor
["vim_info"][target_vim
] = {}
787 ns_image
= target
["image"][int(vdur
["ns-image-id"])]
788 if target_vim
not in ns_image
["vim_info"]:
789 ns_image
["vim_info"][target_vim
] = {}
791 vdur
["vim_info"] = {target_vim
: {}}
792 # instantiation parameters
794 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
796 vdur_list
.append(vdur
)
797 target_vnf
["vdur"] = vdur_list
798 target
["vnf"].append(target_vnf
)
800 desc
= await self
.RO
.deploy(nsr_id
, target
)
801 self
.logger
.debug("RO return > {}".format(desc
))
802 action_id
= desc
["action_id"]
803 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
807 "_admin.deployed.RO.operational-status": "running",
808 "detailed-status": " ".join(stage
)
810 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
811 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
812 self
._write
_op
_status
(nslcmop_id
, stage
)
813 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
816 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
=None, start_time
=None, timeout
=600, stage
=None):
817 detailed_status_old
= None
819 start_time
= start_time
or time()
820 while time() <= start_time
+ timeout
:
821 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
822 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
823 if desc_status
["status"] == "FAILED":
824 raise NgRoException(desc_status
["details"])
825 elif desc_status
["status"] == "BUILD":
827 stage
[2] = "VIM: ({})".format(desc_status
["details"])
828 elif desc_status
["status"] == "DONE":
830 stage
[2] = "Deployed at VIM"
833 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
834 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
835 detailed_status_old
= stage
[2]
836 db_nsr_update
["detailed-status"] = " ".join(stage
)
837 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
838 self
._write
_op
_status
(nslcmop_id
, stage
)
839 await asyncio
.sleep(15, loop
=self
.loop
)
840 else: # timeout_ns_deploy
841 raise NgRoException("Timeout waiting ns to deploy")
843 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
847 start_deploy
= time()
854 "action_id": nslcmop_id
856 desc
= await self
.RO
.deploy(nsr_id
, target
)
857 action_id
= desc
["action_id"]
858 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
859 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
860 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
863 delete_timeout
= 20 * 60 # 20 minutes
864 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
866 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
867 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
869 await self
.RO
.delete(nsr_id
)
870 except Exception as e
:
871 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
872 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
873 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
874 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
875 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
876 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
877 failed_detail
.append("delete conflict: {}".format(e
))
878 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
880 failed_detail
.append("delete error: {}".format(e
))
881 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
884 stage
[2] = "Error deleting from VIM"
886 stage
[2] = "Deleted from VIM"
887 db_nsr_update
["detailed-status"] = " ".join(stage
)
888 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
889 self
._write
_op
_status
(nslcmop_id
, stage
)
892 raise LcmException("; ".join(failed_detail
))
895 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
896 n2vc_key_list
, stage
):
899 :param logging_text: preffix text to use at logging
900 :param nsr_id: nsr identity
901 :param nsd: database content of ns descriptor
902 :param db_nsr: database content of ns record
903 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
905 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
906 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
907 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
908 :return: None or exception
911 start_deploy
= time()
912 ns_params
= db_nslcmop
.get("operationParams")
913 if ns_params
and ns_params
.get("timeout_ns_deploy"):
914 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
916 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
918 # Check for and optionally request placement optimization. Database will be updated if placement activated
919 stage
[2] = "Waiting for Placement."
920 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
921 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
922 for vnfr
in db_vnfrs
.values():
923 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
926 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
928 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
929 db_vnfds
, n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
)
930 except Exception as e
:
931 stage
[2] = "ERROR deploying at VIM"
932 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
933 self
.logger
.error("Error deploying at VIM {}".format(e
),
934 exc_info
=not isinstance(e
, (ROclient
.ROClientException
, LcmException
, DbException
,
938 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
940 Wait for kdu to be up, get ip address
941 :param logging_text: prefix use for logging
948 # self.logger.debug(logging_text + "Starting wait_kdu_up")
951 while nb_tries
< 360:
952 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
953 kdur
= next((x
for x
in get_iterable(db_vnfr
, "kdur") if x
.get("kdu-name") == kdu_name
), None)
955 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
))
956 if kdur
.get("status"):
957 if kdur
["status"] in ("READY", "ENABLED"):
958 return kdur
.get("ip-address")
960 raise LcmException("target KDU={} is in error state".format(kdu_name
))
962 await asyncio
.sleep(10, loop
=self
.loop
)
964 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
966 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
968 Wait for ip addres at RO, and optionally, insert public key in virtual machine
969 :param logging_text: prefix use for logging
974 :param pub_key: public ssh key to inject, None to skip
975 :param user: user to apply the public ssh key
979 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
989 if ro_retries
>= 360: # 1 hour
990 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
992 await asyncio
.sleep(10, loop
=self
.loop
)
995 if not target_vdu_id
:
996 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
998 if not vdu_id
: # for the VNF case
999 if db_vnfr
.get("status") == "ERROR":
1000 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1001 ip_address
= db_vnfr
.get("ip-address")
1004 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1006 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1007 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1009 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1010 vdur
= db_vnfr
["vdur"][0]
1012 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1014 # New generation RO stores information at "vim_info"
1017 if vdur
.get("vim_info"):
1018 target_vim
= next(t
for t
in vdur
["vim_info"]) # there should be only one key
1019 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1020 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE" or ng_ro_status
== "ACTIVE":
1021 ip_address
= vdur
.get("ip-address")
1024 target_vdu_id
= vdur
["vdu-id-ref"]
1025 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1026 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1028 if not target_vdu_id
:
1031 # inject public key into machine
1032 if pub_key
and user
:
1033 self
.logger
.debug(logging_text
+ "Inserting RO key")
1034 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1035 if vdur
.get("pdu-type"):
1036 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1039 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1041 target
= {"action": {"action": "inject_ssh_key", "key": pub_key
, "user": user
},
1042 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1044 desc
= await self
.RO
.deploy(nsr_id
, target
)
1045 action_id
= desc
["action_id"]
1046 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1049 # wait until NS is deployed at RO
1051 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1052 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1055 result_dict
= await self
.RO
.create_action(
1057 item_id_name
=ro_nsr_id
,
1058 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1060 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1061 if not result_dict
or not isinstance(result_dict
, dict):
1062 raise LcmException("Unknown response from RO when injecting key")
1063 for result
in result_dict
.values():
1064 if result
.get("vim_result") == 200:
1067 raise ROclient
.ROClientException("error injecting key: {}".format(
1068 result
.get("description")))
1070 except NgRoException
as e
:
1071 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1072 except ROclient
.ROClientException
as e
:
1074 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1078 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1084 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1086 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1088 my_vca
= vca_deployed_list
[vca_index
]
1089 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1090 # vdu or kdu: no dependencies
1094 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1095 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1096 configuration_status_list
= db_nsr
["configurationStatus"]
1097 for index
, vca_deployed
in enumerate(configuration_status_list
):
1098 if index
== vca_index
:
1101 if not my_vca
.get("member-vnf-index") or \
1102 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1103 internal_status
= configuration_status_list
[index
].get("status")
1104 if internal_status
== 'READY':
1106 elif internal_status
== 'BROKEN':
1107 raise LcmException("Configuration aborted because dependent charm/s has failed")
1111 # no dependencies, return
1113 await asyncio
.sleep(10)
1116 raise LcmException("Configuration aborted because dependent charm/s timeout")
1118 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1119 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
,
1120 ee_config_descriptor
):
1121 nsr_id
= db_nsr
["_id"]
1122 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1123 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1124 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1125 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1127 'collection': 'nsrs',
1128 'filter': {'_id': nsr_id
},
1129 'path': db_update_entry
1135 element_under_configuration
= nsr_id
1139 vnfr_id
= db_vnfr
["_id"]
1140 osm_config
["osm"]["vnf_id"] = vnfr_id
1142 namespace
= "{nsi}.{ns}".format(
1143 nsi
=nsi_id
if nsi_id
else "",
1147 element_type
= 'VNF'
1148 element_under_configuration
= vnfr_id
1149 namespace
+= ".{}".format(vnfr_id
)
1151 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1152 element_type
= 'VDU'
1153 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1154 osm_config
["osm"]["vdu_id"] = vdu_id
1156 namespace
+= ".{}".format(kdu_name
)
1157 element_type
= 'KDU'
1158 element_under_configuration
= kdu_name
1159 osm_config
["osm"]["kdu_name"] = kdu_name
1162 artifact_path
= "{}/{}/{}/{}".format(
1163 base_folder
["folder"],
1164 base_folder
["pkg-dir"],
1165 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1169 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1171 # get initial_config_primitive_list that applies to this element
1172 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1174 self
.logger
.debug("Initial config primitive list > {}".format(initial_config_primitive_list
))
1176 # add config if not present for NS charm
1177 ee_descriptor_id
= ee_config_descriptor
.get("id")
1178 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1179 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list
,
1180 vca_deployed
, ee_descriptor_id
)
1182 self
.logger
.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list
))
1183 # n2vc_redesign STEP 3.1
1184 # find old ee_id if exists
1185 ee_id
= vca_deployed
.get("ee_id")
1188 deep_get(db_vnfr
, ("vim-account-id",)) or
1189 deep_get(deploy_params
, ("OSM", "vim_account_id"))
1191 vca_cloud
, vca_cloud_credential
= self
.get_vca_cloud_and_credentials(vim_account_id
)
1192 vca_k8s_cloud
, vca_k8s_cloud_credential
= self
.get_vca_k8s_cloud_and_credentials(vim_account_id
)
1193 # create or register execution environment in VCA
1194 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1196 self
._write
_configuration
_status
(
1198 vca_index
=vca_index
,
1200 element_under_configuration
=element_under_configuration
,
1201 element_type
=element_type
1204 step
= "create execution environment"
1205 self
.logger
.debug(logging_text
+ step
)
1209 if vca_type
== "k8s_proxy_charm":
1210 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1211 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
1212 namespace
=namespace
,
1213 artifact_path
=artifact_path
,
1215 cloud_name
=vca_k8s_cloud
,
1216 credential_name
=vca_k8s_cloud_credential
,
1218 elif vca_type
== "helm" or vca_type
== "helm-v3":
1219 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1220 namespace
=namespace
,
1223 cloud_name
=vca_cloud
,
1224 credential_name
=vca_cloud_credential
,
1227 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1228 namespace
=namespace
,
1231 cloud_name
=vca_cloud
,
1232 credential_name
=vca_cloud_credential
,
1235 elif vca_type
== "native_charm":
1236 step
= "Waiting to VM being up and getting IP address"
1237 self
.logger
.debug(logging_text
+ step
)
1238 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1239 user
=None, pub_key
=None)
1240 credentials
= {"hostname": rw_mgmt_ip
}
1242 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1243 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1244 # merged. Meanwhile let's get username from initial-config-primitive
1245 if not username
and initial_config_primitive_list
:
1246 for config_primitive
in initial_config_primitive_list
:
1247 for param
in config_primitive
.get("parameter", ()):
1248 if param
["name"] == "ssh-username":
1249 username
= param
["value"]
1252 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1253 "'config-access.ssh-access.default-user'")
1254 credentials
["username"] = username
1255 # n2vc_redesign STEP 3.2
1257 self
._write
_configuration
_status
(
1259 vca_index
=vca_index
,
1260 status
='REGISTERING',
1261 element_under_configuration
=element_under_configuration
,
1262 element_type
=element_type
1265 step
= "register execution environment {}".format(credentials
)
1266 self
.logger
.debug(logging_text
+ step
)
1267 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1268 credentials
=credentials
,
1269 namespace
=namespace
,
1271 cloud_name
=vca_cloud
,
1272 credential_name
=vca_cloud_credential
,
1275 # for compatibility with MON/POL modules, the need model and application name at database
1276 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1277 ee_id_parts
= ee_id
.split('.')
1278 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1279 if len(ee_id_parts
) >= 2:
1280 model_name
= ee_id_parts
[0]
1281 application_name
= ee_id_parts
[1]
1282 db_nsr_update
[db_update_entry
+ "model"] = model_name
1283 db_nsr_update
[db_update_entry
+ "application"] = application_name
1285 # n2vc_redesign STEP 3.3
1286 step
= "Install configuration Software"
1288 self
._write
_configuration
_status
(
1290 vca_index
=vca_index
,
1291 status
='INSTALLING SW',
1292 element_under_configuration
=element_under_configuration
,
1293 element_type
=element_type
,
1294 other_update
=db_nsr_update
1297 # TODO check if already done
1298 self
.logger
.debug(logging_text
+ step
)
1300 if vca_type
== "native_charm":
1301 config_primitive
= next((p
for p
in initial_config_primitive_list
if p
["name"] == "config"), None)
1302 if config_primitive
:
1303 config
= self
._map
_primitive
_params
(
1309 if vca_type
== "lxc_proxy_charm":
1310 if element_type
== "NS":
1311 num_units
= db_nsr
.get("config-units") or 1
1312 elif element_type
== "VNF":
1313 num_units
= db_vnfr
.get("config-units") or 1
1314 elif element_type
== "VDU":
1315 for v
in db_vnfr
["vdur"]:
1316 if vdu_id
== v
["vdu-id-ref"]:
1317 num_units
= v
.get("config-units") or 1
1319 if vca_type
!= "k8s_proxy_charm":
1320 await self
.vca_map
[vca_type
].install_configuration_sw(
1322 artifact_path
=artifact_path
,
1325 num_units
=num_units
,
1328 # write in db flag of configuration_sw already installed
1329 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1331 # add relations for this VCA (wait for other peers related with this VCA)
1332 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1333 vca_index
=vca_index
, vca_type
=vca_type
)
1335 # if SSH access is required, then get execution environment SSH public
1336 # if native charm we have waited already to VM be UP
1337 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1340 # self.logger.debug("get ssh key block")
1341 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1342 # self.logger.debug("ssh key needed")
1343 # Needed to inject a ssh key
1344 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1345 step
= "Install configuration Software, getting public ssh key"
1346 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(ee_id
=ee_id
, db_dict
=db_dict
)
1348 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1350 # self.logger.debug("no need to get ssh key")
1351 step
= "Waiting to VM being up and getting IP address"
1352 self
.logger
.debug(logging_text
+ step
)
1354 # n2vc_redesign STEP 5.1
1355 # wait for RO (ip-address) Insert pub_key into VM
1358 rw_mgmt_ip
= await self
.wait_kdu_up(logging_text
, nsr_id
, vnfr_id
, kdu_name
)
1360 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
,
1361 vdu_index
, user
=user
, pub_key
=pub_key
)
1363 rw_mgmt_ip
= None # This is for a NS configuration
1365 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1367 # store rw_mgmt_ip in deploy params for later replacement
1368 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1370 # n2vc_redesign STEP 6 Execute initial config primitive
1371 step
= 'execute initial config primitive'
1373 # wait for dependent primitives execution (NS -> VNF -> VDU)
1374 if initial_config_primitive_list
:
1375 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1377 # stage, in function of element type: vdu, kdu, vnf or ns
1378 my_vca
= vca_deployed_list
[vca_index
]
1379 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1381 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1382 elif my_vca
.get("member-vnf-index"):
1384 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1387 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1389 self
._write
_configuration
_status
(
1391 vca_index
=vca_index
,
1392 status
='EXECUTING PRIMITIVE'
1395 self
._write
_op
_status
(
1400 check_if_terminated_needed
= True
1401 for initial_config_primitive
in initial_config_primitive_list
:
1402 # adding information on the vca_deployed if it is a NS execution environment
1403 if not vca_deployed
["member-vnf-index"]:
1404 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1405 # TODO check if already done
1406 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1408 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1409 self
.logger
.debug(logging_text
+ step
)
1410 await self
.vca_map
[vca_type
].exec_primitive(
1412 primitive_name
=initial_config_primitive
["name"],
1413 params_dict
=primitive_params_
,
1416 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1417 if check_if_terminated_needed
:
1418 if config_descriptor
.get('terminate-config-primitive'):
1419 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1420 check_if_terminated_needed
= False
1422 # TODO register in database that primitive is done
1424 # STEP 7 Configure metrics
1425 if vca_type
== "helm" or vca_type
== "helm-v3":
1426 prometheus_jobs
= await self
.add_prometheus_metrics(
1428 artifact_path
=artifact_path
,
1429 ee_config_descriptor
=ee_config_descriptor
,
1432 target_ip
=rw_mgmt_ip
,
1435 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "prometheus_jobs": prometheus_jobs
})
1437 step
= "instantiated at VCA"
1438 self
.logger
.debug(logging_text
+ step
)
1440 self
._write
_configuration
_status
(
1442 vca_index
=vca_index
,
1446 except Exception as e
: # TODO not use Exception but N2VC exception
1447 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1448 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1449 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1450 self
._write
_configuration
_status
(
1452 vca_index
=vca_index
,
1455 raise LcmException("{} {}".format(step
, e
)) from e
1457 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1458 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1460 Update db_nsr fields.
1463 :param current_operation:
1464 :param current_operation_id:
1465 :param error_description:
1466 :param error_detail:
1467 :param other_update: Other required changes at database if provided, will be cleared
1471 db_dict
= other_update
or {}
1472 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1473 db_dict
["_admin.current-operation"] = current_operation_id
1474 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1475 db_dict
["currentOperation"] = current_operation
1476 db_dict
["currentOperationID"] = current_operation_id
1477 db_dict
["errorDescription"] = error_description
1478 db_dict
["errorDetail"] = error_detail
1481 db_dict
["nsState"] = ns_state
1482 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1483 except DbException
as e
:
1484 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1486 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1487 operation_state
: str = None, other_update
: dict = None):
1489 db_dict
= other_update
or {}
1490 db_dict
['queuePosition'] = queuePosition
1491 if isinstance(stage
, list):
1492 db_dict
['stage'] = stage
[0]
1493 db_dict
['detailed-status'] = " ".join(stage
)
1494 elif stage
is not None:
1495 db_dict
['stage'] = str(stage
)
1497 if error_message
is not None:
1498 db_dict
['errorMessage'] = error_message
1499 if operation_state
is not None:
1500 db_dict
['operationState'] = operation_state
1501 db_dict
["statusEnteredTime"] = time()
1502 self
.update_db_2("nslcmops", op_id
, db_dict
)
1503 except DbException
as e
:
1504 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1506 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1508 nsr_id
= db_nsr
["_id"]
1509 # configurationStatus
1510 config_status
= db_nsr
.get('configurationStatus')
1512 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1513 enumerate(config_status
) if v
}
1515 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1517 except DbException
as e
:
1518 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1520 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1521 element_under_configuration
: str = None, element_type
: str = None,
1522 other_update
: dict = None):
1524 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1525 # .format(vca_index, status))
1528 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1529 db_dict
= other_update
or {}
1531 db_dict
[db_path
+ 'status'] = status
1532 if element_under_configuration
:
1533 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1535 db_dict
[db_path
+ 'elementType'] = element_type
1536 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1537 except DbException
as e
:
1538 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1539 .format(status
, nsr_id
, vca_index
, e
))
1541 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1543 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1544 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1545 Database is used because the result can be obtained from a different LCM worker in case of HA.
1546 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1547 :param db_nslcmop: database content of nslcmop
1548 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1549 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1550 computed 'vim-account-id'
1553 nslcmop_id
= db_nslcmop
['_id']
1554 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1555 if placement_engine
== "PLA":
1556 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1557 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1558 db_poll_interval
= 5
1559 wait
= db_poll_interval
* 10
1561 while not pla_result
and wait
>= 0:
1562 await asyncio
.sleep(db_poll_interval
)
1563 wait
-= db_poll_interval
1564 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1565 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1568 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1570 for pla_vnf
in pla_result
['vnf']:
1571 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1572 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1575 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1577 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1580 def update_nsrs_with_pla_result(self
, params
):
1582 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1583 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1584 except Exception as e
:
1585 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1587 async def instantiate(self
, nsr_id
, nslcmop_id
):
1590 :param nsr_id: ns instance to deploy
1591 :param nslcmop_id: operation to run
1595 # Try to lock HA task here
1596 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1597 if not task_is_locked_by_me
:
1598 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1601 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1602 self
.logger
.debug(logging_text
+ "Enter")
1604 # get all needed from database
1606 # database nsrs record
1609 # database nslcmops record
1612 # update operation on nsrs
1614 # update operation on nslcmops
1615 db_nslcmop_update
= {}
1617 nslcmop_operation_state
= None
1618 db_vnfrs
= {} # vnf's info indexed by member-index
1620 tasks_dict_info
= {} # from task to info text
1623 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1624 # ^ stage, step, VIM progress
1626 # wait for any previous tasks in process
1627 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1629 stage
[1] = "Sync filesystem from database."
1630 self
.fs
.sync() # TODO, make use of partial sync, only for the needed packages
1632 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1633 stage
[1] = "Reading from database."
1634 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1635 db_nsr_update
["detailed-status"] = "creating"
1636 db_nsr_update
["operational-status"] = "init"
1637 self
._write
_ns
_status
(
1639 ns_state
="BUILDING",
1640 current_operation
="INSTANTIATING",
1641 current_operation_id
=nslcmop_id
,
1642 other_update
=db_nsr_update
1644 self
._write
_op
_status
(
1650 # read from db: operation
1651 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
1652 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1653 ns_params
= db_nslcmop
.get("operationParams")
1654 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1655 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1657 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1660 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
1661 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1662 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
1663 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
1665 # nsr_name = db_nsr["name"] # TODO short-name??
1667 # read from db: vnf's of this ns
1668 stage
[1] = "Getting vnfrs from db."
1669 self
.logger
.debug(logging_text
+ stage
[1])
1670 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
1672 # read from db: vnfd's for every vnf
1673 db_vnfds
= [] # every vnfd data
1675 # for each vnf in ns, read vnfd
1676 for vnfr
in db_vnfrs_list
:
1677 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
1678 vnfd_id
= vnfr
["vnfd-id"]
1679 vnfd_ref
= vnfr
["vnfd-ref"]
1681 # if we haven't this vnfd, read it from db
1682 if vnfd_id
not in db_vnfds
:
1684 stage
[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id
, vnfd_ref
)
1685 self
.logger
.debug(logging_text
+ stage
[1])
1686 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
1689 db_vnfds
.append(vnfd
) # vnfd's indexed by id
1691 # Get or generates the _admin.deployed.VCA list
1692 vca_deployed_list
= None
1693 if db_nsr
["_admin"].get("deployed"):
1694 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
1695 if vca_deployed_list
is None:
1696 vca_deployed_list
= []
1697 configuration_status_list
= []
1698 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1699 db_nsr_update
["configurationStatus"] = configuration_status_list
1700 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1701 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1702 elif isinstance(vca_deployed_list
, dict):
1703 # maintain backward compatibility. Change a dict to list at database
1704 vca_deployed_list
= list(vca_deployed_list
.values())
1705 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1706 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1708 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
1709 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
1710 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
1712 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1713 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1714 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1715 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"})
1717 # n2vc_redesign STEP 2 Deploy Network Scenario
1718 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1719 self
._write
_op
_status
(
1724 stage
[1] = "Deploying KDUs."
1725 # self.logger.debug(logging_text + "Before deploy_kdus")
1726 # Call to deploy_kdus in case exists the "vdu:kdu" param
1727 await self
.deploy_kdus(
1728 logging_text
=logging_text
,
1730 nslcmop_id
=nslcmop_id
,
1733 task_instantiation_info
=tasks_dict_info
,
1736 stage
[1] = "Getting VCA public key."
1737 # n2vc_redesign STEP 1 Get VCA public ssh-key
1738 # feature 1429. Add n2vc public key to needed VMs
1739 n2vc_key
= self
.n2vc
.get_public_key()
1740 n2vc_key_list
= [n2vc_key
]
1741 if self
.vca_config
.get("public_key"):
1742 n2vc_key_list
.append(self
.vca_config
["public_key"])
1744 stage
[1] = "Deploying NS at VIM."
1745 task_ro
= asyncio
.ensure_future(
1746 self
.instantiate_RO(
1747 logging_text
=logging_text
,
1751 db_nslcmop
=db_nslcmop
,
1754 n2vc_key_list
=n2vc_key_list
,
1758 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
1759 tasks_dict_info
[task_ro
] = "Deploying at VIM"
1761 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1762 stage
[1] = "Deploying Execution Environments."
1763 self
.logger
.debug(logging_text
+ stage
[1])
1765 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
1766 for vnf_profile
in get_vnf_profiles(nsd
):
1767 vnfd_id
= vnf_profile
["vnfd-id"]
1768 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
1769 member_vnf_index
= str(vnf_profile
["id"])
1770 db_vnfr
= db_vnfrs
[member_vnf_index
]
1771 base_folder
= vnfd
["_admin"]["storage"]
1777 # Get additional parameters
1778 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
1779 if db_vnfr
.get("additionalParamsForVnf"):
1780 deploy_params
.update(parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy()))
1782 descriptor_config
= get_vnf_configuration(vnfd
)
1783 if descriptor_config
:
1785 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
1788 nslcmop_id
=nslcmop_id
,
1794 member_vnf_index
=member_vnf_index
,
1795 vdu_index
=vdu_index
,
1797 deploy_params
=deploy_params
,
1798 descriptor_config
=descriptor_config
,
1799 base_folder
=base_folder
,
1800 task_instantiation_info
=tasks_dict_info
,
1804 # Deploy charms for each VDU that supports one.
1805 for vdud
in get_vdu_list(vnfd
):
1807 descriptor_config
= get_vdu_configuration(vnfd
, vdu_id
)
1808 vdur
= find_in_list(db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
)
1810 if vdur
.get("additionalParams"):
1811 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
1813 deploy_params_vdu
= deploy_params
1814 deploy_params_vdu
["OSM"] = get_osm_params(db_vnfr
, vdu_id
, vdu_count_index
=0)
1815 vdud_count
= get_vdu_profile(vnfd
, vdu_id
).get("max-number-of-instances", 1)
1817 self
.logger
.debug("VDUD > {}".format(vdud
))
1818 self
.logger
.debug("Descriptor config > {}".format(descriptor_config
))
1819 if descriptor_config
:
1822 for vdu_index
in range(vdud_count
):
1823 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1825 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1826 member_vnf_index
, vdu_id
, vdu_index
),
1829 nslcmop_id
=nslcmop_id
,
1835 member_vnf_index
=member_vnf_index
,
1836 vdu_index
=vdu_index
,
1838 deploy_params
=deploy_params_vdu
,
1839 descriptor_config
=descriptor_config
,
1840 base_folder
=base_folder
,
1841 task_instantiation_info
=tasks_dict_info
,
1844 for kdud
in get_kdu_list(vnfd
):
1845 kdu_name
= kdud
["name"]
1846 descriptor_config
= get_kdu_configuration(vnfd
, kdu_name
)
1847 if descriptor_config
:
1851 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
1852 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
1853 if kdur
.get("additionalParams"):
1854 deploy_params_kdu
= parse_yaml_strings(kdur
["additionalParams"])
1857 logging_text
=logging_text
,
1860 nslcmop_id
=nslcmop_id
,
1866 member_vnf_index
=member_vnf_index
,
1867 vdu_index
=vdu_index
,
1869 deploy_params
=deploy_params_kdu
,
1870 descriptor_config
=descriptor_config
,
1871 base_folder
=base_folder
,
1872 task_instantiation_info
=tasks_dict_info
,
1876 # Check if this NS has a charm configuration
1877 descriptor_config
= nsd
.get("ns-configuration")
1878 if descriptor_config
and descriptor_config
.get("juju"):
1881 member_vnf_index
= None
1887 # Get additional parameters
1888 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
1889 if db_nsr
.get("additionalParamsForNs"):
1890 deploy_params
.update(parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy()))
1891 base_folder
= nsd
["_admin"]["storage"]
1893 logging_text
=logging_text
,
1896 nslcmop_id
=nslcmop_id
,
1902 member_vnf_index
=member_vnf_index
,
1903 vdu_index
=vdu_index
,
1905 deploy_params
=deploy_params
,
1906 descriptor_config
=descriptor_config
,
1907 base_folder
=base_folder
,
1908 task_instantiation_info
=tasks_dict_info
,
1912 # rest of staff will be done at finally
1914 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
1915 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
1917 except asyncio
.CancelledError
:
1918 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
1919 exc
= "Operation was cancelled"
1920 except Exception as e
:
1921 exc
= traceback
.format_exc()
1922 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
1925 error_list
.append(str(exc
))
1927 # wait for pending tasks
1929 stage
[1] = "Waiting for instantiate pending tasks."
1930 self
.logger
.debug(logging_text
+ stage
[1])
1931 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
1932 stage
, nslcmop_id
, nsr_id
=nsr_id
)
1933 stage
[1] = stage
[2] = ""
1934 except asyncio
.CancelledError
:
1935 error_list
.append("Cancelled")
1936 # TODO cancel all tasks
1937 except Exception as exc
:
1938 error_list
.append(str(exc
))
1940 # update operation-status
1941 db_nsr_update
["operational-status"] = "running"
1942 # let's begin with VCA 'configured' status (later we can change it)
1943 db_nsr_update
["config-status"] = "configured"
1944 for task
, task_name
in tasks_dict_info
.items():
1945 if not task
.done() or task
.cancelled() or task
.exception():
1946 if task_name
.startswith(self
.task_name_deploy_vca
):
1947 # A N2VC task is pending
1948 db_nsr_update
["config-status"] = "failed"
1950 # RO or KDU task is pending
1951 db_nsr_update
["operational-status"] = "failed"
1953 # update status at database
1955 error_detail
= ". ".join(error_list
)
1956 self
.logger
.error(logging_text
+ error_detail
)
1957 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
1958 error_description_nsr
= 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id
, stage
[0])
1960 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
1961 db_nslcmop_update
["detailed-status"] = error_detail
1962 nslcmop_operation_state
= "FAILED"
1966 error_description_nsr
= error_description_nslcmop
= None
1968 db_nsr_update
["detailed-status"] = "Done"
1969 db_nslcmop_update
["detailed-status"] = "Done"
1970 nslcmop_operation_state
= "COMPLETED"
1973 self
._write
_ns
_status
(
1976 current_operation
="IDLE",
1977 current_operation_id
=None,
1978 error_description
=error_description_nsr
,
1979 error_detail
=error_detail
,
1980 other_update
=db_nsr_update
1982 self
._write
_op
_status
(
1985 error_message
=error_description_nslcmop
,
1986 operation_state
=nslcmop_operation_state
,
1987 other_update
=db_nslcmop_update
,
1990 if nslcmop_operation_state
:
1992 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
1993 "operationState": nslcmop_operation_state
},
1995 except Exception as e
:
1996 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
1998 self
.logger
.debug(logging_text
+ "Exit")
1999 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2001 async def _add_vca_relations(self
, logging_text
, nsr_id
, vca_index
: int,
2002 timeout
: int = 3600, vca_type
: str = None) -> bool:
2005 # 1. find all relations for this VCA
2006 # 2. wait for other peers related
2010 vca_type
= vca_type
or "lxc_proxy_charm"
2012 # STEP 1: find all relations for this VCA
2015 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2016 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2019 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2021 # read all ns-configuration relations
2022 ns_relations
= list()
2023 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2025 for r
in db_ns_relations
:
2026 # check if this VCA is in the relation
2027 if my_vca
.get('member-vnf-index') in\
2028 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2029 ns_relations
.append(r
)
2031 # read all vnf-configuration relations
2032 vnf_relations
= list()
2033 db_vnfd_list
= db_nsr
.get('vnfd-id')
2035 for vnfd
in db_vnfd_list
:
2036 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2037 db_vnf_relations
= deep_get(db_vnfd
, ('vnf-configuration', 'relation'))
2038 if db_vnf_relations
:
2039 for r
in db_vnf_relations
:
2040 # check if this VCA is in the relation
2041 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2042 vnf_relations
.append(r
)
2044 # if no relations, terminate
2045 if not ns_relations
and not vnf_relations
:
2046 self
.logger
.debug(logging_text
+ ' No relations')
2049 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2056 if now
- start
>= timeout
:
2057 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2060 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2061 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2063 # for each defined NS relation, find the VCA's related
2064 for r
in ns_relations
.copy():
2065 from_vca_ee_id
= None
2067 from_vca_endpoint
= None
2068 to_vca_endpoint
= None
2069 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2070 for vca
in vca_list
:
2071 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2072 and vca
.get('config_sw_installed'):
2073 from_vca_ee_id
= vca
.get('ee_id')
2074 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2075 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2076 and vca
.get('config_sw_installed'):
2077 to_vca_ee_id
= vca
.get('ee_id')
2078 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2079 if from_vca_ee_id
and to_vca_ee_id
:
2081 await self
.vca_map
[vca_type
].add_relation(
2082 ee_id_1
=from_vca_ee_id
,
2083 ee_id_2
=to_vca_ee_id
,
2084 endpoint_1
=from_vca_endpoint
,
2085 endpoint_2
=to_vca_endpoint
)
2086 # remove entry from relations list
2087 ns_relations
.remove(r
)
2089 # check failed peers
2091 vca_status_list
= db_nsr
.get('configurationStatus')
2093 for i
in range(len(vca_list
)):
2095 vca_status
= vca_status_list
[i
]
2096 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2097 if vca_status
.get('status') == 'BROKEN':
2098 # peer broken: remove relation from list
2099 ns_relations
.remove(r
)
2100 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2101 if vca_status
.get('status') == 'BROKEN':
2102 # peer broken: remove relation from list
2103 ns_relations
.remove(r
)
2108 # for each defined VNF relation, find the VCA's related
2109 for r
in vnf_relations
.copy():
2110 from_vca_ee_id
= None
2112 from_vca_endpoint
= None
2113 to_vca_endpoint
= None
2114 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2115 for vca
in vca_list
:
2116 key_to_check
= "vdu_id"
2117 if vca
.get("vdu_id") is None:
2118 key_to_check
= "vnfd_id"
2119 if vca
.get(key_to_check
) == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2120 from_vca_ee_id
= vca
.get('ee_id')
2121 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2122 if vca
.get(key_to_check
) == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2123 to_vca_ee_id
= vca
.get('ee_id')
2124 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2125 if from_vca_ee_id
and to_vca_ee_id
:
2127 await self
.vca_map
[vca_type
].add_relation(
2128 ee_id_1
=from_vca_ee_id
,
2129 ee_id_2
=to_vca_ee_id
,
2130 endpoint_1
=from_vca_endpoint
,
2131 endpoint_2
=to_vca_endpoint
)
2132 # remove entry from relations list
2133 vnf_relations
.remove(r
)
2135 # check failed peers
2137 vca_status_list
= db_nsr
.get('configurationStatus')
2139 for i
in range(len(vca_list
)):
2141 vca_status
= vca_status_list
[i
]
2142 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2143 if vca_status
.get('status') == 'BROKEN':
2144 # peer broken: remove relation from list
2145 vnf_relations
.remove(r
)
2146 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2147 if vca_status
.get('status') == 'BROKEN':
2148 # peer broken: remove relation from list
2149 vnf_relations
.remove(r
)
2155 await asyncio
.sleep(5.0)
2157 if not ns_relations
and not vnf_relations
:
2158 self
.logger
.debug('Relations added')
2163 except Exception as e
:
2164 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2167 async def _install_kdu(self
, nsr_id
: str, nsr_db_path
: str, vnfr_data
: dict, kdu_index
: int, kdud
: dict,
2168 vnfd
: dict, k8s_instance_info
: dict, k8params
: dict = None, timeout
: int = 600):
2171 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2173 db_dict_install
= {"collection": "nsrs",
2174 "filter": {"_id": nsr_id
},
2175 "path": nsr_db_path
}
2177 kdu_instance
= await self
.k8scluster_map
[k8sclustertype
].install(
2178 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2179 kdu_model
=k8s_instance_info
["kdu-model"],
2182 db_dict
=db_dict_install
,
2184 kdu_name
=k8s_instance_info
["kdu-name"],
2185 namespace
=k8s_instance_info
["namespace"])
2186 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2188 # Obtain services to obtain management service ip
2189 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2190 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2191 kdu_instance
=kdu_instance
,
2192 namespace
=k8s_instance_info
["namespace"])
2194 # Obtain management service info (if exists)
2195 vnfr_update_dict
= {}
2197 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2198 mgmt_services
= [service
for service
in kdud
.get("service", []) if service
.get("mgmt-service")]
2199 for mgmt_service
in mgmt_services
:
2200 for service
in services
:
2201 if service
["name"].startswith(mgmt_service
["name"]):
2202 # Mgmt service found, Obtain service ip
2203 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2204 if isinstance(ip
, list) and len(ip
) == 1:
2207 vnfr_update_dict
["kdur.{}.ip-address".format(kdu_index
)] = ip
2209 # Check if must update also mgmt ip at the vnf
2210 service_external_cp
= mgmt_service
.get("external-connection-point-ref")
2211 if service_external_cp
:
2212 if deep_get(vnfd
, ("mgmt-interface", "cp")) == service_external_cp
:
2213 vnfr_update_dict
["ip-address"] = ip
2217 self
.logger
.warn("Mgmt service name: {} not found".format(mgmt_service
["name"]))
2219 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
2220 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
2222 kdu_config
= kdud
.get("kdu-configuration")
2223 if kdu_config
and kdu_config
.get("initial-config-primitive") and kdu_config
.get("juju") is None:
2224 initial_config_primitive_list
= kdu_config
.get("initial-config-primitive")
2225 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
2227 for initial_config_primitive
in initial_config_primitive_list
:
2228 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, {})
2230 await asyncio
.wait_for(
2231 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
2232 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2233 kdu_instance
=kdu_instance
,
2234 primitive_name
=initial_config_primitive
["name"],
2235 params
=primitive_params_
, db_dict
={}),
2238 except Exception as e
:
2239 # Prepare update db with error and raise exception
2241 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)})
2242 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), {"kdur.{}.status".format(kdu_index
): "ERROR"})
2244 # ignore to keep original exception
2246 # reraise original error
2251 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2252 # Launch kdus if present in the descriptor
2254 k8scluster_id_2_uuic
= {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2256 async def _get_cluster_id(cluster_id
, cluster_type
):
2257 nonlocal k8scluster_id_2_uuic
2258 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2259 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2261 # check if K8scluster is creating and wait look if previous tasks in process
2262 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("k8scluster", cluster_id
)
2264 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name
, cluster_id
)
2265 self
.logger
.debug(logging_text
+ text
)
2266 await asyncio
.wait(task_dependency
, timeout
=3600)
2268 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2269 if not db_k8scluster
:
2270 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2272 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2274 if cluster_type
== "helm-chart-v3":
2276 # backward compatibility for existing clusters that have not been initialized for helm v3
2277 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
2278 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(k8s_credentials
,
2279 reuse_cluster_uuid
=cluster_id
)
2280 db_k8scluster_update
= {}
2281 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
2282 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
2283 db_k8scluster_update
["_admin.helm-chart-v3.created"] = uninstall_sw
2284 db_k8scluster_update
["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2285 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
2286 except Exception as e
:
2287 self
.logger
.error(logging_text
+ "error initializing helm-v3 cluster: {}".format(str(e
)))
2288 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id
,
2291 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2292 format(cluster_id
, cluster_type
))
2293 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2296 logging_text
+= "Deploy kdus: "
2299 db_nsr_update
= {"_admin.deployed.K8s": []}
2300 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2303 updated_cluster_list
= []
2304 updated_v3_cluster_list
= []
2306 for vnfr_data
in db_vnfrs
.values():
2307 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
2308 # Step 0: Prepare and set parameters
2309 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
2310 vnfd_id
= vnfr_data
.get('vnfd-id')
2311 kdud
= next(kdud
for kdud
in db_vnfds
[vnfd_id
]["kdu"] if kdud
["name"] == kdur
["kdu-name"])
2312 namespace
= kdur
.get("k8s-namespace")
2313 if kdur
.get("helm-chart"):
2314 kdumodel
= kdur
["helm-chart"]
2315 # Default version: helm3, if helm-version is v2 assign v2
2316 k8sclustertype
= "helm-chart-v3"
2317 self
.logger
.debug("kdur: {}".format(kdur
))
2318 if kdur
.get("helm-version") and kdur
.get("helm-version") == "v2":
2319 k8sclustertype
= "helm-chart"
2320 elif kdur
.get("juju-bundle"):
2321 kdumodel
= kdur
["juju-bundle"]
2322 k8sclustertype
= "juju-bundle"
2324 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2325 "juju-bundle. Maybe an old NBI version is running".
2326 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2327 # check if kdumodel is a file and exists
2329 storage
= deep_get(db_vnfds
.get(vnfd_id
), ('_admin', 'storage'))
2330 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2331 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2332 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2334 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2335 kdumodel
= self
.fs
.path
+ filename
2336 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2338 except Exception: # it is not a file
2341 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2342 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2343 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2346 if (k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
)\
2347 or (k8sclustertype
== "helm-chart-v3" and cluster_uuid
not in updated_v3_cluster_list
):
2348 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2349 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(cluster_uuid
=cluster_uuid
))
2350 if del_repo_list
or added_repo_dict
:
2351 if k8sclustertype
== "helm-chart":
2352 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2353 updated
= {'_admin.helm_charts_added.' +
2354 item
: name
for item
, name
in added_repo_dict
.items()}
2355 updated_cluster_list
.append(cluster_uuid
)
2356 elif k8sclustertype
== "helm-chart-v3":
2357 unset
= {'_admin.helm_charts_v3_added.' + item
: None for item
in del_repo_list
}
2358 updated
= {'_admin.helm_charts_v3_added.' +
2359 item
: name
for item
, name
in added_repo_dict
.items()}
2360 updated_v3_cluster_list
.append(cluster_uuid
)
2361 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster "
2362 "'{}' to_delete: {}, to_add: {}".
2363 format(k8s_cluster_id
, del_repo_list
, added_repo_dict
))
2364 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2367 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2368 kdur
["kdu-name"], k8s_cluster_id
)
2369 k8s_instance_info
= {"kdu-instance": None,
2370 "k8scluster-uuid": cluster_uuid
,
2371 "k8scluster-type": k8sclustertype
,
2372 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2373 "kdu-name": kdur
["kdu-name"],
2374 "kdu-model": kdumodel
,
2375 "namespace": namespace
}
2376 db_path
= "_admin.deployed.K8s.{}".format(index
)
2377 db_nsr_update
[db_path
] = k8s_instance_info
2378 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2380 task
= asyncio
.ensure_future(
2381 self
._install
_kdu
(nsr_id
, db_path
, vnfr_data
, kdu_index
, kdud
, db_vnfds
[vnfd_id
],
2382 k8s_instance_info
, k8params
=desc_params
, timeout
=600))
2383 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2384 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2388 except (LcmException
, asyncio
.CancelledError
):
2390 except Exception as e
:
2391 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2392 if isinstance(e
, (N2VCException
, DbException
)):
2393 self
.logger
.error(logging_text
+ msg
)
2395 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2396 raise LcmException(msg
)
2399 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2401 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2402 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2403 base_folder
, task_instantiation_info
, stage
):
2404 # launch instantiate_N2VC in a asyncio task and register task object
2405 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2406 # if not found, create one entry and update database
2407 # fill db_nsr._admin.deployed.VCA.<index>
2409 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2410 if descriptor_config
.get("juju"): # There is one execution envioronment of type juju
2411 ee_list
= [descriptor_config
]
2412 elif descriptor_config
.get("execution-environment-list"):
2413 ee_list
= descriptor_config
.get("execution-environment-list")
2414 else: # other types as script are not supported
2417 for ee_item
in ee_list
:
2418 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2419 ee_item
.get("helm-chart")))
2420 ee_descriptor_id
= ee_item
.get("id")
2421 if ee_item
.get("juju"):
2422 vca_name
= ee_item
['juju'].get('charm')
2423 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2424 if ee_item
['juju'].get('cloud') == "k8s":
2425 vca_type
= "k8s_proxy_charm"
2426 elif ee_item
['juju'].get('proxy') is False:
2427 vca_type
= "native_charm"
2428 elif ee_item
.get("helm-chart"):
2429 vca_name
= ee_item
['helm-chart']
2430 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
2433 vca_type
= "helm-v3"
2435 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2439 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2440 if not vca_deployed
:
2442 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2443 vca_deployed
.get("vdu_id") == vdu_id
and \
2444 vca_deployed
.get("kdu_name") == kdu_name
and \
2445 vca_deployed
.get("vdu_count_index", 0) == vdu_index
and \
2446 vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
:
2449 # not found, create one.
2450 target
= "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
2452 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
2454 target
+= "/kdu/{}".format(kdu_name
)
2456 "target_element": target
,
2457 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2458 "member-vnf-index": member_vnf_index
,
2460 "kdu_name": kdu_name
,
2461 "vdu_count_index": vdu_index
,
2462 "operational-status": "init", # TODO revise
2463 "detailed-status": "", # TODO revise
2464 "step": "initial-deploy", # TODO revise
2466 "vdu_name": vdu_name
,
2468 "ee_descriptor_id": ee_descriptor_id
2472 # create VCA and configurationStatus in db
2474 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2475 "configurationStatus.{}".format(vca_index
): dict()
2477 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2479 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2481 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
2482 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
2483 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
2486 task_n2vc
= asyncio
.ensure_future(
2487 self
.instantiate_N2VC(
2488 logging_text
=logging_text
,
2489 vca_index
=vca_index
,
2495 vdu_index
=vdu_index
,
2496 deploy_params
=deploy_params
,
2497 config_descriptor
=descriptor_config
,
2498 base_folder
=base_folder
,
2499 nslcmop_id
=nslcmop_id
,
2503 ee_config_descriptor
=ee_item
2506 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2507 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2508 member_vnf_index
or "", vdu_id
or "")
2511 def _create_nslcmop(nsr_id
, operation
, params
):
2513 Creates a ns-lcm-opp content to be stored at database.
2514 :param nsr_id: internal id of the instance
2515 :param operation: instantiate, terminate, scale, action, ...
2516 :param params: user parameters for the operation
2517 :return: dictionary following SOL005 format
2519 # Raise exception if invalid arguments
2520 if not (nsr_id
and operation
and params
):
2522 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2528 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2529 "operationState": "PROCESSING",
2530 "statusEnteredTime": now
,
2531 "nsInstanceId": nsr_id
,
2532 "lcmOperationType": operation
,
2534 "isAutomaticInvocation": False,
2535 "operationParams": params
,
2536 "isCancelPending": False,
2538 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2539 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2544 def _format_additional_params(self
, params
):
2545 params
= params
or {}
2546 for key
, value
in params
.items():
2547 if str(value
).startswith("!!yaml "):
2548 params
[key
] = yaml
.safe_load(value
[7:])
2551 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2552 primitive
= seq
.get('name')
2553 primitive_params
= {}
2555 "member_vnf_index": vnf_index
,
2556 "primitive": primitive
,
2557 "primitive_params": primitive_params
,
2560 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2564 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2565 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2566 if op
.get('operationState') == 'COMPLETED':
2567 # b. Skip sub-operation
2568 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2569 return self
.SUBOPERATION_STATUS_SKIP
2571 # c. retry executing sub-operation
2572 # The sub-operation exists, and operationState != 'COMPLETED'
2573 # Update operationState = 'PROCESSING' to indicate a retry.
2574 operationState
= 'PROCESSING'
2575 detailed_status
= 'In progress'
2576 self
._update
_suboperation
_status
(
2577 db_nslcmop
, op_index
, operationState
, detailed_status
)
2578 # Return the sub-operation index
2579 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2580 # with arguments extracted from the sub-operation
2583 # Find a sub-operation where all keys in a matching dictionary must match
2584 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2585 def _find_suboperation(self
, db_nslcmop
, match
):
2586 if db_nslcmop
and match
:
2587 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2588 for i
, op
in enumerate(op_list
):
2589 if all(op
.get(k
) == match
[k
] for k
in match
):
2591 return self
.SUBOPERATION_STATUS_NOT_FOUND
2593 # Update status for a sub-operation given its index
2594 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2595 # Update DB for HA tasks
2596 q_filter
= {'_id': db_nslcmop
['_id']}
2597 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2598 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2599 self
.db
.set_one("nslcmops",
2601 update_dict
=update_dict
,
2602 fail_on_empty
=False)
2604 # Add sub-operation, return the index of the added sub-operation
2605 # Optionally, set operationState, detailed-status, and operationType
2606 # Status and type are currently set for 'scale' sub-operations:
2607 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2608 # 'detailed-status' : status message
2609 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2610 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2611 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2612 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2613 RO_nsr_id
=None, RO_scaling_info
=None):
2615 return self
.SUBOPERATION_STATUS_NOT_FOUND
2616 # Get the "_admin.operations" list, if it exists
2617 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2618 op_list
= db_nslcmop_admin
.get('operations')
2619 # Create or append to the "_admin.operations" list
2620 new_op
= {'member_vnf_index': vnf_index
,
2622 'vdu_count_index': vdu_count_index
,
2623 'primitive': primitive
,
2624 'primitive_params': mapped_primitive_params
}
2626 new_op
['operationState'] = operationState
2628 new_op
['detailed-status'] = detailed_status
2630 new_op
['lcmOperationType'] = operationType
2632 new_op
['RO_nsr_id'] = RO_nsr_id
2634 new_op
['RO_scaling_info'] = RO_scaling_info
2636 # No existing operations, create key 'operations' with current operation as first list element
2637 db_nslcmop_admin
.update({'operations': [new_op
]})
2638 op_list
= db_nslcmop_admin
.get('operations')
2640 # Existing operations, append operation to list
2641 op_list
.append(new_op
)
2643 db_nslcmop_update
= {'_admin.operations': op_list
}
2644 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2645 op_index
= len(op_list
) - 1
2648 # Helper methods for scale() sub-operations
2650 # pre-scale/post-scale:
2651 # Check for 3 different cases:
2652 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2653 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2654 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2655 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2656 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2657 # Find this sub-operation
2658 if RO_nsr_id
and RO_scaling_info
:
2659 operationType
= 'SCALE-RO'
2661 'member_vnf_index': vnf_index
,
2662 'RO_nsr_id': RO_nsr_id
,
2663 'RO_scaling_info': RO_scaling_info
,
2667 'member_vnf_index': vnf_index
,
2668 'primitive': vnf_config_primitive
,
2669 'primitive_params': primitive_params
,
2670 'lcmOperationType': operationType
2672 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
2673 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
2674 # a. New sub-operation
2675 # The sub-operation does not exist, add it.
2676 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2677 # The following parameters are set to None for all kind of scaling:
2679 vdu_count_index
= None
2681 if RO_nsr_id
and RO_scaling_info
:
2682 vnf_config_primitive
= None
2683 primitive_params
= None
2686 RO_scaling_info
= None
2687 # Initial status for sub-operation
2688 operationState
= 'PROCESSING'
2689 detailed_status
= 'In progress'
2690 # Add sub-operation for pre/post-scaling (zero or more operations)
2691 self
._add
_suboperation
(db_nslcmop
,
2696 vnf_config_primitive
,
2703 return self
.SUBOPERATION_STATUS_NEW
2705 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2706 # or op_index (operationState != 'COMPLETED')
2707 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
2709 # Function to return execution_environment id
2711 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
2712 # TODO vdu_index_count
2713 for vca
in vca_deployed_list
:
2714 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
2717 async def destroy_N2VC(self
, logging_text
, db_nslcmop
, vca_deployed
, config_descriptor
,
2718 vca_index
, destroy_ee
=True, exec_primitives
=True):
2720 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2721 :param logging_text:
2723 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2724 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2725 :param vca_index: index in the database _admin.deployed.VCA
2726 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2727 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2728 not executed properly
2729 :return: None or exception
2733 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2734 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
2738 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
2740 # execute terminate_primitives
2742 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
2743 config_descriptor
.get("terminate-config-primitive"), vca_deployed
.get("ee_descriptor_id"))
2744 vdu_id
= vca_deployed
.get("vdu_id")
2745 vdu_count_index
= vca_deployed
.get("vdu_count_index")
2746 vdu_name
= vca_deployed
.get("vdu_name")
2747 vnf_index
= vca_deployed
.get("member-vnf-index")
2748 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
2749 for seq
in terminate_primitives
:
2750 # For each sequence in list, get primitive and call _ns_execute_primitive()
2751 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
2752 vnf_index
, seq
.get("name"))
2753 self
.logger
.debug(logging_text
+ step
)
2754 # Create the primitive for each sequence, i.e. "primitive": "touch"
2755 primitive
= seq
.get('name')
2756 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
2759 self
._add
_suboperation
(db_nslcmop
,
2765 mapped_primitive_params
)
2766 # Sub-operations: Call _ns_execute_primitive() instead of action()
2768 result
, result_detail
= await self
._ns
_execute
_primitive
(vca_deployed
["ee_id"], primitive
,
2769 mapped_primitive_params
,
2771 except LcmException
:
2772 # this happens when VCA is not deployed. In this case it is not needed to terminate
2774 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
2775 if result
not in result_ok
:
2776 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2777 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
2778 # set that this VCA do not need terminated
2779 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
2780 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
2782 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
2783 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
2786 await self
.vca_map
[vca_type
].delete_execution_environment(vca_deployed
["ee_id"])
2788 async def _delete_all_N2VC(self
, db_nsr
: dict):
2789 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
2790 namespace
= "." + db_nsr
["_id"]
2792 await self
.n2vc
.delete_namespace(namespace
=namespace
, total_timeout
=self
.timeout_charm_delete
)
2793 except N2VCNotFound
: # already deleted. Skip
2795 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
2797 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
2799 Terminates a deployment from RO
2800 :param logging_text:
2801 :param nsr_deployed: db_nsr._admin.deployed
2804 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2805 this method will update only the index 2, but it will write on database the concatenated content of the list
2810 ro_nsr_id
= ro_delete_action
= None
2811 if nsr_deployed
and nsr_deployed
.get("RO"):
2812 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
2813 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
2816 stage
[2] = "Deleting ns from VIM."
2817 db_nsr_update
["detailed-status"] = " ".join(stage
)
2818 self
._write
_op
_status
(nslcmop_id
, stage
)
2819 self
.logger
.debug(logging_text
+ stage
[2])
2820 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2821 self
._write
_op
_status
(nslcmop_id
, stage
)
2822 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
2823 ro_delete_action
= desc
["action_id"]
2824 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2825 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2826 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2827 if ro_delete_action
:
2828 # wait until NS is deleted from VIM
2829 stage
[2] = "Waiting ns deleted from VIM."
2830 detailed_status_old
= None
2831 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
2833 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2834 self
._write
_op
_status
(nslcmop_id
, stage
)
2836 delete_timeout
= 20 * 60 # 20 minutes
2837 while delete_timeout
> 0:
2838 desc
= await self
.RO
.show(
2840 item_id_name
=ro_nsr_id
,
2841 extra_item
="action",
2842 extra_item_id
=ro_delete_action
)
2845 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
2847 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
2848 if ns_status
== "ERROR":
2849 raise ROclient
.ROClientException(ns_status_info
)
2850 elif ns_status
== "BUILD":
2851 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
2852 elif ns_status
== "ACTIVE":
2853 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2854 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2857 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
2858 if stage
[2] != detailed_status_old
:
2859 detailed_status_old
= stage
[2]
2860 db_nsr_update
["detailed-status"] = " ".join(stage
)
2861 self
._write
_op
_status
(nslcmop_id
, stage
)
2862 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2863 await asyncio
.sleep(5, loop
=self
.loop
)
2865 else: # delete_timeout <= 0:
2866 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
2868 except Exception as e
:
2869 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2870 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2871 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2872 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2873 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2874 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
2875 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2876 failed_detail
.append("delete conflict: {}".format(e
))
2877 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
2879 failed_detail
.append("delete error: {}".format(e
))
2880 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
2883 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
2884 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
2886 stage
[2] = "Deleting nsd from RO."
2887 db_nsr_update
["detailed-status"] = " ".join(stage
)
2888 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2889 self
._write
_op
_status
(nslcmop_id
, stage
)
2890 await self
.RO
.delete("nsd", ro_nsd_id
)
2891 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
2892 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2893 except Exception as e
:
2894 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2895 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2896 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
2897 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2898 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
2899 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2901 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
2902 self
.logger
.error(logging_text
+ failed_detail
[-1])
2904 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
2905 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
2906 if not vnf_deployed
or not vnf_deployed
["id"]:
2909 ro_vnfd_id
= vnf_deployed
["id"]
2910 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2911 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
2912 db_nsr_update
["detailed-status"] = " ".join(stage
)
2913 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2914 self
._write
_op
_status
(nslcmop_id
, stage
)
2915 await self
.RO
.delete("vnfd", ro_vnfd_id
)
2916 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
2917 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2918 except Exception as e
:
2919 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2920 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2921 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
2922 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2923 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
2924 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2926 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
2927 self
.logger
.error(logging_text
+ failed_detail
[-1])
2930 stage
[2] = "Error deleting from VIM"
2932 stage
[2] = "Deleted from VIM"
2933 db_nsr_update
["detailed-status"] = " ".join(stage
)
2934 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2935 self
._write
_op
_status
(nslcmop_id
, stage
)
2938 raise LcmException("; ".join(failed_detail
))
2940 async def terminate(self
, nsr_id
, nslcmop_id
):
2941 # Try to lock HA task here
2942 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
2943 if not task_is_locked_by_me
:
2946 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
2947 self
.logger
.debug(logging_text
+ "Enter")
2948 timeout_ns_terminate
= self
.timeout_ns_terminate
2951 operation_params
= None
2953 error_list
= [] # annotates all failed error messages
2954 db_nslcmop_update
= {}
2955 autoremove
= False # autoremove after terminated
2956 tasks_dict_info
= {}
2958 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2959 # ^ contains [stage, step, VIM-status]
2961 # wait for any previous tasks in process
2962 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
2964 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2965 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2966 operation_params
= db_nslcmop
.get("operationParams") or {}
2967 if operation_params
.get("timeout_ns_terminate"):
2968 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
2969 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2970 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2972 db_nsr_update
["operational-status"] = "terminating"
2973 db_nsr_update
["config-status"] = "terminating"
2974 self
._write
_ns
_status
(
2976 ns_state
="TERMINATING",
2977 current_operation
="TERMINATING",
2978 current_operation_id
=nslcmop_id
,
2979 other_update
=db_nsr_update
2981 self
._write
_op
_status
(
2986 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
2987 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
2990 stage
[1] = "Getting vnf descriptors from db."
2991 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
2992 db_vnfds_from_id
= {}
2993 db_vnfds_from_member_index
= {}
2995 for vnfr
in db_vnfrs_list
:
2996 vnfd_id
= vnfr
["vnfd-id"]
2997 if vnfd_id
not in db_vnfds_from_id
:
2998 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2999 db_vnfds_from_id
[vnfd_id
] = vnfd
3000 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3002 # Destroy individual execution environments when there are terminating primitives.
3003 # Rest of EE will be deleted at once
3004 # TODO - check before calling _destroy_N2VC
3005 # if not operation_params.get("skip_terminate_primitives"):#
3006 # or not vca.get("needed_terminate"):
3007 stage
[0] = "Stage 2/3 execute terminating primitives."
3008 self
.logger
.debug(logging_text
+ stage
[0])
3009 stage
[1] = "Looking execution environment that needs terminate."
3010 self
.logger
.debug(logging_text
+ stage
[1])
3012 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3013 config_descriptor
= None
3014 if not vca
or not vca
.get("ee_id"):
3016 if not vca
.get("member-vnf-index"):
3018 config_descriptor
= db_nsr
.get("ns-configuration")
3019 elif vca
.get("vdu_id"):
3020 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3021 vdud
= next((vdu
for vdu
in db_vnfd
.get("vdu", ()) if vdu
["id"] == vca
.get("vdu_id")), None)
3023 config_descriptor
= vdud
.get("vdu-configuration")
3024 elif vca
.get("kdu_name"):
3025 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3026 kdud
= next((kdu
for kdu
in db_vnfd
.get("kdu", ()) if kdu
["name"] == vca
.get("kdu_name")), None)
3028 config_descriptor
= kdud
.get("kdu-configuration")
3030 config_descriptor
= db_vnfds_from_member_index
[vca
["member-vnf-index"]].get("vnf-configuration")
3031 vca_type
= vca
.get("type")
3032 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3033 vca
.get("needed_terminate"))
3034 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3035 # pending native charms
3036 destroy_ee
= True if vca_type
in ("helm", "helm-v3", "native_charm") else False
3037 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3038 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3039 task
= asyncio
.ensure_future(
3040 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
, vca_index
,
3041 destroy_ee
, exec_terminate_primitives
))
3042 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3044 # wait for pending tasks of terminate primitives
3046 self
.logger
.debug(logging_text
+ 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
3047 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3048 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3050 tasks_dict_info
.clear()
3052 return # raise LcmException("; ".join(error_list))
3054 # remove All execution environments at once
3055 stage
[0] = "Stage 3/3 delete all."
3057 if nsr_deployed
.get("VCA"):
3058 stage
[1] = "Deleting all execution environments."
3059 self
.logger
.debug(logging_text
+ stage
[1])
3060 task_delete_ee
= asyncio
.ensure_future(asyncio
.wait_for(self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
),
3061 timeout
=self
.timeout_charm_delete
))
3062 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3063 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3065 # Delete from k8scluster
3066 stage
[1] = "Deleting KDUs."
3067 self
.logger
.debug(logging_text
+ stage
[1])
3068 # print(nsr_deployed)
3069 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3070 if not kdu
or not kdu
.get("kdu-instance"):
3072 kdu_instance
= kdu
.get("kdu-instance")
3073 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3074 task_delete_kdu_instance
= asyncio
.ensure_future(
3075 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3076 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3077 kdu_instance
=kdu_instance
))
3079 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3080 format(kdu
.get("k8scluster-type")))
3082 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3085 stage
[1] = "Deleting ns from VIM."
3087 task_delete_ro
= asyncio
.ensure_future(
3088 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3090 task_delete_ro
= asyncio
.ensure_future(
3091 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3092 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3094 # rest of staff will be done at finally
3096 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3097 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3099 except asyncio
.CancelledError
:
3100 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3101 exc
= "Operation was cancelled"
3102 except Exception as e
:
3103 exc
= traceback
.format_exc()
3104 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3107 error_list
.append(str(exc
))
3109 # wait for pending tasks
3111 stage
[1] = "Waiting for terminate pending tasks."
3112 self
.logger
.debug(logging_text
+ stage
[1])
3113 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3115 stage
[1] = stage
[2] = ""
3116 except asyncio
.CancelledError
:
3117 error_list
.append("Cancelled")
3118 # TODO cancell all tasks
3119 except Exception as exc
:
3120 error_list
.append(str(exc
))
3121 # update status at database
3123 error_detail
= "; ".join(error_list
)
3124 # self.logger.error(logging_text + error_detail)
3125 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
3126 error_description_nsr
= 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id
, stage
[0])
3128 db_nsr_update
["operational-status"] = "failed"
3129 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3130 db_nslcmop_update
["detailed-status"] = error_detail
3131 nslcmop_operation_state
= "FAILED"
3135 error_description_nsr
= error_description_nslcmop
= None
3136 ns_state
= "NOT_INSTANTIATED"
3137 db_nsr_update
["operational-status"] = "terminated"
3138 db_nsr_update
["detailed-status"] = "Done"
3139 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3140 db_nslcmop_update
["detailed-status"] = "Done"
3141 nslcmop_operation_state
= "COMPLETED"
3144 self
._write
_ns
_status
(
3147 current_operation
="IDLE",
3148 current_operation_id
=None,
3149 error_description
=error_description_nsr
,
3150 error_detail
=error_detail
,
3151 other_update
=db_nsr_update
3153 self
._write
_op
_status
(
3156 error_message
=error_description_nslcmop
,
3157 operation_state
=nslcmop_operation_state
,
3158 other_update
=db_nslcmop_update
,
3160 if ns_state
== "NOT_INSTANTIATED":
3162 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "NOT_INSTANTIATED"})
3163 except DbException
as e
:
3164 self
.logger
.warn(logging_text
+ 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3166 if operation_params
:
3167 autoremove
= operation_params
.get("autoremove", False)
3168 if nslcmop_operation_state
:
3170 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3171 "operationState": nslcmop_operation_state
,
3172 "autoremove": autoremove
},
3174 except Exception as e
:
3175 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3177 self
.logger
.debug(logging_text
+ "Exit")
3178 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3180 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3182 error_detail_list
= []
3184 pending_tasks
= list(created_tasks_info
.keys())
3185 num_tasks
= len(pending_tasks
)
3187 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3188 self
._write
_op
_status
(nslcmop_id
, stage
)
3189 while pending_tasks
:
3191 _timeout
= timeout
+ time_start
- time()
3192 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3193 return_when
=asyncio
.FIRST_COMPLETED
)
3194 num_done
+= len(done
)
3195 if not done
: # Timeout
3196 for task
in pending_tasks
:
3197 new_error
= created_tasks_info
[task
] + ": Timeout"
3198 error_detail_list
.append(new_error
)
3199 error_list
.append(new_error
)
3202 if task
.cancelled():
3205 exc
= task
.exception()
3207 if isinstance(exc
, asyncio
.TimeoutError
):
3209 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3210 error_list
.append(created_tasks_info
[task
])
3211 error_detail_list
.append(new_error
)
3212 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3213 K8sException
, NgRoException
)):
3214 self
.logger
.error(logging_text
+ new_error
)
3216 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3217 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + " " + exc_traceback
)
3219 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3220 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3222 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3223 if nsr_id
: # update also nsr
3224 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3225 "errorDetail": ". ".join(error_detail_list
)})
3226 self
._write
_op
_status
(nslcmop_id
, stage
)
3227 return error_detail_list
3230 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3232 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3233 The default-value is used. If it is between < > it look for a value at instantiation_params
3234 :param primitive_desc: portion of VNFD/NSD that describes primitive
3235 :param params: Params provided by user
3236 :param instantiation_params: Instantiation params provided by user
3237 :return: a dictionary with the calculated params
3239 calculated_params
= {}
3240 for parameter
in primitive_desc
.get("parameter", ()):
3241 param_name
= parameter
["name"]
3242 if param_name
in params
:
3243 calculated_params
[param_name
] = params
[param_name
]
3244 elif "default-value" in parameter
or "value" in parameter
:
3245 if "value" in parameter
:
3246 calculated_params
[param_name
] = parameter
["value"]
3248 calculated_params
[param_name
] = parameter
["default-value"]
3249 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3250 and calculated_params
[param_name
].endswith(">"):
3251 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3252 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3254 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3255 format(calculated_params
[param_name
], primitive_desc
["name"]))
3257 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3258 format(param_name
, primitive_desc
["name"]))
3260 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3261 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
],
3262 default_flow_style
=True, width
=256)
3263 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3264 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3265 if parameter
.get("data-type") == "INTEGER":
3267 calculated_params
[param_name
] = int(calculated_params
[param_name
])
3268 except ValueError: # error converting string to int
3270 "Parameter {} of primitive {} must be integer".format(param_name
, primitive_desc
["name"]))
3271 elif parameter
.get("data-type") == "BOOLEAN":
3272 calculated_params
[param_name
] = not ((str(calculated_params
[param_name
])).lower() == 'false')
3274 # add always ns_config_info if primitive name is config
3275 if primitive_desc
["name"] == "config":
3276 if "ns_config_info" in instantiation_params
:
3277 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3278 return calculated_params
3280 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None,
3281 ee_descriptor_id
=None):
3282 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3283 for vca
in deployed_vca
:
3286 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3288 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3290 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3292 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
3296 # vca_deployed not found
3297 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3298 " is not deployed".format(member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
,
3301 ee_id
= vca
.get("ee_id")
3302 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3304 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3305 "execution environment"
3306 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3307 return ee_id
, vca_type
3309 async def _ns_execute_primitive(self
, ee_id
, primitive
, primitive_params
, retries
=0, retries_interval
=30,
3310 timeout
=None, vca_type
=None, db_dict
=None) -> (str, str):
3312 if primitive
== "config":
3313 primitive_params
= {"params": primitive_params
}
3315 vca_type
= vca_type
or "lxc_proxy_charm"
3319 output
= await asyncio
.wait_for(
3320 self
.vca_map
[vca_type
].exec_primitive(
3322 primitive_name
=primitive
,
3323 params_dict
=primitive_params
,
3324 progress_timeout
=self
.timeout_progress_primitive
,
3325 total_timeout
=self
.timeout_primitive
,
3327 timeout
=timeout
or self
.timeout_primitive
)
3330 except asyncio
.CancelledError
:
3332 except Exception as e
: # asyncio.TimeoutError
3333 if isinstance(e
, asyncio
.TimeoutError
):
3337 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3339 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3341 return 'FAILED', str(e
)
3343 return 'COMPLETED', output
3345 except (LcmException
, asyncio
.CancelledError
):
3347 except Exception as e
:
3348 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3350 async def action(self
, nsr_id
, nslcmop_id
):
3351 # Try to lock HA task here
3352 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3353 if not task_is_locked_by_me
:
3356 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3357 self
.logger
.debug(logging_text
+ "Enter")
3358 # get all needed from database
3362 db_nslcmop_update
= {}
3363 nslcmop_operation_state
= None
3364 error_description_nslcmop
= None
3367 # wait for any previous tasks in process
3368 step
= "Waiting for previous operations to terminate"
3369 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3371 self
._write
_ns
_status
(
3374 current_operation
="RUNNING ACTION",
3375 current_operation_id
=nslcmop_id
3378 step
= "Getting information from database"
3379 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3380 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3382 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3383 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3384 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3385 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3386 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3387 primitive
= db_nslcmop
["operationParams"]["primitive"]
3388 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3389 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3392 step
= "Getting vnfr from database"
3393 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3394 step
= "Getting vnfd from database"
3395 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3397 step
= "Getting nsd from database"
3398 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3400 # for backward compatibility
3401 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3402 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3403 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3404 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3406 # look for primitive
3407 config_primitive_desc
= descriptor_configuration
= None
3409 descriptor_configuration
= get_vdu_configuration(db_vnfd
, vdu_id
)
3411 descriptor_configuration
= get_kdu_configuration(db_vnfd
, kdu_name
)
3413 descriptor_configuration
= get_vnf_configuration(db_vnfd
)
3415 descriptor_configuration
= db_nsd
.get("ns-configuration")
3417 if descriptor_configuration
and descriptor_configuration
.get("config-primitive"):
3418 for config_primitive
in descriptor_configuration
["config-primitive"]:
3419 if config_primitive
["name"] == primitive
:
3420 config_primitive_desc
= config_primitive
3423 if not config_primitive_desc
:
3424 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3425 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3427 primitive_name
= primitive
3428 ee_descriptor_id
= None
3430 primitive_name
= config_primitive_desc
.get("execution-environment-primitive", primitive
)
3431 ee_descriptor_id
= config_primitive_desc
.get("execution-environment-ref")
3435 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3436 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
3438 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3439 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3441 desc_params
= parse_yaml_strings(db_vnfr
.get("additionalParamsForVnf"))
3443 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
3445 if kdu_name
and get_kdu_configuration(db_vnfd
):
3446 kdu_action
= True if not get_kdu_configuration(db_vnfd
)["juju"] else False
3448 # TODO check if ns is in a proper status
3449 if kdu_name
and (primitive_name
in ("upgrade", "rollback", "status") or kdu_action
):
3450 # kdur and desc_params already set from before
3451 if primitive_params
:
3452 desc_params
.update(primitive_params
)
3453 # TODO Check if we will need something at vnf level
3454 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3455 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3458 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3460 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3461 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3462 raise LcmException(msg
)
3464 db_dict
= {"collection": "nsrs",
3465 "filter": {"_id": nsr_id
},
3466 "path": "_admin.deployed.K8s.{}".format(index
)}
3467 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
))
3468 step
= "Executing kdu {}".format(primitive_name
)
3469 if primitive_name
== "upgrade":
3470 if desc_params
.get("kdu_model"):
3471 kdu_model
= desc_params
.get("kdu_model")
3472 del desc_params
["kdu_model"]
3474 kdu_model
= kdu
.get("kdu-model")
3475 parts
= kdu_model
.split(sep
=":")
3477 kdu_model
= parts
[0]
3479 detailed_status
= await asyncio
.wait_for(
3480 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3481 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3482 kdu_instance
=kdu
.get("kdu-instance"),
3483 atomic
=True, kdu_model
=kdu_model
,
3484 params
=desc_params
, db_dict
=db_dict
,
3485 timeout
=timeout_ns_action
),
3486 timeout
=timeout_ns_action
+ 10)
3487 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3488 elif primitive_name
== "rollback":
3489 detailed_status
= await asyncio
.wait_for(
3490 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3491 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3492 kdu_instance
=kdu
.get("kdu-instance"),
3494 timeout
=timeout_ns_action
)
3495 elif primitive_name
== "status":
3496 detailed_status
= await asyncio
.wait_for(
3497 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3498 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3499 kdu_instance
=kdu
.get("kdu-instance")),
3500 timeout
=timeout_ns_action
)
3502 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3503 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3505 detailed_status
= await asyncio
.wait_for(
3506 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3507 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3508 kdu_instance
=kdu_instance
,
3509 primitive_name
=primitive_name
,
3510 params
=params
, db_dict
=db_dict
,
3511 timeout
=timeout_ns_action
),
3512 timeout
=timeout_ns_action
)
3515 nslcmop_operation_state
= 'COMPLETED'
3517 detailed_status
= ''
3518 nslcmop_operation_state
= 'FAILED'
3520 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"], member_vnf_index
=vnf_index
,
3521 vdu_id
=vdu_id
, vdu_count_index
=vdu_count_index
,
3522 ee_descriptor_id
=ee_descriptor_id
)
3523 db_nslcmop_notif
= {"collection": "nslcmops",
3524 "filter": {"_id": nslcmop_id
},
3525 "path": "admin.VCA"}
3526 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3528 primitive
=primitive_name
,
3529 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3530 timeout
=timeout_ns_action
,
3532 db_dict
=db_nslcmop_notif
)
3534 db_nslcmop_update
["detailed-status"] = detailed_status
3535 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3536 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3538 return # database update is called inside finally
3540 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3541 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3543 except asyncio
.CancelledError
:
3544 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3545 exc
= "Operation was cancelled"
3546 except asyncio
.TimeoutError
:
3547 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3549 except Exception as e
:
3550 exc
= traceback
.format_exc()
3551 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3554 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3555 "FAILED {}: {}".format(step
, exc
)
3556 nslcmop_operation_state
= "FAILED"
3558 self
._write
_ns
_status
(
3560 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3561 current_operation
="IDLE",
3562 current_operation_id
=None,
3563 # error_description=error_description_nsr,
3564 # error_detail=error_detail,
3565 other_update
=db_nsr_update
3568 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
3569 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
3571 if nslcmop_operation_state
:
3573 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3574 "operationState": nslcmop_operation_state
},
3576 except Exception as e
:
3577 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3578 self
.logger
.debug(logging_text
+ "Exit")
3579 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3580 return nslcmop_operation_state
, detailed_status
3582 async def scale(self
, nsr_id
, nslcmop_id
):
3583 # Try to lock HA task here
3584 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3585 if not task_is_locked_by_me
:
3588 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3589 stage
= ['', '', '']
3590 # ^ stage, step, VIM progress
3591 self
.logger
.debug(logging_text
+ "Enter")
3592 # get all needed from database
3594 db_nslcmop_update
= {}
3597 # in case of error, indicates what part of scale was failed to put nsr at error status
3598 scale_process
= None
3599 old_operational_status
= ""
3600 old_config_status
= ""
3602 # wait for any previous tasks in process
3603 step
= "Waiting for previous operations to terminate"
3604 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3605 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None,
3606 current_operation
="SCALING", current_operation_id
=nslcmop_id
)
3608 step
= "Getting nslcmop from database"
3609 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3610 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3612 step
= "Getting nsr from database"
3613 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3614 old_operational_status
= db_nsr
["operational-status"]
3615 old_config_status
= db_nsr
["config-status"]
3617 step
= "Parsing scaling parameters"
3618 db_nsr_update
["operational-status"] = "scaling"
3619 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3620 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3623 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3624 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3625 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3626 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3627 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3630 RO_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
3631 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3632 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3633 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
3634 # for backward compatibility
3635 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3636 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3637 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3638 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3640 step
= "Getting vnfr from database"
3641 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3643 step
= "Getting vnfd from database"
3644 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3646 step
= "Getting scaling-group-descriptor"
3647 for scaling_descriptor
in db_vnfd
["scaling-group-descriptor"]:
3648 if scaling_descriptor
["name"] == scaling_group
:
3651 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3652 "at vnfd:scaling-group-descriptor".format(scaling_group
))
3654 step
= "Sending scale order to VIM"
3655 # TODO check if ns is in a proper status
3657 if not db_nsr
["_admin"].get("scaling-group"):
3658 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
3659 admin_scale_index
= 0
3661 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
3662 if admin_scale_info
["name"] == scaling_group
:
3663 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
3665 else: # not found, set index one plus last element and add new entry with the name
3666 admin_scale_index
+= 1
3667 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
3668 RO_scaling_info
= []
3669 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
3670 if scaling_type
== "SCALE_OUT":
3671 # count if max-instance-count is reached
3672 max_instance_count
= scaling_descriptor
.get("max-instance-count", 10)
3673 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3674 if nb_scale_op
>= max_instance_count
:
3675 raise LcmException("reached the limit of {} (max-instance-count) "
3676 "scaling-out operations for the "
3677 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
3680 vdu_scaling_info
["scaling_direction"] = "OUT"
3681 vdu_scaling_info
["vdu-create"] = {}
3682 for vdu_scale_info
in scaling_descriptor
["vdu"]:
3683 vdud
= next(vdu
for vdu
in db_vnfd
.get("vdu") if vdu
["id"] == vdu_scale_info
["vdu-id-ref"])
3684 vdu_index
= len([x
for x
in db_vnfr
.get("vdur", ())
3685 if x
.get("vdu-id-ref") == vdu_scale_info
["vdu-id-ref"] and
3686 x
.get("member-vnf-index-ref") == vnf_index
])
3687 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
3689 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"]) or {}
3690 cloud_init_list
= []
3691 for x
in range(vdu_scale_info
.get("count", 1)):
3693 # TODO Information of its own ip is not available because db_vnfr is not updated.
3694 additional_params
["OSM"] = get_osm_params(
3696 vdu_scale_info
["vdu-id-ref"],
3699 cloud_init_list
.append(
3700 self
._parse
_cloud
_init
(
3707 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
3708 "type": "create", "count": vdu_scale_info
.get("count", 1)})
3710 RO_scaling_info
[-1]["cloud_init"] = cloud_init_list
3711 vdu_scaling_info
["vdu-create"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
3713 elif scaling_type
== "SCALE_IN":
3714 # count if min-instance-count is reached
3715 min_instance_count
= 0
3716 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
3717 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
3718 if nb_scale_op
<= min_instance_count
:
3719 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3720 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
3722 vdu_scaling_info
["scaling_direction"] = "IN"
3723 vdu_scaling_info
["vdu-delete"] = {}
3724 for vdu_scale_info
in scaling_descriptor
["vdu"]:
3725 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
3726 "type": "delete", "count": vdu_scale_info
.get("count", 1)})
3727 vdu_scaling_info
["vdu-delete"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
3729 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3730 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
3731 if vdu_scaling_info
["scaling_direction"] == "IN":
3732 for vdur
in reversed(db_vnfr
["vdur"]):
3733 if vdu_delete
.get(vdur
["vdu-id-ref"]):
3734 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
3735 vdu_scaling_info
["vdu"].append({
3736 "name": vdur
.get("name") or vdur
.get("vdu-name"),
3737 "vdu_id": vdur
["vdu-id-ref"],
3740 for interface
in vdur
["interfaces"]:
3741 vdu_scaling_info
["vdu"][-1]["interface"].append({
3742 "name": interface
["name"],
3743 "ip_address": interface
["ip-address"],
3744 "mac_address": interface
.get("mac-address"),
3746 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3749 step
= "Executing pre-scale vnf-config-primitive"
3750 if scaling_descriptor
.get("scaling-config-action"):
3751 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3752 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
3753 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
3754 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3755 step
= db_nslcmop_update
["detailed-status"] = \
3756 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3758 # look for primitive
3759 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
3760 if config_primitive
["name"] == vnf_config_primitive
:
3764 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3765 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3766 "primitive".format(scaling_group
, vnf_config_primitive
))
3768 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3769 if db_vnfr
.get("additionalParamsForVnf"):
3770 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3772 scale_process
= "VCA"
3773 db_nsr_update
["config-status"] = "configuring pre-scaling"
3774 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3776 # Pre-scale retry check: Check if this sub-operation has been executed before
3777 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3778 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
3779 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3780 # Skip sub-operation
3781 result
= 'COMPLETED'
3782 result_detail
= 'Done'
3783 self
.logger
.debug(logging_text
+
3784 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3785 vnf_config_primitive
, result
, result_detail
))
3787 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3788 # New sub-operation: Get index of this sub-operation
3789 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3790 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3791 format(vnf_config_primitive
))
3793 # retry: Get registered params for this existing sub-operation
3794 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3795 vnf_index
= op
.get('member_vnf_index')
3796 vnf_config_primitive
= op
.get('primitive')
3797 primitive_params
= op
.get('primitive_params')
3798 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3799 format(vnf_config_primitive
))
3800 # Execute the primitive, either with new (first-time) or registered (reintent) args
3801 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
3802 primitive_name
= config_primitive
.get("execution-environment-primitive",
3803 vnf_config_primitive
)
3804 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3805 member_vnf_index
=vnf_index
,
3807 vdu_count_index
=None,
3808 ee_descriptor_id
=ee_descriptor_id
)
3809 result
, result_detail
= await self
._ns
_execute
_primitive
(
3810 ee_id
, primitive_name
, primitive_params
, vca_type
)
3811 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3812 vnf_config_primitive
, result
, result_detail
))
3813 # Update operationState = COMPLETED | FAILED
3814 self
._update
_suboperation
_status
(
3815 db_nslcmop
, op_index
, result
, result_detail
)
3817 if result
== "FAILED":
3818 raise LcmException(result_detail
)
3819 db_nsr_update
["config-status"] = old_config_status
3820 scale_process
= None
3823 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
3824 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
3828 scale_process
= "RO"
3829 if self
.ro_config
.get("ng"):
3830 await self
._scale
_ng
_ro
(logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
)
3832 await self
._RO
_scale
(logging_text
, RO_nsr_id
, RO_scaling_info
, db_nslcmop
, db_vnfr
,
3833 db_nslcmop_update
, vdu_scaling_info
)
3834 vdu_scaling_info
.pop("vdu-create", None)
3835 vdu_scaling_info
.pop("vdu-delete", None)
3837 scale_process
= None
3839 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3842 # execute primitive service POST-SCALING
3843 step
= "Executing post-scale vnf-config-primitive"
3844 if scaling_descriptor
.get("scaling-config-action"):
3845 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3846 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
3847 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
3848 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3849 step
= db_nslcmop_update
["detailed-status"] = \
3850 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3852 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3853 if db_vnfr
.get("additionalParamsForVnf"):
3854 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3856 # look for primitive
3857 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
3858 if config_primitive
["name"] == vnf_config_primitive
:
3862 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
3863 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
3864 "config-primitive".format(scaling_group
, vnf_config_primitive
))
3865 scale_process
= "VCA"
3866 db_nsr_update
["config-status"] = "configuring post-scaling"
3867 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3869 # Post-scale retry check: Check if this sub-operation has been executed before
3870 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3871 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
3872 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3873 # Skip sub-operation
3874 result
= 'COMPLETED'
3875 result_detail
= 'Done'
3876 self
.logger
.debug(logging_text
+
3877 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3878 format(vnf_config_primitive
, result
, result_detail
))
3880 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3881 # New sub-operation: Get index of this sub-operation
3882 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3883 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3884 format(vnf_config_primitive
))
3886 # retry: Get registered params for this existing sub-operation
3887 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3888 vnf_index
= op
.get('member_vnf_index')
3889 vnf_config_primitive
= op
.get('primitive')
3890 primitive_params
= op
.get('primitive_params')
3891 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3892 format(vnf_config_primitive
))
3893 # Execute the primitive, either with new (first-time) or registered (reintent) args
3894 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
3895 primitive_name
= config_primitive
.get("execution-environment-primitive",
3896 vnf_config_primitive
)
3897 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3898 member_vnf_index
=vnf_index
,
3900 vdu_count_index
=None,
3901 ee_descriptor_id
=ee_descriptor_id
)
3902 result
, result_detail
= await self
._ns
_execute
_primitive
(
3903 ee_id
, primitive_name
, primitive_params
, vca_type
)
3904 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3905 vnf_config_primitive
, result
, result_detail
))
3906 # Update operationState = COMPLETED | FAILED
3907 self
._update
_suboperation
_status
(
3908 db_nslcmop
, op_index
, result
, result_detail
)
3910 if result
== "FAILED":
3911 raise LcmException(result_detail
)
3912 db_nsr_update
["config-status"] = old_config_status
3913 scale_process
= None
3916 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3917 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
3918 else old_operational_status
3919 db_nsr_update
["config-status"] = old_config_status
3921 except (ROclient
.ROClientException
, DbException
, LcmException
, NgRoException
) as e
:
3922 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3924 except asyncio
.CancelledError
:
3925 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3926 exc
= "Operation was cancelled"
3927 except Exception as e
:
3928 exc
= traceback
.format_exc()
3929 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3931 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE", current_operation_id
=None)
3933 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
3934 nslcmop_operation_state
= "FAILED"
3936 db_nsr_update
["operational-status"] = old_operational_status
3937 db_nsr_update
["config-status"] = old_config_status
3938 db_nsr_update
["detailed-status"] = ""
3940 if "VCA" in scale_process
:
3941 db_nsr_update
["config-status"] = "failed"
3942 if "RO" in scale_process
:
3943 db_nsr_update
["operational-status"] = "failed"
3944 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
3947 error_description_nslcmop
= None
3948 nslcmop_operation_state
= "COMPLETED"
3949 db_nslcmop_update
["detailed-status"] = "Done"
3951 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
3952 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
3954 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE",
3955 current_operation_id
=None, other_update
=db_nsr_update
)
3957 if nslcmop_operation_state
:
3959 msg
= {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
, "operationState": nslcmop_operation_state
}
3960 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
3961 except Exception as e
:
3962 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3963 self
.logger
.debug(logging_text
+ "Exit")
3964 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
3966 async def _scale_ng_ro(self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
):
3967 nsr_id
= db_nslcmop
["nsInstanceId"]
3968 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3971 # read from db: vnfd's for every vnf
3972 db_vnfds
= {} # every vnfd data indexed by vnf id
3975 # for each vnf in ns, read vnfd
3976 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
3977 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
3978 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
3979 # if we haven't this vnfd, read it from db
3980 if vnfd_id
not in db_vnfds
:
3982 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3983 db_vnfds
[vnfd_id
] = vnfd
# vnfd's indexed by id
3984 n2vc_key
= self
.n2vc
.get_public_key()
3985 n2vc_key_list
= [n2vc_key
]
3986 self
.scale_vnfr(db_vnfr
, vdu_scaling_info
.get("vdu-create"), vdu_scaling_info
.get("vdu-delete"),
3988 # db_vnfr has been updated, update db_vnfrs to use it
3989 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
3990 await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, db_nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
3991 db_vnfds
, n2vc_key_list
, stage
=stage
, start_deploy
=time(),
3992 timeout_ns_deploy
=self
.timeout_ns_deploy
)
3993 if vdu_scaling_info
.get("vdu-delete"):
3994 self
.scale_vnfr(db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False)
3996 async def _RO_scale(self
, logging_text
, RO_nsr_id
, RO_scaling_info
, db_nslcmop
, db_vnfr
, db_nslcmop_update
,
3998 nslcmop_id
= db_nslcmop
["_id"]
3999 nsr_id
= db_nslcmop
["nsInstanceId"]
4000 vdu_create
= vdu_scaling_info
.get("vdu-create")
4001 vdu_delete
= vdu_scaling_info
.get("vdu-delete")
4002 # Scale RO retry check: Check if this sub-operation has been executed before
4003 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4004 db_nslcmop
, db_vnfr
["member-vnf-index-ref"], None, None, 'SCALE-RO', RO_nsr_id
, RO_scaling_info
)
4005 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4006 # Skip sub-operation
4007 result
= 'COMPLETED'
4008 result_detail
= 'Done'
4009 self
.logger
.debug(logging_text
+ "Skipped sub-operation RO, result {} {}".format(result
, result_detail
))
4011 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4012 # New sub-operation: Get index of this sub-operation
4013 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4014 self
.logger
.debug(logging_text
+ "New sub-operation RO")
4016 # retry: Get registered params for this existing sub-operation
4017 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4018 RO_nsr_id
= op
.get('RO_nsr_id')
4019 RO_scaling_info
= op
.get('RO_scaling_info')
4020 self
.logger
.debug(logging_text
+ "Sub-operation RO retry")
4022 RO_desc
= await self
.RO
.create_action("ns", RO_nsr_id
, {"vdu-scaling": RO_scaling_info
})
4024 RO_nslcmop_id
= RO_desc
["instance_action_id"]
4025 db_nslcmop_update
["_admin.deploy.RO"] = RO_nslcmop_id
4027 RO_task_done
= False
4028 step
= detailed_status
= "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id
)
4029 detailed_status_old
= None
4030 self
.logger
.debug(logging_text
+ step
)
4032 deployment_timeout
= 1 * 3600 # One hour
4033 while deployment_timeout
> 0:
4034 if not RO_task_done
:
4035 desc
= await self
.RO
.show("ns", item_id_name
=RO_nsr_id
, extra_item
="action",
4036 extra_item_id
=RO_nslcmop_id
)
4039 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4041 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4042 if ns_status
== "ERROR":
4043 raise ROclient
.ROClientException(ns_status_info
)
4044 elif ns_status
== "BUILD":
4045 detailed_status
= step
+ "; {}".format(ns_status_info
)
4046 elif ns_status
== "ACTIVE":
4048 self
.scale_vnfr(db_vnfr
, vdu_create
=vdu_create
, vdu_delete
=vdu_delete
)
4049 step
= detailed_status
= "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id
)
4050 self
.logger
.debug(logging_text
+ step
)
4052 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
4054 desc
= await self
.RO
.show("ns", RO_nsr_id
)
4055 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
4057 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4059 if ns_status
== "ERROR":
4060 raise ROclient
.ROClientException(ns_status_info
)
4061 elif ns_status
== "BUILD":
4062 detailed_status
= step
+ "; {}".format(ns_status_info
)
4063 elif ns_status
== "ACTIVE":
4064 step
= detailed_status
= \
4065 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4067 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4068 self
.ns_update_vnfr({db_vnfr
["member-vnf-index-ref"]: db_vnfr
}, desc
)
4070 except LcmExceptionNoMgmtIP
:
4073 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
4074 if detailed_status
!= detailed_status_old
:
4075 self
._update
_suboperation
_status
(
4076 db_nslcmop
, op_index
, 'COMPLETED', detailed_status
)
4077 detailed_status_old
= db_nslcmop_update
["detailed-status"] = detailed_status
4078 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
4080 await asyncio
.sleep(5, loop
=self
.loop
)
4081 deployment_timeout
-= 5
4082 if deployment_timeout
<= 0:
4083 self
._update
_suboperation
_status
(
4084 db_nslcmop
, nslcmop_id
, op_index
, 'FAILED', "Timeout when waiting for ns to get ready")
4085 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
4087 # update VDU_SCALING_INFO with the obtained ip_addresses
4088 if vdu_scaling_info
["scaling_direction"] == "OUT":
4089 for vdur
in reversed(db_vnfr
["vdur"]):
4090 if vdu_scaling_info
["vdu-create"].get(vdur
["vdu-id-ref"]):
4091 vdu_scaling_info
["vdu-create"][vdur
["vdu-id-ref"]] -= 1
4092 vdu_scaling_info
["vdu"].append({
4093 "name": vdur
["name"] or vdur
.get("vdu-name"),
4094 "vdu_id": vdur
["vdu-id-ref"],
4097 for interface
in vdur
["interfaces"]:
4098 vdu_scaling_info
["vdu"][-1]["interface"].append({
4099 "name": interface
["name"],
4100 "ip_address": interface
["ip-address"],
4101 "mac_address": interface
.get("mac-address"),
4103 self
._update
_suboperation
_status
(db_nslcmop
, op_index
, 'COMPLETED', 'Done')
4105 async def add_prometheus_metrics(self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
):
4106 if not self
.prometheus
:
4108 # look if exist a file called 'prometheus*.j2' and
4109 artifact_content
= self
.fs
.dir_ls(artifact_path
)
4110 job_file
= next((f
for f
in artifact_content
if f
.startswith("prometheus") and f
.endswith(".j2")), None)
4113 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
4117 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
4118 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
4120 vnfr_id
= vnfr_id
.replace("-", "")
4122 "JOB_NAME": vnfr_id
,
4123 "TARGET_IP": target_ip
,
4124 "EXPORTER_POD_IP": host_name
,
4125 "EXPORTER_POD_PORT": host_port
,
4127 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
4128 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4129 for job
in job_list
:
4130 if not isinstance(job
.get("job_name"), str) or vnfr_id
not in job
["job_name"]:
4131 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
4132 job
["nsr_id"] = nsr_id
4133 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
4134 if await self
.prometheus
.update(job_dict
):
4135 return list(job_dict
.keys())
4137 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4139 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4141 :param: vim_account_id: VIM Account ID
4143 :return: (cloud_name, cloud_credential)
4145 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4146 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
4148 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4150 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4152 :param: vim_account_id: VIM Account ID
4154 :return: (cloud_name, cloud_credential)
4156 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4157 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")