1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, Template
, meta
, TemplateError
, TemplateNotFound
, TemplateSyntaxError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from n2vc
.k8s_helm_conn
import K8sHelmConnector
31 from n2vc
.k8s_juju_conn
import K8sJujuConnector
33 from osm_common
.dbbase
import DbException
34 from osm_common
.fsbase
import FsException
36 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
37 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
39 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
41 from copy
import copy
, deepcopy
42 from http
import HTTPStatus
44 from uuid
import uuid4
45 from functools
import partial
46 from random
import randint
48 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
51 class N2VCJujuConnectorLCM(N2VCJujuConnector
):
53 async def create_execution_environment(self
, namespace
: str, db_dict
: dict, reuse_ee_id
: str = None,
54 progress_timeout
: float = None, total_timeout
: float = None,
55 config
: dict = None, artifact_path
: str = None,
56 vca_type
: str = None) -> (str, dict):
57 # admit two new parameters, artifact_path and vca_type
58 if vca_type
== "k8s_proxy_charm":
59 ee_id
= await self
.n2vc
.install_k8s_proxy_charm(
60 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
62 artifact_path
=artifact_path
,
66 return await super().create_execution_environment(
67 namespace
=namespace
, db_dict
=db_dict
, reuse_ee_id
=reuse_ee_id
,
68 progress_timeout
=progress_timeout
, total_timeout
=total_timeout
)
70 async def install_configuration_sw(self
, ee_id
: str, artifact_path
: str, db_dict
: dict,
71 progress_timeout
: float = None, total_timeout
: float = None,
72 config
: dict = None, num_units
: int = 1, vca_type
: str = "lxc_proxy_charm"):
73 if vca_type
== "k8s_proxy_charm":
75 return await super().install_configuration_sw(
76 ee_id
=ee_id
, artifact_path
=artifact_path
, db_dict
=db_dict
, progress_timeout
=progress_timeout
,
77 total_timeout
=total_timeout
, config
=config
, num_units
=num_units
)
81 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
82 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
83 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
84 timeout_charm_delete
= 10 * 60
85 timeout_primitive
= 30 * 60 # timeout for primitive execution
86 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
88 SUBOPERATION_STATUS_NOT_FOUND
= -1
89 SUBOPERATION_STATUS_NEW
= -2
90 SUBOPERATION_STATUS_SKIP
= -3
91 task_name_deploy_vca
= "Deploying VCA"
93 def __init__(self
, db
, msg
, fs
, lcm_tasks
, config
, loop
, prometheus
=None):
95 Init, Connect to database, filesystem storage, and messaging
96 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
103 logger
=logging
.getLogger('lcm.ns')
107 self
.lcm_tasks
= lcm_tasks
108 self
.timeout
= config
["timeout"]
109 self
.ro_config
= config
["ro_config"]
110 self
.ng_ro
= config
["ro_config"].get("ng")
111 self
.vca_config
= config
["VCA"].copy()
113 # create N2VC connector
114 self
.n2vc
= N2VCJujuConnectorLCM(
119 url
='{}:{}'.format(self
.vca_config
['host'], self
.vca_config
['port']),
120 username
=self
.vca_config
.get('user', None),
121 vca_config
=self
.vca_config
,
122 on_update_db
=self
._on
_update
_n
2vc
_db
125 self
.conn_helm_ee
= LCMHelmConn(
132 vca_config
=self
.vca_config
,
133 on_update_db
=self
._on
_update
_n
2vc
_db
136 self
.k8sclusterhelm
= K8sHelmConnector(
137 kubectl_command
=self
.vca_config
.get("kubectlpath"),
138 helm_command
=self
.vca_config
.get("helmpath"),
145 self
.k8sclusterjuju
= K8sJujuConnector(
146 kubectl_command
=self
.vca_config
.get("kubectlpath"),
147 juju_command
=self
.vca_config
.get("jujupath"),
154 self
.k8scluster_map
= {
155 "helm-chart": self
.k8sclusterhelm
,
156 "chart": self
.k8sclusterhelm
,
157 "juju-bundle": self
.k8sclusterjuju
,
158 "juju": self
.k8sclusterjuju
,
162 "lxc_proxy_charm": self
.n2vc
,
163 "native_charm": self
.n2vc
,
164 "k8s_proxy_charm": self
.n2vc
,
165 "helm": self
.conn_helm_ee
168 self
.prometheus
= prometheus
172 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
174 self
.RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
176 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
178 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
181 # TODO filter RO descriptor fields...
185 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
186 db_dict
['deploymentStatus'] = ro_descriptor
187 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
189 except Exception as e
:
190 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
192 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
):
194 # remove last dot from path (if exists)
195 if path
.endswith('.'):
198 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
199 # .format(table, filter, path, updated_data))
203 nsr_id
= filter.get('_id')
205 # read ns record from database
206 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
207 current_ns_status
= nsr
.get('nsState')
209 # get vca status for NS
210 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False)
214 db_dict
['vcaStatus'] = status_dict
216 # update configurationStatus for this VCA
218 vca_index
= int(path
[path
.rfind(".")+1:])
220 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
221 vca_status
= vca_list
[vca_index
].get('status')
223 configuration_status_list
= nsr
.get('configurationStatus')
224 config_status
= configuration_status_list
[vca_index
].get('status')
226 if config_status
== 'BROKEN' and vca_status
!= 'failed':
227 db_dict
['configurationStatus'][vca_index
] = 'READY'
228 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
229 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
230 except Exception as e
:
231 # not update configurationStatus
232 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
234 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
235 # if nsState = 'DEGRADED' check if all is OK
237 if current_ns_status
in ('READY', 'DEGRADED'):
238 error_description
= ''
240 if status_dict
.get('machines'):
241 for machine_id
in status_dict
.get('machines'):
242 machine
= status_dict
.get('machines').get(machine_id
)
243 # check machine agent-status
244 if machine
.get('agent-status'):
245 s
= machine
.get('agent-status').get('status')
248 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
249 # check machine instance status
250 if machine
.get('instance-status'):
251 s
= machine
.get('instance-status').get('status')
254 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
256 if status_dict
.get('applications'):
257 for app_id
in status_dict
.get('applications'):
258 app
= status_dict
.get('applications').get(app_id
)
259 # check application status
260 if app
.get('status'):
261 s
= app
.get('status').get('status')
264 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
266 if error_description
:
267 db_dict
['errorDescription'] = error_description
268 if current_ns_status
== 'READY' and is_degraded
:
269 db_dict
['nsState'] = 'DEGRADED'
270 if current_ns_status
== 'DEGRADED' and not is_degraded
:
271 db_dict
['nsState'] = 'READY'
274 self
.update_db_2("nsrs", nsr_id
, db_dict
)
276 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
278 except Exception as e
:
279 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
281 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
283 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
284 :param vnfd: input vnfd
285 :param new_id: overrides vnf id if provided
286 :param additionalParams: Instantiation params for VNFs provided
287 :param nsrId: Id of the NSR
288 :return: copy of vnfd
291 vnfd_RO
= deepcopy(vnfd
)
292 # remove unused by RO configuration, monitoring, scaling and internal keys
293 vnfd_RO
.pop("_id", None)
294 vnfd_RO
.pop("_admin", None)
295 vnfd_RO
.pop("vnf-configuration", None)
296 vnfd_RO
.pop("monitoring-param", None)
297 vnfd_RO
.pop("scaling-group-descriptor", None)
298 vnfd_RO
.pop("kdu", None)
299 vnfd_RO
.pop("k8s-cluster", None)
301 vnfd_RO
["id"] = new_id
303 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
304 for vdu
in get_iterable(vnfd_RO
, "vdu"):
305 cloud_init_file
= None
306 if vdu
.get("cloud-init-file"):
307 base_folder
= vnfd
["_admin"]["storage"]
308 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
309 vdu
["cloud-init-file"])
310 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
311 cloud_init_content
= ci_file
.read()
312 vdu
.pop("cloud-init-file", None)
313 elif vdu
.get("cloud-init"):
314 cloud_init_content
= vdu
["cloud-init"]
319 ast
= env
.parse(cloud_init_content
)
320 mandatory_vars
= meta
.find_undeclared_variables(ast
)
322 for var
in mandatory_vars
:
323 if not additionalParams
or var
not in additionalParams
.keys():
324 raise LcmException("Variable '{}' defined at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
325 "file, must be provided in the instantiation parameters inside the "
326 "'additionalParamsForVnf' block".format(var
, vnfd
["id"], vdu
["id"]))
327 template
= Template(cloud_init_content
)
328 cloud_init_content
= template
.render(additionalParams
or {})
329 vdu
["cloud-init"] = cloud_init_content
332 except FsException
as e
:
333 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
334 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
335 except (TemplateError
, TemplateNotFound
, TemplateSyntaxError
) as e
:
336 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
337 format(vnfd
["id"], vdu
["id"], e
))
339 def _ns_params_2_RO(self
, ns_params
, nsd
, vnfd_dict
, db_vnfrs
, n2vc_key_list
):
341 Creates a RO ns descriptor from OSM ns_instantiate params
342 :param ns_params: OSM instantiate params
343 :param vnfd_dict: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
344 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index. {member-vnf-index: {vnfr_object}, ...}
345 :return: The RO ns descriptor
349 # TODO feature 1417: Check that no instantiation is set over PDU
350 # check if PDU forces a concrete vim-network-id and add it
351 # check if PDU contains a SDN-assist info (dpid, switch, port) and pass it to RO
353 def vim_account_2_RO(vim_account
):
354 if vim_account
in vim_2_RO
:
355 return vim_2_RO
[vim_account
]
357 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
358 if db_vim
["_admin"]["operationalState"] != "ENABLED":
359 raise LcmException("VIM={} is not available. operationalState={}".format(
360 vim_account
, db_vim
["_admin"]["operationalState"]))
361 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
362 vim_2_RO
[vim_account
] = RO_vim_id
365 def wim_account_2_RO(wim_account
):
366 if isinstance(wim_account
, str):
367 if wim_account
in wim_2_RO
:
368 return wim_2_RO
[wim_account
]
370 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
371 if db_wim
["_admin"]["operationalState"] != "ENABLED":
372 raise LcmException("WIM={} is not available. operationalState={}".format(
373 wim_account
, db_wim
["_admin"]["operationalState"]))
374 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
375 wim_2_RO
[wim_account
] = RO_wim_id
380 def ip_profile_2_RO(ip_profile
):
381 RO_ip_profile
= deepcopy((ip_profile
))
382 if "dns-server" in RO_ip_profile
:
383 if isinstance(RO_ip_profile
["dns-server"], list):
384 RO_ip_profile
["dns-address"] = []
385 for ds
in RO_ip_profile
.pop("dns-server"):
386 RO_ip_profile
["dns-address"].append(ds
['address'])
388 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
389 if RO_ip_profile
.get("ip-version") == "ipv4":
390 RO_ip_profile
["ip-version"] = "IPv4"
391 if RO_ip_profile
.get("ip-version") == "ipv6":
392 RO_ip_profile
["ip-version"] = "IPv6"
393 if "dhcp-params" in RO_ip_profile
:
394 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
400 # "name": ns_params["nsName"],
401 # "description": ns_params.get("nsDescription"),
402 "datacenter": vim_account_2_RO(ns_params
["vimAccountId"]),
403 "wim_account": wim_account_2_RO(ns_params
.get("wimAccountId")),
404 # "scenario": ns_params["nsdId"],
406 # set vim_account of each vnf if different from general vim_account.
407 # Get this information from <vnfr> database content, key vim-account-id
408 # Vim account can be set by placement_engine and it may be different from
409 # the instantiate parameters (vnfs.member-vnf-index.datacenter).
410 for vnf_index
, vnfr
in db_vnfrs
.items():
411 if vnfr
.get("vim-account-id") and vnfr
["vim-account-id"] != ns_params
["vimAccountId"]:
412 populate_dict(RO_ns_params
, ("vnfs", vnf_index
, "datacenter"), vim_account_2_RO(vnfr
["vim-account-id"]))
414 n2vc_key_list
= n2vc_key_list
or []
415 for vnfd_ref
, vnfd
in vnfd_dict
.items():
416 vdu_needed_access
= []
418 if vnfd
.get("vnf-configuration"):
419 ssh_required
= deep_get(vnfd
, ("vnf-configuration", "config-access", "ssh-access", "required"))
420 if ssh_required
and vnfd
.get("mgmt-interface"):
421 if vnfd
["mgmt-interface"].get("vdu-id"):
422 vdu_needed_access
.append(vnfd
["mgmt-interface"]["vdu-id"])
423 elif vnfd
["mgmt-interface"].get("cp"):
424 mgmt_cp
= vnfd
["mgmt-interface"]["cp"]
426 for vdu
in vnfd
.get("vdu", ()):
427 if vdu
.get("vdu-configuration"):
428 ssh_required
= deep_get(vdu
, ("vdu-configuration", "config-access", "ssh-access", "required"))
430 vdu_needed_access
.append(vdu
["id"])
432 for vdu_interface
in vdu
.get("interface"):
433 if vdu_interface
.get("external-connection-point-ref") and \
434 vdu_interface
["external-connection-point-ref"] == mgmt_cp
:
435 vdu_needed_access
.append(vdu
["id"])
439 if vdu_needed_access
:
440 for vnf_member
in nsd
.get("constituent-vnfd"):
441 if vnf_member
["vnfd-id-ref"] != vnfd_ref
:
443 for vdu
in vdu_needed_access
:
444 populate_dict(RO_ns_params
,
445 ("vnfs", vnf_member
["member-vnf-index"], "vdus", vdu
, "mgmt_keys"),
448 if ns_params
.get("vduImage"):
449 RO_ns_params
["vduImage"] = ns_params
["vduImage"]
451 if ns_params
.get("ssh_keys"):
452 RO_ns_params
["cloud-config"] = {"key-pairs": ns_params
["ssh_keys"]}
453 for vnf_params
in get_iterable(ns_params
, "vnf"):
454 for constituent_vnfd
in nsd
["constituent-vnfd"]:
455 if constituent_vnfd
["member-vnf-index"] == vnf_params
["member-vnf-index"]:
456 vnf_descriptor
= vnfd_dict
[constituent_vnfd
["vnfd-id-ref"]]
459 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index={} is not present at nsd:"
460 "constituent-vnfd".format(vnf_params
["member-vnf-index"]))
462 for vdu_params
in get_iterable(vnf_params
, "vdu"):
463 # TODO feature 1417: check that this VDU exist and it is not a PDU
464 if vdu_params
.get("volume"):
465 for volume_params
in vdu_params
["volume"]:
466 if volume_params
.get("vim-volume-id"):
467 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
468 vdu_params
["id"], "devices", volume_params
["name"], "vim_id"),
469 volume_params
["vim-volume-id"])
470 if vdu_params
.get("interface"):
471 for interface_params
in vdu_params
["interface"]:
472 if interface_params
.get("ip-address"):
473 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
474 vdu_params
["id"], "interfaces", interface_params
["name"],
476 interface_params
["ip-address"])
477 if interface_params
.get("mac-address"):
478 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
479 vdu_params
["id"], "interfaces", interface_params
["name"],
481 interface_params
["mac-address"])
482 if interface_params
.get("floating-ip-required"):
483 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
484 vdu_params
["id"], "interfaces", interface_params
["name"],
486 interface_params
["floating-ip-required"])
488 for internal_vld_params
in get_iterable(vnf_params
, "internal-vld"):
489 if internal_vld_params
.get("vim-network-name"):
490 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
491 internal_vld_params
["name"], "vim-network-name"),
492 internal_vld_params
["vim-network-name"])
493 if internal_vld_params
.get("vim-network-id"):
494 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
495 internal_vld_params
["name"], "vim-network-id"),
496 internal_vld_params
["vim-network-id"])
497 if internal_vld_params
.get("ip-profile"):
498 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
499 internal_vld_params
["name"], "ip-profile"),
500 ip_profile_2_RO(internal_vld_params
["ip-profile"]))
501 if internal_vld_params
.get("provider-network"):
503 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "networks",
504 internal_vld_params
["name"], "provider-network"),
505 internal_vld_params
["provider-network"].copy())
507 for icp_params
in get_iterable(internal_vld_params
, "internal-connection-point"):
510 for vdu_descriptor
in vnf_descriptor
["vdu"]:
511 for vdu_interface
in vdu_descriptor
["interface"]:
512 if vdu_interface
.get("internal-connection-point-ref") == icp_params
["id-ref"]:
513 if icp_params
.get("ip-address"):
514 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
515 vdu_descriptor
["id"], "interfaces",
516 vdu_interface
["name"], "ip_address"),
517 icp_params
["ip-address"])
519 if icp_params
.get("mac-address"):
520 populate_dict(RO_ns_params
, ("vnfs", vnf_params
["member-vnf-index"], "vdus",
521 vdu_descriptor
["id"], "interfaces",
522 vdu_interface
["name"], "mac_address"),
523 icp_params
["mac-address"])
529 raise LcmException("Invalid instantiate parameter vnf:member-vnf-index[{}]:"
530 "internal-vld:id-ref={} is not present at vnfd:internal-"
531 "connection-point".format(vnf_params
["member-vnf-index"],
532 icp_params
["id-ref"]))
534 for vld_params
in get_iterable(ns_params
, "vld"):
535 if "ip-profile" in vld_params
:
536 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "ip-profile"),
537 ip_profile_2_RO(vld_params
["ip-profile"]))
539 if vld_params
.get("provider-network"):
541 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "provider-network"),
542 vld_params
["provider-network"].copy())
544 if "wimAccountId" in vld_params
and vld_params
["wimAccountId"] is not None:
545 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "wim_account"),
546 wim_account_2_RO(vld_params
["wimAccountId"])),
547 if vld_params
.get("vim-network-name"):
549 if isinstance(vld_params
["vim-network-name"], dict):
550 for vim_account
, vim_net
in vld_params
["vim-network-name"].items():
551 RO_vld_sites
.append({
552 "netmap-use": vim_net
,
553 "datacenter": vim_account_2_RO(vim_account
)
555 else: # isinstance str
556 RO_vld_sites
.append({"netmap-use": vld_params
["vim-network-name"]})
558 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "sites"), RO_vld_sites
)
560 if vld_params
.get("vim-network-id"):
562 if isinstance(vld_params
["vim-network-id"], dict):
563 for vim_account
, vim_net
in vld_params
["vim-network-id"].items():
564 RO_vld_sites
.append({
565 "netmap-use": vim_net
,
566 "datacenter": vim_account_2_RO(vim_account
)
568 else: # isinstance str
569 RO_vld_sites
.append({"netmap-use": vld_params
["vim-network-id"]})
571 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "sites"), RO_vld_sites
)
572 if vld_params
.get("ns-net"):
573 if isinstance(vld_params
["ns-net"], dict):
574 for vld_id
, instance_scenario_id
in vld_params
["ns-net"].items():
575 RO_vld_ns_net
= {"instance_scenario_id": instance_scenario_id
, "osm_id": vld_id
}
576 populate_dict(RO_ns_params
, ("networks", vld_params
["name"], "use-network"), RO_vld_ns_net
)
577 if "vnfd-connection-point-ref" in vld_params
:
578 for cp_params
in vld_params
["vnfd-connection-point-ref"]:
580 for constituent_vnfd
in nsd
["constituent-vnfd"]:
581 if constituent_vnfd
["member-vnf-index"] == cp_params
["member-vnf-index-ref"]:
582 vnf_descriptor
= vnfd_dict
[constituent_vnfd
["vnfd-id-ref"]]
586 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={} "
587 "is not present at nsd:constituent-vnfd".format(cp_params
["member-vnf-index-ref"]))
589 for vdu_descriptor
in vnf_descriptor
["vdu"]:
590 for interface_descriptor
in vdu_descriptor
["interface"]:
591 if interface_descriptor
.get("external-connection-point-ref") == \
592 cp_params
["vnfd-connection-point-ref"]:
599 "Invalid instantiate parameter vld:vnfd-connection-point-ref:member-vnf-index-ref={}:"
600 "vnfd-connection-point-ref={} is not present at vnfd={}".format(
601 cp_params
["member-vnf-index-ref"],
602 cp_params
["vnfd-connection-point-ref"],
603 vnf_descriptor
["id"]))
604 if cp_params
.get("ip-address"):
605 populate_dict(RO_ns_params
, ("vnfs", cp_params
["member-vnf-index-ref"], "vdus",
606 vdu_descriptor
["id"], "interfaces",
607 interface_descriptor
["name"], "ip_address"),
608 cp_params
["ip-address"])
609 if cp_params
.get("mac-address"):
610 populate_dict(RO_ns_params
, ("vnfs", cp_params
["member-vnf-index-ref"], "vdus",
611 vdu_descriptor
["id"], "interfaces",
612 interface_descriptor
["name"], "mac_address"),
613 cp_params
["mac-address"])
616 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None):
617 # make a copy to do not change
618 vdu_create
= copy(vdu_create
)
619 vdu_delete
= copy(vdu_delete
)
621 vdurs
= db_vnfr
.get("vdur")
624 vdu_index
= len(vdurs
)
627 vdur
= vdurs
[vdu_index
]
628 if vdur
.get("pdu-type"):
630 vdu_id_ref
= vdur
["vdu-id-ref"]
631 if vdu_create
and vdu_create
.get(vdu_id_ref
):
632 for index
in range(0, vdu_create
[vdu_id_ref
]):
633 vdur
= deepcopy(vdur
)
634 vdur
["_id"] = str(uuid4())
635 vdur
["count-index"] += 1
636 vdurs
.insert(vdu_index
+1+index
, vdur
)
637 del vdu_create
[vdu_id_ref
]
638 if vdu_delete
and vdu_delete
.get(vdu_id_ref
):
640 vdu_delete
[vdu_id_ref
] -= 1
641 if not vdu_delete
[vdu_id_ref
]:
642 del vdu_delete
[vdu_id_ref
]
643 # check all operations are done
644 if vdu_create
or vdu_delete
:
645 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
648 raise LcmException("Error scaling IN VNFR for {}. There is not any existing vnfr. Scaled to 0?".format(
651 vnfr_update
= {"vdur": vdurs
}
652 db_vnfr
["vdur"] = vdurs
653 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
655 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
657 Updates database nsr with the RO info for the created vld
658 :param ns_update_nsr: dictionary to be filled with the updated info
659 :param db_nsr: content of db_nsr. This is also modified
660 :param nsr_desc_RO: nsr descriptor from RO
661 :return: Nothing, LcmException is raised on errors
664 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
665 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
666 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
668 vld
["vim-id"] = net_RO
.get("vim_net_id")
669 vld
["name"] = net_RO
.get("vim_name")
670 vld
["status"] = net_RO
.get("status")
671 vld
["status-detailed"] = net_RO
.get("error_msg")
672 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
675 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
677 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
679 for db_vnfr
in db_vnfrs
.values():
680 vnfr_update
= {"status": "ERROR"}
681 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
682 if "status" not in vdur
:
683 vdur
["status"] = "ERROR"
684 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
686 vdur
["status-detailed"] = str(error_text
)
687 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
688 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
689 except DbException
as e
:
690 self
.logger
.error("Cannot update vnf. {}".format(e
))
692 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
694 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
695 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
696 :param nsr_desc_RO: nsr descriptor from RO
697 :return: Nothing, LcmException is raised on errors
699 for vnf_index
, db_vnfr
in db_vnfrs
.items():
700 for vnf_RO
in nsr_desc_RO
["vnfs"]:
701 if vnf_RO
["member_vnf_index"] != vnf_index
:
704 if vnf_RO
.get("ip_address"):
705 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
706 elif not db_vnfr
.get("ip-address"):
707 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
708 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
710 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
711 vdur_RO_count_index
= 0
712 if vdur
.get("pdu-type"):
714 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
715 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
717 if vdur
["count-index"] != vdur_RO_count_index
:
718 vdur_RO_count_index
+= 1
720 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
721 if vdur_RO
.get("ip_address"):
722 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
724 vdur
["ip-address"] = None
725 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
726 vdur
["name"] = vdur_RO
.get("vim_name")
727 vdur
["status"] = vdur_RO
.get("status")
728 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
729 for ifacer
in get_iterable(vdur
, "interfaces"):
730 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
731 if ifacer
["name"] == interface_RO
.get("internal_name"):
732 ifacer
["ip-address"] = interface_RO
.get("ip_address")
733 ifacer
["mac-address"] = interface_RO
.get("mac_address")
736 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
738 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
739 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
742 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
743 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
745 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
746 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
747 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
749 vld
["vim-id"] = net_RO
.get("vim_net_id")
750 vld
["name"] = net_RO
.get("vim_name")
751 vld
["status"] = net_RO
.get("status")
752 vld
["status-detailed"] = net_RO
.get("error_msg")
753 vnfr_update
["vld.{}".format(vld_index
)] = vld
756 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
757 vnf_index
, vld
["id"]))
759 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
763 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
765 def _get_ns_config_info(self
, nsr_id
):
767 Generates a mapping between vnf,vdu elements and the N2VC id
768 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
769 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
770 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
771 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
773 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
774 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
776 ns_config_info
= {"osm-config-mapping": mapping
}
777 for vca
in vca_deployed_list
:
778 if not vca
["member-vnf-index"]:
780 if not vca
["vdu_id"]:
781 mapping
[vca
["member-vnf-index"]] = vca
["application"]
783 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
785 return ns_config_info
788 def _get_initial_config_primitive_list(desc_primitive_list
, vca_deployed
):
790 Generates a list of initial-config-primitive based on the list provided by the descriptor. It includes internal
791 primitives as verify-ssh-credentials, or config when needed
792 :param desc_primitive_list: information of the descriptor
793 :param vca_deployed: information of the deployed, needed for known if it is related to an NS, VNF, VDU and if
794 this element contains a ssh public key
795 :return: The modified list. Can ba an empty list, but always a list
797 if desc_primitive_list
:
798 primitive_list
= desc_primitive_list
.copy()
801 # look for primitive config, and get the position. None if not present
802 config_position
= None
803 for index
, primitive
in enumerate(primitive_list
):
804 if primitive
["name"] == "config":
805 config_position
= index
808 # for NS, add always a config primitive if not present (bug 874)
809 if not vca_deployed
["member-vnf-index"] and config_position
is None:
810 primitive_list
.insert(0, {"name": "config", "parameter": []})
812 # for VNF/VDU add verify-ssh-credentials after config
813 if vca_deployed
["member-vnf-index"] and config_position
is not None and vca_deployed
.get("ssh-public-key"):
814 primitive_list
.insert(config_position
+ 1, {"name": "verify-ssh-credentials", "parameter": []})
815 return primitive_list
817 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds_ref
,
818 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
819 nslcmop_id
= db_nslcmop
["_id"]
821 "name": db_nsr
["name"],
824 "image": deepcopy(db_nsr
["image"]),
825 "flavor": deepcopy(db_nsr
["flavor"]),
826 "action_id": nslcmop_id
,
828 for image
in target
["image"]:
829 image
["vim_info"] = []
830 for flavor
in target
["flavor"]:
831 flavor
["vim_info"] = []
833 ns_params
= db_nslcmop
.get("operationParams")
835 if ns_params
.get("ssh_keys"):
836 ssh_keys
+= ns_params
.get("ssh_keys")
838 ssh_keys
+= n2vc_key_list
841 for vld_index
, vld
in enumerate(nsd
.get("vld")):
842 target_vld
= {"id": vld
["id"],
844 "mgmt-network": vld
.get("mgmt-network", False),
845 "type": vld
.get("type"),
846 "vim_info": [{"vim-network-name": vld
.get("vim-network-name"),
847 "vim_account_id": ns_params
["vimAccountId"]}],
849 for cp
in vld
["vnfd-connection-point-ref"]:
850 cp2target
["member_vnf:{}.{}".format(cp
["member-vnf-index-ref"], cp
["vnfd-connection-point-ref"])] = \
851 "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
852 target
["ns"]["vld"].append(target_vld
)
853 for vnfr
in db_vnfrs
.values():
854 vnfd
= db_vnfds_ref
[vnfr
["vnfd-ref"]]
855 target_vnf
= deepcopy(vnfr
)
856 for vld
in target_vnf
.get("vld", ()):
857 # check if connected to a ns.vld
858 vnf_cp
= next((cp
for cp
in vnfd
.get("connection-point", ()) if
859 cp
.get("internal-vld-ref") == vld
["id"]), None)
861 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
862 if cp2target
.get(ns_cp
):
863 vld
["target"] = cp2target
[ns_cp
]
864 vld
["vim_info"] = [{"vim-network-name": vld
.get("vim-network-name"),
865 "vim_account_id": vnfr
["vim-account-id"]}]
867 for vdur
in target_vnf
.get("vdur", ()):
868 vdur
["vim_info"] = [{"vim_account_id": vnfr
["vim-account-id"]}]
869 vdud_index
, vdud
= next(k
for k
in enumerate(vnfd
["vdu"]) if k
[1]["id"] == vdur
["vdu-id-ref"])
870 # vdur["additionalParams"] = vnfr.get("additionalParamsForVnf") # TODO additional params for VDU
873 if deep_get(vdud
, ("vdu-configuration", "config-access", "ssh-access", "required")):
874 vdur
["ssh-keys"] = ssh_keys
875 vdur
["ssh-access-required"] = True
876 elif deep_get(vnfd
, ("vnf-configuration", "config-access", "ssh-access", "required")) and \
877 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
878 vdur
["ssh-keys"] = ssh_keys
879 vdur
["ssh-access-required"] = True
882 if vdud
.get("cloud-init-file"):
883 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
884 elif vdud
.get("cloud-init"):
885 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], vdud_index
)
888 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
889 if not next((vi
for vi
in ns_flavor
["vim_info"] if
890 vi
and vi
.get("vim_account_id") == vnfr
["vim-account-id"]), None):
891 ns_flavor
["vim_info"].append({"vim_account_id": vnfr
["vim-account-id"]})
893 ns_image
= target
["image"][int(vdur
["ns-image-id"])]
894 if not next((vi
for vi
in ns_image
["vim_info"] if
895 vi
and vi
.get("vim_account_id") == vnfr
["vim-account-id"]), None):
896 ns_image
["vim_info"].append({"vim_account_id": vnfr
["vim-account-id"]})
898 vdur
["vim_info"] = [{"vim_account_id": vnfr
["vim-account-id"]}]
899 target
["vnf"].append(target_vnf
)
901 desc
= await self
.RO
.deploy(nsr_id
, target
)
902 action_id
= desc
["action_id"]
903 await self
._wait
_ng
_ro
(self
, nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
907 "_admin.deployed.RO.operational-status": "running",
908 "detailed-status": " ".join(stage
)
910 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
911 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
912 self
._write
_op
_status
(nslcmop_id
, stage
)
913 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
916 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
, start_time
, timeout
, stage
):
917 detailed_status_old
= None
919 while time() <= start_time
+ timeout
:
920 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
921 if desc_status
["status"] == "FAILED":
922 raise NgRoException(desc_status
["details"])
923 elif desc_status
["status"] == "BUILD":
924 stage
[2] = "VIM: ({})".format(desc_status
["details"])
925 elif desc_status
["status"] == "DONE":
926 stage
[2] = "Deployed at VIM"
929 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
930 if stage
[2] != detailed_status_old
:
931 detailed_status_old
= stage
[2]
932 db_nsr_update
["detailed-status"] = " ".join(stage
)
933 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
934 self
._write
_op
_status
(nslcmop_id
, stage
)
935 await asyncio
.sleep(5, loop
=self
.loop
)
936 else: # timeout_ns_deploy
937 raise NgRoException("Timeout waiting ns to deploy")
939 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
943 start_deploy
= time()
951 desc
= await self
.RO
.deploy(nsr_id
, target
)
952 action_id
= desc
["action_id"]
953 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
954 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
955 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
958 delete_timeout
= 20 * 60 # 20 minutes
959 await self
._wait
_ng
_ro
(self
, nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
961 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
962 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
964 await self
.RO
.delete(nsr_id
)
965 except Exception as e
:
966 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
967 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
968 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
969 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
970 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
971 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
972 failed_detail
.append("delete conflict: {}".format(e
))
973 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
975 failed_detail
.append("delete error: {}".format(e
))
976 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
979 stage
[2] = "Error deleting from VIM"
981 stage
[2] = "Deleted from VIM"
982 db_nsr_update
["detailed-status"] = " ".join(stage
)
983 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
984 self
._write
_op
_status
(nslcmop_id
, stage
)
987 raise LcmException("; ".join(failed_detail
))
990 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds_ref
,
991 n2vc_key_list
, stage
):
994 :param logging_text: preffix text to use at logging
995 :param nsr_id: nsr identity
996 :param nsd: database content of ns descriptor
997 :param db_nsr: database content of ns record
998 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
1000 :param db_vnfds_ref: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
1001 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
1002 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
1003 :return: None or exception
1007 RO_descriptor_number
= 0 # number of descriptors created at RO
1008 vnf_index_2_RO_id
= {} # map between vnfd/nsd id to the id used at RO
1009 nslcmop_id
= db_nslcmop
["_id"]
1010 start_deploy
= time()
1011 ns_params
= db_nslcmop
.get("operationParams")
1012 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1013 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1015 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1017 # Check for and optionally request placement optimization. Database will be updated if placement activated
1018 stage
[2] = "Waiting for Placement."
1019 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
1020 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
1021 for vnfr
in db_vnfrs
.values():
1022 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
1025 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
1028 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
1029 db_vnfds_ref
, n2vc_key_list
, stage
, start_deploy
,
1032 # get vnfds, instantiate at RO
1033 for c_vnf
in nsd
.get("constituent-vnfd", ()):
1034 member_vnf_index
= c_vnf
["member-vnf-index"]
1035 vnfd
= db_vnfds_ref
[c_vnf
['vnfd-id-ref']]
1036 vnfd_ref
= vnfd
["id"]
1038 stage
[2] = "Creating vnfd='{}' member_vnf_index='{}' at RO".format(vnfd_ref
, member_vnf_index
)
1039 db_nsr_update
["detailed-status"] = " ".join(stage
)
1040 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1041 self
._write
_op
_status
(nslcmop_id
, stage
)
1043 # self.logger.debug(logging_text + stage[2])
1044 vnfd_id_RO
= "{}.{}.{}".format(nsr_id
, RO_descriptor_number
, member_vnf_index
[:23])
1045 vnf_index_2_RO_id
[member_vnf_index
] = vnfd_id_RO
1046 RO_descriptor_number
+= 1
1048 # look position at deployed.RO.vnfd if not present it will be appended at the end
1049 for index
, vnf_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["RO"]["vnfd"]):
1050 if vnf_deployed
["member-vnf-index"] == member_vnf_index
:
1053 index
= len(db_nsr
["_admin"]["deployed"]["RO"]["vnfd"])
1054 db_nsr
["_admin"]["deployed"]["RO"]["vnfd"].append(None)
1057 RO_update
= {"member-vnf-index": member_vnf_index
}
1058 vnfd_list
= await self
.RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
1060 RO_update
["id"] = vnfd_list
[0]["uuid"]
1061 self
.logger
.debug(logging_text
+ "vnfd='{}' member_vnf_index='{}' exists at RO. Using RO_id={}".
1062 format(vnfd_ref
, member_vnf_index
, vnfd_list
[0]["uuid"]))
1064 vnfd_RO
= self
.vnfd2RO(vnfd
, vnfd_id_RO
, db_vnfrs
[c_vnf
["member-vnf-index"]].
1065 get("additionalParamsForVnf"), nsr_id
)
1066 desc
= await self
.RO
.create("vnfd", descriptor
=vnfd_RO
)
1067 RO_update
["id"] = desc
["uuid"]
1068 self
.logger
.debug(logging_text
+ "vnfd='{}' member_vnf_index='{}' created at RO. RO_id={}".format(
1069 vnfd_ref
, member_vnf_index
, desc
["uuid"]))
1070 db_nsr_update
["_admin.deployed.RO.vnfd.{}".format(index
)] = RO_update
1071 db_nsr
["_admin"]["deployed"]["RO"]["vnfd"][index
] = RO_update
1076 stage
[2] = "Creating nsd={} at RO".format(nsd_ref
)
1077 db_nsr_update
["detailed-status"] = " ".join(stage
)
1078 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1079 self
._write
_op
_status
(nslcmop_id
, stage
)
1081 # self.logger.debug(logging_text + stage[2])
1082 RO_osm_nsd_id
= "{}.{}.{}".format(nsr_id
, RO_descriptor_number
, nsd_ref
[:23])
1083 RO_descriptor_number
+= 1
1084 nsd_list
= await self
.RO
.get_list("nsd", filter_by
={"osm_id": RO_osm_nsd_id
})
1086 db_nsr_update
["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid
= nsd_list
[0]["uuid"]
1087 self
.logger
.debug(logging_text
+ "nsd={} exists at RO. Using RO_id={}".format(
1088 nsd_ref
, RO_nsd_uuid
))
1090 nsd_RO
= deepcopy(nsd
)
1091 nsd_RO
["id"] = RO_osm_nsd_id
1092 nsd_RO
.pop("_id", None)
1093 nsd_RO
.pop("_admin", None)
1094 for c_vnf
in nsd_RO
.get("constituent-vnfd", ()):
1095 member_vnf_index
= c_vnf
["member-vnf-index"]
1096 c_vnf
["vnfd-id-ref"] = vnf_index_2_RO_id
[member_vnf_index
]
1097 for c_vld
in nsd_RO
.get("vld", ()):
1098 for cp
in c_vld
.get("vnfd-connection-point-ref", ()):
1099 member_vnf_index
= cp
["member-vnf-index-ref"]
1100 cp
["vnfd-id-ref"] = vnf_index_2_RO_id
[member_vnf_index
]
1102 desc
= await self
.RO
.create("nsd", descriptor
=nsd_RO
)
1103 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1104 db_nsr_update
["_admin.deployed.RO.nsd_id"] = RO_nsd_uuid
= desc
["uuid"]
1105 self
.logger
.debug(logging_text
+ "nsd={} created at RO. RO_id={}".format(nsd_ref
, RO_nsd_uuid
))
1106 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1109 stage
[2] = "Creating nsd={} at RO".format(nsd_ref
)
1110 db_nsr_update
["detailed-status"] = " ".join(stage
)
1111 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1112 self
._write
_op
_status
(nslcmop_id
, stage
)
1114 # if present use it unless in error status
1115 RO_nsr_id
= deep_get(db_nsr
, ("_admin", "deployed", "RO", "nsr_id"))
1118 stage
[2] = "Looking for existing ns at RO"
1119 db_nsr_update
["detailed-status"] = " ".join(stage
)
1120 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1121 self
._write
_op
_status
(nslcmop_id
, stage
)
1122 # self.logger.debug(logging_text + stage[2] + " RO_ns_id={}".format(RO_nsr_id))
1123 desc
= await self
.RO
.show("ns", RO_nsr_id
)
1125 except ROclient
.ROClientException
as e
:
1126 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
1128 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1130 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
1131 db_nsr_update
["_admin.deployed.RO.nsr_status"] = ns_status
1132 if ns_status
== "ERROR":
1133 stage
[2] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id
)
1134 self
.logger
.debug(logging_text
+ stage
[2])
1135 await self
.RO
.delete("ns", RO_nsr_id
)
1136 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
1138 stage
[2] = "Checking dependencies"
1139 db_nsr_update
["detailed-status"] = " ".join(stage
)
1140 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1141 self
._write
_op
_status
(nslcmop_id
, stage
)
1142 # self.logger.debug(logging_text + stage[2])
1144 # check if VIM is creating and wait look if previous tasks in process
1145 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("vim_account", ns_params
["vimAccountId"])
1147 stage
[2] = "Waiting for related tasks '{}' to be completed".format(task_name
)
1148 self
.logger
.debug(logging_text
+ stage
[2])
1149 await asyncio
.wait(task_dependency
, timeout
=3600)
1150 if ns_params
.get("vnf"):
1151 for vnf
in ns_params
["vnf"]:
1152 if "vimAccountId" in vnf
:
1153 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("vim_account",
1154 vnf
["vimAccountId"])
1156 stage
[2] = "Waiting for related tasks '{}' to be completed.".format(task_name
)
1157 self
.logger
.debug(logging_text
+ stage
[2])
1158 await asyncio
.wait(task_dependency
, timeout
=3600)
1160 stage
[2] = "Checking instantiation parameters."
1161 RO_ns_params
= self
._ns
_params
_2_RO
(ns_params
, nsd
, db_vnfds_ref
, db_vnfrs
, n2vc_key_list
)
1162 stage
[2] = "Deploying ns at VIM."
1163 db_nsr_update
["detailed-status"] = " ".join(stage
)
1164 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1165 self
._write
_op
_status
(nslcmop_id
, stage
)
1167 desc
= await self
.RO
.create("ns", descriptor
=RO_ns_params
, name
=db_nsr
["name"], scenario
=RO_nsd_uuid
)
1168 RO_nsr_id
= db_nsr_update
["_admin.deployed.RO.nsr_id"] = desc
["uuid"]
1169 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1170 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "BUILD"
1171 self
.logger
.debug(logging_text
+ "ns created at RO. RO_id={}".format(desc
["uuid"]))
1173 # wait until NS is ready
1174 stage
[2] = "Waiting VIM to deploy ns."
1175 db_nsr_update
["detailed-status"] = " ".join(stage
)
1176 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1177 self
._write
_op
_status
(nslcmop_id
, stage
)
1178 detailed_status_old
= None
1179 self
.logger
.debug(logging_text
+ stage
[2] + " RO_ns_id={}".format(RO_nsr_id
))
1182 while time() <= start_deploy
+ timeout_ns_deploy
:
1183 desc
= await self
.RO
.show("ns", RO_nsr_id
)
1186 if desc
!= old_desc
:
1187 # desc has changed => update db
1188 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
1191 ns_status
, ns_status_info
= self
.RO
.check_ns_status(desc
)
1192 db_nsr_update
["_admin.deployed.RO.nsr_status"] = ns_status
1193 if ns_status
== "ERROR":
1194 raise ROclient
.ROClientException(ns_status_info
)
1195 elif ns_status
== "BUILD":
1196 stage
[2] = "VIM: ({})".format(ns_status_info
)
1197 elif ns_status
== "ACTIVE":
1198 stage
[2] = "Waiting for management IP address reported by the VIM. Updating VNFRs."
1200 self
.ns_update_vnfr(db_vnfrs
, desc
)
1202 except LcmExceptionNoMgmtIP
:
1205 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
1206 if stage
[2] != detailed_status_old
:
1207 detailed_status_old
= stage
[2]
1208 db_nsr_update
["detailed-status"] = " ".join(stage
)
1209 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1210 self
._write
_op
_status
(nslcmop_id
, stage
)
1211 await asyncio
.sleep(5, loop
=self
.loop
)
1212 else: # timeout_ns_deploy
1213 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
1216 self
.ns_update_nsr(db_nsr_update
, db_nsr
, desc
)
1218 db_nsr_update
["_admin.deployed.RO.operational-status"] = "running"
1219 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
1220 stage
[2] = "Deployed at VIM"
1221 db_nsr_update
["detailed-status"] = " ".join(stage
)
1222 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1223 self
._write
_op
_status
(nslcmop_id
, stage
)
1224 # await self._on_update_n2vc_db("nsrs", {"_id": nsr_id}, "_admin.deployed", db_nsr_update)
1225 # self.logger.debug(logging_text + "Deployed at VIM")
1226 except (ROclient
.ROClientException
, LcmException
, DbException
, NgRoException
) as e
:
1227 stage
[2] = "ERROR deploying at VIM"
1228 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
1231 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
1233 Wait for ip addres at RO, and optionally, insert public key in virtual machine
1234 :param logging_text: prefix use for logging
1239 :param pub_key: public ssh key to inject, None to skip
1240 :param user: user to apply the public ssh key
1244 # self.logger.debug(logging_text + "Starting wait_vm_up_insert_key_ro")
1248 target_vdu_id
= None
1254 if ro_retries
>= 360: # 1 hour
1255 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
1257 await asyncio
.sleep(10, loop
=self
.loop
)
1260 if not target_vdu_id
:
1261 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1263 if not vdu_id
: # for the VNF case
1264 if db_vnfr
.get("status") == "ERROR":
1265 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1266 ip_address
= db_vnfr
.get("ip-address")
1269 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1271 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1272 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1274 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1275 vdur
= db_vnfr
["vdur"][0]
1277 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1280 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE":
1281 ip_address
= vdur
.get("ip-address")
1284 target_vdu_id
= vdur
["vdu-id-ref"]
1285 elif vdur
.get("status") == "ERROR":
1286 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1288 if not target_vdu_id
:
1291 # inject public key into machine
1292 if pub_key
and user
:
1293 # wait until NS is deployed at RO
1295 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1296 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1300 # self.logger.debug(logging_text + "Inserting RO key")
1301 if vdur
.get("pdu-type"):
1302 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1305 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1307 target
= {"action": "inject_ssh_key", "key": pub_key
, "user": user
,
1308 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdu_id
}]}],
1310 await self
.RO
.deploy(nsr_id
, target
)
1312 result_dict
= await self
.RO
.create_action(
1314 item_id_name
=ro_nsr_id
,
1315 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1317 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1318 if not result_dict
or not isinstance(result_dict
, dict):
1319 raise LcmException("Unknown response from RO when injecting key")
1320 for result
in result_dict
.values():
1321 if result
.get("vim_result") == 200:
1324 raise ROclient
.ROClientException("error injecting key: {}".format(
1325 result
.get("description")))
1327 except NgRoException
as e
:
1328 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1329 except ROclient
.ROClientException
as e
:
1331 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1335 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1341 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1343 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1345 my_vca
= vca_deployed_list
[vca_index
]
1346 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1347 # vdu or kdu: no dependencies
1351 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1352 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1353 configuration_status_list
= db_nsr
["configurationStatus"]
1354 for index
, vca_deployed
in enumerate(configuration_status_list
):
1355 if index
== vca_index
:
1358 if not my_vca
.get("member-vnf-index") or \
1359 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1360 internal_status
= configuration_status_list
[index
].get("status")
1361 if internal_status
== 'READY':
1363 elif internal_status
== 'BROKEN':
1364 raise LcmException("Configuration aborted because dependent charm/s has failed")
1368 # no dependencies, return
1370 await asyncio
.sleep(10)
1373 raise LcmException("Configuration aborted because dependent charm/s timeout")
1375 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1376 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
,
1377 ee_config_descriptor
):
1378 nsr_id
= db_nsr
["_id"]
1379 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1380 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1381 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1382 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1384 'collection': 'nsrs',
1385 'filter': {'_id': nsr_id
},
1386 'path': db_update_entry
1392 element_under_configuration
= nsr_id
1396 vnfr_id
= db_vnfr
["_id"]
1397 osm_config
["osm"]["vnf_id"] = vnfr_id
1399 namespace
= "{nsi}.{ns}".format(
1400 nsi
=nsi_id
if nsi_id
else "",
1404 element_type
= 'VNF'
1405 element_under_configuration
= vnfr_id
1406 namespace
+= ".{}".format(vnfr_id
)
1408 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1409 element_type
= 'VDU'
1410 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1411 osm_config
["osm"]["vdu_id"] = vdu_id
1413 namespace
+= ".{}".format(kdu_name
)
1414 element_type
= 'KDU'
1415 element_under_configuration
= kdu_name
1416 osm_config
["osm"]["kdu_name"] = kdu_name
1419 artifact_path
= "{}/{}/{}/{}".format(
1420 base_folder
["folder"],
1421 base_folder
["pkg-dir"],
1422 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1426 # n2vc_redesign STEP 3.1
1428 # find old ee_id if exists
1429 ee_id
= vca_deployed
.get("ee_id")
1431 # create or register execution environment in VCA
1432 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm"):
1434 self
._write
_configuration
_status
(
1436 vca_index
=vca_index
,
1438 element_under_configuration
=element_under_configuration
,
1439 element_type
=element_type
1442 step
= "create execution environment"
1443 self
.logger
.debug(logging_text
+ step
)
1444 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1445 namespace
=namespace
,
1449 artifact_path
=artifact_path
,
1452 elif vca_type
== "native_charm":
1453 step
= "Waiting to VM being up and getting IP address"
1454 self
.logger
.debug(logging_text
+ step
)
1455 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1456 user
=None, pub_key
=None)
1457 credentials
= {"hostname": rw_mgmt_ip
}
1459 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1460 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1461 # merged. Meanwhile let's get username from initial-config-primitive
1462 if not username
and config_descriptor
.get("initial-config-primitive"):
1463 for config_primitive
in config_descriptor
["initial-config-primitive"]:
1464 for param
in config_primitive
.get("parameter", ()):
1465 if param
["name"] == "ssh-username":
1466 username
= param
["value"]
1469 raise LcmException("Cannot determine the username neither with 'initial-config-promitive' nor with "
1470 "'config-access.ssh-access.default-user'")
1471 credentials
["username"] = username
1472 # n2vc_redesign STEP 3.2
1474 self
._write
_configuration
_status
(
1476 vca_index
=vca_index
,
1477 status
='REGISTERING',
1478 element_under_configuration
=element_under_configuration
,
1479 element_type
=element_type
1482 step
= "register execution environment {}".format(credentials
)
1483 self
.logger
.debug(logging_text
+ step
)
1484 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1485 credentials
=credentials
, namespace
=namespace
, db_dict
=db_dict
)
1487 # for compatibility with MON/POL modules, the need model and application name at database
1488 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1489 ee_id_parts
= ee_id
.split('.')
1490 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1491 if len(ee_id_parts
) >= 2:
1492 model_name
= ee_id_parts
[0]
1493 application_name
= ee_id_parts
[1]
1494 db_nsr_update
[db_update_entry
+ "model"] = model_name
1495 db_nsr_update
[db_update_entry
+ "application"] = application_name
1497 # n2vc_redesign STEP 3.3
1498 step
= "Install configuration Software"
1500 self
._write
_configuration
_status
(
1502 vca_index
=vca_index
,
1503 status
='INSTALLING SW',
1504 element_under_configuration
=element_under_configuration
,
1505 element_type
=element_type
,
1506 other_update
=db_nsr_update
1509 # TODO check if already done
1510 self
.logger
.debug(logging_text
+ step
)
1512 if vca_type
== "native_charm":
1513 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1514 if initial_config_primitive_list
:
1515 for primitive
in initial_config_primitive_list
:
1516 if primitive
["name"] == "config":
1517 config
= self
._map
_primitive
_params
(
1524 if vca_type
== "lxc_proxy_charm":
1525 if element_type
== "NS":
1526 num_units
= db_nsr
.get("config-units") or 1
1527 elif element_type
== "VNF":
1528 num_units
= db_vnfr
.get("config-units") or 1
1529 elif element_type
== "VDU":
1530 for v
in db_vnfr
["vdur"]:
1531 if vdu_id
== v
["vdu-id-ref"]:
1532 num_units
= v
.get("config-units") or 1
1535 await self
.vca_map
[vca_type
].install_configuration_sw(
1537 artifact_path
=artifact_path
,
1540 num_units
=num_units
,
1544 # write in db flag of configuration_sw already installed
1545 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1547 # add relations for this VCA (wait for other peers related with this VCA)
1548 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1549 vca_index
=vca_index
, vca_type
=vca_type
)
1551 # if SSH access is required, then get execution environment SSH public
1552 if vca_type
in ("lxc_proxy_charm", "helm"): # if native charm we have waited already to VM be UP
1555 # self.logger.debug("get ssh key block")
1556 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1557 # self.logger.debug("ssh key needed")
1558 # Needed to inject a ssh key
1559 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1560 step
= "Install configuration Software, getting public ssh key"
1561 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(ee_id
=ee_id
, db_dict
=db_dict
)
1563 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1565 # self.logger.debug("no need to get ssh key")
1566 step
= "Waiting to VM being up and getting IP address"
1567 self
.logger
.debug(logging_text
+ step
)
1569 # n2vc_redesign STEP 5.1
1570 # wait for RO (ip-address) Insert pub_key into VM
1572 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1573 user
=user
, pub_key
=pub_key
)
1575 rw_mgmt_ip
= None # This is for a NS configuration
1577 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1579 # store rw_mgmt_ip in deploy params for later replacement
1580 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1582 # n2vc_redesign STEP 6 Execute initial config primitive
1583 step
= 'execute initial config primitive'
1584 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1586 # sort initial config primitives by 'seq'
1587 if initial_config_primitive_list
:
1589 initial_config_primitive_list
.sort(key
=lambda val
: int(val
['seq']))
1590 except Exception as e
:
1591 self
.logger
.error(logging_text
+ step
+ ": " + str(e
))
1593 self
.logger
.debug(logging_text
+ step
+ ": No initial-config-primitive")
1595 # add config if not present for NS charm
1596 initial_config_primitive_list
= self
._get
_initial
_config
_primitive
_list
(initial_config_primitive_list
,
1599 # wait for dependent primitives execution (NS -> VNF -> VDU)
1600 if initial_config_primitive_list
:
1601 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1603 # stage, in function of element type: vdu, kdu, vnf or ns
1604 my_vca
= vca_deployed_list
[vca_index
]
1605 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1607 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1608 elif my_vca
.get("member-vnf-index"):
1610 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1613 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1615 self
._write
_configuration
_status
(
1617 vca_index
=vca_index
,
1618 status
='EXECUTING PRIMITIVE'
1621 self
._write
_op
_status
(
1626 check_if_terminated_needed
= True
1627 for initial_config_primitive
in initial_config_primitive_list
:
1628 # adding information on the vca_deployed if it is a NS execution environment
1629 if not vca_deployed
["member-vnf-index"]:
1630 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1631 # TODO check if already done
1632 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1634 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1635 self
.logger
.debug(logging_text
+ step
)
1636 await self
.vca_map
[vca_type
].exec_primitive(
1638 primitive_name
=initial_config_primitive
["name"],
1639 params_dict
=primitive_params_
,
1642 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1643 if check_if_terminated_needed
:
1644 if config_descriptor
.get('terminate-config-primitive'):
1645 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1646 check_if_terminated_needed
= False
1648 # TODO register in database that primitive is done
1650 # STEP 7 Configure metrics
1651 if vca_type
== "helm":
1652 prometheus_jobs
= await self
.add_prometheus_metrics(
1654 artifact_path
=artifact_path
,
1655 ee_config_descriptor
=ee_config_descriptor
,
1658 target_ip
=rw_mgmt_ip
,
1661 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "prometheus_jobs": prometheus_jobs
})
1663 step
= "instantiated at VCA"
1664 self
.logger
.debug(logging_text
+ step
)
1666 self
._write
_configuration
_status
(
1668 vca_index
=vca_index
,
1672 except Exception as e
: # TODO not use Exception but N2VC exception
1673 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1674 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1675 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1676 self
._write
_configuration
_status
(
1678 vca_index
=vca_index
,
1681 raise LcmException("{} {}".format(step
, e
)) from e
1683 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1684 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1686 Update db_nsr fields.
1689 :param current_operation:
1690 :param current_operation_id:
1691 :param error_description:
1692 :param error_detail:
1693 :param other_update: Other required changes at database if provided, will be cleared
1697 db_dict
= other_update
or {}
1698 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1699 db_dict
["_admin.current-operation"] = current_operation_id
1700 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1701 db_dict
["currentOperation"] = current_operation
1702 db_dict
["currentOperationID"] = current_operation_id
1703 db_dict
["errorDescription"] = error_description
1704 db_dict
["errorDetail"] = error_detail
1707 db_dict
["nsState"] = ns_state
1708 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1709 except DbException
as e
:
1710 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1712 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1713 operation_state
: str = None, other_update
: dict = None):
1715 db_dict
= other_update
or {}
1716 db_dict
['queuePosition'] = queuePosition
1717 if isinstance(stage
, list):
1718 db_dict
['stage'] = stage
[0]
1719 db_dict
['detailed-status'] = " ".join(stage
)
1720 elif stage
is not None:
1721 db_dict
['stage'] = str(stage
)
1723 if error_message
is not None:
1724 db_dict
['errorMessage'] = error_message
1725 if operation_state
is not None:
1726 db_dict
['operationState'] = operation_state
1727 db_dict
["statusEnteredTime"] = time()
1728 self
.update_db_2("nslcmops", op_id
, db_dict
)
1729 except DbException
as e
:
1730 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1732 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1734 nsr_id
= db_nsr
["_id"]
1735 # configurationStatus
1736 config_status
= db_nsr
.get('configurationStatus')
1738 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1739 enumerate(config_status
) if v
}
1741 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1743 except DbException
as e
:
1744 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1746 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1747 element_under_configuration
: str = None, element_type
: str = None,
1748 other_update
: dict = None):
1750 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1751 # .format(vca_index, status))
1754 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1755 db_dict
= other_update
or {}
1757 db_dict
[db_path
+ 'status'] = status
1758 if element_under_configuration
:
1759 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1761 db_dict
[db_path
+ 'elementType'] = element_type
1762 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1763 except DbException
as e
:
1764 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1765 .format(status
, nsr_id
, vca_index
, e
))
1767 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1769 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1770 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1771 Database is used because the result can be obtained from a different LCM worker in case of HA.
1772 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1773 :param db_nslcmop: database content of nslcmop
1774 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1775 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1776 computed 'vim-account-id'
1779 nslcmop_id
= db_nslcmop
['_id']
1780 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1781 if placement_engine
== "PLA":
1782 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1783 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1784 db_poll_interval
= 5
1785 wait
= db_poll_interval
* 10
1787 while not pla_result
and wait
>= 0:
1788 await asyncio
.sleep(db_poll_interval
)
1789 wait
-= db_poll_interval
1790 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1791 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1794 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1796 for pla_vnf
in pla_result
['vnf']:
1797 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1798 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1801 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1803 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1806 def update_nsrs_with_pla_result(self
, params
):
1808 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1809 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1810 except Exception as e
:
1811 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1813 async def instantiate(self
, nsr_id
, nslcmop_id
):
1816 :param nsr_id: ns instance to deploy
1817 :param nslcmop_id: operation to run
1821 # Try to lock HA task here
1822 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1823 if not task_is_locked_by_me
:
1824 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1827 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1828 self
.logger
.debug(logging_text
+ "Enter")
1833 # get all needed from database
1835 # database nsrs record
1838 # database nslcmops record
1841 # update operation on nsrs
1843 # update operation on nslcmops
1844 db_nslcmop_update
= {}
1846 nslcmop_operation_state
= None
1847 db_vnfrs
= {} # vnf's info indexed by member-index
1849 tasks_dict_info
= {} # from task to info text
1852 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1853 # ^ stage, step, VIM progress
1855 # wait for any previous tasks in process
1856 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1858 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1859 stage
[1] = "Reading from database,"
1860 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1861 db_nsr_update
["detailed-status"] = "creating"
1862 db_nsr_update
["operational-status"] = "init"
1863 self
._write
_ns
_status
(
1865 ns_state
="BUILDING",
1866 current_operation
="INSTANTIATING",
1867 current_operation_id
=nslcmop_id
,
1868 other_update
=db_nsr_update
1870 self
._write
_op
_status
(
1876 # read from db: operation
1877 stage
[1] = "Getting nslcmop={} from db".format(nslcmop_id
)
1878 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1879 ns_params
= db_nslcmop
.get("operationParams")
1880 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1881 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1883 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1886 stage
[1] = "Getting nsr={} from db".format(nsr_id
)
1887 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1888 stage
[1] = "Getting nsd={} from db".format(db_nsr
["nsd-id"])
1889 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
1891 # nsr_name = db_nsr["name"] # TODO short-name??
1893 # read from db: vnf's of this ns
1894 stage
[1] = "Getting vnfrs from db"
1895 self
.logger
.debug(logging_text
+ stage
[1])
1896 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
1898 # read from db: vnfd's for every vnf
1899 db_vnfds_ref
= {} # every vnfd data indexed by vnf name
1900 db_vnfds
= {} # every vnfd data indexed by vnf id
1901 db_vnfds_index
= {} # every vnfd data indexed by vnf member-index
1903 # for each vnf in ns, read vnfd
1904 for vnfr
in db_vnfrs_list
:
1905 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
# vnf's dict indexed by member-index: '1', '2', etc
1906 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
1907 vnfd_ref
= vnfr
["vnfd-ref"] # vnfd name for this vnf
1908 # if we haven't this vnfd, read it from db
1909 if vnfd_id
not in db_vnfds
:
1911 stage
[1] = "Getting vnfd={} id='{}' from db".format(vnfd_id
, vnfd_ref
)
1912 self
.logger
.debug(logging_text
+ stage
[1])
1913 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
1916 db_vnfds_ref
[vnfd_ref
] = vnfd
# vnfd's indexed by name
1917 db_vnfds
[vnfd_id
] = vnfd
# vnfd's indexed by id
1918 db_vnfds_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds
[vnfd_id
] # vnfd's indexed by member-index
1920 # Get or generates the _admin.deployed.VCA list
1921 vca_deployed_list
= None
1922 if db_nsr
["_admin"].get("deployed"):
1923 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
1924 if vca_deployed_list
is None:
1925 vca_deployed_list
= []
1926 configuration_status_list
= []
1927 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1928 db_nsr_update
["configurationStatus"] = configuration_status_list
1929 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1930 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1931 elif isinstance(vca_deployed_list
, dict):
1932 # maintain backward compatibility. Change a dict to list at database
1933 vca_deployed_list
= list(vca_deployed_list
.values())
1934 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1935 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1937 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
1938 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
1939 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
1941 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1942 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1943 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1945 # n2vc_redesign STEP 2 Deploy Network Scenario
1946 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1947 self
._write
_op
_status
(
1952 stage
[1] = "Deploying KDUs,"
1953 # self.logger.debug(logging_text + "Before deploy_kdus")
1954 # Call to deploy_kdus in case exists the "vdu:kdu" param
1955 await self
.deploy_kdus(
1956 logging_text
=logging_text
,
1958 nslcmop_id
=nslcmop_id
,
1961 task_instantiation_info
=tasks_dict_info
,
1964 stage
[1] = "Getting VCA public key."
1965 # n2vc_redesign STEP 1 Get VCA public ssh-key
1966 # feature 1429. Add n2vc public key to needed VMs
1967 n2vc_key
= self
.n2vc
.get_public_key()
1968 n2vc_key_list
= [n2vc_key
]
1969 if self
.vca_config
.get("public_key"):
1970 n2vc_key_list
.append(self
.vca_config
["public_key"])
1972 stage
[1] = "Deploying NS at VIM."
1973 task_ro
= asyncio
.ensure_future(
1974 self
.instantiate_RO(
1975 logging_text
=logging_text
,
1979 db_nslcmop
=db_nslcmop
,
1981 db_vnfds_ref
=db_vnfds_ref
,
1982 n2vc_key_list
=n2vc_key_list
,
1986 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
1987 tasks_dict_info
[task_ro
] = "Deploying at VIM"
1989 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1990 stage
[1] = "Deploying Execution Environments."
1991 self
.logger
.debug(logging_text
+ stage
[1])
1993 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
1994 # get_iterable() returns a value from a dict or empty tuple if key does not exist
1995 for c_vnf
in get_iterable(nsd
, "constituent-vnfd"):
1996 vnfd_id
= c_vnf
["vnfd-id-ref"]
1997 vnfd
= db_vnfds_ref
[vnfd_id
]
1998 member_vnf_index
= str(c_vnf
["member-vnf-index"])
1999 db_vnfr
= db_vnfrs
[member_vnf_index
]
2000 base_folder
= vnfd
["_admin"]["storage"]
2006 # Get additional parameters
2008 if db_vnfr
.get("additionalParamsForVnf"):
2009 deploy_params
= self
._format
_additional
_params
(db_vnfr
["additionalParamsForVnf"].copy())
2011 descriptor_config
= vnfd
.get("vnf-configuration")
2012 if descriptor_config
:
2014 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
2017 nslcmop_id
=nslcmop_id
,
2023 member_vnf_index
=member_vnf_index
,
2024 vdu_index
=vdu_index
,
2026 deploy_params
=deploy_params
,
2027 descriptor_config
=descriptor_config
,
2028 base_folder
=base_folder
,
2029 task_instantiation_info
=tasks_dict_info
,
2033 # Deploy charms for each VDU that supports one.
2034 for vdud
in get_iterable(vnfd
, 'vdu'):
2036 descriptor_config
= vdud
.get('vdu-configuration')
2037 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
2038 if vdur
.get("additionalParams"):
2039 deploy_params_vdu
= self
._format
_additional
_params
(vdur
["additionalParams"])
2041 deploy_params_vdu
= deploy_params
2042 if descriptor_config
:
2043 # look for vdu index in the db_vnfr["vdu"] section
2044 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
2045 # if vdur["vdu-id-ref"] == vdu_id:
2048 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
2049 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
2050 # vdu_name = vdur.get("name")
2053 for vdu_index
in range(int(vdud
.get("count", 1))):
2054 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
2056 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
2057 member_vnf_index
, vdu_id
, vdu_index
),
2060 nslcmop_id
=nslcmop_id
,
2066 member_vnf_index
=member_vnf_index
,
2067 vdu_index
=vdu_index
,
2069 deploy_params
=deploy_params_vdu
,
2070 descriptor_config
=descriptor_config
,
2071 base_folder
=base_folder
,
2072 task_instantiation_info
=tasks_dict_info
,
2075 for kdud
in get_iterable(vnfd
, 'kdu'):
2076 kdu_name
= kdud
["name"]
2077 descriptor_config
= kdud
.get('kdu-configuration')
2078 if descriptor_config
:
2082 # look for vdu index in the db_vnfr["vdu"] section
2083 # for vdur_index, vdur in enumerate(db_vnfr["vdur"]):
2084 # if vdur["vdu-id-ref"] == vdu_id:
2087 # raise LcmException("Mismatch vdu_id={} not found in the vnfr['vdur'] list for "
2088 # "member_vnf_index={}".format(vdu_id, member_vnf_index))
2089 # vdu_name = vdur.get("name")
2093 logging_text
=logging_text
,
2096 nslcmop_id
=nslcmop_id
,
2102 member_vnf_index
=member_vnf_index
,
2103 vdu_index
=vdu_index
,
2105 deploy_params
=deploy_params
,
2106 descriptor_config
=descriptor_config
,
2107 base_folder
=base_folder
,
2108 task_instantiation_info
=tasks_dict_info
,
2112 # Check if this NS has a charm configuration
2113 descriptor_config
= nsd
.get("ns-configuration")
2114 if descriptor_config
and descriptor_config
.get("juju"):
2117 member_vnf_index
= None
2123 # Get additional parameters
2125 if db_nsr
.get("additionalParamsForNs"):
2126 deploy_params
= self
._format
_additional
_params
(db_nsr
["additionalParamsForNs"].copy())
2127 base_folder
= nsd
["_admin"]["storage"]
2129 logging_text
=logging_text
,
2132 nslcmop_id
=nslcmop_id
,
2138 member_vnf_index
=member_vnf_index
,
2139 vdu_index
=vdu_index
,
2141 deploy_params
=deploy_params
,
2142 descriptor_config
=descriptor_config
,
2143 base_folder
=base_folder
,
2144 task_instantiation_info
=tasks_dict_info
,
2148 # rest of staff will be done at finally
2150 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
2151 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
2153 except asyncio
.CancelledError
:
2154 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
2155 exc
= "Operation was cancelled"
2156 except Exception as e
:
2157 exc
= traceback
.format_exc()
2158 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
2161 error_list
.append(str(exc
))
2163 # wait for pending tasks
2165 stage
[1] = "Waiting for instantiate pending tasks."
2166 self
.logger
.debug(logging_text
+ stage
[1])
2167 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
2168 stage
, nslcmop_id
, nsr_id
=nsr_id
)
2169 stage
[1] = stage
[2] = ""
2170 except asyncio
.CancelledError
:
2171 error_list
.append("Cancelled")
2172 # TODO cancel all tasks
2173 except Exception as exc
:
2174 error_list
.append(str(exc
))
2176 # update operation-status
2177 db_nsr_update
["operational-status"] = "running"
2178 # let's begin with VCA 'configured' status (later we can change it)
2179 db_nsr_update
["config-status"] = "configured"
2180 for task
, task_name
in tasks_dict_info
.items():
2181 if not task
.done() or task
.cancelled() or task
.exception():
2182 if task_name
.startswith(self
.task_name_deploy_vca
):
2183 # A N2VC task is pending
2184 db_nsr_update
["config-status"] = "failed"
2186 # RO or KDU task is pending
2187 db_nsr_update
["operational-status"] = "failed"
2189 # update status at database
2191 error_detail
= ". ".join(error_list
)
2192 self
.logger
.error(logging_text
+ error_detail
)
2193 error_description_nslcmop
= 'Stage: {}. Detail: {}'.format(stage
[0], error_detail
)
2194 error_description_nsr
= 'Operation: INSTANTIATING.{}, Stage {}'.format(nslcmop_id
, stage
[0])
2196 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
2197 db_nslcmop_update
["detailed-status"] = error_detail
2198 nslcmop_operation_state
= "FAILED"
2202 error_description_nsr
= error_description_nslcmop
= None
2204 db_nsr_update
["detailed-status"] = "Done"
2205 db_nslcmop_update
["detailed-status"] = "Done"
2206 nslcmop_operation_state
= "COMPLETED"
2209 self
._write
_ns
_status
(
2212 current_operation
="IDLE",
2213 current_operation_id
=None,
2214 error_description
=error_description_nsr
,
2215 error_detail
=error_detail
,
2216 other_update
=db_nsr_update
2218 self
._write
_op
_status
(
2221 error_message
=error_description_nslcmop
,
2222 operation_state
=nslcmop_operation_state
,
2223 other_update
=db_nslcmop_update
,
2226 if nslcmop_operation_state
:
2228 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
2229 "operationState": nslcmop_operation_state
},
2231 except Exception as e
:
2232 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
2234 self
.logger
.debug(logging_text
+ "Exit")
2235 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2237 async def _add_vca_relations(self
, logging_text
, nsr_id
, vca_index
: int,
2238 timeout
: int = 3600, vca_type
: str = None) -> bool:
2241 # 1. find all relations for this VCA
2242 # 2. wait for other peers related
2246 vca_type
= vca_type
or "lxc_proxy_charm"
2248 # STEP 1: find all relations for this VCA
2251 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2252 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2255 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2257 # read all ns-configuration relations
2258 ns_relations
= list()
2259 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2261 for r
in db_ns_relations
:
2262 # check if this VCA is in the relation
2263 if my_vca
.get('member-vnf-index') in\
2264 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2265 ns_relations
.append(r
)
2267 # read all vnf-configuration relations
2268 vnf_relations
= list()
2269 db_vnfd_list
= db_nsr
.get('vnfd-id')
2271 for vnfd
in db_vnfd_list
:
2272 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2273 db_vnf_relations
= deep_get(db_vnfd
, ('vnf-configuration', 'relation'))
2274 if db_vnf_relations
:
2275 for r
in db_vnf_relations
:
2276 # check if this VCA is in the relation
2277 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2278 vnf_relations
.append(r
)
2280 # if no relations, terminate
2281 if not ns_relations
and not vnf_relations
:
2282 self
.logger
.debug(logging_text
+ ' No relations')
2285 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2292 if now
- start
>= timeout
:
2293 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2296 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2297 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2299 # for each defined NS relation, find the VCA's related
2300 for r
in ns_relations
:
2301 from_vca_ee_id
= None
2303 from_vca_endpoint
= None
2304 to_vca_endpoint
= None
2305 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2306 for vca
in vca_list
:
2307 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2308 and vca
.get('config_sw_installed'):
2309 from_vca_ee_id
= vca
.get('ee_id')
2310 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2311 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2312 and vca
.get('config_sw_installed'):
2313 to_vca_ee_id
= vca
.get('ee_id')
2314 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2315 if from_vca_ee_id
and to_vca_ee_id
:
2317 await self
.vca_map
[vca_type
].add_relation(
2318 ee_id_1
=from_vca_ee_id
,
2319 ee_id_2
=to_vca_ee_id
,
2320 endpoint_1
=from_vca_endpoint
,
2321 endpoint_2
=to_vca_endpoint
)
2322 # remove entry from relations list
2323 ns_relations
.remove(r
)
2325 # check failed peers
2327 vca_status_list
= db_nsr
.get('configurationStatus')
2329 for i
in range(len(vca_list
)):
2331 vca_status
= vca_status_list
[i
]
2332 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2333 if vca_status
.get('status') == 'BROKEN':
2334 # peer broken: remove relation from list
2335 ns_relations
.remove(r
)
2336 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2337 if vca_status
.get('status') == 'BROKEN':
2338 # peer broken: remove relation from list
2339 ns_relations
.remove(r
)
2344 # for each defined VNF relation, find the VCA's related
2345 for r
in vnf_relations
:
2346 from_vca_ee_id
= None
2348 from_vca_endpoint
= None
2349 to_vca_endpoint
= None
2350 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2351 for vca
in vca_list
:
2352 if vca
.get('vdu_id') == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2353 from_vca_ee_id
= vca
.get('ee_id')
2354 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2355 if vca
.get('vdu_id') == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2356 to_vca_ee_id
= vca
.get('ee_id')
2357 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2358 if from_vca_ee_id
and to_vca_ee_id
:
2360 await self
.vca_map
[vca_type
].add_relation(
2361 ee_id_1
=from_vca_ee_id
,
2362 ee_id_2
=to_vca_ee_id
,
2363 endpoint_1
=from_vca_endpoint
,
2364 endpoint_2
=to_vca_endpoint
)
2365 # remove entry from relations list
2366 vnf_relations
.remove(r
)
2368 # check failed peers
2370 vca_status_list
= db_nsr
.get('configurationStatus')
2372 for i
in range(len(vca_list
)):
2374 vca_status
= vca_status_list
[i
]
2375 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2376 if vca_status
.get('status') == 'BROKEN':
2377 # peer broken: remove relation from list
2378 ns_relations
.remove(r
)
2379 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2380 if vca_status
.get('status') == 'BROKEN':
2381 # peer broken: remove relation from list
2382 ns_relations
.remove(r
)
2388 await asyncio
.sleep(5.0)
2390 if not ns_relations
and not vnf_relations
:
2391 self
.logger
.debug('Relations added')
2396 except Exception as e
:
2397 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2400 def _write_db_callback(self
, task
, item
, _id
, on_done
=None, on_exc
=None):
2402 callback for kdu install intended to store the returned kdu_instance at database
2407 result
= task
.result()
2409 db_update
[on_done
] = str(result
)
2410 except Exception as e
:
2412 db_update
[on_exc
] = str(e
)
2415 self
.update_db_2(item
, _id
, db_update
)
2419 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2420 # Launch kdus if present in the descriptor
2422 k8scluster_id_2_uuic
= {"helm-chart": {}, "juju-bundle": {}}
2424 def _get_cluster_id(cluster_id
, cluster_type
):
2425 nonlocal k8scluster_id_2_uuic
2426 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2427 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2429 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2430 if not db_k8scluster
:
2431 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2432 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2434 raise LcmException("K8s cluster '{}' has not been initilized for '{}'".format(cluster_id
, cluster_type
))
2435 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2438 logging_text
+= "Deploy kdus: "
2441 db_nsr_update
= {"_admin.deployed.K8s": []}
2442 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2445 updated_cluster_list
= []
2447 for vnfr_data
in db_vnfrs
.values():
2448 for kdur
in get_iterable(vnfr_data
, "kdur"):
2449 desc_params
= self
._format
_additional
_params
(kdur
.get("additionalParams"))
2450 vnfd_id
= vnfr_data
.get('vnfd-id')
2451 namespace
= kdur
.get("k8s-namespace")
2452 if kdur
.get("helm-chart"):
2453 kdumodel
= kdur
["helm-chart"]
2454 k8sclustertype
= "helm-chart"
2455 elif kdur
.get("juju-bundle"):
2456 kdumodel
= kdur
["juju-bundle"]
2457 k8sclustertype
= "juju-bundle"
2459 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2460 "juju-bundle. Maybe an old NBI version is running".
2461 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2462 # check if kdumodel is a file and exists
2464 storage
= deep_get(db_vnfds
.get(vnfd_id
), ('_admin', 'storage'))
2465 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2466 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2467 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2469 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2470 kdumodel
= self
.fs
.path
+ filename
2471 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2473 except Exception: # it is not a file
2476 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2477 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2478 cluster_uuid
= _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2480 if k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
:
2481 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2482 self
.k8sclusterhelm
.synchronize_repos(cluster_uuid
=cluster_uuid
))
2483 if del_repo_list
or added_repo_dict
:
2484 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2485 updated
= {'_admin.helm_charts_added.' +
2486 item
: name
for item
, name
in added_repo_dict
.items()}
2487 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster '{}' to_delete: {}, "
2488 "to_add: {}".format(k8s_cluster_id
, del_repo_list
,
2490 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2491 updated_cluster_list
.append(cluster_uuid
)
2493 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2494 kdur
["kdu-name"], k8s_cluster_id
)
2496 k8s_instace_info
= {"kdu-instance": None,
2497 "k8scluster-uuid": cluster_uuid
,
2498 "k8scluster-type": k8sclustertype
,
2499 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2500 "kdu-name": kdur
["kdu-name"],
2501 "kdu-model": kdumodel
,
2502 "namespace": namespace
}
2503 db_path
= "_admin.deployed.K8s.{}".format(index
)
2504 db_nsr_update
[db_path
] = k8s_instace_info
2505 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2507 db_dict
= {"collection": "nsrs",
2508 "filter": {"_id": nsr_id
},
2511 task
= asyncio
.ensure_future(
2512 self
.k8scluster_map
[k8sclustertype
].install(cluster_uuid
=cluster_uuid
, kdu_model
=kdumodel
,
2513 atomic
=True, params
=desc_params
,
2514 db_dict
=db_dict
, timeout
=600,
2515 kdu_name
=kdur
["kdu-name"], namespace
=namespace
))
2517 task
.add_done_callback(partial(self
._write
_db
_callback
, item
="nsrs", _id
=nsr_id
,
2518 on_done
=db_path
+ ".kdu-instance",
2519 on_exc
=db_path
+ ".detailed-status"))
2520 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2521 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2525 except (LcmException
, asyncio
.CancelledError
):
2527 except Exception as e
:
2528 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2529 if isinstance(e
, (N2VCException
, DbException
)):
2530 self
.logger
.error(logging_text
+ msg
)
2532 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2533 raise LcmException(msg
)
2536 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2538 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2539 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2540 base_folder
, task_instantiation_info
, stage
):
2541 # launch instantiate_N2VC in a asyncio task and register task object
2542 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2543 # if not found, create one entry and update database
2544 # fill db_nsr._admin.deployed.VCA.<index>
2546 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2547 if descriptor_config
.get("juju"): # There is one execution envioronment of type juju
2548 ee_list
= [descriptor_config
]
2549 elif descriptor_config
.get("execution-environment-list"):
2550 ee_list
= descriptor_config
.get("execution-environment-list")
2551 else: # other types as script are not supported
2554 for ee_item
in ee_list
:
2555 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2556 ee_item
.get("helm-chart")))
2557 if ee_item
.get("juju"):
2558 vca_name
= ee_item
['juju'].get('charm')
2559 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2560 if ee_item
['juju'].get('cloud') == "k8s":
2561 vca_type
= "k8s_proxy_charm"
2562 elif ee_item
['juju'].get('proxy') is False:
2563 vca_type
= "native_charm"
2564 elif ee_item
.get("helm-chart"):
2565 vca_name
= ee_item
['helm-chart']
2568 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2572 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2573 if not vca_deployed
:
2575 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2576 vca_deployed
.get("vdu_id") == vdu_id
and \
2577 vca_deployed
.get("kdu_name") == kdu_name
and \
2578 vca_deployed
.get("vdu_count_index", 0) == vdu_index
:
2581 # not found, create one.
2583 "member-vnf-index": member_vnf_index
,
2585 "kdu_name": kdu_name
,
2586 "vdu_count_index": vdu_index
,
2587 "operational-status": "init", # TODO revise
2588 "detailed-status": "", # TODO revise
2589 "step": "initial-deploy", # TODO revise
2591 "vdu_name": vdu_name
,
2596 # create VCA and configurationStatus in db
2598 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2599 "configurationStatus.{}".format(vca_index
): dict()
2601 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2603 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2606 task_n2vc
= asyncio
.ensure_future(
2607 self
.instantiate_N2VC(
2608 logging_text
=logging_text
,
2609 vca_index
=vca_index
,
2615 vdu_index
=vdu_index
,
2616 deploy_params
=deploy_params
,
2617 config_descriptor
=descriptor_config
,
2618 base_folder
=base_folder
,
2619 nslcmop_id
=nslcmop_id
,
2623 ee_config_descriptor
=ee_item
2626 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2627 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2628 member_vnf_index
or "", vdu_id
or "")
2630 # Check if this VNFD has a configured terminate action
2631 def _has_terminate_config_primitive(self
, vnfd
):
2632 vnf_config
= vnfd
.get("vnf-configuration")
2633 if vnf_config
and vnf_config
.get("terminate-config-primitive"):
2639 def _get_terminate_config_primitive_seq_list(vnfd
):
2640 """ Get a numerically sorted list of the sequences for this VNFD's terminate action """
2641 # No need to check for existing primitive twice, already done before
2642 vnf_config
= vnfd
.get("vnf-configuration")
2643 seq_list
= vnf_config
.get("terminate-config-primitive")
2644 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2645 seq_list_sorted
= sorted(seq_list
, key
=lambda x
: int(x
['seq']))
2646 return seq_list_sorted
2649 def _create_nslcmop(nsr_id
, operation
, params
):
2651 Creates a ns-lcm-opp content to be stored at database.
2652 :param nsr_id: internal id of the instance
2653 :param operation: instantiate, terminate, scale, action, ...
2654 :param params: user parameters for the operation
2655 :return: dictionary following SOL005 format
2657 # Raise exception if invalid arguments
2658 if not (nsr_id
and operation
and params
):
2660 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2666 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2667 "operationState": "PROCESSING",
2668 "statusEnteredTime": now
,
2669 "nsInstanceId": nsr_id
,
2670 "lcmOperationType": operation
,
2672 "isAutomaticInvocation": False,
2673 "operationParams": params
,
2674 "isCancelPending": False,
2676 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2677 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2682 def _format_additional_params(self
, params
):
2683 params
= params
or {}
2684 for key
, value
in params
.items():
2685 if str(value
).startswith("!!yaml "):
2686 params
[key
] = yaml
.safe_load(value
[7:])
2689 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2690 primitive
= seq
.get('name')
2691 primitive_params
= {}
2693 "member_vnf_index": vnf_index
,
2694 "primitive": primitive
,
2695 "primitive_params": primitive_params
,
2698 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2702 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2703 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2704 if op
.get('operationState') == 'COMPLETED':
2705 # b. Skip sub-operation
2706 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2707 return self
.SUBOPERATION_STATUS_SKIP
2709 # c. retry executing sub-operation
2710 # The sub-operation exists, and operationState != 'COMPLETED'
2711 # Update operationState = 'PROCESSING' to indicate a retry.
2712 operationState
= 'PROCESSING'
2713 detailed_status
= 'In progress'
2714 self
._update
_suboperation
_status
(
2715 db_nslcmop
, op_index
, operationState
, detailed_status
)
2716 # Return the sub-operation index
2717 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2718 # with arguments extracted from the sub-operation
2721 # Find a sub-operation where all keys in a matching dictionary must match
2722 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2723 def _find_suboperation(self
, db_nslcmop
, match
):
2724 if db_nslcmop
and match
:
2725 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2726 for i
, op
in enumerate(op_list
):
2727 if all(op
.get(k
) == match
[k
] for k
in match
):
2729 return self
.SUBOPERATION_STATUS_NOT_FOUND
2731 # Update status for a sub-operation given its index
2732 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2733 # Update DB for HA tasks
2734 q_filter
= {'_id': db_nslcmop
['_id']}
2735 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2736 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2737 self
.db
.set_one("nslcmops",
2739 update_dict
=update_dict
,
2740 fail_on_empty
=False)
2742 # Add sub-operation, return the index of the added sub-operation
2743 # Optionally, set operationState, detailed-status, and operationType
2744 # Status and type are currently set for 'scale' sub-operations:
2745 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2746 # 'detailed-status' : status message
2747 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2748 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2749 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2750 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2751 RO_nsr_id
=None, RO_scaling_info
=None):
2753 return self
.SUBOPERATION_STATUS_NOT_FOUND
2754 # Get the "_admin.operations" list, if it exists
2755 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2756 op_list
= db_nslcmop_admin
.get('operations')
2757 # Create or append to the "_admin.operations" list
2758 new_op
= {'member_vnf_index': vnf_index
,
2760 'vdu_count_index': vdu_count_index
,
2761 'primitive': primitive
,
2762 'primitive_params': mapped_primitive_params
}
2764 new_op
['operationState'] = operationState
2766 new_op
['detailed-status'] = detailed_status
2768 new_op
['lcmOperationType'] = operationType
2770 new_op
['RO_nsr_id'] = RO_nsr_id
2772 new_op
['RO_scaling_info'] = RO_scaling_info
2774 # No existing operations, create key 'operations' with current operation as first list element
2775 db_nslcmop_admin
.update({'operations': [new_op
]})
2776 op_list
= db_nslcmop_admin
.get('operations')
2778 # Existing operations, append operation to list
2779 op_list
.append(new_op
)
2781 db_nslcmop_update
= {'_admin.operations': op_list
}
2782 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2783 op_index
= len(op_list
) - 1
2786 # Helper methods for scale() sub-operations
2788 # pre-scale/post-scale:
2789 # Check for 3 different cases:
2790 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2791 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2792 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2793 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2794 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2795 # Find this sub-operation
2796 if RO_nsr_id
and RO_scaling_info
:
2797 operationType
= 'SCALE-RO'
2799 'member_vnf_index': vnf_index
,
2800 'RO_nsr_id': RO_nsr_id
,
2801 'RO_scaling_info': RO_scaling_info
,
2805 'member_vnf_index': vnf_index
,
2806 'primitive': vnf_config_primitive
,
2807 'primitive_params': primitive_params
,
2808 'lcmOperationType': operationType
2810 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
2811 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
2812 # a. New sub-operation
2813 # The sub-operation does not exist, add it.
2814 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2815 # The following parameters are set to None for all kind of scaling:
2817 vdu_count_index
= None
2819 if RO_nsr_id
and RO_scaling_info
:
2820 vnf_config_primitive
= None
2821 primitive_params
= None
2824 RO_scaling_info
= None
2825 # Initial status for sub-operation
2826 operationState
= 'PROCESSING'
2827 detailed_status
= 'In progress'
2828 # Add sub-operation for pre/post-scaling (zero or more operations)
2829 self
._add
_suboperation
(db_nslcmop
,
2834 vnf_config_primitive
,
2841 return self
.SUBOPERATION_STATUS_NEW
2843 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2844 # or op_index (operationState != 'COMPLETED')
2845 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
2847 # Function to return execution_environment id
2849 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
2850 # TODO vdu_index_count
2851 for vca
in vca_deployed_list
:
2852 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
2855 async def destroy_N2VC(self
, logging_text
, db_nslcmop
, vca_deployed
, config_descriptor
,
2856 vca_index
, destroy_ee
=True, exec_primitives
=True):
2858 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2859 :param logging_text:
2861 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2862 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2863 :param vca_index: index in the database _admin.deployed.VCA
2864 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2865 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2866 not executed properly
2867 :return: None or exception
2871 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2872 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
2876 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
2878 # execute terminate_primitives
2880 terminate_primitives
= config_descriptor
.get("terminate-config-primitive")
2881 vdu_id
= vca_deployed
.get("vdu_id")
2882 vdu_count_index
= vca_deployed
.get("vdu_count_index")
2883 vdu_name
= vca_deployed
.get("vdu_name")
2884 vnf_index
= vca_deployed
.get("member-vnf-index")
2885 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
2886 # Get all 'seq' tags in seq_list, order sequences numerically, ascending.
2887 terminate_primitives
= sorted(terminate_primitives
, key
=lambda x
: int(x
['seq']))
2888 for seq
in terminate_primitives
:
2889 # For each sequence in list, get primitive and call _ns_execute_primitive()
2890 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
2891 vnf_index
, seq
.get("name"))
2892 self
.logger
.debug(logging_text
+ step
)
2893 # Create the primitive for each sequence, i.e. "primitive": "touch"
2894 primitive
= seq
.get('name')
2895 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
2896 # The following 3 parameters are currently set to None for 'terminate':
2897 # vdu_id, vdu_count_index, vdu_name
2900 self
._add
_suboperation
(db_nslcmop
,
2906 mapped_primitive_params
)
2907 # Sub-operations: Call _ns_execute_primitive() instead of action()
2909 result
, result_detail
= await self
._ns
_execute
_primitive
(vca_deployed
["ee_id"], primitive
,
2910 mapped_primitive_params
,
2912 except LcmException
:
2913 # this happens when VCA is not deployed. In this case it is not needed to terminate
2915 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
2916 if result
not in result_ok
:
2917 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2918 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
2919 # set that this VCA do not need terminated
2920 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
2921 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
2923 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
2924 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
2927 await self
.vca_map
[vca_type
].delete_execution_environment(vca_deployed
["ee_id"])
2929 async def _delete_all_N2VC(self
, db_nsr
: dict):
2930 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
2931 namespace
= "." + db_nsr
["_id"]
2933 await self
.n2vc
.delete_namespace(namespace
=namespace
, total_timeout
=self
.timeout_charm_delete
)
2934 except N2VCNotFound
: # already deleted. Skip
2936 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
2938 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
2940 Terminates a deployment from RO
2941 :param logging_text:
2942 :param nsr_deployed: db_nsr._admin.deployed
2945 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2946 this method will update only the index 2, but it will write on database the concatenated content of the list
2951 ro_nsr_id
= ro_delete_action
= None
2952 if nsr_deployed
and nsr_deployed
.get("RO"):
2953 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
2954 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
2957 stage
[2] = "Deleting ns from VIM."
2958 db_nsr_update
["detailed-status"] = " ".join(stage
)
2959 self
._write
_op
_status
(nslcmop_id
, stage
)
2960 self
.logger
.debug(logging_text
+ stage
[2])
2961 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2962 self
._write
_op
_status
(nslcmop_id
, stage
)
2963 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
2964 ro_delete_action
= desc
["action_id"]
2965 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2966 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2967 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2968 if ro_delete_action
:
2969 # wait until NS is deleted from VIM
2970 stage
[2] = "Waiting ns deleted from VIM."
2971 detailed_status_old
= None
2972 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
2974 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2975 self
._write
_op
_status
(nslcmop_id
, stage
)
2977 delete_timeout
= 20 * 60 # 20 minutes
2978 while delete_timeout
> 0:
2979 desc
= await self
.RO
.show(
2981 item_id_name
=ro_nsr_id
,
2982 extra_item
="action",
2983 extra_item_id
=ro_delete_action
)
2986 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
2988 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
2989 if ns_status
== "ERROR":
2990 raise ROclient
.ROClientException(ns_status_info
)
2991 elif ns_status
== "BUILD":
2992 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
2993 elif ns_status
== "ACTIVE":
2994 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2995 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2998 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
2999 if stage
[2] != detailed_status_old
:
3000 detailed_status_old
= stage
[2]
3001 db_nsr_update
["detailed-status"] = " ".join(stage
)
3002 self
._write
_op
_status
(nslcmop_id
, stage
)
3003 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3004 await asyncio
.sleep(5, loop
=self
.loop
)
3006 else: # delete_timeout <= 0:
3007 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
3009 except Exception as e
:
3010 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3011 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3012 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
3013 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
3014 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
3015 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
3016 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3017 failed_detail
.append("delete conflict: {}".format(e
))
3018 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
3020 failed_detail
.append("delete error: {}".format(e
))
3021 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
3024 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
3025 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
3027 stage
[2] = "Deleting nsd from RO."
3028 db_nsr_update
["detailed-status"] = " ".join(stage
)
3029 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3030 self
._write
_op
_status
(nslcmop_id
, stage
)
3031 await self
.RO
.delete("nsd", ro_nsd_id
)
3032 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
3033 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3034 except Exception as e
:
3035 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3036 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
3037 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
3038 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3039 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
3040 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3042 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
3043 self
.logger
.error(logging_text
+ failed_detail
[-1])
3045 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
3046 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
3047 if not vnf_deployed
or not vnf_deployed
["id"]:
3050 ro_vnfd_id
= vnf_deployed
["id"]
3051 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
3052 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
3053 db_nsr_update
["detailed-status"] = " ".join(stage
)
3054 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3055 self
._write
_op
_status
(nslcmop_id
, stage
)
3056 await self
.RO
.delete("vnfd", ro_vnfd_id
)
3057 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
3058 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
3059 except Exception as e
:
3060 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
3061 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
3062 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
3063 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
3064 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
3065 self
.logger
.debug(logging_text
+ failed_detail
[-1])
3067 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
3068 self
.logger
.error(logging_text
+ failed_detail
[-1])
3071 stage
[2] = "Error deleting from VIM"
3073 stage
[2] = "Deleted from VIM"
3074 db_nsr_update
["detailed-status"] = " ".join(stage
)
3075 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3076 self
._write
_op
_status
(nslcmop_id
, stage
)
3079 raise LcmException("; ".join(failed_detail
))
3081 async def terminate(self
, nsr_id
, nslcmop_id
):
3082 # Try to lock HA task here
3083 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3084 if not task_is_locked_by_me
:
3087 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
3088 self
.logger
.debug(logging_text
+ "Enter")
3089 timeout_ns_terminate
= self
.timeout_ns_terminate
3092 operation_params
= None
3094 error_list
= [] # annotates all failed error messages
3095 db_nslcmop_update
= {}
3096 autoremove
= False # autoremove after terminated
3097 tasks_dict_info
= {}
3099 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3100 # ^ contains [stage, step, VIM-status]
3102 # wait for any previous tasks in process
3103 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
3105 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
3106 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3107 operation_params
= db_nslcmop
.get("operationParams") or {}
3108 if operation_params
.get("timeout_ns_terminate"):
3109 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
3110 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
3111 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3113 db_nsr_update
["operational-status"] = "terminating"
3114 db_nsr_update
["config-status"] = "terminating"
3115 self
._write
_ns
_status
(
3117 ns_state
="TERMINATING",
3118 current_operation
="TERMINATING",
3119 current_operation_id
=nslcmop_id
,
3120 other_update
=db_nsr_update
3122 self
._write
_op
_status
(
3127 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
3128 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
3131 stage
[1] = "Getting vnf descriptors from db."
3132 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
3133 db_vnfds_from_id
= {}
3134 db_vnfds_from_member_index
= {}
3136 for vnfr
in db_vnfrs_list
:
3137 vnfd_id
= vnfr
["vnfd-id"]
3138 if vnfd_id
not in db_vnfds_from_id
:
3139 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3140 db_vnfds_from_id
[vnfd_id
] = vnfd
3141 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3143 # Destroy individual execution environments when there are terminating primitives.
3144 # Rest of EE will be deleted at once
3145 # TODO - check before calling _destroy_N2VC
3146 # if not operation_params.get("skip_terminate_primitives"):#
3147 # or not vca.get("needed_terminate"):
3148 stage
[0] = "Stage 2/3 execute terminating primitives."
3149 self
.logger
.debug(logging_text
+ stage
[0])
3150 stage
[1] = "Looking execution environment that needs terminate."
3151 self
.logger
.debug(logging_text
+ stage
[1])
3152 # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
3153 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3154 self
.logger
.debug("vca_index: {}, vca: {}".format(vca_index
, vca
))
3155 config_descriptor
= None
3156 if not vca
or not vca
.get("ee_id"):
3158 if not vca
.get("member-vnf-index"):
3160 config_descriptor
= db_nsr
.get("ns-configuration")
3161 elif vca
.get("vdu_id"):
3162 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3163 vdud
= next((vdu
for vdu
in db_vnfd
.get("vdu", ()) if vdu
["id"] == vca
.get("vdu_id")), None)
3165 config_descriptor
= vdud
.get("vdu-configuration")
3166 elif vca
.get("kdu_name"):
3167 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3168 kdud
= next((kdu
for kdu
in db_vnfd
.get("kdu", ()) if kdu
["name"] == vca
.get("kdu_name")), None)
3170 config_descriptor
= kdud
.get("kdu-configuration")
3172 config_descriptor
= db_vnfds_from_member_index
[vca
["member-vnf-index"]].get("vnf-configuration")
3173 vca_type
= vca
.get("type")
3174 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3175 vca
.get("needed_terminate"))
3176 # For helm we must destroy_ee
3177 destroy_ee
= "True" if vca_type
== "helm" else "False"
3178 task
= asyncio
.ensure_future(
3179 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
, vca_index
,
3180 destroy_ee
, exec_terminate_primitives
))
3181 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3183 # wait for pending tasks of terminate primitives
3185 self
.logger
.debug(logging_text
+ 'Waiting for terminate primitive pending tasks...')
3186 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3187 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3190 return # raise LcmException("; ".join(error_list))
3191 tasks_dict_info
.clear()
3193 # remove All execution environments at once
3194 stage
[0] = "Stage 3/3 delete all."
3196 if nsr_deployed
.get("VCA"):
3197 stage
[1] = "Deleting all execution environments."
3198 self
.logger
.debug(logging_text
+ stage
[1])
3199 task_delete_ee
= asyncio
.ensure_future(asyncio
.wait_for(self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
),
3200 timeout
=self
.timeout_charm_delete
))
3201 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3202 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3204 # Delete from k8scluster
3205 stage
[1] = "Deleting KDUs."
3206 self
.logger
.debug(logging_text
+ stage
[1])
3207 # print(nsr_deployed)
3208 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3209 if not kdu
or not kdu
.get("kdu-instance"):
3211 kdu_instance
= kdu
.get("kdu-instance")
3212 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3213 task_delete_kdu_instance
= asyncio
.ensure_future(
3214 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3215 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3216 kdu_instance
=kdu_instance
))
3218 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3219 format(kdu
.get("k8scluster-type")))
3221 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3224 stage
[1] = "Deleting ns from VIM."
3226 task_delete_ro
= asyncio
.ensure_future(
3227 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3229 task_delete_ro
= asyncio
.ensure_future(
3230 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3231 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3233 # rest of staff will be done at finally
3235 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3236 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3238 except asyncio
.CancelledError
:
3239 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3240 exc
= "Operation was cancelled"
3241 except Exception as e
:
3242 exc
= traceback
.format_exc()
3243 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3246 error_list
.append(str(exc
))
3248 # wait for pending tasks
3250 stage
[1] = "Waiting for terminate pending tasks."
3251 self
.logger
.debug(logging_text
+ stage
[1])
3252 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3254 stage
[1] = stage
[2] = ""
3255 except asyncio
.CancelledError
:
3256 error_list
.append("Cancelled")
3257 # TODO cancell all tasks
3258 except Exception as exc
:
3259 error_list
.append(str(exc
))
3260 # update status at database
3262 error_detail
= "; ".join(error_list
)
3263 # self.logger.error(logging_text + error_detail)
3264 error_description_nslcmop
= 'Stage: {}. Detail: {}'.format(stage
[0], error_detail
)
3265 error_description_nsr
= 'Operation: TERMINATING.{}, Stage {}.'.format(nslcmop_id
, stage
[0])
3267 db_nsr_update
["operational-status"] = "failed"
3268 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3269 db_nslcmop_update
["detailed-status"] = error_detail
3270 nslcmop_operation_state
= "FAILED"
3274 error_description_nsr
= error_description_nslcmop
= None
3275 ns_state
= "NOT_INSTANTIATED"
3276 db_nsr_update
["operational-status"] = "terminated"
3277 db_nsr_update
["detailed-status"] = "Done"
3278 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3279 db_nslcmop_update
["detailed-status"] = "Done"
3280 nslcmop_operation_state
= "COMPLETED"
3283 self
._write
_ns
_status
(
3286 current_operation
="IDLE",
3287 current_operation_id
=None,
3288 error_description
=error_description_nsr
,
3289 error_detail
=error_detail
,
3290 other_update
=db_nsr_update
3292 self
._write
_op
_status
(
3295 error_message
=error_description_nslcmop
,
3296 operation_state
=nslcmop_operation_state
,
3297 other_update
=db_nslcmop_update
,
3299 if operation_params
:
3300 autoremove
= operation_params
.get("autoremove", False)
3301 if nslcmop_operation_state
:
3303 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3304 "operationState": nslcmop_operation_state
,
3305 "autoremove": autoremove
},
3307 except Exception as e
:
3308 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3310 self
.logger
.debug(logging_text
+ "Exit")
3311 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3313 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3315 error_detail_list
= []
3317 pending_tasks
= list(created_tasks_info
.keys())
3318 num_tasks
= len(pending_tasks
)
3320 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3321 self
._write
_op
_status
(nslcmop_id
, stage
)
3322 while pending_tasks
:
3324 _timeout
= timeout
+ time_start
- time()
3325 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3326 return_when
=asyncio
.FIRST_COMPLETED
)
3327 num_done
+= len(done
)
3328 if not done
: # Timeout
3329 for task
in pending_tasks
:
3330 new_error
= created_tasks_info
[task
] + ": Timeout"
3331 error_detail_list
.append(new_error
)
3332 error_list
.append(new_error
)
3335 if task
.cancelled():
3338 exc
= task
.exception()
3340 if isinstance(exc
, asyncio
.TimeoutError
):
3342 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3343 error_list
.append(created_tasks_info
[task
])
3344 error_detail_list
.append(new_error
)
3345 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3347 self
.logger
.error(logging_text
+ new_error
)
3349 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3350 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + exc_traceback
)
3352 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3353 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3355 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3356 if nsr_id
: # update also nsr
3357 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3358 "errorDetail": ". ".join(error_detail_list
)})
3359 self
._write
_op
_status
(nslcmop_id
, stage
)
3360 return error_detail_list
3363 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3365 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3366 The default-value is used. If it is between < > it look for a value at instantiation_params
3367 :param primitive_desc: portion of VNFD/NSD that describes primitive
3368 :param params: Params provided by user
3369 :param instantiation_params: Instantiation params provided by user
3370 :return: a dictionary with the calculated params
3372 calculated_params
= {}
3373 for parameter
in primitive_desc
.get("parameter", ()):
3374 param_name
= parameter
["name"]
3375 if param_name
in params
:
3376 calculated_params
[param_name
] = params
[param_name
]
3377 elif "default-value" in parameter
or "value" in parameter
:
3378 if "value" in parameter
:
3379 calculated_params
[param_name
] = parameter
["value"]
3381 calculated_params
[param_name
] = parameter
["default-value"]
3382 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3383 and calculated_params
[param_name
].endswith(">"):
3384 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3385 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3387 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3388 format(calculated_params
[param_name
], primitive_desc
["name"]))
3390 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3391 format(param_name
, primitive_desc
["name"]))
3393 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3394 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
], default_flow_style
=True,
3396 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3397 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3399 # add always ns_config_info if primitive name is config
3400 if primitive_desc
["name"] == "config":
3401 if "ns_config_info" in instantiation_params
:
3402 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3403 return calculated_params
3405 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None):
3406 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3407 for vca
in deployed_vca
:
3410 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3412 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3414 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3418 # vca_deployed not found
3419 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} is not "
3420 "deployed".format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3423 ee_id
= vca
.get("ee_id")
3424 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3426 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3427 "execution environment"
3428 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3429 return ee_id
, vca_type
3431 async def _ns_execute_primitive(self
, ee_id
, primitive
, primitive_params
, retries
=0,
3432 retries_interval
=30, timeout
=None,
3433 vca_type
=None, db_dict
=None) -> (str, str):
3435 if primitive
== "config":
3436 primitive_params
= {"params": primitive_params
}
3438 vca_type
= vca_type
or "lxc_proxy_charm"
3442 output
= await asyncio
.wait_for(
3443 self
.vca_map
[vca_type
].exec_primitive(
3445 primitive_name
=primitive
,
3446 params_dict
=primitive_params
,
3447 progress_timeout
=self
.timeout_progress_primitive
,
3448 total_timeout
=self
.timeout_primitive
,
3450 timeout
=timeout
or self
.timeout_primitive
)
3453 except asyncio
.CancelledError
:
3455 except Exception as e
: # asyncio.TimeoutError
3456 if isinstance(e
, asyncio
.TimeoutError
):
3460 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3462 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3464 return 'FAILED', str(e
)
3466 return 'COMPLETED', output
3468 except (LcmException
, asyncio
.CancelledError
):
3470 except Exception as e
:
3471 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3473 async def action(self
, nsr_id
, nslcmop_id
):
3475 # Try to lock HA task here
3476 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3477 if not task_is_locked_by_me
:
3480 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3481 self
.logger
.debug(logging_text
+ "Enter")
3482 # get all needed from database
3486 db_nslcmop_update
= {}
3487 nslcmop_operation_state
= None
3488 error_description_nslcmop
= None
3491 # wait for any previous tasks in process
3492 step
= "Waiting for previous operations to terminate"
3493 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3495 self
._write
_ns
_status
(
3498 current_operation
="RUNNING ACTION",
3499 current_operation_id
=nslcmop_id
3502 step
= "Getting information from database"
3503 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3504 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3506 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3507 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3508 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3509 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3510 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3511 primitive
= db_nslcmop
["operationParams"]["primitive"]
3512 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3513 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3516 step
= "Getting vnfr from database"
3517 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3518 step
= "Getting vnfd from database"
3519 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3521 step
= "Getting nsd from database"
3522 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3524 # for backward compatibility
3525 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3526 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3527 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3528 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3530 # look for primitive
3531 config_primitive_desc
= None
3533 for vdu
in get_iterable(db_vnfd
, "vdu"):
3534 if vdu_id
== vdu
["id"]:
3535 for config_primitive
in deep_get(vdu
, ("vdu-configuration", "config-primitive"), ()):
3536 if config_primitive
["name"] == primitive
:
3537 config_primitive_desc
= config_primitive
3541 for kdu
in get_iterable(db_vnfd
, "kdu"):
3542 if kdu_name
== kdu
["name"]:
3543 for config_primitive
in deep_get(kdu
, ("kdu-configuration", "config-primitive"), ()):
3544 if config_primitive
["name"] == primitive
:
3545 config_primitive_desc
= config_primitive
3549 for config_primitive
in deep_get(db_vnfd
, ("vnf-configuration", "config-primitive"), ()):
3550 if config_primitive
["name"] == primitive
:
3551 config_primitive_desc
= config_primitive
3554 for config_primitive
in deep_get(db_nsd
, ("ns-configuration", "config-primitive"), ()):
3555 if config_primitive
["name"] == primitive
:
3556 config_primitive_desc
= config_primitive
3559 if not config_primitive_desc
and not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3560 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3565 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3566 desc_params
= self
._format
_additional
_params
(vdur
.get("additionalParams"))
3568 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3569 desc_params
= self
._format
_additional
_params
(kdur
.get("additionalParams"))
3571 desc_params
= self
._format
_additional
_params
(db_vnfr
.get("additionalParamsForVnf"))
3573 desc_params
= self
._format
_additional
_params
(db_nsr
.get("additionalParamsForNs"))
3576 kdu_action
= True if not deep_get(kdu
, ("kdu-configuration", "juju")) else False
3578 # TODO check if ns is in a proper status
3579 if kdu_name
and (primitive
in ("upgrade", "rollback", "status") or kdu_action
):
3580 # kdur and desc_params already set from before
3581 if primitive_params
:
3582 desc_params
.update(primitive_params
)
3583 # TODO Check if we will need something at vnf level
3584 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3585 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3588 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3590 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3591 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3592 raise LcmException(msg
)
3594 db_dict
= {"collection": "nsrs",
3595 "filter": {"_id": nsr_id
},
3596 "path": "_admin.deployed.K8s.{}".format(index
)}
3597 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive
, vnf_index
, kdu_name
))
3598 step
= "Executing kdu {}".format(primitive
)
3599 if primitive
== "upgrade":
3600 if desc_params
.get("kdu_model"):
3601 kdu_model
= desc_params
.get("kdu_model")
3602 del desc_params
["kdu_model"]
3604 kdu_model
= kdu
.get("kdu-model")
3605 parts
= kdu_model
.split(sep
=":")
3607 kdu_model
= parts
[0]
3609 detailed_status
= await asyncio
.wait_for(
3610 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3611 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3612 kdu_instance
=kdu
.get("kdu-instance"),
3613 atomic
=True, kdu_model
=kdu_model
,
3614 params
=desc_params
, db_dict
=db_dict
,
3615 timeout
=timeout_ns_action
),
3616 timeout
=timeout_ns_action
+ 10)
3617 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3618 elif primitive
== "rollback":
3619 detailed_status
= await asyncio
.wait_for(
3620 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3621 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3622 kdu_instance
=kdu
.get("kdu-instance"),
3624 timeout
=timeout_ns_action
)
3625 elif primitive
== "status":
3626 detailed_status
= await asyncio
.wait_for(
3627 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3628 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3629 kdu_instance
=kdu
.get("kdu-instance")),
3630 timeout
=timeout_ns_action
)
3632 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3633 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3635 detailed_status
= await asyncio
.wait_for(
3636 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3637 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3638 kdu_instance
=kdu_instance
,
3639 primitive_name
=primitive
,
3640 params
=params
, db_dict
=db_dict
,
3641 timeout
=timeout_ns_action
),
3642 timeout
=timeout_ns_action
)
3645 nslcmop_operation_state
= 'COMPLETED'
3647 detailed_status
= ''
3648 nslcmop_operation_state
= 'FAILED'
3650 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3651 member_vnf_index
=vnf_index
,
3653 vdu_count_index
=vdu_count_index
)
3654 db_nslcmop_notif
= {"collection": "nslcmops",
3655 "filter": {"_id": nslcmop_id
},
3656 "path": "admin.VCA"}
3657 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3659 primitive
=primitive
,
3660 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3661 timeout
=timeout_ns_action
,
3663 db_dict
=db_nslcmop_notif
)
3665 db_nslcmop_update
["detailed-status"] = detailed_status
3666 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3667 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3669 return # database update is called inside finally
3671 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3672 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3674 except asyncio
.CancelledError
:
3675 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3676 exc
= "Operation was cancelled"
3677 except asyncio
.TimeoutError
:
3678 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3680 except Exception as e
:
3681 exc
= traceback
.format_exc()
3682 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3685 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3686 "FAILED {}: {}".format(step
, exc
)
3687 nslcmop_operation_state
= "FAILED"
3689 self
._write
_ns
_status
(
3691 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3692 current_operation
="IDLE",
3693 current_operation_id
=None,
3694 # error_description=error_description_nsr,
3695 # error_detail=error_detail,
3696 other_update
=db_nsr_update
3699 self
._write
_op
_status
(
3702 error_message
=error_description_nslcmop
,
3703 operation_state
=nslcmop_operation_state
,
3704 other_update
=db_nslcmop_update
,
3707 if nslcmop_operation_state
:
3709 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3710 "operationState": nslcmop_operation_state
},
3712 except Exception as e
:
3713 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3714 self
.logger
.debug(logging_text
+ "Exit")
3715 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3716 return nslcmop_operation_state
, detailed_status
3718 async def scale(self
, nsr_id
, nslcmop_id
):
3720 # Try to lock HA task here
3721 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3722 if not task_is_locked_by_me
:
3725 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3726 self
.logger
.debug(logging_text
+ "Enter")
3727 # get all needed from database
3730 db_nslcmop_update
= {}
3731 nslcmop_operation_state
= None
3734 # in case of error, indicates what part of scale was failed to put nsr at error status
3735 scale_process
= None
3736 old_operational_status
= ""
3737 old_config_status
= ""
3740 # wait for any previous tasks in process
3741 step
= "Waiting for previous operations to terminate"
3742 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3744 self
._write
_ns
_status
(
3747 current_operation
="SCALING",
3748 current_operation_id
=nslcmop_id
3751 step
= "Getting nslcmop from database"
3752 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3753 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3754 step
= "Getting nsr from database"
3755 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3757 old_operational_status
= db_nsr
["operational-status"]
3758 old_config_status
= db_nsr
["config-status"]
3759 step
= "Parsing scaling parameters"
3760 # self.logger.debug(step)
3761 db_nsr_update
["operational-status"] = "scaling"
3762 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3763 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3766 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3767 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3768 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3769 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3770 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3773 RO_nsr_id
= nsr_deployed
["RO"]["nsr_id"]
3774 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3775 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3776 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
3777 # scaling_policy = db_nslcmop["operationParams"]["scaleVnfData"]["scaleByStepData"].get("scaling-policy")
3779 # for backward compatibility
3780 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3781 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3782 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3783 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3785 step
= "Getting vnfr from database"
3786 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3787 step
= "Getting vnfd from database"
3788 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3790 step
= "Getting scaling-group-descriptor"
3791 for scaling_descriptor
in db_vnfd
["scaling-group-descriptor"]:
3792 if scaling_descriptor
["name"] == scaling_group
:
3795 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3796 "at vnfd:scaling-group-descriptor".format(scaling_group
))
3799 # for scaling_policy_descriptor in scaling_descriptor.get("scaling-policy", ()):
3800 # cooldown_time = scaling_policy_descriptor.get("cooldown-time", 0)
3801 # if scaling_policy and scaling_policy == scaling_policy_descriptor.get("name"):
3804 # TODO check if ns is in a proper status
3805 step
= "Sending scale order to VIM"
3807 if not db_nsr
["_admin"].get("scaling-group"):
3808 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
3809 admin_scale_index
= 0
3811 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
3812 if admin_scale_info
["name"] == scaling_group
:
3813 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
3815 else: # not found, set index one plus last element and add new entry with the name
3816 admin_scale_index
+= 1
3817 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
3818 RO_scaling_info
= []
3819 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
3820 if scaling_type
== "SCALE_OUT":
3821 # count if max-instance-count is reached
3822 max_instance_count
= scaling_descriptor
.get("max-instance-count", 10)
3823 # self.logger.debug("MAX_INSTANCE_COUNT is {}".format(max_instance_count))
3824 if nb_scale_op
>= max_instance_count
:
3825 raise LcmException("reached the limit of {} (max-instance-count) "
3826 "scaling-out operations for the "
3827 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
3830 vdu_scaling_info
["scaling_direction"] = "OUT"
3831 vdu_scaling_info
["vdu-create"] = {}
3832 for vdu_scale_info
in scaling_descriptor
["vdu"]:
3833 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
3834 "type": "create", "count": vdu_scale_info
.get("count", 1)})
3835 vdu_scaling_info
["vdu-create"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
3837 elif scaling_type
== "SCALE_IN":
3838 # count if min-instance-count is reached
3839 min_instance_count
= 0
3840 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
3841 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
3842 if nb_scale_op
<= min_instance_count
:
3843 raise LcmException("reached the limit of {} (min-instance-count) scaling-in operations for the "
3844 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
))
3846 vdu_scaling_info
["scaling_direction"] = "IN"
3847 vdu_scaling_info
["vdu-delete"] = {}
3848 for vdu_scale_info
in scaling_descriptor
["vdu"]:
3849 RO_scaling_info
.append({"osm_vdu_id": vdu_scale_info
["vdu-id-ref"], "member-vnf-index": vnf_index
,
3850 "type": "delete", "count": vdu_scale_info
.get("count", 1)})
3851 vdu_scaling_info
["vdu-delete"][vdu_scale_info
["vdu-id-ref"]] = vdu_scale_info
.get("count", 1)
3853 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3854 vdu_create
= vdu_scaling_info
.get("vdu-create")
3855 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
3856 if vdu_scaling_info
["scaling_direction"] == "IN":
3857 for vdur
in reversed(db_vnfr
["vdur"]):
3858 if vdu_delete
.get(vdur
["vdu-id-ref"]):
3859 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
3860 vdu_scaling_info
["vdu"].append({
3861 "name": vdur
["name"],
3862 "vdu_id": vdur
["vdu-id-ref"],
3865 for interface
in vdur
["interfaces"]:
3866 vdu_scaling_info
["vdu"][-1]["interface"].append({
3867 "name": interface
["name"],
3868 "ip_address": interface
["ip-address"],
3869 "mac_address": interface
.get("mac-address"),
3871 vdu_delete
= vdu_scaling_info
.pop("vdu-delete")
3874 step
= "Executing pre-scale vnf-config-primitive"
3875 if scaling_descriptor
.get("scaling-config-action"):
3876 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3877 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
3878 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
3879 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3880 step
= db_nslcmop_update
["detailed-status"] = \
3881 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3883 # look for primitive
3884 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
3885 if config_primitive
["name"] == vnf_config_primitive
:
3889 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3890 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3891 "primitive".format(scaling_group
, config_primitive
))
3893 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3894 if db_vnfr
.get("additionalParamsForVnf"):
3895 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3897 scale_process
= "VCA"
3898 db_nsr_update
["config-status"] = "configuring pre-scaling"
3899 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3901 # Pre-scale retry check: Check if this sub-operation has been executed before
3902 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3903 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
3904 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3905 # Skip sub-operation
3906 result
= 'COMPLETED'
3907 result_detail
= 'Done'
3908 self
.logger
.debug(logging_text
+
3909 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3910 vnf_config_primitive
, result
, result_detail
))
3912 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3913 # New sub-operation: Get index of this sub-operation
3914 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3915 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3916 format(vnf_config_primitive
))
3918 # retry: Get registered params for this existing sub-operation
3919 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3920 vnf_index
= op
.get('member_vnf_index')
3921 vnf_config_primitive
= op
.get('primitive')
3922 primitive_params
= op
.get('primitive_params')
3923 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3924 format(vnf_config_primitive
))
3925 # Execute the primitive, either with new (first-time) or registered (reintent) args
3926 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3927 member_vnf_index
=vnf_index
,
3929 vdu_count_index
=None)
3930 result
, result_detail
= await self
._ns
_execute
_primitive
(
3931 ee_id
, vnf_config_primitive
, primitive_params
, vca_type
)
3932 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3933 vnf_config_primitive
, result
, result_detail
))
3934 # Update operationState = COMPLETED | FAILED
3935 self
._update
_suboperation
_status
(
3936 db_nslcmop
, op_index
, result
, result_detail
)
3938 if result
== "FAILED":
3939 raise LcmException(result_detail
)
3940 db_nsr_update
["config-status"] = old_config_status
3941 scale_process
= None
3945 # Should this block be skipped if 'RO_nsr_id' == None ?
3946 # if (RO_nsr_id and RO_scaling_info):
3948 scale_process
= "RO"
3949 # Scale RO retry check: Check if this sub-operation has been executed before
3950 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3951 db_nslcmop
, vnf_index
, None, None, 'SCALE-RO', RO_nsr_id
, RO_scaling_info
)
3952 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3953 # Skip sub-operation
3954 result
= 'COMPLETED'
3955 result_detail
= 'Done'
3956 self
.logger
.debug(logging_text
+ "Skipped sub-operation RO, result {} {}".format(
3957 result
, result_detail
))
3959 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3960 # New sub-operation: Get index of this sub-operation
3961 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3962 self
.logger
.debug(logging_text
+ "New sub-operation RO")
3964 # retry: Get registered params for this existing sub-operation
3965 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3966 RO_nsr_id
= op
.get('RO_nsr_id')
3967 RO_scaling_info
= op
.get('RO_scaling_info')
3968 self
.logger
.debug(logging_text
+ "Sub-operation RO retry for primitive {}".format(
3969 vnf_config_primitive
))
3971 RO_desc
= await self
.RO
.create_action("ns", RO_nsr_id
, {"vdu-scaling": RO_scaling_info
})
3972 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
3973 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
3975 RO_nslcmop_id
= RO_desc
["instance_action_id"]
3976 db_nslcmop_update
["_admin.deploy.RO"] = RO_nslcmop_id
3978 RO_task_done
= False
3979 step
= detailed_status
= "Waiting RO_task_id={} to complete the scale action.".format(RO_nslcmop_id
)
3980 detailed_status_old
= None
3981 self
.logger
.debug(logging_text
+ step
)
3983 deployment_timeout
= 1 * 3600 # One hour
3984 while deployment_timeout
> 0:
3985 if not RO_task_done
:
3986 desc
= await self
.RO
.show("ns", item_id_name
=RO_nsr_id
, extra_item
="action",
3987 extra_item_id
=RO_nslcmop_id
)
3990 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
3992 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
3993 if ns_status
== "ERROR":
3994 raise ROclient
.ROClientException(ns_status_info
)
3995 elif ns_status
== "BUILD":
3996 detailed_status
= step
+ "; {}".format(ns_status_info
)
3997 elif ns_status
== "ACTIVE":
3999 step
= detailed_status
= "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id
)
4000 self
.logger
.debug(logging_text
+ step
)
4002 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
4005 if ns_status
== "ERROR":
4006 raise ROclient
.ROClientException(ns_status_info
)
4007 elif ns_status
== "BUILD":
4008 detailed_status
= step
+ "; {}".format(ns_status_info
)
4009 elif ns_status
== "ACTIVE":
4010 step
= detailed_status
= \
4011 "Waiting for management IP address reported by the VIM. Updating VNFRs"
4013 self
.scale_vnfr(db_vnfr
, vdu_create
=vdu_create
, vdu_delete
=vdu_delete
)
4016 desc
= await self
.RO
.show("ns", RO_nsr_id
)
4019 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
4021 # nsr_deployed["nsr_ip"] = RO.get_ns_vnf_info(desc)
4022 self
.ns_update_vnfr({db_vnfr
["member-vnf-index-ref"]: db_vnfr
}, desc
)
4024 except LcmExceptionNoMgmtIP
:
4027 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
4028 if detailed_status
!= detailed_status_old
:
4029 self
._update
_suboperation
_status
(
4030 db_nslcmop
, op_index
, 'COMPLETED', detailed_status
)
4031 detailed_status_old
= db_nslcmop_update
["detailed-status"] = detailed_status
4032 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
4034 await asyncio
.sleep(5, loop
=self
.loop
)
4035 deployment_timeout
-= 5
4036 if deployment_timeout
<= 0:
4037 self
._update
_suboperation
_status
(
4038 db_nslcmop
, nslcmop_id
, op_index
, 'FAILED', "Timeout when waiting for ns to get ready")
4039 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
4041 # update VDU_SCALING_INFO with the obtained ip_addresses
4042 if vdu_scaling_info
["scaling_direction"] == "OUT":
4043 for vdur
in reversed(db_vnfr
["vdur"]):
4044 if vdu_scaling_info
["vdu-create"].get(vdur
["vdu-id-ref"]):
4045 vdu_scaling_info
["vdu-create"][vdur
["vdu-id-ref"]] -= 1
4046 vdu_scaling_info
["vdu"].append({
4047 "name": vdur
["name"],
4048 "vdu_id": vdur
["vdu-id-ref"],
4051 for interface
in vdur
["interfaces"]:
4052 vdu_scaling_info
["vdu"][-1]["interface"].append({
4053 "name": interface
["name"],
4054 "ip_address": interface
["ip-address"],
4055 "mac_address": interface
.get("mac-address"),
4057 del vdu_scaling_info
["vdu-create"]
4059 self
._update
_suboperation
_status
(db_nslcmop
, op_index
, 'COMPLETED', 'Done')
4062 scale_process
= None
4064 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4067 # execute primitive service POST-SCALING
4068 step
= "Executing post-scale vnf-config-primitive"
4069 if scaling_descriptor
.get("scaling-config-action"):
4070 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
4071 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
4072 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
4073 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
4074 step
= db_nslcmop_update
["detailed-status"] = \
4075 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
4077 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
4078 if db_vnfr
.get("additionalParamsForVnf"):
4079 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
4081 # look for primitive
4082 for config_primitive
in db_vnfd
.get("vnf-configuration", {}).get("config-primitive", ()):
4083 if config_primitive
["name"] == vnf_config_primitive
:
4086 raise LcmException("Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:"
4087 "scaling-config-action[vnf-config-primitive-name-ref='{}'] does not "
4088 "match any vnf-configuration:config-primitive".format(scaling_group
,
4090 scale_process
= "VCA"
4091 db_nsr_update
["config-status"] = "configuring post-scaling"
4092 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
4094 # Post-scale retry check: Check if this sub-operation has been executed before
4095 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4096 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
4097 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4098 # Skip sub-operation
4099 result
= 'COMPLETED'
4100 result_detail
= 'Done'
4101 self
.logger
.debug(logging_text
+
4102 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4103 format(vnf_config_primitive
, result
, result_detail
))
4105 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4106 # New sub-operation: Get index of this sub-operation
4107 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4108 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
4109 format(vnf_config_primitive
))
4111 # retry: Get registered params for this existing sub-operation
4112 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4113 vnf_index
= op
.get('member_vnf_index')
4114 vnf_config_primitive
= op
.get('primitive')
4115 primitive_params
= op
.get('primitive_params')
4116 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
4117 format(vnf_config_primitive
))
4118 # Execute the primitive, either with new (first-time) or registered (reintent) args
4119 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
4120 member_vnf_index
=vnf_index
,
4122 vdu_count_index
=None)
4123 result
, result_detail
= await self
._ns
_execute
_primitive
(
4124 ee_id
, vnf_config_primitive
, primitive_params
, vca_type
)
4125 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
4126 vnf_config_primitive
, result
, result_detail
))
4127 # Update operationState = COMPLETED | FAILED
4128 self
._update
_suboperation
_status
(
4129 db_nslcmop
, op_index
, result
, result_detail
)
4131 if result
== "FAILED":
4132 raise LcmException(result_detail
)
4133 db_nsr_update
["config-status"] = old_config_status
4134 scale_process
= None
4137 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4138 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
4139 else old_operational_status
4140 db_nsr_update
["config-status"] = old_config_status
4142 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
4143 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4145 except asyncio
.CancelledError
:
4146 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
4147 exc
= "Operation was cancelled"
4148 except Exception as e
:
4149 exc
= traceback
.format_exc()
4150 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
4152 self
._write
_ns
_status
(
4155 current_operation
="IDLE",
4156 current_operation_id
=None
4159 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
4160 nslcmop_operation_state
= "FAILED"
4162 db_nsr_update
["operational-status"] = old_operational_status
4163 db_nsr_update
["config-status"] = old_config_status
4164 db_nsr_update
["detailed-status"] = ""
4166 if "VCA" in scale_process
:
4167 db_nsr_update
["config-status"] = "failed"
4168 if "RO" in scale_process
:
4169 db_nsr_update
["operational-status"] = "failed"
4170 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
4173 error_description_nslcmop
= None
4174 nslcmop_operation_state
= "COMPLETED"
4175 db_nslcmop_update
["detailed-status"] = "Done"
4177 self
._write
_op
_status
(
4180 error_message
=error_description_nslcmop
,
4181 operation_state
=nslcmop_operation_state
,
4182 other_update
=db_nslcmop_update
,
4185 self
._write
_ns
_status
(
4188 current_operation
="IDLE",
4189 current_operation_id
=None,
4190 other_update
=db_nsr_update
4193 if nslcmop_operation_state
:
4195 await self
.msg
.aiowrite("ns", "scaled", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
4196 "operationState": nslcmop_operation_state
},
4199 # await asyncio.sleep(cooldown_time, loop=self.loop)
4200 # await self.msg.aiowrite("ns","scaled-cooldown-time", {"nsr_id": nsr_id, "nslcmop_id": nslcmop_id})
4201 except Exception as e
:
4202 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
4203 self
.logger
.debug(logging_text
+ "Exit")
4204 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
4206 async def add_prometheus_metrics(self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
):
4207 if not self
.prometheus
:
4209 # look if exist a file called 'prometheus*.j2' and
4210 artifact_content
= self
.fs
.dir_ls(artifact_path
)
4211 job_file
= next((f
for f
in artifact_content
if f
.startswith("prometheus") and f
.endswith(".j2")), None)
4214 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
4218 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
4219 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
4221 vnfr_id
= vnfr_id
.replace("-", "")
4223 "JOB_NAME": vnfr_id
,
4224 "TARGET_IP": target_ip
,
4225 "EXPORTER_POD_IP": host_name
,
4226 "EXPORTER_POD_PORT": host_port
,
4228 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
4229 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4230 for job
in job_list
:
4231 if not isinstance(job
.get("job_name"), str) or vnfr_id
not in job
["job_name"]:
4232 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
4233 job
["nsr_id"] = nsr_id
4234 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
4235 if await self
.prometheus
.update(job_dict
):
4236 return list(job_dict
.keys())