1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, TemplateError
, TemplateNotFound
, StrictUndefined
, UndefinedError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from n2vc
.k8s_helm_conn
import K8sHelmConnector
31 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
32 from n2vc
.k8s_juju_conn
import K8sJujuConnector
34 from osm_common
.dbbase
import DbException
35 from osm_common
.fsbase
import FsException
37 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
38 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
40 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
42 from copy
import copy
, deepcopy
43 from http
import HTTPStatus
45 from uuid
import uuid4
47 from random
import randint
49 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
53 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
54 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
55 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
56 timeout_charm_delete
= 10 * 60
57 timeout_primitive
= 30 * 60 # timeout for primitive execution
58 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
60 SUBOPERATION_STATUS_NOT_FOUND
= -1
61 SUBOPERATION_STATUS_NEW
= -2
62 SUBOPERATION_STATUS_SKIP
= -3
63 task_name_deploy_vca
= "Deploying VCA"
65 def __init__(self
, db
, msg
, fs
, lcm_tasks
, config
, loop
, prometheus
=None):
67 Init, Connect to database, filesystem storage, and messaging
68 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
75 logger
=logging
.getLogger('lcm.ns')
79 self
.lcm_tasks
= lcm_tasks
80 self
.timeout
= config
["timeout"]
81 self
.ro_config
= config
["ro_config"]
82 self
.ng_ro
= config
["ro_config"].get("ng")
83 self
.vca_config
= config
["VCA"].copy()
85 # create N2VC connector
86 self
.n2vc
= N2VCJujuConnector(
91 url
='{}:{}'.format(self
.vca_config
['host'], self
.vca_config
['port']),
92 username
=self
.vca_config
.get('user', None),
93 vca_config
=self
.vca_config
,
94 on_update_db
=self
._on
_update
_n
2vc
_db
97 self
.conn_helm_ee
= LCMHelmConn(
104 vca_config
=self
.vca_config
,
105 on_update_db
=self
._on
_update
_n
2vc
_db
108 self
.k8sclusterhelm2
= K8sHelmConnector(
109 kubectl_command
=self
.vca_config
.get("kubectlpath"),
110 helm_command
=self
.vca_config
.get("helmpath"),
117 self
.k8sclusterhelm3
= K8sHelm3Connector(
118 kubectl_command
=self
.vca_config
.get("kubectlpath"),
119 helm_command
=self
.vca_config
.get("helm3path"),
126 self
.k8sclusterjuju
= K8sJujuConnector(
127 kubectl_command
=self
.vca_config
.get("kubectlpath"),
128 juju_command
=self
.vca_config
.get("jujupath"),
134 vca_config
=self
.vca_config
,
137 self
.k8scluster_map
= {
138 "helm-chart": self
.k8sclusterhelm2
,
139 "helm-chart-v3": self
.k8sclusterhelm3
,
140 "chart": self
.k8sclusterhelm3
,
141 "juju-bundle": self
.k8sclusterjuju
,
142 "juju": self
.k8sclusterjuju
,
146 "lxc_proxy_charm": self
.n2vc
,
147 "native_charm": self
.n2vc
,
148 "k8s_proxy_charm": self
.n2vc
,
149 "helm": self
.conn_helm_ee
,
150 "helm-v3": self
.conn_helm_ee
153 self
.prometheus
= prometheus
157 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
159 self
.RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
161 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
163 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
166 # TODO filter RO descriptor fields...
170 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
171 db_dict
['deploymentStatus'] = ro_descriptor
172 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
174 except Exception as e
:
175 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
177 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
):
179 # remove last dot from path (if exists)
180 if path
.endswith('.'):
183 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
184 # .format(table, filter, path, updated_data))
188 nsr_id
= filter.get('_id')
190 # read ns record from database
191 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
192 current_ns_status
= nsr
.get('nsState')
194 # get vca status for NS
195 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False)
199 db_dict
['vcaStatus'] = status_dict
201 # update configurationStatus for this VCA
203 vca_index
= int(path
[path
.rfind(".")+1:])
205 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
206 vca_status
= vca_list
[vca_index
].get('status')
208 configuration_status_list
= nsr
.get('configurationStatus')
209 config_status
= configuration_status_list
[vca_index
].get('status')
211 if config_status
== 'BROKEN' and vca_status
!= 'failed':
212 db_dict
['configurationStatus'][vca_index
] = 'READY'
213 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
214 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
215 except Exception as e
:
216 # not update configurationStatus
217 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
219 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
220 # if nsState = 'DEGRADED' check if all is OK
222 if current_ns_status
in ('READY', 'DEGRADED'):
223 error_description
= ''
225 if status_dict
.get('machines'):
226 for machine_id
in status_dict
.get('machines'):
227 machine
= status_dict
.get('machines').get(machine_id
)
228 # check machine agent-status
229 if machine
.get('agent-status'):
230 s
= machine
.get('agent-status').get('status')
233 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
234 # check machine instance status
235 if machine
.get('instance-status'):
236 s
= machine
.get('instance-status').get('status')
239 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
241 if status_dict
.get('applications'):
242 for app_id
in status_dict
.get('applications'):
243 app
= status_dict
.get('applications').get(app_id
)
244 # check application status
245 if app
.get('status'):
246 s
= app
.get('status').get('status')
249 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
251 if error_description
:
252 db_dict
['errorDescription'] = error_description
253 if current_ns_status
== 'READY' and is_degraded
:
254 db_dict
['nsState'] = 'DEGRADED'
255 if current_ns_status
== 'DEGRADED' and not is_degraded
:
256 db_dict
['nsState'] = 'READY'
259 self
.update_db_2("nsrs", nsr_id
, db_dict
)
261 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
263 except Exception as e
:
264 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
267 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
269 env
= Environment(undefined
=StrictUndefined
)
270 template
= env
.from_string(cloud_init_text
)
271 return template
.render(additional_params
or {})
272 except UndefinedError
as e
:
273 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
274 "file, must be provided in the instantiation parameters inside the "
275 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
))
276 except (TemplateError
, TemplateNotFound
) as e
:
277 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
278 format(vnfd_id
, vdu_id
, e
))
280 def _get_cloud_init(self
, vdu
, vnfd
):
282 cloud_init_content
= cloud_init_file
= None
283 if vdu
.get("cloud-init-file"):
284 base_folder
= vnfd
["_admin"]["storage"]
285 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
286 vdu
["cloud-init-file"])
287 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
288 cloud_init_content
= ci_file
.read()
289 elif vdu
.get("cloud-init"):
290 cloud_init_content
= vdu
["cloud-init"]
292 return cloud_init_content
293 except FsException
as e
:
294 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
295 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
297 def _get_osm_params(self
, db_vnfr
, vdu_id
=None, vdu_count_index
=0):
298 osm_params
= {x
.replace("-", "_"): db_vnfr
[x
] for x
in ("ip-address", "vim-account-id", "vnfd-id", "vnfd-ref")
299 if db_vnfr
.get(x
) is not None}
300 osm_params
["ns_id"] = db_vnfr
["nsr-id-ref"]
301 osm_params
["vnf_id"] = db_vnfr
["_id"]
302 osm_params
["member_vnf_index"] = db_vnfr
["member-vnf-index-ref"]
303 if db_vnfr
.get("vdur"):
304 osm_params
["vdu"] = {}
305 for vdur
in db_vnfr
["vdur"]:
307 "count_index": vdur
["count-index"],
308 "vdu_id": vdur
["vdu-id-ref"],
311 if vdur
.get("ip-address"):
312 vdu
["ip_address"] = vdur
["ip-address"]
313 for iface
in vdur
["interfaces"]:
314 vdu
["interfaces"][iface
["name"]] = \
315 {x
.replace("-", "_"): iface
[x
] for x
in ("mac-address", "ip-address", "vnf-vld-id", "name")
316 if iface
.get(x
) is not None}
317 vdu_id_index
= "{}-{}".format(vdur
["vdu-id-ref"], vdur
["count-index"])
318 osm_params
["vdu"][vdu_id_index
] = vdu
320 osm_params
["vdu_id"] = vdu_id
321 osm_params
["count_index"] = vdu_count_index
324 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
325 vdur
= next(vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"])
326 additional_params
= vdur
.get("additionalParams")
327 return self
._format
_additional
_params
(additional_params
)
329 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
331 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
332 :param vnfd: input vnfd
333 :param new_id: overrides vnf id if provided
334 :param additionalParams: Instantiation params for VNFs provided
335 :param nsrId: Id of the NSR
336 :return: copy of vnfd
338 vnfd_RO
= deepcopy(vnfd
)
339 # remove unused by RO configuration, monitoring, scaling and internal keys
340 vnfd_RO
.pop("_id", None)
341 vnfd_RO
.pop("_admin", None)
342 vnfd_RO
.pop("vnf-configuration", None)
343 vnfd_RO
.pop("monitoring-param", None)
344 vnfd_RO
.pop("scaling-group-descriptor", None)
345 vnfd_RO
.pop("kdu", None)
346 vnfd_RO
.pop("k8s-cluster", None)
348 vnfd_RO
["id"] = new_id
350 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
351 for vdu
in get_iterable(vnfd_RO
, "vdu"):
352 vdu
.pop("cloud-init-file", None)
353 vdu
.pop("cloud-init", None)
356 def _ns_params_2_RO(self
, ns_params
, nsd
, vnfd_dict
, db_vnfrs
, n2vc_key_list
):
358 Creates a RO ns descriptor from OSM ns_instantiate params
359 :param ns_params: OSM instantiate params
360 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
361 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
362 :return: The RO ns descriptor
366 # TODO feature 1417: Check that no instantiation is set over PDU
367 # check if PDU forces a concrete vim-network-id and add it
368 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
370 def vim_account_2_RO(vim_account
):
371 if vim_account
in vim_2_RO
:
372 return vim_2_RO
[vim_account
]
374 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
375 if db_vim
["_admin"]["operationalState"] != "ENABLED":
376 raise LcmException("VIM={} is not available. operationalState={}".format(
377 vim_account
, db_vim
["_admin"]["operationalState"]))
378 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
379 vim_2_RO
[vim_account
] = RO_vim_id
382 def wim_account_2_RO(wim_account
):
383 if isinstance(wim_account
, str):
384 if wim_account
in wim_2_RO
:
385 return wim_2_RO
[wim_account
]
387 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
388 if db_wim
["_admin"]["operationalState"] != "ENABLED":
389 raise LcmException("WIM={} is not available. operationalState={}".format(
390 wim_account
, db_wim
["_admin"]["operationalState"]))
391 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
392 wim_2_RO
[wim_account
] = RO_wim_id
397 def ip_profile_2_RO(ip_profile
):
398 RO_ip_profile
= deepcopy((ip_profile
))
399 if "dns-server" in RO_ip_profile
:
400 if isinstance(RO_ip_profile
["dns-server"], list):
401 RO_ip_profile
["dns-address"] = []
402 for ds
in RO_ip_profile
.pop("dns-server"):
403 RO_ip_profile
["dns-address"].append(ds
['address'])
405 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
406 if RO_ip_profile
.get("ip-version") == "ipv4":
407 RO_ip_profile
["ip-version"] = "IPv4"
408 if RO_ip_profile
.get("ip-version") == "ipv6":
409 RO_ip_profile
["ip-version"] = "IPv6"
410 if "dhcp-params" in RO_ip_profile
:
411 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
417 # "name": ns_params["nsName"],
418 # "description": ns_params.get("nsDescription"),
419 "datacenter": vim_account_2_RO(ns_params
["vimAccountId"]),
420 "wim_account": wim_account_2_RO(ns_params
.get("wimAccountId")),
421 # "scenario": ns_params["nsdId"],
423 # set vim_account of each vnf if different from general vim_account.
424 # Get this information from <vnfr> database content, key vim-account-id
425 # Vim account can be set by placement_engine and it may be different from
426 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
427 for vnf_index
, vnfr
in db_vnfrs
.items():
428 if vnfr
.get("vim-account-id") and vnfr
["vim-account-id"] != ns_params
["vimAccountId"]:
429 populate_dict(RO_ns_params
, ("vnfs", vnf_index
, "datacenter"), vim_account_2_RO(vnfr
["vim-account-id"]))
431 n2vc_key_list
= n2vc_key_list
or []
432 for vnfd_ref
, vnfd
in vnfd_dict
.items():
433 vdu_needed_access
= []
435 if vnfd
.get("vnf-configuration"):
436 ssh_required
= deep_get(vnfd
, ("vnf-configuration", "config-access", "ssh-access", "required"))
437 if ssh_required
and vnfd
.get("mgmt-interface"):
438 if vnfd
["mgmt-interface"].get("vdu-id"):
439 vdu_needed_access
.append(vnfd
["mgmt-interface"]["vdu-id"])
440 elif vnfd
["mgmt-interface"].get("cp"):
441 mgmt_cp
= vnfd
["mgmt-interface"]["cp"]
443 for vdu
in vnfd
.get("vdu", ()):
444 if vdu
.get("vdu-configuration"):
445 ssh_required
= deep_get(vdu
, ("vdu-configuration", "config-access", "ssh-access", "required"))
447 vdu_needed_access
.append(vdu
["id"])
449 for vdu_interface
in vdu
.get("interface"):
450 if vdu_interface
.get("external-connection-point-ref") and \
451 vdu_interface
["external-connection-point-ref"] == mgmt_cp
:
452 vdu_needed_access
.append(vdu
["id"])
456 if vdu_needed_access
:
457 for vnf_member
in nsd
.get("constituent-vnfd"):
458 if vnf_member
["vnfd-id-ref"] != vnfd_ref
:
460 for vdu
in vdu_needed_access
:
461 populate_dict(RO_ns_params
,
462 ("vnfs", vnf_member
["member-vnf-index"], "vdus", vdu
, "mgmt_keys"),
465 for vdu
in get_iterable(vnfd
, "vdu"):
466 cloud_init_text
= self
._get
_cloud
_init
(vdu
, vnfd
)
467 if not cloud_init_text
:
469 for vnf_member
in nsd
.get("constituent-vnfd"):
470 if vnf_member
["vnfd-id-ref"] != vnfd_ref
:
472 db_vnfr
= db_vnfrs
[vnf_member
["member-vnf-index"]]
473 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdu
["id"]) or {}
476 for vdu_index
in range(0, int(vdu
.get("count", 1))):
477 additional_params
["OSM"] = self
._get
_osm
_params
(db_vnfr
, vdu
["id"], vdu_index
)
478 cloud_init_list
.append(self
._parse
_cloud
_init
(cloud_init_text
, additional_params
, vnfd
["id"],
480 populate_dict(RO_ns_params
,
481 ("vnfs", vnf_member
["member-vnf-index"], "vdus", vdu
["id"], "cloud_init"),
484 if ns_params
.get("vduImage"):
485 RO_ns_params
["vduImage"] = ns_params
["vduImage"]
487 if ns_params
.get("ssh_keys"):
488 RO_ns_params
["cloud-config"] = {"key-pairs": ns_params
["ssh_keys"]}
489 for vnf_params
in get_iterable(ns_params
, "vnf"):
490 for constituent_vnfd
in nsd
["constituent-vnfd"]:
491 if constituent_vnfd
["member-vnf-index"] == vnf_params
["member-vnf-index"]:
492 vnf_descriptor
= vnfd_dict
[constituent_vnfd
["vnfd-id-ref"]]
495 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
496 "constituent-vnfd".format(vnf_params
["member-vnf-index"]))
498 for vdu_params
in get_iterable(vnf_params
, "vdu"):
499 # TODO feature 1417: check that this VDU exist and it is not a PDU
500 if vdu_params
.get("volume"):
501 for volume_params
in vdu_params
["volume"]:
502 if volume_params
.get("vim-volume-id"):
503 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
504 vdu_params
["id"], "devices", volume_params
["name"], "vim_id"),
505 volume_params
["vim-volume-id"])
506 if vdu_params
.get("interface"):
507 for interface_params
in vdu_params
["interface"]:
508 if interface_params
.get("ip-address"):
509 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
510 vdu_params
["id"], "interfaces", interface_params
["name"],
512 interface_params
["ip-address"])
513 if interface_params
.get("mac-address"):
514 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
515 vdu_params
["id"], "interfaces", interface_params
["name"],
517 interface_params
["mac-address"])
518 if interface_params
.get("floating-ip-required"):
519 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
520 vdu_params
["id"], "interfaces", interface_params
["name"],
522 interface_params
["floating-ip-required"])
524 for internal_vld_params
in get_iterable(vnf_params
, "internal-vld"):
525 if internal_vld_params
.get("vim-network-name"):
526 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
527 internal_vld_params
["name"], "vim-network-name"),
528 internal_vld_params
["vim-network-name"])
529 if internal_vld_params
.get("vim-network-id"):
530 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
531 internal_vld_params
["name"], "vim-network-id"),
532 internal_vld_params
["vim-network-id"])
533 if internal_vld_params
.get("ip-profile"):
534 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
535 internal_vld_params
["name"], "ip-profile"),
536 ip_profile_2_RO(internal_vld_params
["ip-profile"]))
537 if internal_vld_params
.get("provider-network"):
539 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
540 internal_vld_params
["name"], "provider-network"),
541 internal_vld_params
["provider-network"].copy())
543 for icp_params
in get_iterable(internal_vld_params
, "internal-connection-point"):
546 for vdu_descriptor
in vnf_descriptor
["vdu"]:
547 for vdu_interface
in vdu_descriptor
["interface"]:
548 if vdu_interface
.get("internal-connection-point-ref") == icp_params
["id-ref"]:
549 if icp_params
.get("ip-address"):
550 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
551 vdu_descriptor
["id"], "interfaces",
552 vdu_interface
["name"], "ip_address"),
553 icp_params
["ip-address"])
555 if icp_params
.get("mac-address"):
556 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
557 vdu_descriptor
["id"], "interfaces",
558 vdu_interface
["name"], "mac_address"),
559 icp_params
["mac-address"])
565 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
566 "internal-vld:id-ref={} is not present at vnfd:internal-"
567 "connection-point".format(vnf_params
["member-vnf-index"],
568 icp_params
["id-ref"]))
570 for vld_params
in get_iterable(ns_params
, "vld"):
571 if "ip-profile" in vld_params
:
572 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "ip-profile"),
573 ip_profile_2_RO(vld_params
["ip-profile"]))
575 if vld_params
.get("provider-network"):
577 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "provider-network"),
578 vld_params
["provider-network"].copy())
580 if "wimAccountId" in vld_params
and vld_params
["wimAccountId"] is not None:
581 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "wim_account"),
582 wim_account_2_RO(vld_params
["wimAccountId"])),
583 if vld_params
.get("vim-network-name"):
585 if isinstance(vld_params
["vim-network-name"], dict):
586 for vim_account
, vim_net
in vld_params
["vim-network-name"].items():
587 RO_vld_sites
.append({
588 "netmap-use": vim_net
,
589 "datacenter": vim_account_2_RO(vim_account
)
591 else: # isinstance str
592 RO_vld_sites
.append({"netmap-use": vld_params
["vim-network-name"]})
594 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "sites"), RO_vld_sites
)
596 if vld_params
.get("vim-network-id"):
598 if isinstance(vld_params
["vim-network-id"], dict):
599 for vim_account
, vim_net
in vld_params
["vim-network-id"].items():
600 RO_vld_sites
.append({
601 "netmap-use": vim_net
,
602 "datacenter": vim_account_2_RO(vim_account
)
604 else: # isinstance str
605 RO_vld_sites
.append({"netmap-use": vld_params
["vim-network-id"]})
607 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "sites"), RO_vld_sites
)
608 if vld_params
.get("ns-net"):
609 if isinstance(vld_params
["ns-net"], dict):
610 for vld_id
, instance_scenario_id
in vld_params
["ns-net"].items():
611 RO_vld_ns_net
= {"instance_scenario_id": instance_scenario_id
, "osm_id": vld_id
}
612 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "use-network"), RO_vld_ns_net
)
613 if "vnfd-connection-point-ref" in vld_params
:
614 for cp_params
in vld_params
["vnfd-connection-point-ref"]:
616 for constituent_vnfd
in nsd
["constituent-vnfd"]:
617 if constituent_vnfd
["member-vnf-index"] == cp_params
["member-vnf-index-ref"]:
618 vnf_descriptor
= vnfd_dict
[constituent_vnfd
["vnfd-id-ref"]]
622 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
623 "is not present at nsd:constituent-vnfd".format(cp_params
["member-vnf-index-ref"]))
625 for vdu_descriptor
in vnf_descriptor
["vdu"]:
626 for interface_descriptor
in vdu_descriptor
["interface"]:
627 if interface_descriptor
.get("external-connection-point-ref") == \
628 cp_params
["vnfd-connection-point-ref"]:
635 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
636 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
637 cp_params
["member-vnf-index-ref"],
638 cp_params
["vnfd-connection-point-ref"],
639 vnf_descriptor
["id"]))
640 if cp_params
.get("ip-address"):
641 populate_dict(RO_ns_params
, ("vnfs", cp_params
["member-vnf-index-ref"], "vdus",
642 vdu_descriptor
["id"], "interfaces",
643 interface_descriptor
["name"], "ip_address"),
644 cp_params
["ip-address"])
645 if cp_params
.get("mac-address"):
646 populate_dict(RO_ns_params
, ("vnfs", cp_params
["member-vnf-index-ref"], "vdus",
647 vdu_descriptor
["id"], "interfaces",
648 interface_descriptor
["name"], "mac_address"),
649 cp_params
["mac-address"])
652 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None):
653 # make a copy to do not change
654 vdu_create
= copy(vdu_create
)
655 vdu_delete
= copy(vdu_delete
)
657 vdurs
= db_vnfr
.get("vdur")
660 vdu_index
= len(vdurs
)
663 vdur
= vdurs
[vdu_index
]
664 if vdur
.get("pdu-type"):
666 vdu_id_ref
= vdur
["vdu-id-ref"]
667 if vdu_create
and vdu_create
.get(vdu_id_ref
):
668 vdur_copy
= deepcopy(vdur
)
669 vdur_copy
["status"] = "BUILD"
670 vdur_copy
["status-detailed"] = None
671 vdur_copy
["ip_address"]: None
672 for iface
in vdur_copy
["interfaces"]:
673 iface
["ip-address"] = None
674 iface
["mac-address"] = None
675 iface
.pop("mgmt_vnf", None) # only first vdu can be managment of vnf # TODO ALF
676 for index
in range(0, vdu_create
[vdu_id_ref
]):
677 vdur_copy
["_id"] = str(uuid4())
678 vdur_copy
["count-index"] += 1
679 vdurs
.insert(vdu_index
+1+index
, vdur_copy
)
680 self
.logger
.debug("scale out, adding vdu={}".format(vdur_copy
))
681 vdur_copy
= deepcopy(vdur_copy
)
683 del vdu_create
[vdu_id_ref
]
684 if vdu_delete
and vdu_delete
.get(vdu_id_ref
):
686 vdu_delete
[vdu_id_ref
] -= 1
687 if not vdu_delete
[vdu_id_ref
]:
688 del vdu_delete
[vdu_id_ref
]
689 # check all operations are done
690 if vdu_create
or vdu_delete
:
691 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
694 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
697 vnfr_update
= {"vdur": vdurs
}
698 db_vnfr
["vdur"] = vdurs
699 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
701 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
703 Updates database nsr with the RO info for the created vld
704 :param ns_update_nsr: dictionary to be filled with the updated info
705 :param db_nsr: content of db_nsr. This is also modified
706 :param nsr_desc_RO: nsr descriptor from RO
707 :return: Nothing, LcmException is raised on errors
710 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
711 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
712 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
714 vld
["vim-id"] = net_RO
.get("vim_net_id")
715 vld
["name"] = net_RO
.get("vim_name")
716 vld
["status"] = net_RO
.get("status")
717 vld
["status-detailed"] = net_RO
.get("error_msg")
718 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
721 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
723 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
725 for db_vnfr
in db_vnfrs
.values():
726 vnfr_update
= {"status": "ERROR"}
727 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
728 if "status" not in vdur
:
729 vdur
["status"] = "ERROR"
730 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
732 vdur
["status-detailed"] = str(error_text
)
733 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
734 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
735 except DbException
as e
:
736 self
.logger
.error("Cannot update vnf. {}".format(e
))
738 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
740 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
741 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
742 :param nsr_desc_RO: nsr descriptor from RO
743 :return: Nothing, LcmException is raised on errors
745 for vnf_index
, db_vnfr
in db_vnfrs
.items():
746 for vnf_RO
in nsr_desc_RO
["vnfs"]:
747 if vnf_RO
["member_vnf_index"] != vnf_index
:
750 if vnf_RO
.get("ip_address"):
751 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
752 elif not db_vnfr
.get("ip-address"):
753 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
754 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
756 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
757 vdur_RO_count_index
= 0
758 if vdur
.get("pdu-type"):
760 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
761 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
763 if vdur
["count-index"] != vdur_RO_count_index
:
764 vdur_RO_count_index
+= 1
766 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
767 if vdur_RO
.get("ip_address"):
768 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
770 vdur
["ip-address"] = None
771 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
772 vdur
["name"] = vdur_RO
.get("vim_name")
773 vdur
["status"] = vdur_RO
.get("status")
774 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
775 for ifacer
in get_iterable(vdur
, "interfaces"):
776 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
777 if ifacer
["name"] == interface_RO
.get("internal_name"):
778 ifacer
["ip-address"] = interface_RO
.get("ip_address")
779 ifacer
["mac-address"] = interface_RO
.get("mac_address")
782 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
784 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
785 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
788 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
789 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
791 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
792 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
793 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
795 vld
["vim-id"] = net_RO
.get("vim_net_id")
796 vld
["name"] = net_RO
.get("vim_name")
797 vld
["status"] = net_RO
.get("status")
798 vld
["status-detailed"] = net_RO
.get("error_msg")
799 vnfr_update
["vld.{}".format(vld_index
)] = vld
802 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
803 vnf_index
, vld
["id"]))
805 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
809 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
811 def _get_ns_config_info(self
, nsr_id
):
813 Generates a mapping between vnf,vdu elements and the N2VC id
814 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
815 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
816 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
817 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
819 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
820 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
822 ns_config_info
= {"osm-config-mapping": mapping
}
823 for vca
in vca_deployed_list
:
824 if not vca
["member-vnf-index"]:
826 if not vca
["vdu_id"]:
827 mapping
[vca
["member-vnf-index"]] = vca
["application"]
829 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
831 return ns_config_info
834 def _get_initial_config_primitive_list(desc_primitive_list
, vca_deployed
, ee_descriptor_id
):
836 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
837 primitives as verify-ssh-credentials, or config when needed
838 :param desc_primitive_list: information of the descriptor
839 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
840 this element contains a ssh public key
841 :param ee_descriptor_id: execution environment descriptor id. It is the value of
842 XXX_configuration.execution-environment-list.INDEX.id; it can be None
843 :return: The modified list. Can ba an empty list, but always a list
846 primitive_list
= desc_primitive_list
or []
848 # filter primitives by ee_id
849 primitive_list
= [p
for p
in primitive_list
if p
.get("execution-environment-ref") == ee_descriptor_id
]
853 primitive_list
.sort(key
=lambda val
: int(val
['seq']))
855 # look for primitive config, and get the position. None if not present
856 config_position
= None
857 for index
, primitive
in enumerate(primitive_list
):
858 if primitive
["name"] == "config":
859 config_position
= index
862 # for NS, add always a config primitive if not present (bug 874)
863 if not vca_deployed
["member-vnf-index"] and config_position
is None:
864 primitive_list
.insert(0, {"name": "config", "parameter": []})
866 # TODO revise if needed: for VNF/VDU add verify-ssh-credentials after config
867 if vca_deployed
["member-vnf-index"] and config_position
is not None and vca_deployed
.get("ssh-public-key"):
868 primitive_list
.insert(config_position
+ 1, {"name": "verify-ssh-credentials", "parameter": []})
869 return primitive_list
871 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds_ref
,
872 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
873 nslcmop_id
= db_nslcmop
["_id"]
875 "name": db_nsr
["name"],
878 "image": deepcopy(db_nsr
["image"]),
879 "flavor": deepcopy(db_nsr
["flavor"]),
880 "action_id": nslcmop_id
,
882 for image
in target
["image"]:
883 image
["vim_info"] = []
884 for flavor
in target
["flavor"]:
885 flavor
["vim_info"] = []
887 ns_params
= db_nslcmop
.get("operationParams")
889 if ns_params
.get("ssh_keys"):
890 ssh_keys
+= ns_params
.get("ssh_keys")
892 ssh_keys
+= n2vc_key_list
895 for vld_index
, vld
in enumerate(nsd
.get("vld")):
896 target_vld
= {"id": vld
["id"],
898 "mgmt-network": vld
.get("mgmt-network", False),
899 "type": vld
.get("type"),
900 "vim_info": [{"vim-network-name": vld
.get("vim-network-name"),
901 "vim_account_id": ns_params
["vimAccountId"]}],
903 for cp
in vld
["vnfd-connection-point-ref"]:
904 cp2target
["member_vnf:{}.{}".format(cp
["member-vnf-index-ref"], cp
["vnfd-connection-point-ref"])] = \
905 "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
906 target
["ns"]["vld"].append(target_vld
)
907 for vnfr
in db_vnfrs
.values():
908 vnfd
= db_vnfds_ref
[vnfr
["vnfd-ref"]]
909 target_vnf
= deepcopy(vnfr
)
910 for vld
in target_vnf
.get("vld", ()):
911 # check if connected to a ns.vld
912 vnf_cp
= next((cp
for cp
in vnfd
.get("connection-point", ()) if
913 cp
.get("internal-vld-ref") == vld
["id"]), None)
915 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
916 if cp2target
.get(ns_cp
):
917 vld
["target"] = cp2target
[ns_cp
]
918 vld
["vim_info"] = [{"vim-network-name": vld
.get("vim-network-name"),
919 "vim_account_id": vnfr
["vim-account-id"]}]
921 for vdur
in target_vnf
.get("vdur", ()):
922 vdur
["vim_info"] = [{"vim_account_id": vnfr
["vim-account-id"]}]
923 vdud_index
, vdud
= next(k
for k
in enumerate(vnfd
["vdu"]) if k
[1]["id"] == vdur
["vdu-id-ref"])
924 # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
927 if deep_get(vdud
, ("vdu-configuration", "config-access", "ssh-access", "required")):
928 vdur
["ssh-keys"] = ssh_keys
929 vdur
["ssh-access-required"] = True
930 elif deep_get(vnfd
, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
931 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
932 vdur
["ssh-keys"] = ssh_keys
933 vdur
["ssh-access-required"] = True
936 if vdud
.get("cloud-init-file"):
937 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
938 elif vdud
.get("cloud-init"):
939 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], vdud_index
)
942 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
943 if not next((vi
for vi
in ns_flavor
["vim_info"] if
944 vi
and vi
.get("vim_account_id") == vnfr
["vim-account-id"]), None):
945 ns_flavor
["vim_info"].append({"vim_account_id": vnfr
["vim-account-id"]})
947 ns_image
= target
["image"][int(vdur
["ns-image-id"])]
948 if not next((vi
for vi
in ns_image
["vim_info"] if
949 vi
and vi
.get("vim_account_id") == vnfr
["vim-account-id"]), None):
950 ns_image
["vim_info"].append({"vim_account_id": vnfr
["vim-account-id"]})
952 vdur
["vim_info"] = [{"vim_account_id": vnfr
["vim-account-id"]}]
953 target
["vnf"].append(target_vnf
)
955 desc
= await self
.RO
.deploy(nsr_id
, target
)
956 action_id
= desc
["action_id"]
957 await self
._wait
_ng
_ro
(self
, nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
961 "_admin.deployed.RO.operational-status": "running",
962 "detailed-status": " ".join(stage
)
964 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
965 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
966 self
._write
_op
_status
(nslcmop_id
, stage
)
967 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
970 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
, start_time
, timeout
, stage
):
971 detailed_status_old
= None
973 while time() <= start_time
+ timeout
:
974 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
975 if desc_status
["status"] == "FAILED":
976 raise NgRoException(desc_status
["details"])
977 elif desc_status
["status"] == "BUILD":
978 stage
[2] = "VIM: ({})".format(desc_status
["details"])
979 elif desc_status
["status"] == "DONE":
980 stage
[2] = "Deployed at VIM"
983 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
984 if stage
[2] != detailed_status_old
:
985 detailed_status_old
= stage
[2]
986 db_nsr_update
["detailed-status"] = " ".join(stage
)
987 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
988 self
._write
_op
_status
(nslcmop_id
, stage
)
989 await asyncio
.sleep(5, loop
=self
.loop
)
990 else: # timeout_ns_deploy
991 raise NgRoException("Timeout waiting ns to deploy")
993 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
997 start_deploy
= time()
1005 desc
= await self
.RO
.deploy(nsr_id
, target
)
1006 action_id
= desc
["action_id"]
1007 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
1008 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
1009 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
1012 delete_timeout
= 20 * 60 # 20 minutes
1013 await self
._wait
_ng
_ro
(self
, nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
1015 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1016 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1018 await self
.RO
.delete(nsr_id
)
1019 except Exception as e
:
1020 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
1021 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1022 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
1023 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
1024 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
1025 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
1026 failed_detail
.append("delete conflict: {}".format(e
))
1027 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
1029 failed_detail
.append("delete error: {}".format(e
))
1030 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
1033 stage
[2] = "Error deleting from VIM"
1035 stage
[2] = "Deleted from VIM"
1036 db_nsr_update
["detailed-status"] = " ".join(stage
)
1037 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1038 self
._write
_op
_status
(nslcmop_id
, stage
)
1041 raise LcmException("; ".join(failed_detail
))
1044 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds_ref
,
1045 n2vc_key_list
, stage
):
1048 :param logging_text: preffix text to use at logging
1049 :param nsr_id: nsr identity
1050 :param nsd: database content of ns descriptor
1051 :param db_nsr: database content of ns record
1052 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1054 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1055 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1056 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1057 :return: None or exception
1061 RO_descriptor_number
= 0 # number of descriptors created at RO
1062 vnf_index_2_RO_id
= {} # map between vnfd/nsd id to the id used at RO
1063 nslcmop_id
= db_nslcmop
["_id"]
1064 start_deploy
= time()
1065 ns_params
= db_nslcmop
.get("operationParams")
1066 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1067 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1069 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1071 # Check for and optionally request placement optimization. Database will be updated if placement activated
1072 stage
[2] = "Waiting for Placement."
1073 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1074 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1075 for vnfr
in db_vnfrs
.values():
1076 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1079 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1082 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
1083 db_vnfds_ref
, n2vc_key_list
, stage
, start_deploy
,
1086 # get vnfds, instantiate at RO
1087 for c_vnf
in nsd
.get("constituent-vnfd", ()):
1088 member_vnf_index
= c_vnf
["member-vnf-index"]
1089 vnfd
= db_vnfds_ref
[c_vnf
['vnfd-id-ref']]
1090 vnfd_ref
= vnfd
["id"]
1092 stage
[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref
, member_vnf_index
)
1093 db_nsr_update
["detailed-status"] = " ".join(stage
)
1094 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1095 self
._write
_op
_status
(nslcmop_id
, stage
)
1097 # self.logger.debug(logging_text + stage[2])
1098 vnfd_id_RO
= "{}.{}.{}".format(nsr_id
, RO_descriptor_number
, member_vnf_index
[:23])
1099 vnf_index_2_RO_id
[member_vnf_index
] = vnfd_id_RO
1100 RO_descriptor_number
+= 1
1102 # look position at deployed.RO.vnfd if not present it will be appended at the end
1103 for index
, vnf_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["RO"]["vnfd"]):
1104 if vnf_deployed
["member-vnf-index"] == member_vnf_index
:
1107 index
= len(db_nsr
["_admin"]["deployed"]["RO"]["vnfd"])
1108 db_nsr
["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1111 RO_update
= {"member-vnf-index": member_vnf_index
}
1112 vnfd_list
= await self
.RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
1114 RO_update
["id"] = vnfd_list
[0]["uuid"]
1115 self
.logger
.debug(logging_text
+ "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1116 format(vnfd_ref
, member_vnf_index
, vnfd_list
[0]["uuid"]))
1118 vnfd_RO
= self
.vnfd2RO(vnfd
, vnfd_id_RO
, db_vnfrs
[c_vnf
["member-vnf-index"]].
1119 get("additionalParamsForVnf"), nsr_id
)
1120 desc
= await self
.RO
.create("vnfd", descriptor
=vnfd_RO
)
1121 RO_update
["id"] = desc
["uuid"]
1122 self
.logger
.debug(logging_text
+ "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1123 vnfd_ref
, member_vnf_index
, desc
["uuid"]))
1124 db_nsr_update
["_admin.deployed.RO.vnfd.{}".format(index
)] = RO_update
1125 db_nsr
["_admin"]["deployed"]["RO"]["vnfd"][index
] = RO_update
1130 stage
[2] = "Creating nsd={} at RO".format(nsd_ref
)
1131 db_nsr_update
["detailed-status"] = " ".join(stage
)
1132 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1133 self
._write
_op
_status
(nslcmop_id
, stage
)
1135 # self.logger.debug(logging_text + stage[2])
1136 RO_osm_nsd_id
= "{}.{}.{}".format(nsr_id
, RO_descriptor_number
, nsd_ref
[:23])
1137 RO_descriptor_number
+= 1
1138 nsd_list
= await self
.RO
.get_list("nsd", filter_by
={"osm_id": RO_osm_nsd_id
})
1140 db_nsr_update
["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid
= nsd_list
[0]["uuid"]
1141 self
.logger
.debug(logging_text
+ "nsd={} exists at RO. Using RO_id={}".format(
1142 nsd_ref
, RO_nsd_uuid
))
1144 nsd_RO
= deepcopy(nsd
)
1145 nsd_RO
["id"] = RO_osm_nsd_id
1146 nsd_RO
.pop("_id", None)
1147 nsd_RO
.pop("_admin", None)
1148 for c_vnf
in nsd_RO
.get("constituent-vnfd", ()):
1149 member_vnf_index
= c_vnf
["member-vnf-index"]
1150 c_vnf
["vnfd-id-ref"] = vnf_index_2_RO_id
[member_vnf_index
]
1151 for c_vld
in nsd_RO
.get("vld", ()):
1152 for cp
in c_vld
.get("vnfd-connection-point-ref", ()):
1153 member_vnf_index
= cp
["member-vnf-index-ref"]
1154 cp
["vnfd-id-ref"] = vnf_index_2_RO_id
[member_vnf_index
]
1156 desc
= await self
.RO
.create("nsd", descriptor
=nsd_RO
)
1157 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1158 db_nsr_update
["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid
= desc
["uuid"]
1159 self
.logger
.debug(logging_text
+ "nsd={} created at RO. RO_id={}".format(nsd_ref
, RO_nsd_uuid
))
1160 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1163 stage
[2] = "Creating nsd={} at RO".format(nsd_ref
)
1164 db_nsr_update
["detailed-status"] = " ".join(stage
)
1165 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1166 self
._write
_op
_status
(nslcmop_id
, stage
)
1168 # if present use it unless in error status
1169 RO_nsr_id
= deep_get(db_nsr
, ("_admin", "deployed", "RO", "nsr_id"))
1172 stage
[2] = "Looking for existing ns at RO"
1173 db_nsr_update
["detailed-status"] = " ".join(stage
)
1174 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1175 self
._write
_op
_status
(nslcmop_id
, stage
)
1176 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1177 desc
= await self
.RO
.show("ns", RO_nsr_id
)
1179 except ROclient
.ROClientException
as e
:
1180 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
1182 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1184 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
1185 db_nsr_update
["_admin.deployed.RO.nsr_status"] = ns_status
1186 if ns_status
== "ERROR":
1187 stage
[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id
)
1188 self
.logger
.debug(logging_text
+ stage
[2])
1189 await self
.RO
.delete("ns", RO_nsr_id
)
1190 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1192 stage
[2] = "Checking dependencies"
1193 db_nsr_update
["detailed-status"] = " ".join(stage
)
1194 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1195 self
._write
_op
_status
(nslcmop_id
, stage
)
1196 # self.logger.debug(logging_text + stage[2])
1198 # check if VIM is creating and wait look if previous tasks in process
1199 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("vim_account", ns_params
["vimAccountId"])
1201 stage
[2] = "Waiting for related tasks '{}' to be completed".format(task_name
)
1202 self
.logger
.debug(logging_text
+ stage
[2])
1203 await asyncio
.wait(task_dependency
, timeout
=3600)
1204 if ns_params
.get("vnf"):
1205 for vnf
in ns_params
["vnf"]:
1206 if "vimAccountId" in vnf
:
1207 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("vim_account",
1208 vnf
["vimAccountId"])
1210 stage
[2] = "Waiting for related tasks '{}' to be completed.".format(task_name
)
1211 self
.logger
.debug(logging_text
+ stage
[2])
1212 await asyncio
.wait(task_dependency
, timeout
=3600)
1214 stage
[2] = "Checking instantiation parameters."
1215 RO_ns_params
= self
._ns
_params
_2_RO
(ns_params
, nsd
, db_vnfds_ref
, db_vnfrs
, n2vc_key_list
)
1216 stage
[2] = "Deploying ns at VIM."
1217 db_nsr_update
["detailed-status"] = " ".join(stage
)
1218 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1219 self
._write
_op
_status
(nslcmop_id
, stage
)
1221 desc
= await self
.RO
.create("ns", descriptor
=RO_ns_params
, name
=db_nsr
["name"], scenario
=RO_nsd_uuid
)
1222 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = desc
["uuid"]
1223 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1224 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "BUILD"
1225 self
.logger
.debug(logging_text
+ "ns created at RO. RO_id={}".format(desc
["uuid"]))
1227 # wait until NS is ready
1228 stage
[2] = "Waiting VIM to deploy ns."
1229 db_nsr_update
["detailed-status"] = " ".join(stage
)
1230 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1231 self
._write
_op
_status
(nslcmop_id
, stage
)
1232 detailed_status_old
= None
1233 self
.logger
.debug(logging_text
+ stage
[2] + " RO_ns_id={}".format(RO_nsr_id
))
1236 while time() <= start_deploy
+ timeout_ns_deploy
:
1237 desc
= await self
.RO
.show("ns", RO_nsr_id
)
1240 if desc
!= old_desc
:
1241 # desc has changed => update db
1242 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
1245 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
1246 db_nsr_update
["_admin.deployed.RO.nsr_status"] = ns_status
1247 if ns_status
== "ERROR":
1248 raise ROclient
.ROClientException(ns_status_info
)
1249 elif ns_status
== "BUILD":
1250 stage
[2] = "VIM: ({})".format(ns_status_info
)
1251 elif ns_status
== "ACTIVE":
1252 stage
[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1254 self
.ns_update_vnfr(db_vnfrs
, desc
)
1256 except LcmExceptionNoMgmtIP
:
1259 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
1260 if stage
[2] != detailed_status_old
:
1261 detailed_status_old
= stage
[2]
1262 db_nsr_update
["detailed-status"] = " ".join(stage
)
1263 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1264 self
._write
_op
_status
(nslcmop_id
, stage
)
1265 await asyncio
.sleep(5, loop
=self
.loop
)
1266 else: # timeout_ns_deploy
1267 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
1270 self
.ns_update_nsr(db_nsr_update
, db_nsr
, desc
)
1272 db_nsr_update
["_admin.deployed.RO.operational-status"] = "running"
1273 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1274 stage
[2] = "Deployed at VIM"
1275 db_nsr_update
["detailed-status"] = " ".join(stage
)
1276 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1277 self
._write
_op
_status
(nslcmop_id
, stage
)
1278 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1279 # self.logger.debug(logging_text + "Deployed at VIM")
1280 except (ROclient
.ROClientException
, LcmException
, DbException
, NgRoException
) as e
:
1281 stage
[2] = "ERROR deploying at VIM"
1282 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1285 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
1287 Wait for kdu to be up, get ip address
1288 :param logging_text: prefix use for logging
1295 # self.logger.debug(logging_text + "Starting wait_kdu_up")
1298 while nb_tries
< 360:
1299 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1300 kdur
= next((x
for x
in get_iterable(db_vnfr
, "kdur") if x
.get("kdu-name") == kdu_name
), None)
1302 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
))
1303 if kdur
.get("status"):
1304 if kdur
["status"] in ("READY", "ENABLED"):
1305 return kdur
.get("ip-address")
1307 raise LcmException("target KDU={} is in error state".format(kdu_name
))
1309 await asyncio
.sleep(10, loop
=self
.loop
)
1311 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
1313 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
1315 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1316 :param logging_text: prefix use for logging
1321 :param pub_key: public ssh key to inject, None to skip
1322 :param user: user to apply the public ssh key
1326 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1330 target_vdu_id
= None
1336 if ro_retries
>= 360: # 1 hour
1337 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
1339 await asyncio
.sleep(10, loop
=self
.loop
)
1342 if not target_vdu_id
:
1343 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1345 if not vdu_id
: # for the VNF case
1346 if db_vnfr
.get("status") == "ERROR":
1347 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1348 ip_address
= db_vnfr
.get("ip-address")
1351 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1353 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1354 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1356 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1357 vdur
= db_vnfr
["vdur"][0]
1359 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1362 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE":
1363 ip_address
= vdur
.get("ip-address")
1366 target_vdu_id
= vdur
["vdu-id-ref"]
1367 elif vdur
.get("status") == "ERROR":
1368 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1370 if not target_vdu_id
:
1373 # inject public key into machine
1374 if pub_key
and user
:
1375 # wait until NS is deployed at RO
1377 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1378 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1382 # self.logger.debug(logging_text + "Inserting RO key")
1383 if vdur
.get("pdu-type"):
1384 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1387 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1389 target
= {"action": "inject_ssh_key", "key": pub_key
, "user": user
,
1390 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdu_id
}]}],
1392 await self
.RO
.deploy(nsr_id
, target
)
1394 result_dict
= await self
.RO
.create_action(
1396 item_id_name
=ro_nsr_id
,
1397 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1399 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1400 if not result_dict
or not isinstance(result_dict
, dict):
1401 raise LcmException("Unknown response from RO when injecting key")
1402 for result
in result_dict
.values():
1403 if result
.get("vim_result") == 200:
1406 raise ROclient
.ROClientException("error injecting key: {}".format(
1407 result
.get("description")))
1409 except NgRoException
as e
:
1410 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1411 except ROclient
.ROClientException
as e
:
1413 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1417 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1423 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1425 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1427 my_vca
= vca_deployed_list
[vca_index
]
1428 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1429 # vdu or kdu: no dependencies
1433 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1434 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1435 configuration_status_list
= db_nsr
["configurationStatus"]
1436 for index
, vca_deployed
in enumerate(configuration_status_list
):
1437 if index
== vca_index
:
1440 if not my_vca
.get("member-vnf-index") or \
1441 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1442 internal_status
= configuration_status_list
[index
].get("status")
1443 if internal_status
== 'READY':
1445 elif internal_status
== 'BROKEN':
1446 raise LcmException("Configuration aborted because dependent charm/s has failed")
1450 # no dependencies, return
1452 await asyncio
.sleep(10)
1455 raise LcmException("Configuration aborted because dependent charm/s timeout")
1457 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1458 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
,
1459 ee_config_descriptor
):
1460 nsr_id
= db_nsr
["_id"]
1461 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1462 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1463 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1464 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1466 'collection': 'nsrs',
1467 'filter': {'_id': nsr_id
},
1468 'path': db_update_entry
1474 element_under_configuration
= nsr_id
1478 vnfr_id
= db_vnfr
["_id"]
1479 osm_config
["osm"]["vnf_id"] = vnfr_id
1481 namespace
= "{nsi}.{ns}".format(
1482 nsi
=nsi_id
if nsi_id
else "",
1486 element_type
= 'VNF'
1487 element_under_configuration
= vnfr_id
1488 namespace
+= ".{}".format(vnfr_id
)
1490 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1491 element_type
= 'VDU'
1492 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1493 osm_config
["osm"]["vdu_id"] = vdu_id
1495 namespace
+= ".{}".format(kdu_name
)
1496 element_type
= 'KDU'
1497 element_under_configuration
= kdu_name
1498 osm_config
["osm"]["kdu_name"] = kdu_name
1501 artifact_path
= "{}/{}/{}/{}".format(
1502 base_folder
["folder"],
1503 base_folder
["pkg-dir"],
1504 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1507 # get initial_config_primitive_list that applies to this element
1508 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1510 # add config if not present for NS charm
1511 ee_descriptor_id
= ee_config_descriptor
.get("id")
1512 initial_config_primitive_list
= self
._get
_initial
_config
_primitive
_list
(initial_config_primitive_list
,
1513 vca_deployed
, ee_descriptor_id
)
1515 # n2vc_redesign STEP 3.1
1516 # find old ee_id if exists
1517 ee_id
= vca_deployed
.get("ee_id")
1520 deep_get(db_vnfr
, ("vim-account-id",)) or
1521 deep_get(deploy_params
, ("OSM", "vim_account_id"))
1523 vca_cloud
, vca_cloud_credential
= self
.get_vca_cloud_and_credentials(vim_account_id
)
1524 vca_k8s_cloud
, vca_k8s_cloud_credential
= self
.get_vca_k8s_cloud_and_credentials(vim_account_id
)
1525 # create or register execution environment in VCA
1526 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1528 self
._write
_configuration
_status
(
1530 vca_index
=vca_index
,
1532 element_under_configuration
=element_under_configuration
,
1533 element_type
=element_type
1536 step
= "create execution environment"
1537 self
.logger
.debug(logging_text
+ step
)
1541 if vca_type
== "k8s_proxy_charm":
1542 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1543 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
1544 namespace
=namespace
,
1545 artifact_path
=artifact_path
,
1547 cloud_name
=vca_k8s_cloud
,
1548 credential_name
=vca_k8s_cloud_credential
,
1551 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1552 namespace
=namespace
,
1556 cloud_name
=vca_cloud
,
1557 credential_name
=vca_cloud_credential
,
1560 elif vca_type
== "native_charm":
1561 step
= "Waiting to VM being up and getting IP address"
1562 self
.logger
.debug(logging_text
+ step
)
1563 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1564 user
=None, pub_key
=None)
1565 credentials
= {"hostname": rw_mgmt_ip
}
1567 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1568 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1569 # merged. Meanwhile let's get username from initial-config-primitive
1570 if not username
and initial_config_primitive_list
:
1571 for config_primitive
in initial_config_primitive_list
:
1572 for param
in config_primitive
.get("parameter", ()):
1573 if param
["name"] == "ssh-username":
1574 username
= param
["value"]
1577 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1578 "'config-access.ssh-access.default-user'")
1579 credentials
["username"] = username
1580 # n2vc_redesign STEP 3.2
1582 self
._write
_configuration
_status
(
1584 vca_index
=vca_index
,
1585 status
='REGISTERING',
1586 element_under_configuration
=element_under_configuration
,
1587 element_type
=element_type
1590 step
= "register execution environment {}".format(credentials
)
1591 self
.logger
.debug(logging_text
+ step
)
1592 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1593 credentials
=credentials
,
1594 namespace
=namespace
,
1596 cloud_name
=vca_cloud
,
1597 credential_name
=vca_cloud_credential
,
1600 # for compatibility with MON/POL modules, the need model and application name at database
1601 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1602 ee_id_parts
= ee_id
.split('.')
1603 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1604 if len(ee_id_parts
) >= 2:
1605 model_name
= ee_id_parts
[0]
1606 application_name
= ee_id_parts
[1]
1607 db_nsr_update
[db_update_entry
+ "model"] = model_name
1608 db_nsr_update
[db_update_entry
+ "application"] = application_name
1610 # n2vc_redesign STEP 3.3
1611 step
= "Install configuration Software"
1613 self
._write
_configuration
_status
(
1615 vca_index
=vca_index
,
1616 status
='INSTALLING SW',
1617 element_under_configuration
=element_under_configuration
,
1618 element_type
=element_type
,
1619 other_update
=db_nsr_update
1622 # TODO check if already done
1623 self
.logger
.debug(logging_text
+ step
)
1625 if vca_type
== "native_charm":
1626 config_primitive
= next((p
for p
in initial_config_primitive_list
if p
["name"] == "config"), None)
1627 if config_primitive
:
1628 config
= self
._map
_primitive
_params
(
1634 if vca_type
== "lxc_proxy_charm":
1635 if element_type
== "NS":
1636 num_units
= db_nsr
.get("config-units") or 1
1637 elif element_type
== "VNF":
1638 num_units
= db_vnfr
.get("config-units") or 1
1639 elif element_type
== "VDU":
1640 for v
in db_vnfr
["vdur"]:
1641 if vdu_id
== v
["vdu-id-ref"]:
1642 num_units
= v
.get("config-units") or 1
1644 if vca_type
!= "k8s_proxy_charm":
1645 await self
.vca_map
[vca_type
].install_configuration_sw(
1647 artifact_path
=artifact_path
,
1650 num_units
=num_units
,
1653 # write in db flag of configuration_sw already installed
1654 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1656 # add relations for this VCA (wait for other peers related with this VCA)
1657 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1658 vca_index
=vca_index
, vca_type
=vca_type
)
1660 # if SSH access is required, then get execution environment SSH public
1661 # if native charm we have waited already to VM be UP
1662 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1665 # self.logger.debug("get ssh key block")
1666 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1667 # self.logger.debug("ssh key needed")
1668 # Needed to inject a ssh key
1669 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1670 step
= "Install configuration Software, getting public ssh key"
1671 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(ee_id
=ee_id
, db_dict
=db_dict
)
1673 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1675 # self.logger.debug("no need to get ssh key")
1676 step
= "Waiting to VM being up and getting IP address"
1677 self
.logger
.debug(logging_text
+ step
)
1679 # n2vc_redesign STEP 5.1
1680 # wait for RO (ip-address) Insert pub_key into VM
1683 rw_mgmt_ip
= await self
.wait_kdu_up(logging_text
, nsr_id
, vnfr_id
, kdu_name
)
1685 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
,
1686 vdu_index
, user
=user
, pub_key
=pub_key
)
1688 rw_mgmt_ip
= None # This is for a NS configuration
1690 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1692 # store rw_mgmt_ip in deploy params for later replacement
1693 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1695 # n2vc_redesign STEP 6 Execute initial config primitive
1696 step
= 'execute initial config primitive'
1698 # wait for dependent primitives execution (NS -> VNF -> VDU)
1699 if initial_config_primitive_list
:
1700 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1702 # stage, in function of element type: vdu, kdu, vnf or ns
1703 my_vca
= vca_deployed_list
[vca_index
]
1704 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1706 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1707 elif my_vca
.get("member-vnf-index"):
1709 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1712 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1714 self
._write
_configuration
_status
(
1716 vca_index
=vca_index
,
1717 status
='EXECUTING PRIMITIVE'
1720 self
._write
_op
_status
(
1725 check_if_terminated_needed
= True
1726 for initial_config_primitive
in initial_config_primitive_list
:
1727 # adding information on the vca_deployed if it is a NS execution environment
1728 if not vca_deployed
["member-vnf-index"]:
1729 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1730 # TODO check if already done
1731 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1733 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1734 self
.logger
.debug(logging_text
+ step
)
1735 await self
.vca_map
[vca_type
].exec_primitive(
1737 primitive_name
=initial_config_primitive
["name"],
1738 params_dict
=primitive_params_
,
1741 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1742 if check_if_terminated_needed
:
1743 if config_descriptor
.get('terminate-config-primitive'):
1744 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1745 check_if_terminated_needed
= False
1747 # TODO register in database that primitive is done
1749 # STEP 7 Configure metrics
1750 if vca_type
== "helm" or vca_type
== "helm-v3":
1751 prometheus_jobs
= await self
.add_prometheus_metrics(
1753 artifact_path
=artifact_path
,
1754 ee_config_descriptor
=ee_config_descriptor
,
1757 target_ip
=rw_mgmt_ip
,
1760 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "prometheus_jobs": prometheus_jobs
})
1762 step
= "instantiated at VCA"
1763 self
.logger
.debug(logging_text
+ step
)
1765 self
._write
_configuration
_status
(
1767 vca_index
=vca_index
,
1771 except Exception as e
: # TODO not use Exception but N2VC exception
1772 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1773 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1774 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1775 self
._write
_configuration
_status
(
1777 vca_index
=vca_index
,
1780 raise LcmException("{} {}".format(step
, e
)) from e
1782 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1783 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1785 Update db_nsr fields.
1788 :param current_operation:
1789 :param current_operation_id:
1790 :param error_description:
1791 :param error_detail:
1792 :param other_update: Other required changes at database if provided, will be cleared
1796 db_dict
= other_update
or {}
1797 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1798 db_dict
["_admin.current-operation"] = current_operation_id
1799 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1800 db_dict
["currentOperation"] = current_operation
1801 db_dict
["currentOperationID"] = current_operation_id
1802 db_dict
["errorDescription"] = error_description
1803 db_dict
["errorDetail"] = error_detail
1806 db_dict
["nsState"] = ns_state
1807 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1808 except DbException
as e
:
1809 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1811 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1812 operation_state
: str = None, other_update
: dict = None):
1814 db_dict
= other_update
or {}
1815 db_dict
['queuePosition'] = queuePosition
1816 if isinstance(stage
, list):
1817 db_dict
['stage'] = stage
[0]
1818 db_dict
['detailed-status'] = " ".join(stage
)
1819 elif stage
is not None:
1820 db_dict
['stage'] = str(stage
)
1822 if error_message
is not None:
1823 db_dict
['errorMessage'] = error_message
1824 if operation_state
is not None:
1825 db_dict
['operationState'] = operation_state
1826 db_dict
["statusEnteredTime"] = time()
1827 self
.update_db_2("nslcmops", op_id
, db_dict
)
1828 except DbException
as e
:
1829 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1831 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1833 nsr_id
= db_nsr
["_id"]
1834 # configurationStatus
1835 config_status
= db_nsr
.get('configurationStatus')
1837 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1838 enumerate(config_status
) if v
}
1840 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1842 except DbException
as e
:
1843 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1845 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1846 element_under_configuration
: str = None, element_type
: str = None,
1847 other_update
: dict = None):
1849 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1850 # .format(vca_index, status))
1853 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1854 db_dict
= other_update
or {}
1856 db_dict
[db_path
+ 'status'] = status
1857 if element_under_configuration
:
1858 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1860 db_dict
[db_path
+ 'elementType'] = element_type
1861 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1862 except DbException
as e
:
1863 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1864 .format(status
, nsr_id
, vca_index
, e
))
1866 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1868 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1869 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1870 Database is used because the result can be obtained from a different LCM worker in case of HA.
1871 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1872 :param db_nslcmop: database content of nslcmop
1873 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1874 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1875 computed 'vim-account-id'
1878 nslcmop_id
= db_nslcmop
['_id']
1879 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1880 if placement_engine
== "PLA":
1881 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1882 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1883 db_poll_interval
= 5
1884 wait
= db_poll_interval
* 10
1886 while not pla_result
and wait
>= 0:
1887 await asyncio
.sleep(db_poll_interval
)
1888 wait
-= db_poll_interval
1889 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1890 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1893 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1895 for pla_vnf
in pla_result
['vnf']:
1896 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1897 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1900 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1902 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1905 def update_nsrs_with_pla_result(self
, params
):
1907 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1908 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1909 except Exception as e
:
1910 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1912 async def instantiate(self
, nsr_id
, nslcmop_id
):
1915 :param nsr_id: ns instance to deploy
1916 :param nslcmop_id: operation to run
1920 # Try to lock HA task here
1921 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1922 if not task_is_locked_by_me
:
1923 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1926 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1927 self
.logger
.debug(logging_text
+ "Enter")
1929 # get all needed from database
1931 # database nsrs record
1934 # database nslcmops record
1937 # update operation on nsrs
1939 # update operation on nslcmops
1940 db_nslcmop_update
= {}
1942 nslcmop_operation_state
= None
1943 db_vnfrs
= {} # vnf's info indexed by member-index
1945 tasks_dict_info
= {} # from task to info text
1948 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1949 # ^ stage, step, VIM progress
1951 # wait for any previous tasks in process
1952 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1954 stage
[1] = "Sync filesystem from database."
1955 self
.fs
.sync() # TODO, make use of partial sync, only for the needed packages
1957 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1958 stage
[1] = "Reading from database."
1959 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1960 db_nsr_update
["detailed-status"] = "creating"
1961 db_nsr_update
["operational-status"] = "init"
1962 self
._write
_ns
_status
(
1964 ns_state
="BUILDING",
1965 current_operation
="INSTANTIATING",
1966 current_operation_id
=nslcmop_id
,
1967 other_update
=db_nsr_update
1969 self
._write
_op
_status
(
1975 # read from db: operation
1976 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
1977 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1978 ns_params
= db_nslcmop
.get("operationParams")
1979 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1980 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1982 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1985 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
1986 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1987 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
1988 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
1990 # nsr_name = db_nsr["name"] # TODO short-name??
1992 # read from db: vnf's of this ns
1993 stage
[1] = "Getting vnfrs from db."
1994 self
.logger
.debug(logging_text
+ stage
[1])
1995 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
1997 # read from db: vnfd's for every vnf
1998 db_vnfds_ref
= {} # every vnfd data indexed by vnf name
1999 db_vnfds
= {} # every vnfd data indexed by vnf id
2000 db_vnfds_index
= {} # every vnfd data indexed by vnf member-index
2002 # for each vnf in ns, read vnfd
2003 for vnfr
in db_vnfrs_list
:
2004 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
# vnf's dict indexed by member-index: '1', '2', etc
2005 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
2006 vnfd_ref
= vnfr
["vnfd-ref"] # vnfd name for this vnf
2008 # if we haven't this vnfd, read it from db
2009 if vnfd_id
not in db_vnfds
:
2011 stage
[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id
, vnfd_ref
)
2012 self
.logger
.debug(logging_text
+ stage
[1])
2013 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
2016 db_vnfds_ref
[vnfd_ref
] = vnfd
# vnfd's indexed by name
2017 db_vnfds
[vnfd_id
] = vnfd
# vnfd's indexed by id
2018 db_vnfds_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds
[vnfd_id
] # vnfd's indexed by member-index
2020 # Get or generates the _admin.deployed.VCA list
2021 vca_deployed_list
= None
2022 if db_nsr
["_admin"].get("deployed"):
2023 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
2024 if vca_deployed_list
is None:
2025 vca_deployed_list
= []
2026 configuration_status_list
= []
2027 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2028 db_nsr_update
["configurationStatus"] = configuration_status_list
2029 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
2030 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2031 elif isinstance(vca_deployed_list
, dict):
2032 # maintain backward compatibility. Change a dict to list at database
2033 vca_deployed_list
= list(vca_deployed_list
.values())
2034 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
2035 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
2037 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
2038 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
2039 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
2041 # set state to INSTANTIATED. When instantiated NBI will not delete directly
2042 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
2043 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2044 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"})
2046 # n2vc_redesign STEP 2 Deploy Network Scenario
2047 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
2048 self
._write
_op
_status
(
2053 stage
[1] = "Deploying KDUs."
2054 # self.logger.debug(logging_text + "Before deploy_kdus")
2055 # Call to deploy_kdus in case exists the "vdu:kdu" param
2056 await self
.deploy_kdus(
2057 logging_text
=logging_text
,
2059 nslcmop_id
=nslcmop_id
,
2062 task_instantiation_info
=tasks_dict_info
,
2065 stage
[1] = "Getting VCA public key."
2066 # n2vc_redesign STEP 1 Get VCA public ssh-key
2067 # feature 1429. Add n2vc public key to needed VMs
2068 n2vc_key
= self
.n2vc
.get_public_key()
2069 n2vc_key_list
= [n2vc_key
]
2070 if self
.vca_config
.get("public_key"):
2071 n2vc_key_list
.append(self
.vca_config
["public_key"])
2073 stage
[1] = "Deploying NS at VIM."
2074 task_ro
= asyncio
.ensure_future(
2075 self
.instantiate_RO(
2076 logging_text
=logging_text
,
2080 db_nslcmop
=db_nslcmop
,
2082 db_vnfds_ref
=db_vnfds_ref
,
2083 n2vc_key_list
=n2vc_key_list
,
2087 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
2088 tasks_dict_info
[task_ro
] = "Deploying at VIM"
2090 # n2vc_redesign STEP 3 to 6 Deploy N2VC
2091 stage
[1] = "Deploying Execution Environments."
2092 self
.logger
.debug(logging_text
+ stage
[1])
2094 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
2095 # get_iterable() returns a value from a dict or empty tuple if key does not exist
2096 for c_vnf
in get_iterable(nsd
, "constituent-vnfd"):
2097 vnfd_id
= c_vnf
["vnfd-id-ref"]
2098 vnfd
= db_vnfds_ref
[vnfd_id
]
2099 member_vnf_index
= str(c_vnf
["member-vnf-index"])
2100 db_vnfr
= db_vnfrs
[member_vnf_index
]
2101 base_folder
= vnfd
["_admin"]["storage"]
2107 # Get additional parameters
2108 deploy_params
= {"OSM": self
._get
_osm
_params
(db_vnfr
)}
2109 if db_vnfr
.get("additionalParamsForVnf"):
2110 deploy_params
.update(self
._format
_additional
_params
(db_vnfr
["additionalParamsForVnf"].copy()))
2112 descriptor_config
= vnfd
.get("vnf-configuration")
2113 if descriptor_config
:
2115 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
2118 nslcmop_id
=nslcmop_id
,
2124 member_vnf_index
=member_vnf_index
,
2125 vdu_index
=vdu_index
,
2127 deploy_params
=deploy_params
,
2128 descriptor_config
=descriptor_config
,
2129 base_folder
=base_folder
,
2130 task_instantiation_info
=tasks_dict_info
,
2134 # Deploy charms for each VDU that supports one.
2135 for vdud
in get_iterable(vnfd
, 'vdu'):
2137 descriptor_config
= vdud
.get('vdu-configuration')
2138 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
2139 if vdur
.get("additionalParams"):
2140 deploy_params_vdu
= self
._format
_additional
_params
(vdur
["additionalParams"])
2142 deploy_params_vdu
= deploy_params
2143 deploy_params_vdu
["OSM"] = self
._get
_osm
_params
(db_vnfr
, vdu_id
, vdu_count_index
=0)
2144 if descriptor_config
:
2147 for vdu_index
in range(int(vdud
.get("count", 1))):
2148 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2150 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2151 member_vnf_index
, vdu_id
, vdu_index
),
2154 nslcmop_id
=nslcmop_id
,
2160 member_vnf_index
=member_vnf_index
,
2161 vdu_index
=vdu_index
,
2163 deploy_params
=deploy_params_vdu
,
2164 descriptor_config
=descriptor_config
,
2165 base_folder
=base_folder
,
2166 task_instantiation_info
=tasks_dict_info
,
2169 for kdud
in get_iterable(vnfd
, 'kdu'):
2170 kdu_name
= kdud
["name"]
2171 descriptor_config
= kdud
.get('kdu-configuration')
2172 if descriptor_config
:
2176 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
2177 deploy_params_kdu
= {"OSM": self
._get
_osm
_params
(db_vnfr
)}
2178 if kdur
.get("additionalParams"):
2179 deploy_params_kdu
= self
._format
_additional
_params
(kdur
["additionalParams"])
2182 logging_text
=logging_text
,
2185 nslcmop_id
=nslcmop_id
,
2191 member_vnf_index
=member_vnf_index
,
2192 vdu_index
=vdu_index
,
2194 deploy_params
=deploy_params_kdu
,
2195 descriptor_config
=descriptor_config
,
2196 base_folder
=base_folder
,
2197 task_instantiation_info
=tasks_dict_info
,
2201 # Check if this NS has a charm configuration
2202 descriptor_config
= nsd
.get("ns-configuration")
2203 if descriptor_config
and descriptor_config
.get("juju"):
2206 member_vnf_index
= None
2212 # Get additional parameters
2213 deploy_params
= {"OSM": self
._get
_osm
_params
(db_vnfr
)}
2214 if db_nsr
.get("additionalParamsForNs"):
2215 deploy_params
.update(self
._format
_additional
_params
(db_nsr
["additionalParamsForNs"].copy()))
2216 base_folder
= nsd
["_admin"]["storage"]
2218 logging_text
=logging_text
,
2221 nslcmop_id
=nslcmop_id
,
2227 member_vnf_index
=member_vnf_index
,
2228 vdu_index
=vdu_index
,
2230 deploy_params
=deploy_params
,
2231 descriptor_config
=descriptor_config
,
2232 base_folder
=base_folder
,
2233 task_instantiation_info
=tasks_dict_info
,
2237 # rest of staff will be done at finally
2239 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
2240 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
2242 except asyncio
.CancelledError
:
2243 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
2244 exc
= "Operation was cancelled"
2245 except Exception as e
:
2246 exc
= traceback
.format_exc()
2247 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
2250 error_list
.append(str(exc
))
2252 # wait for pending tasks
2254 stage
[1] = "Waiting for instantiate pending tasks."
2255 self
.logger
.debug(logging_text
+ stage
[1])
2256 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
2257 stage
, nslcmop_id
, nsr_id
=nsr_id
)
2258 stage
[1] = stage
[2] = ""
2259 except asyncio
.CancelledError
:
2260 error_list
.append("Cancelled")
2261 # TODO cancel all tasks
2262 except Exception as exc
:
2263 error_list
.append(str(exc
))
2265 # update operation-status
2266 db_nsr_update
["operational-status"] = "running"
2267 # let's begin with VCA 'configured' status (later we can change it)
2268 db_nsr_update
["config-status"] = "configured"
2269 for task
, task_name
in tasks_dict_info
.items():
2270 if not task
.done() or task
.cancelled() or task
.exception():
2271 if task_name
.startswith(self
.task_name_deploy_vca
):
2272 # A N2VC task is pending
2273 db_nsr_update
["config-status"] = "failed"
2275 # RO or KDU task is pending
2276 db_nsr_update
["operational-status"] = "failed"
2278 # update status at database
2280 error_detail
= ". ".join(error_list
)
2281 self
.logger
.error(logging_text
+ error_detail
)
2282 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
2283 error_description_nsr
= 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id
, stage
[0])
2285 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
2286 db_nslcmop_update
["detailed-status"] = error_detail
2287 nslcmop_operation_state
= "FAILED"
2291 error_description_nsr
= error_description_nslcmop
= None
2293 db_nsr_update
["detailed-status"] = "Done"
2294 db_nslcmop_update
["detailed-status"] = "Done"
2295 nslcmop_operation_state
= "COMPLETED"
2298 self
._write
_ns
_status
(
2301 current_operation
="IDLE",
2302 current_operation_id
=None,
2303 error_description
=error_description_nsr
,
2304 error_detail
=error_detail
,
2305 other_update
=db_nsr_update
2307 self
._write
_op
_status
(
2310 error_message
=error_description_nslcmop
,
2311 operation_state
=nslcmop_operation_state
,
2312 other_update
=db_nslcmop_update
,
2315 if nslcmop_operation_state
:
2317 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
2318 "operationState": nslcmop_operation_state
},
2320 except Exception as e
:
2321 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
2323 self
.logger
.debug(logging_text
+ "Exit")
2324 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2326 async def _add_vca_relations(self
, logging_text
, nsr_id
, vca_index
: int,
2327 timeout
: int = 3600, vca_type
: str = None) -> bool:
2330 # 1. find all relations for this VCA
2331 # 2. wait for other peers related
2335 vca_type
= vca_type
or "lxc_proxy_charm"
2337 # STEP 1: find all relations for this VCA
2340 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2341 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2344 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2346 # read all ns-configuration relations
2347 ns_relations
= list()
2348 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2350 for r
in db_ns_relations
:
2351 # check if this VCA is in the relation
2352 if my_vca
.get('member-vnf-index') in\
2353 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2354 ns_relations
.append(r
)
2356 # read all vnf-configuration relations
2357 vnf_relations
= list()
2358 db_vnfd_list
= db_nsr
.get('vnfd-id')
2360 for vnfd
in db_vnfd_list
:
2361 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2362 db_vnf_relations
= deep_get(db_vnfd
, ('vnf-configuration', 'relation'))
2363 if db_vnf_relations
:
2364 for r
in db_vnf_relations
:
2365 # check if this VCA is in the relation
2366 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2367 vnf_relations
.append(r
)
2369 # if no relations, terminate
2370 if not ns_relations
and not vnf_relations
:
2371 self
.logger
.debug(logging_text
+ ' No relations')
2374 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2381 if now
- start
>= timeout
:
2382 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2385 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2386 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2388 # for each defined NS relation, find the VCA's related
2389 for r
in ns_relations
.copy():
2390 from_vca_ee_id
= None
2392 from_vca_endpoint
= None
2393 to_vca_endpoint
= None
2394 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2395 for vca
in vca_list
:
2396 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2397 and vca
.get('config_sw_installed'):
2398 from_vca_ee_id
= vca
.get('ee_id')
2399 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2400 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2401 and vca
.get('config_sw_installed'):
2402 to_vca_ee_id
= vca
.get('ee_id')
2403 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2404 if from_vca_ee_id
and to_vca_ee_id
:
2406 await self
.vca_map
[vca_type
].add_relation(
2407 ee_id_1
=from_vca_ee_id
,
2408 ee_id_2
=to_vca_ee_id
,
2409 endpoint_1
=from_vca_endpoint
,
2410 endpoint_2
=to_vca_endpoint
)
2411 # remove entry from relations list
2412 ns_relations
.remove(r
)
2414 # check failed peers
2416 vca_status_list
= db_nsr
.get('configurationStatus')
2418 for i
in range(len(vca_list
)):
2420 vca_status
= vca_status_list
[i
]
2421 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2422 if vca_status
.get('status') == 'BROKEN':
2423 # peer broken: remove relation from list
2424 ns_relations
.remove(r
)
2425 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2426 if vca_status
.get('status') == 'BROKEN':
2427 # peer broken: remove relation from list
2428 ns_relations
.remove(r
)
2433 # for each defined VNF relation, find the VCA's related
2434 for r
in vnf_relations
.copy():
2435 from_vca_ee_id
= None
2437 from_vca_endpoint
= None
2438 to_vca_endpoint
= None
2439 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2440 for vca
in vca_list
:
2441 key_to_check
= "vdu_id"
2442 if vca
.get("vdu_id") is None:
2443 key_to_check
= "vnfd_id"
2444 if vca
.get(key_to_check
) == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2445 from_vca_ee_id
= vca
.get('ee_id')
2446 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2447 if vca
.get(key_to_check
) == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2448 to_vca_ee_id
= vca
.get('ee_id')
2449 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2450 if from_vca_ee_id
and to_vca_ee_id
:
2452 await self
.vca_map
[vca_type
].add_relation(
2453 ee_id_1
=from_vca_ee_id
,
2454 ee_id_2
=to_vca_ee_id
,
2455 endpoint_1
=from_vca_endpoint
,
2456 endpoint_2
=to_vca_endpoint
)
2457 # remove entry from relations list
2458 vnf_relations
.remove(r
)
2460 # check failed peers
2462 vca_status_list
= db_nsr
.get('configurationStatus')
2464 for i
in range(len(vca_list
)):
2466 vca_status
= vca_status_list
[i
]
2467 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2468 if vca_status
.get('status') == 'BROKEN':
2469 # peer broken: remove relation from list
2470 vnf_relations
.remove(r
)
2471 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2472 if vca_status
.get('status') == 'BROKEN':
2473 # peer broken: remove relation from list
2474 vnf_relations
.remove(r
)
2480 await asyncio
.sleep(5.0)
2482 if not ns_relations
and not vnf_relations
:
2483 self
.logger
.debug('Relations added')
2488 except Exception as e
:
2489 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2492 async def _install_kdu(self
, nsr_id
: str, nsr_db_path
: str, vnfr_data
: dict, kdu_index
: int, kdud
: dict,
2493 vnfd
: dict, k8s_instance_info
: dict, k8params
: dict = None, timeout
: int = 600):
2496 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2498 db_dict_install
= {"collection": "nsrs",
2499 "filter": {"_id": nsr_id
},
2500 "path": nsr_db_path
}
2502 kdu_instance
= await self
.k8scluster_map
[k8sclustertype
].install(
2503 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2504 kdu_model
=k8s_instance_info
["kdu-model"],
2507 db_dict
=db_dict_install
,
2509 kdu_name
=k8s_instance_info
["kdu-name"],
2510 namespace
=k8s_instance_info
["namespace"])
2511 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2513 # Obtain services to obtain management service ip
2514 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2515 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2516 kdu_instance
=kdu_instance
,
2517 namespace
=k8s_instance_info
["namespace"])
2519 # Obtain management service info (if exists)
2520 vnfr_update_dict
= {}
2522 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2523 mgmt_services
= [service
for service
in kdud
.get("service", []) if service
.get("mgmt-service")]
2524 for mgmt_service
in mgmt_services
:
2525 for service
in services
:
2526 if service
["name"].startswith(mgmt_service
["name"]):
2527 # Mgmt service found, Obtain service ip
2528 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2529 if isinstance(ip
, list) and len(ip
) == 1:
2532 vnfr_update_dict
["kdur.{}.ip-address".format(kdu_index
)] = ip
2534 # Check if must update also mgmt ip at the vnf
2535 service_external_cp
= mgmt_service
.get("external-connection-point-ref")
2536 if service_external_cp
:
2537 if deep_get(vnfd
, ("mgmt-interface", "cp")) == service_external_cp
:
2538 vnfr_update_dict
["ip-address"] = ip
2542 self
.logger
.warn("Mgmt service name: {} not found".format(mgmt_service
["name"]))
2544 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
2545 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
2547 kdu_config
= kdud
.get("kdu-configuration")
2548 if kdu_config
and kdu_config
.get("initial-config-primitive") and kdu_config
.get("juju") is None:
2549 initial_config_primitive_list
= kdu_config
.get("initial-config-primitive")
2550 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
2552 for initial_config_primitive
in initial_config_primitive_list
:
2553 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, {})
2555 await asyncio
.wait_for(
2556 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
2557 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2558 kdu_instance
=kdu_instance
,
2559 primitive_name
=initial_config_primitive
["name"],
2560 params
=primitive_params_
, db_dict
={}),
2563 except Exception as e
:
2564 # Prepare update db with error and raise exception
2566 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)})
2567 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), {"kdur.{}.status".format(kdu_index
): "ERROR"})
2569 # ignore to keep original exception
2571 # reraise original error
2576 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2577 # Launch kdus if present in the descriptor
2579 k8scluster_id_2_uuic
= {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2581 async def _get_cluster_id(cluster_id
, cluster_type
):
2582 nonlocal k8scluster_id_2_uuic
2583 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2584 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2586 # check if K8scluster is creating and wait look if previous tasks in process
2587 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("k8scluster", cluster_id
)
2589 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name
, cluster_id
)
2590 self
.logger
.debug(logging_text
+ text
)
2591 await asyncio
.wait(task_dependency
, timeout
=3600)
2593 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2594 if not db_k8scluster
:
2595 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2597 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2599 if cluster_type
== "helm-chart-v3":
2601 # backward compatibility for existing clusters that have not been initialized for helm v3
2602 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
2603 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(k8s_credentials
,
2604 reuse_cluster_uuid
=cluster_id
)
2605 db_k8scluster_update
= {}
2606 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
2607 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
2608 db_k8scluster_update
["_admin.helm-chart-v3.created"] = uninstall_sw
2609 db_k8scluster_update
["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2610 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
2611 except Exception as e
:
2612 self
.logger
.error(logging_text
+ "error initializing helm-v3 cluster: {}".format(str(e
)))
2613 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id
,
2616 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2617 format(cluster_id
, cluster_type
))
2618 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2621 logging_text
+= "Deploy kdus: "
2624 db_nsr_update
= {"_admin.deployed.K8s": []}
2625 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2628 updated_cluster_list
= []
2629 updated_v3_cluster_list
= []
2631 for vnfr_data
in db_vnfrs
.values():
2632 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
2633 # Step 0: Prepare and set parameters
2634 desc_params
= self
._format
_additional
_params
(kdur
.get("additionalParams"))
2635 vnfd_id
= vnfr_data
.get('vnfd-id')
2636 kdud
= next(kdud
for kdud
in db_vnfds
[vnfd_id
]["kdu"] if kdud
["name"] == kdur
["kdu-name"])
2637 namespace
= kdur
.get("k8s-namespace")
2638 if kdur
.get("helm-chart"):
2639 kdumodel
= kdur
["helm-chart"]
2640 # Default version: helm3, if helm-version is v2 assign v2
2641 k8sclustertype
= "helm-chart-v3"
2642 self
.logger
.debug("kdur: {}".format(kdur
))
2643 if kdur
.get("helm-version") and kdur
.get("helm-version") == "v2":
2644 k8sclustertype
= "helm-chart"
2645 elif kdur
.get("juju-bundle"):
2646 kdumodel
= kdur
["juju-bundle"]
2647 k8sclustertype
= "juju-bundle"
2649 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2650 "juju-bundle. Maybe an old NBI version is running".
2651 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2652 # check if kdumodel is a file and exists
2654 storage
= deep_get(db_vnfds
.get(vnfd_id
), ('_admin', 'storage'))
2655 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2656 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2657 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2659 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2660 kdumodel
= self
.fs
.path
+ filename
2661 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2663 except Exception: # it is not a file
2666 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2667 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2668 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2671 if (k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
)\
2672 or (k8sclustertype
== "helm-chart-v3" and cluster_uuid
not in updated_v3_cluster_list
):
2673 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2674 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(cluster_uuid
=cluster_uuid
))
2675 if del_repo_list
or added_repo_dict
:
2676 if k8sclustertype
== "helm-chart":
2677 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2678 updated
= {'_admin.helm_charts_added.' +
2679 item
: name
for item
, name
in added_repo_dict
.items()}
2680 updated_cluster_list
.append(cluster_uuid
)
2681 elif k8sclustertype
== "helm-chart-v3":
2682 unset
= {'_admin.helm_charts_v3_added.' + item
: None for item
in del_repo_list
}
2683 updated
= {'_admin.helm_charts_v3_added.' +
2684 item
: name
for item
, name
in added_repo_dict
.items()}
2685 updated_v3_cluster_list
.append(cluster_uuid
)
2686 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster "
2687 "'{}' to_delete: {}, to_add: {}".
2688 format(k8s_cluster_id
, del_repo_list
, added_repo_dict
))
2689 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2692 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2693 kdur
["kdu-name"], k8s_cluster_id
)
2694 k8s_instance_info
= {"kdu-instance": None,
2695 "k8scluster-uuid": cluster_uuid
,
2696 "k8scluster-type": k8sclustertype
,
2697 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2698 "kdu-name": kdur
["kdu-name"],
2699 "kdu-model": kdumodel
,
2700 "namespace": namespace
}
2701 db_path
= "_admin.deployed.K8s.{}".format(index
)
2702 db_nsr_update
[db_path
] = k8s_instance_info
2703 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2705 task
= asyncio
.ensure_future(
2706 self
._install
_kdu
(nsr_id
, db_path
, vnfr_data
, kdu_index
, kdud
, db_vnfds
[vnfd_id
],
2707 k8s_instance_info
, k8params
=desc_params
, timeout
=600))
2708 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2709 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2713 except (LcmException
, asyncio
.CancelledError
):
2715 except Exception as e
:
2716 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2717 if isinstance(e
, (N2VCException
, DbException
)):
2718 self
.logger
.error(logging_text
+ msg
)
2720 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2721 raise LcmException(msg
)
2724 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2726 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2727 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2728 base_folder
, task_instantiation_info
, stage
):
2729 # launch instantiate_N2VC in a asyncio task and register task object
2730 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2731 # if not found, create one entry and update database
2732 # fill db_nsr._admin.deployed.VCA.<index>
2734 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2735 if descriptor_config
.get("juju"): # There is one execution envioronment of type juju
2736 ee_list
= [descriptor_config
]
2737 elif descriptor_config
.get("execution-environment-list"):
2738 ee_list
= descriptor_config
.get("execution-environment-list")
2739 else: # other types as script are not supported
2742 for ee_item
in ee_list
:
2743 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2744 ee_item
.get("helm-chart")))
2745 ee_descriptor_id
= ee_item
.get("id")
2746 if ee_item
.get("juju"):
2747 vca_name
= ee_item
['juju'].get('charm')
2748 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2749 if ee_item
['juju'].get('cloud') == "k8s":
2750 vca_type
= "k8s_proxy_charm"
2751 elif ee_item
['juju'].get('proxy') is False:
2752 vca_type
= "native_charm"
2753 elif ee_item
.get("helm-chart"):
2754 vca_name
= ee_item
['helm-chart']
2755 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
2758 vca_type
= "helm-v3"
2760 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2764 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2765 if not vca_deployed
:
2767 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2768 vca_deployed
.get("vdu_id") == vdu_id
and \
2769 vca_deployed
.get("kdu_name") == kdu_name
and \
2770 vca_deployed
.get("vdu_count_index", 0) == vdu_index
and \
2771 vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
:
2774 # not found, create one.
2775 target
= "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
2777 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
2779 target
+= "/kdu/{}".format(kdu_name
)
2781 "target_element": target
,
2782 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2783 "member-vnf-index": member_vnf_index
,
2785 "kdu_name": kdu_name
,
2786 "vdu_count_index": vdu_index
,
2787 "operational-status": "init", # TODO revise
2788 "detailed-status": "", # TODO revise
2789 "step": "initial-deploy", # TODO revise
2791 "vdu_name": vdu_name
,
2793 "ee_descriptor_id": ee_descriptor_id
2797 # create VCA and configurationStatus in db
2799 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2800 "configurationStatus.{}".format(vca_index
): dict()
2802 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2804 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2807 task_n2vc
= asyncio
.ensure_future(
2808 self
.instantiate_N2VC(
2809 logging_text
=logging_text
,
2810 vca_index
=vca_index
,
2816 vdu_index
=vdu_index
,
2817 deploy_params
=deploy_params
,
2818 config_descriptor
=descriptor_config
,
2819 base_folder
=base_folder
,
2820 nslcmop_id
=nslcmop_id
,
2824 ee_config_descriptor
=ee_item
2827 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2828 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2829 member_vnf_index
or "", vdu_id
or "")
2832 def _get_terminate_config_primitive(primitive_list
, vca_deployed
):
2833 """ Get a sorted terminate config primitive list. In case ee_descriptor_id is present at vca_deployed,
2834 it get only those primitives for this execution envirom"""
2836 primitive_list
= primitive_list
or []
2837 # filter primitives by ee_descriptor_id
2838 ee_descriptor_id
= vca_deployed
.get("ee_descriptor_id")
2839 primitive_list
= [p
for p
in primitive_list
if p
.get("execution-environment-ref") == ee_descriptor_id
]
2842 primitive_list
.sort(key
=lambda val
: int(val
['seq']))
2844 return primitive_list
2847 def _create_nslcmop(nsr_id
, operation
, params
):
2849 Creates a ns-lcm-opp content to be stored at database.
2850 :param nsr_id: internal id of the instance
2851 :param operation: instantiate, terminate, scale, action, ...
2852 :param params: user parameters for the operation
2853 :return: dictionary following SOL005 format
2855 # Raise exception if invalid arguments
2856 if not (nsr_id
and operation
and params
):
2858 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2864 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2865 "operationState": "PROCESSING",
2866 "statusEnteredTime": now
,
2867 "nsInstanceId": nsr_id
,
2868 "lcmOperationType": operation
,
2870 "isAutomaticInvocation": False,
2871 "operationParams": params
,
2872 "isCancelPending": False,
2874 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2875 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2880 def _format_additional_params(self
, params
):
2881 params
= params
or {}
2882 for key
, value
in params
.items():
2883 if str(value
).startswith("!!yaml "):
2884 params
[key
] = yaml
.safe_load(value
[7:])
2887 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2888 primitive
= seq
.get('name')
2889 primitive_params
= {}
2891 "member_vnf_index": vnf_index
,
2892 "primitive": primitive
,
2893 "primitive_params": primitive_params
,
2896 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2900 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2901 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2902 if op
.get('operationState') == 'COMPLETED':
2903 # b. Skip sub-operation
2904 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2905 return self
.SUBOPERATION_STATUS_SKIP
2907 # c. retry executing sub-operation
2908 # The sub-operation exists, and operationState != 'COMPLETED'
2909 # Update operationState = 'PROCESSING' to indicate a retry.
2910 operationState
= 'PROCESSING'
2911 detailed_status
= 'In progress'
2912 self
._update
_suboperation
_status
(
2913 db_nslcmop
, op_index
, operationState
, detailed_status
)
2914 # Return the sub-operation index
2915 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2916 # with arguments extracted from the sub-operation
2919 # Find a sub-operation where all keys in a matching dictionary must match
2920 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2921 def _find_suboperation(self
, db_nslcmop
, match
):
2922 if db_nslcmop
and match
:
2923 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2924 for i
, op
in enumerate(op_list
):
2925 if all(op
.get(k
) == match
[k
] for k
in match
):
2927 return self
.SUBOPERATION_STATUS_NOT_FOUND
2929 # Update status for a sub-operation given its index
2930 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2931 # Update DB for HA tasks
2932 q_filter
= {'_id': db_nslcmop
['_id']}
2933 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2934 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2935 self
.db
.set_one("nslcmops",
2937 update_dict
=update_dict
,
2938 fail_on_empty
=False)
2940 # Add sub-operation, return the index of the added sub-operation
2941 # Optionally, set operationState, detailed-status, and operationType
2942 # Status and type are currently set for 'scale' sub-operations:
2943 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2944 # 'detailed-status' : status message
2945 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2946 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2947 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2948 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2949 RO_nsr_id
=None, RO_scaling_info
=None):
2951 return self
.SUBOPERATION_STATUS_NOT_FOUND
2952 # Get the "_admin.operations" list, if it exists
2953 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2954 op_list
= db_nslcmop_admin
.get('operations')
2955 # Create or append to the "_admin.operations" list
2956 new_op
= {'member_vnf_index': vnf_index
,
2958 'vdu_count_index': vdu_count_index
,
2959 'primitive': primitive
,
2960 'primitive_params': mapped_primitive_params
}
2962 new_op
['operationState'] = operationState
2964 new_op
['detailed-status'] = detailed_status
2966 new_op
['lcmOperationType'] = operationType
2968 new_op
['RO_nsr_id'] = RO_nsr_id
2970 new_op
['RO_scaling_info'] = RO_scaling_info
2972 # No existing operations, create key 'operations' with current operation as first list element
2973 db_nslcmop_admin
.update({'operations': [new_op
]})
2974 op_list
= db_nslcmop_admin
.get('operations')
2976 # Existing operations, append operation to list
2977 op_list
.append(new_op
)
2979 db_nslcmop_update
= {'_admin.operations': op_list
}
2980 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2981 op_index
= len(op_list
) - 1
2984 # Helper methods for scale() sub-operations
2986 # pre-scale/post-scale:
2987 # Check for 3 different cases:
2988 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2989 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2990 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2991 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2992 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2993 # Find this sub-operation
2994 if RO_nsr_id
and RO_scaling_info
:
2995 operationType
= 'SCALE-RO'
2997 'member_vnf_index': vnf_index
,
2998 'RO_nsr_id': RO_nsr_id
,
2999 'RO_scaling_info': RO_scaling_info
,
3003 'member_vnf_index': vnf_index
,
3004 'primitive': vnf_config_primitive
,
3005 'primitive_params': primitive_params
,
3006 'lcmOperationType': operationType
3008 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
3009 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
3010 # a. New sub-operation
3011 # The sub-operation does not exist, add it.
3012 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
3013 # The following parameters are set to None for all kind of scaling:
3015 vdu_count_index
= None
3017 if RO_nsr_id
and RO_scaling_info
:
3018 vnf_config_primitive
= None
3019 primitive_params
= None
3022 RO_scaling_info
= None
3023 # Initial status for sub-operation
3024 operationState
= 'PROCESSING'
3025 detailed_status
= 'In progress'
3026 # Add sub-operation for pre/post-scaling (zero or more operations)
3027 self
._add
_suboperation
(db_nslcmop
,
3032 vnf_config_primitive
,
3039 return self
.SUBOPERATION_STATUS_NEW
3041 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
3042 # or op_index (operationState != 'COMPLETED')
3043 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
3045 # Function to return execution_environment id
3047 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
3048 # TODO vdu_index_count
3049 for vca
in vca_deployed_list
:
3050 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
3053 async def destroy_N2VC(self
, logging_text
, db_nslcmop
, vca_deployed
, config_descriptor
,
3054 vca_index
, destroy_ee
=True, exec_primitives
=True):
3056 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
3057 :param logging_text:
3059 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
3060 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
3061 :param vca_index: index in the database _admin.deployed.VCA
3062 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
3063 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
3064 not executed properly
3065 :return: None or exception
3069 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
3070 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
3074 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
3076 # execute terminate_primitives
3078 terminate_primitives
= self
._get
_terminate
_config
_primitive
(
3079 config_descriptor
.get("terminate-config-primitive"), vca_deployed
)
3080 vdu_id
= vca_deployed
.get("vdu_id")
3081 vdu_count_index
= vca_deployed
.get("vdu_count_index")
3082 vdu_name
= vca_deployed
.get("vdu_name")
3083 vnf_index
= vca_deployed
.get("member-vnf-index")
3084 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
3085 for seq
in terminate_primitives
:
3086 # For each sequence in list, get primitive and call _ns_execute_primitive()
3087 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
3088 vnf_index
, seq
.get("name"))
3089 self
.logger
.debug(logging_text
+ step
)
3090 # Create the primitive for each sequence, i.e. "primitive": "touch"
3091 primitive
= seq
.get('name')
3092 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
3095 self
._add
_suboperation
(db_nslcmop
,
3101 mapped_primitive_params
)
3102 # Sub-operations: Call _ns_execute_primitive() instead of action()
3104 result
, result_detail
= await self
._ns
_execute
_primitive
(vca_deployed
["ee_id"], primitive
,
3105 mapped_primitive_params
,
3107 except LcmException
:
3108 # this happens when VCA is not deployed. In this case it is not needed to terminate
3110 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
3111 if result
not in result_ok
:
3112 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
3113 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
3114 # set that this VCA do not need terminated
3115 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
3116 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
3118 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
3119 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
3122 await self
.vca_map
[vca_type
].delete_execution_environment(vca_deployed
["ee_id"])
3124 async def _delete_all_N2VC(self
, db_nsr
: dict):
3125 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
3126 namespace
= "." + db_nsr
["_id"]
3128 await self
.n2vc
.delete_namespace(namespace
=namespace
, total_timeout
=self
.timeout_charm_delete
)
3129 except N2VCNotFound
: # already deleted. Skip
3131 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
3133 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
3135 Terminates a deployment from RO
3136 :param logging_text:
3137 :param nsr_deployed: db_nsr._admin.deployed
3140 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
3141 this method will update only the index 2, but it will write on database the concatenated content of the list
3146 ro_nsr_id
= ro_delete_action
= None
3147 if nsr_deployed
and nsr_deployed
.get("RO"):
3148 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
3149 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
3152 stage
[2] = "Deleting ns from VIM."
3153 db_nsr_update
["detailed-status"] = " ".join(stage
)
3154 self
._write
_op
_status
(nslcmop_id
, stage
)
3155 self
.logger
.debug(logging_text
+ stage
[2])
3156 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3157 self
._write
_op
_status
(nslcmop_id
, stage
)
3158 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
3159 ro_delete_action
= desc
["action_id"]
3160 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
3161 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3162 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3163 if ro_delete_action
:
3164 # wait until NS is deleted from VIM
3165 stage
[2] = "Waiting ns deleted from VIM."
3166 detailed_status_old
= None
3167 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
3169 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3170 self
._write
_op
_status
(nslcmop_id
, stage
)
3172 delete_timeout
= 20 * 60 # 20 minutes
3173 while delete_timeout
> 0:
3174 desc
= await self
.RO
.show(
3176 item_id_name
=ro_nsr_id
,
3177 extra_item
="action",
3178 extra_item_id
=ro_delete_action
)
3181 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
3183 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
3184 if ns_status
== "ERROR":
3185 raise ROclient
.ROClientException(ns_status_info
)
3186 elif ns_status
== "BUILD":
3187 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
3188 elif ns_status
== "ACTIVE":
3189 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3190 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3193 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
3194 if stage
[2] != detailed_status_old
:
3195 detailed_status_old
= stage
[2]
3196 db_nsr_update
["detailed-status"] = " ".join(stage
)
3197 self
._write
_op
_status
(nslcmop_id
, stage
)
3198 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3199 await asyncio
.sleep(5, loop
=self
.loop
)
3201 else: # delete_timeout <= 0:
3202 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
3204 except Exception as e
:
3205 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3206 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3207 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3208 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3209 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3210 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
3211 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3212 failed_detail
.append("delete conflict: {}".format(e
))
3213 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
3215 failed_detail
.append("delete error: {}".format(e
))
3216 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
3219 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
3220 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
3222 stage
[2] = "Deleting nsd from RO."
3223 db_nsr_update
["detailed-status"] = " ".join(stage
)
3224 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3225 self
._write
_op
_status
(nslcmop_id
, stage
)
3226 await self
.RO
.delete("nsd", ro_nsd_id
)
3227 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
3228 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3229 except Exception as e
:
3230 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3231 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3232 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
3233 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3234 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
3235 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3237 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
3238 self
.logger
.error(logging_text
+ failed_detail
[-1])
3240 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
3241 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
3242 if not vnf_deployed
or not vnf_deployed
["id"]:
3245 ro_vnfd_id
= vnf_deployed
["id"]
3246 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3247 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
3248 db_nsr_update
["detailed-status"] = " ".join(stage
)
3249 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3250 self
._write
_op
_status
(nslcmop_id
, stage
)
3251 await self
.RO
.delete("vnfd", ro_vnfd_id
)
3252 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
3253 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
3254 except Exception as e
:
3255 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3256 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
3257 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
3258 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3259 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
3260 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3262 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
3263 self
.logger
.error(logging_text
+ failed_detail
[-1])
3266 stage
[2] = "Error deleting from VIM"
3268 stage
[2] = "Deleted from VIM"
3269 db_nsr_update
["detailed-status"] = " ".join(stage
)
3270 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3271 self
._write
_op
_status
(nslcmop_id
, stage
)
3274 raise LcmException("; ".join(failed_detail
))
3276 async def terminate(self
, nsr_id
, nslcmop_id
):
3277 # Try to lock HA task here
3278 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3279 if not task_is_locked_by_me
:
3282 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
3283 self
.logger
.debug(logging_text
+ "Enter")
3284 timeout_ns_terminate
= self
.timeout_ns_terminate
3287 operation_params
= None
3289 error_list
= [] # annotates all failed error messages
3290 db_nslcmop_update
= {}
3291 autoremove
= False # autoremove after terminated
3292 tasks_dict_info
= {}
3294 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3295 # ^ contains [stage, step, VIM-status]
3297 # wait for any previous tasks in process
3298 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
3300 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
3301 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3302 operation_params
= db_nslcmop
.get("operationParams") or {}
3303 if operation_params
.get("timeout_ns_terminate"):
3304 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
3305 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
3306 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3308 db_nsr_update
["operational-status"] = "terminating"
3309 db_nsr_update
["config-status"] = "terminating"
3310 self
._write
_ns
_status
(
3312 ns_state
="TERMINATING",
3313 current_operation
="TERMINATING",
3314 current_operation_id
=nslcmop_id
,
3315 other_update
=db_nsr_update
3317 self
._write
_op
_status
(
3322 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
3323 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
3326 stage
[1] = "Getting vnf descriptors from db."
3327 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
3328 db_vnfds_from_id
= {}
3329 db_vnfds_from_member_index
= {}
3331 for vnfr
in db_vnfrs_list
:
3332 vnfd_id
= vnfr
["vnfd-id"]
3333 if vnfd_id
not in db_vnfds_from_id
:
3334 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3335 db_vnfds_from_id
[vnfd_id
] = vnfd
3336 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3338 # Destroy individual execution environments when there are terminating primitives.
3339 # Rest of EE will be deleted at once
3340 # TODO - check before calling _destroy_N2VC
3341 # if not operation_params.get("skip_terminate_primitives"):#
3342 # or not vca.get("needed_terminate"):
3343 stage
[0] = "Stage 2/3 execute terminating primitives."
3344 self
.logger
.debug(logging_text
+ stage
[0])
3345 stage
[1] = "Looking execution environment that needs terminate."
3346 self
.logger
.debug(logging_text
+ stage
[1])
3347 # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3348 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3349 config_descriptor
= None
3350 if not vca
or not vca
.get("ee_id"):
3352 if not vca
.get("member-vnf-index"):
3354 config_descriptor
= db_nsr
.get("ns-configuration")
3355 elif vca
.get("vdu_id"):
3356 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3357 vdud
= next((vdu
for vdu
in db_vnfd
.get("vdu", ()) if vdu
["id"] == vca
.get("vdu_id")), None)
3359 config_descriptor
= vdud
.get("vdu-configuration")
3360 elif vca
.get("kdu_name"):
3361 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3362 kdud
= next((kdu
for kdu
in db_vnfd
.get("kdu", ()) if kdu
["name"] == vca
.get("kdu_name")), None)
3364 config_descriptor
= kdud
.get("kdu-configuration")
3366 config_descriptor
= db_vnfds_from_member_index
[vca
["member-vnf-index"]].get("vnf-configuration")
3367 vca_type
= vca
.get("type")
3368 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3369 vca
.get("needed_terminate"))
3370 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3371 # pending native charms
3372 destroy_ee
= True if vca_type
in ("helm", "helm-v3", "native_charm") else False
3373 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3374 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3375 task
= asyncio
.ensure_future(
3376 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
, vca_index
,
3377 destroy_ee
, exec_terminate_primitives
))
3378 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3380 # wait for pending tasks of terminate primitives
3382 self
.logger
.debug(logging_text
+ 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
3383 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3384 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3386 tasks_dict_info
.clear()
3388 return # raise LcmException("; ".join(error_list))
3390 # remove All execution environments at once
3391 stage
[0] = "Stage 3/3 delete all."
3393 if nsr_deployed
.get("VCA"):
3394 stage
[1] = "Deleting all execution environments."
3395 self
.logger
.debug(logging_text
+ stage
[1])
3396 task_delete_ee
= asyncio
.ensure_future(asyncio
.wait_for(self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
),
3397 timeout
=self
.timeout_charm_delete
))
3398 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3399 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3401 # Delete from k8scluster
3402 stage
[1] = "Deleting KDUs."
3403 self
.logger
.debug(logging_text
+ stage
[1])
3404 # print(nsr_deployed)
3405 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3406 if not kdu
or not kdu
.get("kdu-instance"):
3408 kdu_instance
= kdu
.get("kdu-instance")
3409 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3410 task_delete_kdu_instance
= asyncio
.ensure_future(
3411 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3412 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3413 kdu_instance
=kdu_instance
))
3415 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3416 format(kdu
.get("k8scluster-type")))
3418 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3421 stage
[1] = "Deleting ns from VIM."
3423 task_delete_ro
= asyncio
.ensure_future(
3424 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3426 task_delete_ro
= asyncio
.ensure_future(
3427 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3428 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3430 # rest of staff will be done at finally
3432 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3433 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3435 except asyncio
.CancelledError
:
3436 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3437 exc
= "Operation was cancelled"
3438 except Exception as e
:
3439 exc
= traceback
.format_exc()
3440 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3443 error_list
.append(str(exc
))
3445 # wait for pending tasks
3447 stage
[1] = "Waiting for terminate pending tasks."
3448 self
.logger
.debug(logging_text
+ stage
[1])
3449 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3451 stage
[1] = stage
[2] = ""
3452 except asyncio
.CancelledError
:
3453 error_list
.append("Cancelled")
3454 # TODO cancell all tasks
3455 except Exception as exc
:
3456 error_list
.append(str(exc
))
3457 # update status at database
3459 error_detail
= "; ".join(error_list
)
3460 # self.logger.error(logging_text + error_detail)
3461 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
3462 error_description_nsr
= 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id
, stage
[0])
3464 db_nsr_update
["operational-status"] = "failed"
3465 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3466 db_nslcmop_update
["detailed-status"] = error_detail
3467 nslcmop_operation_state
= "FAILED"
3471 error_description_nsr
= error_description_nslcmop
= None
3472 ns_state
= "NOT_INSTANTIATED"
3473 db_nsr_update
["operational-status"] = "terminated"
3474 db_nsr_update
["detailed-status"] = "Done"
3475 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3476 db_nslcmop_update
["detailed-status"] = "Done"
3477 nslcmop_operation_state
= "COMPLETED"
3480 self
._write
_ns
_status
(
3483 current_operation
="IDLE",
3484 current_operation_id
=None,
3485 error_description
=error_description_nsr
,
3486 error_detail
=error_detail
,
3487 other_update
=db_nsr_update
3489 self
._write
_op
_status
(
3492 error_message
=error_description_nslcmop
,
3493 operation_state
=nslcmop_operation_state
,
3494 other_update
=db_nslcmop_update
,
3496 if ns_state
== "NOT_INSTANTIATED":
3498 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "NOT_INSTANTIATED"})
3499 except DbException
as e
:
3500 self
.logger
.warn(logging_text
+ 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3502 if operation_params
:
3503 autoremove
= operation_params
.get("autoremove", False)
3504 if nslcmop_operation_state
:
3506 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3507 "operationState": nslcmop_operation_state
,
3508 "autoremove": autoremove
},
3510 except Exception as e
:
3511 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3513 self
.logger
.debug(logging_text
+ "Exit")
3514 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3516 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3518 error_detail_list
= []
3520 pending_tasks
= list(created_tasks_info
.keys())
3521 num_tasks
= len(pending_tasks
)
3523 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3524 self
._write
_op
_status
(nslcmop_id
, stage
)
3525 while pending_tasks
:
3527 _timeout
= timeout
+ time_start
- time()
3528 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3529 return_when
=asyncio
.FIRST_COMPLETED
)
3530 num_done
+= len(done
)
3531 if not done
: # Timeout
3532 for task
in pending_tasks
:
3533 new_error
= created_tasks_info
[task
] + ": Timeout"
3534 error_detail_list
.append(new_error
)
3535 error_list
.append(new_error
)
3538 if task
.cancelled():
3541 exc
= task
.exception()
3543 if isinstance(exc
, asyncio
.TimeoutError
):
3545 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3546 error_list
.append(created_tasks_info
[task
])
3547 error_detail_list
.append(new_error
)
3548 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3550 self
.logger
.error(logging_text
+ new_error
)
3552 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3553 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + exc_traceback
)
3555 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3556 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3558 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3559 if nsr_id
: # update also nsr
3560 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3561 "errorDetail": ". ".join(error_detail_list
)})
3562 self
._write
_op
_status
(nslcmop_id
, stage
)
3563 return error_detail_list
3566 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3568 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3569 The default-value is used. If it is between < > it look for a value at instantiation_params
3570 :param primitive_desc: portion of VNFD/NSD that describes primitive
3571 :param params: Params provided by user
3572 :param instantiation_params: Instantiation params provided by user
3573 :return: a dictionary with the calculated params
3575 calculated_params
= {}
3576 for parameter
in primitive_desc
.get("parameter", ()):
3577 param_name
= parameter
["name"]
3578 if param_name
in params
:
3579 calculated_params
[param_name
] = params
[param_name
]
3580 elif "default-value" in parameter
or "value" in parameter
:
3581 if "value" in parameter
:
3582 calculated_params
[param_name
] = parameter
["value"]
3584 calculated_params
[param_name
] = parameter
["default-value"]
3585 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3586 and calculated_params
[param_name
].endswith(">"):
3587 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3588 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3590 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3591 format(calculated_params
[param_name
], primitive_desc
["name"]))
3593 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3594 format(param_name
, primitive_desc
["name"]))
3596 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3597 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
], default_flow_style
=True,
3599 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3600 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3601 if parameter
.get("data-type") == "INTEGER":
3603 calculated_params
[param_name
] = int(calculated_params
[param_name
])
3604 except ValueError: # error converting string to int
3606 "Parameter {} of primitive {} must be integer".format(param_name
, primitive_desc
["name"]))
3607 elif parameter
.get("data-type") == "BOOLEAN":
3608 calculated_params
[param_name
] = not ((str(calculated_params
[param_name
])).lower() == 'false')
3610 # add always ns_config_info if primitive name is config
3611 if primitive_desc
["name"] == "config":
3612 if "ns_config_info" in instantiation_params
:
3613 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3614 return calculated_params
3616 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None,
3617 ee_descriptor_id
=None):
3618 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3619 for vca
in deployed_vca
:
3622 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3624 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3626 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3628 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
3632 # vca_deployed not found
3633 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3634 " is not deployed".format(member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
,
3638 ee_id
= vca
.get("ee_id")
3639 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3641 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3642 "execution environment"
3643 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3644 return ee_id
, vca_type
3646 async def _ns_execute_primitive(self
, ee_id
, primitive
, primitive_params
, retries
=0,
3647 retries_interval
=30, timeout
=None,
3648 vca_type
=None, db_dict
=None) -> (str, str):
3650 if primitive
== "config":
3651 primitive_params
= {"params": primitive_params
}
3653 vca_type
= vca_type
or "lxc_proxy_charm"
3657 output
= await asyncio
.wait_for(
3658 self
.vca_map
[vca_type
].exec_primitive(
3660 primitive_name
=primitive
,
3661 params_dict
=primitive_params
,
3662 progress_timeout
=self
.timeout_progress_primitive
,
3663 total_timeout
=self
.timeout_primitive
,
3665 timeout
=timeout
or self
.timeout_primitive
)
3668 except asyncio
.CancelledError
:
3670 except Exception as e
: # asyncio.TimeoutError
3671 if isinstance(e
, asyncio
.TimeoutError
):
3675 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3677 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3679 return 'FAILED', str(e
)
3681 return 'COMPLETED', output
3683 except (LcmException
, asyncio
.CancelledError
):
3685 except Exception as e
:
3686 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3688 async def action(self
, nsr_id
, nslcmop_id
):
3690 # Try to lock HA task here
3691 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3692 if not task_is_locked_by_me
:
3695 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3696 self
.logger
.debug(logging_text
+ "Enter")
3697 # get all needed from database
3701 db_nslcmop_update
= {}
3702 nslcmop_operation_state
= None
3703 error_description_nslcmop
= None
3706 # wait for any previous tasks in process
3707 step
= "Waiting for previous operations to terminate"
3708 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3710 self
._write
_ns
_status
(
3713 current_operation
="RUNNING ACTION",
3714 current_operation_id
=nslcmop_id
3717 step
= "Getting information from database"
3718 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3719 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3721 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3722 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3723 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3724 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3725 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3726 primitive
= db_nslcmop
["operationParams"]["primitive"]
3727 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3728 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3731 step
= "Getting vnfr from database"
3732 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3733 step
= "Getting vnfd from database"
3734 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3736 step
= "Getting nsd from database"
3737 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3739 # for backward compatibility
3740 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3741 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3742 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3743 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3745 # look for primitive
3746 config_primitive_desc
= descriptor_configuration
= None
3748 for vdu
in get_iterable(db_vnfd
, "vdu"):
3749 if vdu_id
== vdu
["id"]:
3750 descriptor_configuration
= vdu
.get("vdu-configuration")
3753 for kdu
in get_iterable(db_vnfd
, "kdu"):
3754 if kdu_name
== kdu
["name"]:
3755 descriptor_configuration
= kdu
.get("kdu-configuration")
3758 descriptor_configuration
= db_vnfd
.get("vnf-configuration")
3760 descriptor_configuration
= db_nsd
.get("ns-configuration")
3762 if descriptor_configuration
and descriptor_configuration
.get("config-primitive"):
3763 for config_primitive
in descriptor_configuration
["config-primitive"]:
3764 if config_primitive
["name"] == primitive
:
3765 config_primitive_desc
= config_primitive
3768 if not config_primitive_desc
:
3769 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3770 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3772 primitive_name
= primitive
3773 ee_descriptor_id
= None
3775 primitive_name
= config_primitive_desc
.get("execution-environment-primitive", primitive
)
3776 ee_descriptor_id
= config_primitive_desc
.get("execution-environment-ref")
3780 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3781 desc_params
= self
._format
_additional
_params
(vdur
.get("additionalParams"))
3783 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3784 desc_params
= self
._format
_additional
_params
(kdur
.get("additionalParams"))
3786 desc_params
= self
._format
_additional
_params
(db_vnfr
.get("additionalParamsForVnf"))
3788 desc_params
= self
._format
_additional
_params
(db_nsr
.get("additionalParamsForNs"))
3791 kdu_action
= True if not deep_get(kdu
, ("kdu-configuration", "juju")) else False
3793 # TODO check if ns is in a proper status
3794 if kdu_name
and (primitive_name
in ("upgrade", "rollback", "status") or kdu_action
):
3795 # kdur and desc_params already set from before
3796 if primitive_params
:
3797 desc_params
.update(primitive_params
)
3798 # TODO Check if we will need something at vnf level
3799 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3800 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3803 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3805 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3806 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3807 raise LcmException(msg
)
3809 db_dict
= {"collection": "nsrs",
3810 "filter": {"_id": nsr_id
},
3811 "path": "_admin.deployed.K8s.{}".format(index
)}
3812 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
))
3813 step
= "Executing kdu {}".format(primitive_name
)
3814 if primitive_name
== "upgrade":
3815 if desc_params
.get("kdu_model"):
3816 kdu_model
= desc_params
.get("kdu_model")
3817 del desc_params
["kdu_model"]
3819 kdu_model
= kdu
.get("kdu-model")
3820 parts
= kdu_model
.split(sep
=":")
3822 kdu_model
= parts
[0]
3824 detailed_status
= await asyncio
.wait_for(
3825 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3826 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3827 kdu_instance
=kdu
.get("kdu-instance"),
3828 atomic
=True, kdu_model
=kdu_model
,
3829 params
=desc_params
, db_dict
=db_dict
,
3830 timeout
=timeout_ns_action
),
3831 timeout
=timeout_ns_action
+ 10)
3832 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3833 elif primitive_name
== "rollback":
3834 detailed_status
= await asyncio
.wait_for(
3835 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3836 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3837 kdu_instance
=kdu
.get("kdu-instance"),
3839 timeout
=timeout_ns_action
)
3840 elif primitive_name
== "status":
3841 detailed_status
= await asyncio
.wait_for(
3842 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3843 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3844 kdu_instance
=kdu
.get("kdu-instance")),
3845 timeout
=timeout_ns_action
)
3847 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3848 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3850 detailed_status
= await asyncio
.wait_for(
3851 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3852 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3853 kdu_instance
=kdu_instance
,
3854 primitive_name
=primitive_name
,
3855 params
=params
, db_dict
=db_dict
,
3856 timeout
=timeout_ns_action
),
3857 timeout
=timeout_ns_action
)
3860 nslcmop_operation_state
= 'COMPLETED'
3862 detailed_status
= ''
3863 nslcmop_operation_state
= 'FAILED'
3865 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3866 member_vnf_index
=vnf_index
,
3868 vdu_count_index
=vdu_count_index
,
3869 ee_descriptor_id
=ee_descriptor_id
)
3870 db_nslcmop_notif
= {"collection": "nslcmops",
3871 "filter": {"_id": nslcmop_id
},
3872 "path": "admin.VCA"}
3873 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3875 primitive
=primitive_name
,
3876 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3877 timeout
=timeout_ns_action
,
3879 db_dict
=db_nslcmop_notif
)
3881 db_nslcmop_update
["detailed-status"] = detailed_status
3882 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3883 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3885 return # database update is called inside finally
3887 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3888 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3890 except asyncio
.CancelledError
:
3891 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3892 exc
= "Operation was cancelled"
3893 except asyncio
.TimeoutError
:
3894 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3896 except Exception as e
:
3897 exc
= traceback
.format_exc()
3898 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3901 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3902 "FAILED {}: {}".format(step
, exc
)
3903 nslcmop_operation_state
= "FAILED"
3905 self
._write
_ns
_status
(
3907 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3908 current_operation
="IDLE",
3909 current_operation_id
=None,
3910 # error_description=error_description_nsr,
3911 # error_detail=error_detail,
3912 other_update
=db_nsr_update
3915 self
._write
_op
_status
(
3918 error_message
=error_description_nslcmop
,
3919 operation_state
=nslcmop_operation_state
,
3920 other_update
=db_nslcmop_update
,
3923 if nslcmop_operation_state
:
3925 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3926 "operationState": nslcmop_operation_state
},
3928 except Exception as e
:
3929 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3930 self
.logger
.debug(logging_text
+ "Exit")
3931 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3932 return nslcmop_operation_state
, detailed_status
3934 async def scale(self
, nsr_id
, nslcmop_id
):
3936 # Try to lock HA task here
3937 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3938 if not task_is_locked_by_me
:
3941 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3942 self
.logger
.debug(logging_text
+ "Enter")
3943 # get all needed from database
3946 db_nslcmop_update
= {}
3947 nslcmop_operation_state
= None
3950 # in case of error, indicates what part of scale was failed to put nsr at error status
3951 scale_process
= None
3952 old_operational_status
= ""
3953 old_config_status
= ""
3955 # wait for any previous tasks in process
3956 step
= "Waiting for previous operations to terminate"
3957 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3959 self
._write
_ns
_status
(
3962 current_operation
="SCALING",
3963 current_operation_id
=nslcmop_id
3966 step
= "Getting nslcmop from database"
3967 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3968 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3969 step
= "Getting nsr from database"
3970 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3972 old_operational_status
= db_nsr
["operational-status"]
3973 old_config_status
= db_nsr
["config-status"]
3974 step
= "Parsing scaling parameters"
3975 # self.logger.debug(step)
3976 db_nsr_update
["operational-status"] = "scaling"
3977 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3978 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3981 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3982 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3983 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3984 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3985 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3988 RO_nsr_id
= nsr_deployed
["RO"]["nsr_id"]
3989 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3990 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3991 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
3992 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3994 # for backward compatibility
3995 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3996 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3997 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3998 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4000 step
= "Getting vnfr from database"
4001 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
4002 step
= "Getting vnfd from database"
4003 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
4005 step
= "Getting scaling-group-descriptor"
4006 for scaling_descriptor
in db_vnfd
["scaling-group-descriptor"]:
4007 if scaling_descriptor
["name"] == scaling_group
:
4010 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
4011 "at vnfd:scaling-group-descriptor".format(scaling_group
))
4014 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
4015 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
4016 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
4019 # TODO check if ns is in a proper status
4020 step
= "Sending scale order to VIM"
4022 if not db_nsr
["_admin"].get("scaling-group"):
4023 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
4024 admin_scale_index
= 0
4026 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
4027 if admin_scale_info
["name"] == scaling_group
:
4028 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
4030 else: # not found, set index one plus last element and add new entry with the name
4031 admin_scale_index
+= 1
4032 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
4033 RO_scaling_info
= []
4034 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
4035 if scaling_type
== "SCALE_OUT":
4036 # count if max-instance-count is reached
4037 max_instance_count
= scaling_descriptor
.get("max-instance-count", 10)
4038 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
4039 if nb_scale_op
>= max_instance_count
:
4040 raise LcmException("reached the limit of {} (max-instance-count) "
4041 "scaling-out operations for the "
4042 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
4045 vdu_scaling_info
["scaling_direction"] = "OUT"
4046 vdu_scaling_info
["vdu-create"] = {}
4047 for vdu_scale_info
in scaling_descriptor
["vdu"]:
4048 vdud
= next(vdu
for vdu
in db_vnfd
.get("vdu") if vdu
["id"] == vdu_scale_info
["vdu-id-ref"])
4049 vdu_index
= len([x
for x
in db_vnfr
.get("vdur", ())
4050 if x
.get("vdu-id-ref") == vdu_scale_info
["vdu-id-ref"] and
4051 x
.get("member-vnf-index-ref") == vnf_index
])
4052 cloud_init_text
= self
._get
_cloud
_init
(vdud
, db_vnfd
)
4054 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"]) or {}
4055 cloud_init_list
= []
4056 for x
in range(vdu_scale_info
.get("count", 1)):
4058 # TODO Information of its own ip is not available because db_vnfr is not updated.
4059 additional_params
["OSM"] = self
._get
_osm
_params
(db_vnfr
, vdu_scale_info
["vdu-id-ref"],
4061 cloud_init_list
.append(self
._parse
_cloud
_init
(cloud_init_text
, additional_params
,
4062 db_vnfd
["id"], vdud
["id"]))
4063 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
4064 "type": "create", "count": vdu_scale_info
.get("count", 1)})
4066 RO_scaling_info
[-1]["cloud_init"] = cloud_init_list
4067 vdu_scaling_info
["vdu-create"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
4069 elif scaling_type
== "SCALE_IN":
4070 # count if min-instance-count is reached
4071 min_instance_count
= 0
4072 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
4073 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
4074 if nb_scale_op
<= min_instance_count
:
4075 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
4076 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
4078 vdu_scaling_info
["scaling_direction"] = "IN"
4079 vdu_scaling_info
["vdu-delete"] = {}
4080 for vdu_scale_info
in scaling_descriptor
["vdu"]:
4081 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
4082 "type": "delete", "count": vdu_scale_info
.get("count", 1)})
4083 vdu_scaling_info
["vdu-delete"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
4085 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
4086 vdu_create
= vdu_scaling_info
.get("vdu-create")
4087 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
4088 if vdu_scaling_info
["scaling_direction"] == "IN":
4089 for vdur
in reversed(db_vnfr
["vdur"]):
4090 if vdu_delete
.get(vdur
["vdu-id-ref"]):
4091 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
4092 vdu_scaling_info
["vdu"].append({
4093 "name": vdur
["name"],
4094 "vdu_id": vdur
["vdu-id-ref"],
4097 for interface
in vdur
["interfaces"]:
4098 vdu_scaling_info
["vdu"][-1]["interface"].append({
4099 "name": interface
["name"],
4100 "ip_address": interface
["ip-address"],
4101 "mac_address": interface
.get("mac-address"),
4103 vdu_delete
= vdu_scaling_info
.pop("vdu-delete")
4106 step
= "Executing pre-scale vnf-config-primitive"
4107 if scaling_descriptor
.get("scaling-config-action"):
4108 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
4109 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
4110 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
4111 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
4112 step
= db_nslcmop_update
["detailed-status"] = \
4113 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
4115 # look for primitive
4116 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
4117 if config_primitive
["name"] == vnf_config_primitive
:
4121 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
4122 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
4123 "primitive".format(scaling_group
, vnf_config_primitive
))
4125 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
4126 if db_vnfr
.get("additionalParamsForVnf"):
4127 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
4129 scale_process
= "VCA"
4130 db_nsr_update
["config-status"] = "configuring pre-scaling"
4131 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
4133 # Pre-scale retry check: Check if this sub-operation has been executed before
4134 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4135 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
4136 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4137 # Skip sub-operation
4138 result
= 'COMPLETED'
4139 result_detail
= 'Done'
4140 self
.logger
.debug(logging_text
+
4141 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
4142 vnf_config_primitive
, result
, result_detail
))
4144 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4145 # New sub-operation: Get index of this sub-operation
4146 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4147 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
4148 format(vnf_config_primitive
))
4150 # retry: Get registered params for this existing sub-operation
4151 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4152 vnf_index
= op
.get('member_vnf_index')
4153 vnf_config_primitive
= op
.get('primitive')
4154 primitive_params
= op
.get('primitive_params')
4155 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
4156 format(vnf_config_primitive
))
4157 # Execute the primitive, either with new (first-time) or registered (reintent) args
4158 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
4159 primitive_name
= config_primitive
.get("execution-environment-primitive",
4160 vnf_config_primitive
)
4161 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
4162 member_vnf_index
=vnf_index
,
4164 vdu_count_index
=None,
4165 ee_descriptor_id
=ee_descriptor_id
)
4166 result
, result_detail
= await self
._ns
_execute
_primitive
(
4167 ee_id
, primitive_name
, primitive_params
, vca_type
)
4168 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
4169 vnf_config_primitive
, result
, result_detail
))
4170 # Update operationState = COMPLETED | FAILED
4171 self
._update
_suboperation
_status
(
4172 db_nslcmop
, op_index
, result
, result_detail
)
4174 if result
== "FAILED":
4175 raise LcmException(result_detail
)
4176 db_nsr_update
["config-status"] = old_config_status
4177 scale_process
= None
4181 # Should this block be skipped if 'RO_nsr_id' == None ?
4182 # if (RO_nsr_id and RO_scaling_info):
4184 scale_process
= "RO"
4185 # Scale RO retry check: Check if this sub-operation has been executed before
4186 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4187 db_nslcmop
, vnf_index
, None, None, 'SCALE-RO', RO_nsr_id
, RO_scaling_info
)
4188 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4189 # Skip sub-operation
4190 result
= 'COMPLETED'
4191 result_detail
= 'Done'
4192 self
.logger
.debug(logging_text
+ "Skipped sub-operation RO, result {} {}".format(
4193 result
, result_detail
))
4195 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4196 # New sub-operation: Get index of this sub-operation
4197 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4198 self
.logger
.debug(logging_text
+ "New sub-operation RO")
4200 # retry: Get registered params for this existing sub-operation
4201 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4202 RO_nsr_id
= op
.get('RO_nsr_id')
4203 RO_scaling_info
= op
.get('RO_scaling_info')
4204 self
.logger
.debug(logging_text
+ "Sub-operation RO retry for primitive {}".format(
4205 vnf_config_primitive
))
4207 RO_desc
= await self
.RO
.create_action("ns", RO_nsr_id
, {"vdu-scaling": RO_scaling_info
})
4208 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
4209 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
4211 RO_nslcmop_id
= RO_desc
["instance_action_id"]
4212 db_nslcmop_update
["_admin.deploy.RO"] = RO_nslcmop_id
4214 RO_task_done
= False
4215 step
= detailed_status
= "Waiting for VIM to scale. RO_task_id={}.".format(RO_nslcmop_id
)
4216 detailed_status_old
= None
4217 self
.logger
.debug(logging_text
+ step
)
4219 deployment_timeout
= 1 * 3600 # One hour
4220 while deployment_timeout
> 0:
4221 if not RO_task_done
:
4222 desc
= await self
.RO
.show("ns", item_id_name
=RO_nsr_id
, extra_item
="action",
4223 extra_item_id
=RO_nslcmop_id
)
4226 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4228 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
4229 if ns_status
== "ERROR":
4230 raise ROclient
.ROClientException(ns_status_info
)
4231 elif ns_status
== "BUILD":
4232 detailed_status
= step
+ "; {}".format(ns_status_info
)
4233 elif ns_status
== "ACTIVE":
4235 self
.scale_vnfr(db_vnfr
, vdu_create
=vdu_create
, vdu_delete
=vdu_delete
)
4236 step
= detailed_status
= "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id
)
4237 self
.logger
.debug(logging_text
+ step
)
4239 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
4241 desc
= await self
.RO
.show("ns", RO_nsr_id
)
4242 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
4244 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4246 if ns_status
== "ERROR":
4247 raise ROclient
.ROClientException(ns_status_info
)
4248 elif ns_status
== "BUILD":
4249 detailed_status
= step
+ "; {}".format(ns_status_info
)
4250 elif ns_status
== "ACTIVE":
4251 step
= detailed_status
= \
4252 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4254 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4255 self
.ns_update_vnfr({db_vnfr
["member-vnf-index-ref"]: db_vnfr
}, desc
)
4257 except LcmExceptionNoMgmtIP
:
4260 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
4261 if detailed_status
!= detailed_status_old
:
4262 self
._update
_suboperation
_status
(
4263 db_nslcmop
, op_index
, 'COMPLETED', detailed_status
)
4264 detailed_status_old
= db_nslcmop_update
["detailed-status"] = detailed_status
4265 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
4267 await asyncio
.sleep(5, loop
=self
.loop
)
4268 deployment_timeout
-= 5
4269 if deployment_timeout
<= 0:
4270 self
._update
_suboperation
_status
(
4271 db_nslcmop
, nslcmop_id
, op_index
, 'FAILED', "Timeout when waiting for ns to get ready")
4272 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
4274 # update VDU_SCALING_INFO with the obtained ip_addresses
4275 if vdu_scaling_info
["scaling_direction"] == "OUT":
4276 for vdur
in reversed(db_vnfr
["vdur"]):
4277 if vdu_scaling_info
["vdu-create"].get(vdur
["vdu-id-ref"]):
4278 vdu_scaling_info
["vdu-create"][vdur
["vdu-id-ref"]] -= 1
4279 vdu_scaling_info
["vdu"].append({
4280 "name": vdur
["name"],
4281 "vdu_id": vdur
["vdu-id-ref"],
4284 for interface
in vdur
["interfaces"]:
4285 vdu_scaling_info
["vdu"][-1]["interface"].append({
4286 "name": interface
["name"],
4287 "ip_address": interface
["ip-address"],
4288 "mac_address": interface
.get("mac-address"),
4290 del vdu_scaling_info
["vdu-create"]
4292 self
._update
_suboperation
_status
(db_nslcmop
, op_index
, 'COMPLETED', 'Done')
4295 scale_process
= None
4297 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4300 # execute primitive service POST-SCALING
4301 step
= "Executing post-scale vnf-config-primitive"
4302 if scaling_descriptor
.get("scaling-config-action"):
4303 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
4304 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
4305 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
4306 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
4307 step
= db_nslcmop_update
["detailed-status"] = \
4308 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
4310 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
4311 if db_vnfr
.get("additionalParamsForVnf"):
4312 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
4314 # look for primitive
4315 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
4316 if config_primitive
["name"] == vnf_config_primitive
:
4320 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4321 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4322 "config-primitive".format(scaling_group
, vnf_config_primitive
))
4323 scale_process
= "VCA"
4324 db_nsr_update
["config-status"] = "configuring post-scaling"
4325 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
4327 # Post-scale retry check: Check if this sub-operation has been executed before
4328 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4329 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
4330 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4331 # Skip sub-operation
4332 result
= 'COMPLETED'
4333 result_detail
= 'Done'
4334 self
.logger
.debug(logging_text
+
4335 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4336 format(vnf_config_primitive
, result
, result_detail
))
4338 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4339 # New sub-operation: Get index of this sub-operation
4340 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4341 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
4342 format(vnf_config_primitive
))
4344 # retry: Get registered params for this existing sub-operation
4345 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4346 vnf_index
= op
.get('member_vnf_index')
4347 vnf_config_primitive
= op
.get('primitive')
4348 primitive_params
= op
.get('primitive_params')
4349 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
4350 format(vnf_config_primitive
))
4351 # Execute the primitive, either with new (first-time) or registered (reintent) args
4352 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
4353 primitive_name
= config_primitive
.get("execution-environment-primitive",
4354 vnf_config_primitive
)
4355 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
4356 member_vnf_index
=vnf_index
,
4358 vdu_count_index
=None,
4359 ee_descriptor_id
=ee_descriptor_id
)
4360 result
, result_detail
= await self
._ns
_execute
_primitive
(
4361 ee_id
, primitive_name
, primitive_params
, vca_type
)
4362 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
4363 vnf_config_primitive
, result
, result_detail
))
4364 # Update operationState = COMPLETED | FAILED
4365 self
._update
_suboperation
_status
(
4366 db_nslcmop
, op_index
, result
, result_detail
)
4368 if result
== "FAILED":
4369 raise LcmException(result_detail
)
4370 db_nsr_update
["config-status"] = old_config_status
4371 scale_process
= None
4374 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4375 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
4376 else old_operational_status
4377 db_nsr_update
["config-status"] = old_config_status
4379 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
4380 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4382 except asyncio
.CancelledError
:
4383 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
4384 exc
= "Operation was cancelled"
4385 except Exception as e
:
4386 exc
= traceback
.format_exc()
4387 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
4389 self
._write
_ns
_status
(
4392 current_operation
="IDLE",
4393 current_operation_id
=None
4396 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
4397 nslcmop_operation_state
= "FAILED"
4399 db_nsr_update
["operational-status"] = old_operational_status
4400 db_nsr_update
["config-status"] = old_config_status
4401 db_nsr_update
["detailed-status"] = ""
4403 if "VCA" in scale_process
:
4404 db_nsr_update
["config-status"] = "failed"
4405 if "RO" in scale_process
:
4406 db_nsr_update
["operational-status"] = "failed"
4407 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
4410 error_description_nslcmop
= None
4411 nslcmop_operation_state
= "COMPLETED"
4412 db_nslcmop_update
["detailed-status"] = "Done"
4414 self
._write
_op
_status
(
4417 error_message
=error_description_nslcmop
,
4418 operation_state
=nslcmop_operation_state
,
4419 other_update
=db_nslcmop_update
,
4422 self
._write
_ns
_status
(
4425 current_operation
="IDLE",
4426 current_operation_id
=None,
4427 other_update
=db_nsr_update
4430 if nslcmop_operation_state
:
4432 await self
.msg
.aiowrite("ns", "scaled", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
4433 "operationState": nslcmop_operation_state
},
4436 # await asyncio.sleep(cooldown_time, loop=self.loop)
4437 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4438 except Exception as e
:
4439 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
4440 self
.logger
.debug(logging_text
+ "Exit")
4441 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
4443 async def add_prometheus_metrics(self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
):
4444 if not self
.prometheus
:
4446 # look if exist a file called 'prometheus*.j2' and
4447 artifact_content
= self
.fs
.dir_ls(artifact_path
)
4448 job_file
= next((f
for f
in artifact_content
if f
.startswith("prometheus") and f
.endswith(".j2")), None)
4451 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
4455 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
4456 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
4458 vnfr_id
= vnfr_id
.replace("-", "")
4460 "JOB_NAME": vnfr_id
,
4461 "TARGET_IP": target_ip
,
4462 "EXPORTER_POD_IP": host_name
,
4463 "EXPORTER_POD_PORT": host_port
,
4465 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
4466 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4467 for job
in job_list
:
4468 if not isinstance(job
.get("job_name"), str) or vnfr_id
not in job
["job_name"]:
4469 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
4470 job
["nsr_id"] = nsr_id
4471 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
4472 if await self
.prometheus
.update(job_dict
):
4473 return list(job_dict
.keys())
4475 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4477 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4479 :param: vim_account_id: VIM Account ID
4481 :return: (cloud_name, cloud_credential)
4483 config
= self
.get_vim_account_config(vim_account_id
)
4484 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
4486 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4488 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4490 :param: vim_account_id: VIM Account ID
4492 :return: (cloud_name, cloud_credential)
4494 config
= self
.get_vim_account_config(vim_account_id
)
4495 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")
4497 def get_vim_account_config(self
, vim_account_id
: str) -> dict:
4499 Get VIM Account config from the OSM Database
4501 :param: vim_account_id: VIM Account ID
4503 :return: Dictionary with the config of the vim account
4505 vim_account
= self
.db
.get_one(table
="vim_accounts", q_filter
={"_id": vim_account_id
}, fail_on_empty
=False)
4506 return vim_account
.get("config", {}) if vim_account
else {}