1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
23 from osm_lcm
import ROclient
24 from osm_lcm
.lcm_utils
import LcmException
, LcmBase
, deep_get
25 from n2vc
.k8s_helm_conn
import K8sHelmConnector
26 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
27 from n2vc
.k8s_juju_conn
import K8sJujuConnector
28 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
29 from n2vc
.exceptions
import K8sException
, N2VCException
30 from osm_common
.dbbase
import DbException
31 from copy
import deepcopy
34 __author__
= "Alfonso Tierno"
37 class VimLcm(LcmBase
):
38 # values that are encrypted at vim config because they are passwords
39 vim_config_encrypted
= {
40 "1.1": ("admin_password", "nsx_password", "vcenter_password"),
49 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
51 Init, Connect to database, filesystem storage, and messaging
52 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
56 self
.logger
= logging
.getLogger("lcm.vim")
58 self
.lcm_tasks
= lcm_tasks
59 self
.ro_config
= config
["ro_config"]
61 super().__init
__(msg
, self
.logger
)
63 async def create(self
, vim_content
, order_id
):
65 # HA tasks and backward compatibility:
66 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
67 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
68 # Register 'create' task here for related future HA operations
69 op_id
= vim_content
.pop("op_id", None)
70 if not self
.lcm_tasks
.lock_HA("vim", "create", op_id
):
73 vim_id
= vim_content
["_id"]
74 logging_text
= "Task vim_create={} ".format(vim_id
)
75 self
.logger
.debug(logging_text
+ "Enter")
82 step
= "Getting vim-id='{}' from db".format(vim_id
)
83 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
84 if vim_content
.get("config") and vim_content
["config"].get(
87 step
= "Getting sdn-controller-id='{}' from db".format(
88 vim_content
["config"]["sdn-controller"]
90 db_sdn
= self
.db
.get_one(
91 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
94 # If the VIM account has an associated SDN account, also
95 # wait for any previous tasks in process for the SDN
96 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
100 and db_sdn
["_admin"].get("deployed")
101 and db_sdn
["_admin"]["deployed"].get("RO")
103 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
106 "sdn-controller={} is not available. Not deployed at RO".format(
107 vim_content
["config"]["sdn-controller"]
111 step
= "Creating vim at RO"
112 db_vim_update
["_admin.deployed.RO"] = None
113 db_vim_update
["_admin.detailed-status"] = step
114 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
115 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
116 vim_RO
= deepcopy(vim_content
)
117 vim_RO
.pop("_id", None)
118 vim_RO
.pop("_admin", None)
119 schema_version
= vim_RO
.pop("schema_version", None)
120 vim_RO
.pop("schema_type", None)
121 vim_RO
.pop("vim_tenant_name", None)
122 vim_RO
["type"] = vim_RO
.pop("vim_type")
123 vim_RO
.pop("vim_user", None)
124 vim_RO
.pop("vim_password", None)
126 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
127 desc
= await RO
.create("vim", descriptor
=vim_RO
)
128 RO_vim_id
= desc
["uuid"]
129 db_vim_update
["_admin.deployed.RO"] = RO_vim_id
131 logging_text
+ "VIM created at RO_vim_id={}".format(RO_vim_id
)
134 step
= "Creating vim_account at RO"
135 db_vim_update
["_admin.detailed-status"] = step
136 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
138 if vim_content
.get("vim_password"):
139 vim_content
["vim_password"] = self
.db
.decrypt(
140 vim_content
["vim_password"],
141 schema_version
=schema_version
,
145 "vim_tenant_name": vim_content
["vim_tenant_name"],
146 "vim_username": vim_content
["vim_user"],
147 "vim_password": vim_content
["vim_password"],
149 if vim_RO
.get("config"):
150 vim_account_RO
["config"] = vim_RO
["config"]
151 if "sdn-controller" in vim_account_RO
["config"]:
152 del vim_account_RO
["config"]["sdn-controller"]
153 if "sdn-port-mapping" in vim_account_RO
["config"]:
154 del vim_account_RO
["config"]["sdn-port-mapping"]
155 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
157 ) or self
.vim_config_encrypted
.get("default")
158 for p
in vim_config_encrypted_keys
:
159 if vim_account_RO
["config"].get(p
):
160 vim_account_RO
["config"][p
] = self
.db
.decrypt(
161 vim_account_RO
["config"][p
],
162 schema_version
=schema_version
,
166 desc
= await RO
.attach("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
167 db_vim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
168 db_vim_update
["_admin.operationalState"] = "ENABLED"
169 db_vim_update
["_admin.detailed-status"] = "Done"
170 # Mark the VIM 'create' HA task as successful
171 operation_state
= "COMPLETED"
172 operation_details
= "Done"
176 + "Exit Ok VIM account created at RO_vim_account_id={}".format(
182 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
183 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
185 except Exception as e
:
186 self
.logger
.critical(
187 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
192 db_vim_update
["_admin.operationalState"] = "ERROR"
193 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
196 # Mark the VIM 'create' HA task as erroneous
197 operation_state
= "FAILED"
198 operation_details
= "ERROR {}: {}".format(step
, exc
)
201 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
202 # Register the VIM 'create' HA task either
203 # succesful or erroneous, or do nothing (if legacy NBI)
204 self
.lcm_tasks
.unlock_HA(
208 operationState
=operation_state
,
209 detailed_status
=operation_details
,
211 except DbException
as e
:
212 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
214 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
216 async def edit(self
, vim_content
, order_id
):
218 # HA tasks and backward compatibility:
219 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
220 # In such a case, HA is not supported by NBI, and the HA check always returns True
221 op_id
= vim_content
.pop("op_id", None)
222 if not self
.lcm_tasks
.lock_HA("vim", "edit", op_id
):
225 vim_id
= vim_content
["_id"]
226 logging_text
= "Task vim_edit={} ".format(vim_id
)
227 self
.logger
.debug(logging_text
+ "Enter")
234 step
= "Getting vim-id='{}' from db".format(vim_id
)
236 # wait for any previous tasks in process
237 await self
.lcm_tasks
.waitfor_related_HA("vim", "edit", op_id
)
239 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
243 and db_vim
["_admin"].get("deployed")
244 and db_vim
["_admin"]["deployed"].get("RO")
246 if vim_content
.get("config") and vim_content
["config"].get(
249 step
= "Getting sdn-controller-id='{}' from db".format(
250 vim_content
["config"]["sdn-controller"]
252 db_sdn
= self
.db
.get_one(
253 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
256 # If the VIM account has an associated SDN account, also
257 # wait for any previous tasks in process for the SDN
258 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
262 and db_sdn
["_admin"].get("deployed")
263 and db_sdn
["_admin"]["deployed"].get("RO")
265 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
268 "sdn-controller={} is not available. Not deployed at RO".format(
269 vim_content
["config"]["sdn-controller"]
273 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
274 step
= "Editing vim at RO"
275 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
276 vim_RO
= deepcopy(vim_content
)
277 vim_RO
.pop("_id", None)
278 vim_RO
.pop("_admin", None)
279 schema_version
= vim_RO
.pop("schema_version", None)
280 vim_RO
.pop("schema_type", None)
281 vim_RO
.pop("vim_tenant_name", None)
282 if "vim_type" in vim_RO
:
283 vim_RO
["type"] = vim_RO
.pop("vim_type")
284 vim_RO
.pop("vim_user", None)
285 vim_RO
.pop("vim_password", None)
287 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
288 # TODO make a deep update of sdn-port-mapping
290 await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
292 step
= "Editing vim-account at RO tenant"
294 if "config" in vim_content
:
295 if "sdn-controller" in vim_content
["config"]:
296 del vim_content
["config"]["sdn-controller"]
297 if "sdn-port-mapping" in vim_content
["config"]:
298 del vim_content
["config"]["sdn-port-mapping"]
299 if not vim_content
["config"]:
300 del vim_content
["config"]
301 if "vim_tenant_name" in vim_content
:
302 vim_account_RO
["vim_tenant_name"] = vim_content
["vim_tenant_name"]
303 if "vim_password" in vim_content
:
304 vim_account_RO
["vim_password"] = vim_content
["vim_password"]
305 if vim_content
.get("vim_password"):
306 vim_account_RO
["vim_password"] = self
.db
.decrypt(
307 vim_content
["vim_password"],
308 schema_version
=schema_version
,
311 if "config" in vim_content
:
312 vim_account_RO
["config"] = vim_content
["config"]
313 if vim_content
.get("config"):
314 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
316 ) or self
.vim_config_encrypted
.get("default")
317 for p
in vim_config_encrypted_keys
:
318 if vim_content
["config"].get(p
):
319 vim_account_RO
["config"][p
] = self
.db
.decrypt(
320 vim_content
["config"][p
],
321 schema_version
=schema_version
,
325 if "vim_user" in vim_content
:
326 vim_content
["vim_username"] = vim_content
["vim_user"]
327 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
328 # vim_thread. RO will remove and relaunch a new thread for this vim_account
329 await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
330 db_vim_update
["_admin.operationalState"] = "ENABLED"
331 # Mark the VIM 'edit' HA task as successful
332 operation_state
= "COMPLETED"
333 operation_details
= "Done"
335 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id={}".format(RO_vim_id
))
338 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
339 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
341 except Exception as e
:
342 self
.logger
.critical(
343 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
348 db_vim_update
["_admin.operationalState"] = "ERROR"
349 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
352 # Mark the VIM 'edit' HA task as erroneous
353 operation_state
= "FAILED"
354 operation_details
= "ERROR {}: {}".format(step
, exc
)
357 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
358 # Register the VIM 'edit' HA task either
359 # succesful or erroneous, or do nothing (if legacy NBI)
360 self
.lcm_tasks
.unlock_HA(
364 operationState
=operation_state
,
365 detailed_status
=operation_details
,
367 except DbException
as e
:
368 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
370 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
372 async def delete(self
, vim_content
, order_id
):
374 # HA tasks and backward compatibility:
375 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
376 # In such a case, HA is not supported by NBI, and the HA check always returns True
377 op_id
= vim_content
.pop("op_id", None)
378 if not self
.lcm_tasks
.lock_HA("vim", "delete", op_id
):
381 vim_id
= vim_content
["_id"]
382 logging_text
= "Task vim_delete={} ".format(vim_id
)
383 self
.logger
.debug(logging_text
+ "Enter")
388 step
= "Getting vim from db"
390 # wait for any previous tasks in process
391 await self
.lcm_tasks
.waitfor_related_HA("vim", "delete", op_id
)
392 if not self
.ro_config
.get("ng"):
393 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
396 and db_vim
["_admin"].get("deployed")
397 and db_vim
["_admin"]["deployed"].get("RO")
399 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
400 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
401 step
= "Detaching vim from RO tenant"
403 await RO
.detach("vim_account", RO_vim_id
)
404 except ROclient
.ROClientException
as e
:
405 if e
.http_code
== 404: # not found
408 + "RO_vim_id={} already detached".format(RO_vim_id
)
413 step
= "Deleting vim from RO"
415 await RO
.delete("vim", RO_vim_id
)
416 except ROclient
.ROClientException
as e
:
417 if e
.http_code
== 404: # not found
420 + "RO_vim_id={} already deleted".format(RO_vim_id
)
426 self
.logger
.debug(logging_text
+ "Nothing to remove at RO")
427 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
429 self
.logger
.debug(logging_text
+ "Exit Ok")
432 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
433 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
435 except Exception as e
:
436 self
.logger
.critical(
437 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
441 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
443 db_vim_update
["_admin.operationalState"] = "ERROR"
444 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
447 # Mark the VIM 'delete' HA task as erroneous
448 operation_state
= "FAILED"
449 operation_details
= "ERROR {}: {}".format(step
, exc
)
450 self
.lcm_tasks
.unlock_HA(
454 operationState
=operation_state
,
455 detailed_status
=operation_details
,
458 if db_vim
and db_vim_update
:
459 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
460 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
461 # which means that there is nowhere to register this task, so do nothing here.
462 except DbException
as e
:
463 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
464 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
467 class WimLcm(LcmBase
):
468 # values that are encrypted at wim config because they are passwords
469 wim_config_encrypted
= ()
471 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
473 Init, Connect to database, filesystem storage, and messaging
474 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
478 self
.logger
= logging
.getLogger("lcm.vim")
480 self
.lcm_tasks
= lcm_tasks
481 self
.ro_config
= config
["ro_config"]
483 super().__init
__(msg
, self
.logger
)
485 async def create(self
, wim_content
, order_id
):
487 # HA tasks and backward compatibility:
488 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
489 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
490 # Register 'create' task here for related future HA operations
491 op_id
= wim_content
.pop("op_id", None)
492 self
.lcm_tasks
.lock_HA("wim", "create", op_id
)
494 wim_id
= wim_content
["_id"]
495 logging_text
= "Task wim_create={} ".format(wim_id
)
496 self
.logger
.debug(logging_text
+ "Enter")
502 step
= "Getting wim-id='{}' from db".format(wim_id
)
503 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
504 db_wim_update
["_admin.deployed.RO"] = None
506 step
= "Creating wim at RO"
507 db_wim_update
["_admin.detailed-status"] = step
508 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
509 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
510 wim_RO
= deepcopy(wim_content
)
511 wim_RO
.pop("_id", None)
512 wim_RO
.pop("_admin", None)
513 schema_version
= wim_RO
.pop("schema_version", None)
514 wim_RO
.pop("schema_type", None)
515 wim_RO
.pop("wim_tenant_name", None)
516 wim_RO
["type"] = wim_RO
.pop("wim_type")
517 wim_RO
.pop("wim_user", None)
518 wim_RO
.pop("wim_password", None)
519 desc
= await RO
.create("wim", descriptor
=wim_RO
)
520 RO_wim_id
= desc
["uuid"]
521 db_wim_update
["_admin.deployed.RO"] = RO_wim_id
523 logging_text
+ "WIM created at RO_wim_id={}".format(RO_wim_id
)
526 step
= "Creating wim_account at RO"
527 db_wim_update
["_admin.detailed-status"] = step
528 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
530 if wim_content
.get("wim_password"):
531 wim_content
["wim_password"] = self
.db
.decrypt(
532 wim_content
["wim_password"],
533 schema_version
=schema_version
,
537 "name": wim_content
["name"],
538 "user": wim_content
["user"],
539 "password": wim_content
["password"],
541 if wim_RO
.get("config"):
542 wim_account_RO
["config"] = wim_RO
["config"]
543 if "wim_port_mapping" in wim_account_RO
["config"]:
544 del wim_account_RO
["config"]["wim_port_mapping"]
545 for p
in self
.wim_config_encrypted
:
546 if wim_account_RO
["config"].get(p
):
547 wim_account_RO
["config"][p
] = self
.db
.decrypt(
548 wim_account_RO
["config"][p
],
549 schema_version
=schema_version
,
553 desc
= await RO
.attach("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
554 db_wim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
555 db_wim_update
["_admin.operationalState"] = "ENABLED"
556 db_wim_update
["_admin.detailed-status"] = "Done"
557 # Mark the WIM 'create' HA task as successful
558 operation_state
= "COMPLETED"
559 operation_details
= "Done"
563 + "Exit Ok WIM account created at RO_wim_account_id={}".format(
569 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
570 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
572 except Exception as e
:
573 self
.logger
.critical(
574 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
579 db_wim_update
["_admin.operationalState"] = "ERROR"
580 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
583 # Mark the WIM 'create' HA task as erroneous
584 operation_state
= "FAILED"
585 operation_details
= "ERROR {}: {}".format(step
, exc
)
588 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
589 # Register the WIM 'create' HA task either
590 # succesful or erroneous, or do nothing (if legacy NBI)
591 self
.lcm_tasks
.unlock_HA(
595 operationState
=operation_state
,
596 detailed_status
=operation_details
,
598 except DbException
as e
:
599 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
600 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
602 async def edit(self
, wim_content
, order_id
):
604 # HA tasks and backward compatibility:
605 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
606 # In such a case, HA is not supported by NBI, and the HA check always returns True
607 op_id
= wim_content
.pop("op_id", None)
608 if not self
.lcm_tasks
.lock_HA("wim", "edit", op_id
):
611 wim_id
= wim_content
["_id"]
612 logging_text
= "Task wim_edit={} ".format(wim_id
)
613 self
.logger
.debug(logging_text
+ "Enter")
619 step
= "Getting wim-id='{}' from db".format(wim_id
)
621 # wait for any previous tasks in process
622 await self
.lcm_tasks
.waitfor_related_HA("wim", "edit", op_id
)
624 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
628 and db_wim
["_admin"].get("deployed")
629 and db_wim
["_admin"]["deployed"].get("RO")
632 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
633 step
= "Editing wim at RO"
634 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
635 wim_RO
= deepcopy(wim_content
)
636 wim_RO
.pop("_id", None)
637 wim_RO
.pop("_admin", None)
638 schema_version
= wim_RO
.pop("schema_version", None)
639 wim_RO
.pop("schema_type", None)
640 wim_RO
.pop("wim_tenant_name", None)
641 if "wim_type" in wim_RO
:
642 wim_RO
["type"] = wim_RO
.pop("wim_type")
643 wim_RO
.pop("wim_user", None)
644 wim_RO
.pop("wim_password", None)
645 # TODO make a deep update of wim_port_mapping
647 await RO
.edit("wim", RO_wim_id
, descriptor
=wim_RO
)
649 step
= "Editing wim-account at RO tenant"
651 if "config" in wim_content
:
652 if "wim_port_mapping" in wim_content
["config"]:
653 del wim_content
["config"]["wim_port_mapping"]
654 if not wim_content
["config"]:
655 del wim_content
["config"]
656 if "wim_tenant_name" in wim_content
:
657 wim_account_RO
["wim_tenant_name"] = wim_content
["wim_tenant_name"]
658 if "wim_password" in wim_content
:
659 wim_account_RO
["wim_password"] = wim_content
["wim_password"]
660 if wim_content
.get("wim_password"):
661 wim_account_RO
["wim_password"] = self
.db
.decrypt(
662 wim_content
["wim_password"],
663 schema_version
=schema_version
,
666 if "config" in wim_content
:
667 wim_account_RO
["config"] = wim_content
["config"]
668 if wim_content
.get("config"):
669 for p
in self
.wim_config_encrypted
:
670 if wim_content
["config"].get(p
):
671 wim_account_RO
["config"][p
] = self
.db
.decrypt(
672 wim_content
["config"][p
],
673 schema_version
=schema_version
,
677 if "wim_user" in wim_content
:
678 wim_content
["wim_username"] = wim_content
["wim_user"]
679 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
680 # wim_thread. RO will remove and relaunch a new thread for this wim_account
681 await RO
.edit("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
682 db_wim_update
["_admin.operationalState"] = "ENABLED"
683 # Mark the WIM 'edit' HA task as successful
684 operation_state
= "COMPLETED"
685 operation_details
= "Done"
687 self
.logger
.debug(logging_text
+ "Exit Ok RO_wim_id={}".format(RO_wim_id
))
690 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
691 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
693 except Exception as e
:
694 self
.logger
.critical(
695 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
700 db_wim_update
["_admin.operationalState"] = "ERROR"
701 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
704 # Mark the WIM 'edit' HA task as erroneous
705 operation_state
= "FAILED"
706 operation_details
= "ERROR {}: {}".format(step
, exc
)
709 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
710 # Register the WIM 'edit' HA task either
711 # succesful or erroneous, or do nothing (if legacy NBI)
712 self
.lcm_tasks
.unlock_HA(
716 operationState
=operation_state
,
717 detailed_status
=operation_details
,
719 except DbException
as e
:
720 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
721 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
723 async def delete(self
, wim_content
, order_id
):
725 # HA tasks and backward compatibility:
726 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
727 # In such a case, HA is not supported by NBI, and the HA check always returns True
728 op_id
= wim_content
.pop("op_id", None)
729 if not self
.lcm_tasks
.lock_HA("wim", "delete", op_id
):
732 wim_id
= wim_content
["_id"]
733 logging_text
= "Task wim_delete={} ".format(wim_id
)
734 self
.logger
.debug(logging_text
+ "Enter")
739 step
= "Getting wim from db"
741 # wait for any previous tasks in process
742 await self
.lcm_tasks
.waitfor_related_HA("wim", "delete", op_id
)
744 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
747 and db_wim
["_admin"].get("deployed")
748 and db_wim
["_admin"]["deployed"].get("RO")
750 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
751 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
752 step
= "Detaching wim from RO tenant"
754 await RO
.detach("wim_account", RO_wim_id
)
755 except ROclient
.ROClientException
as e
:
756 if e
.http_code
== 404: # not found
759 + "RO_wim_id={} already detached".format(RO_wim_id
)
764 step
= "Deleting wim from RO"
766 await RO
.delete("wim", RO_wim_id
)
767 except ROclient
.ROClientException
as e
:
768 if e
.http_code
== 404: # not found
771 + "RO_wim_id={} already deleted".format(RO_wim_id
)
777 self
.logger
.error(logging_text
+ "Nothing to remove at RO")
778 self
.db
.del_one("wim_accounts", {"_id": wim_id
})
780 self
.logger
.debug(logging_text
+ "Exit Ok")
783 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
784 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
786 except Exception as e
:
787 self
.logger
.critical(
788 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
792 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
794 db_wim_update
["_admin.operationalState"] = "ERROR"
795 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
798 # Mark the WIM 'delete' HA task as erroneous
799 operation_state
= "FAILED"
800 operation_details
= "ERROR {}: {}".format(step
, exc
)
801 self
.lcm_tasks
.unlock_HA(
805 operationState
=operation_state
,
806 detailed_status
=operation_details
,
809 if db_wim
and db_wim_update
:
810 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
811 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
812 # which means that there is nowhere to register this task, so do nothing here.
813 except DbException
as e
:
814 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
815 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
818 class SdnLcm(LcmBase
):
819 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
821 Init, Connect to database, filesystem storage, and messaging
822 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
826 self
.logger
= logging
.getLogger("lcm.sdn")
828 self
.lcm_tasks
= lcm_tasks
829 self
.ro_config
= config
["ro_config"]
831 super().__init
__(msg
, self
.logger
)
833 async def create(self
, sdn_content
, order_id
):
835 # HA tasks and backward compatibility:
836 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
837 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
838 # Register 'create' task here for related future HA operations
839 op_id
= sdn_content
.pop("op_id", None)
840 self
.lcm_tasks
.lock_HA("sdn", "create", op_id
)
842 sdn_id
= sdn_content
["_id"]
843 logging_text
= "Task sdn_create={} ".format(sdn_id
)
844 self
.logger
.debug(logging_text
+ "Enter")
851 step
= "Getting sdn from db"
852 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
853 db_sdn_update
["_admin.deployed.RO"] = None
855 step
= "Creating sdn at RO"
856 db_sdn_update
["_admin.detailed-status"] = step
857 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
859 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
860 sdn_RO
= deepcopy(sdn_content
)
861 sdn_RO
.pop("_id", None)
862 sdn_RO
.pop("_admin", None)
863 schema_version
= sdn_RO
.pop("schema_version", None)
864 sdn_RO
.pop("schema_type", None)
865 sdn_RO
.pop("description", None)
866 if sdn_RO
.get("password"):
867 sdn_RO
["password"] = self
.db
.decrypt(
868 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
871 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
872 RO_sdn_id
= desc
["uuid"]
873 db_sdn_update
["_admin.deployed.RO"] = RO_sdn_id
874 db_sdn_update
["_admin.operationalState"] = "ENABLED"
875 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
876 # Mark the SDN 'create' HA task as successful
877 operation_state
= "COMPLETED"
878 operation_details
= "Done"
881 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
882 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
884 except Exception as e
:
885 self
.logger
.critical(
886 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
891 db_sdn_update
["_admin.operationalState"] = "ERROR"
892 db_sdn_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
895 # Mark the SDN 'create' HA task as erroneous
896 operation_state
= "FAILED"
897 operation_details
= "ERROR {}: {}".format(step
, exc
)
899 if db_sdn
and db_sdn_update
:
900 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
901 # Register the SDN 'create' HA task either
902 # succesful or erroneous, or do nothing (if legacy NBI)
903 self
.lcm_tasks
.unlock_HA(
907 operationState
=operation_state
,
908 detailed_status
=operation_details
,
910 except DbException
as e
:
911 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
912 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
914 async def edit(self
, sdn_content
, order_id
):
916 # HA tasks and backward compatibility:
917 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
918 # In such a case, HA is not supported by NBI, and the HA check always returns True
919 op_id
= sdn_content
.pop("op_id", None)
920 if not self
.lcm_tasks
.lock_HA("sdn", "edit", op_id
):
923 sdn_id
= sdn_content
["_id"]
924 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
925 self
.logger
.debug(logging_text
+ "Enter")
930 step
= "Getting sdn from db"
932 # wait for any previous tasks in process
933 await self
.lcm_tasks
.waitfor_related_HA("sdn", "edit", op_id
)
935 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
939 and db_sdn
["_admin"].get("deployed")
940 and db_sdn
["_admin"]["deployed"].get("RO")
942 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
943 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
944 step
= "Editing sdn at RO"
945 sdn_RO
= deepcopy(sdn_content
)
946 sdn_RO
.pop("_id", None)
947 sdn_RO
.pop("_admin", None)
948 schema_version
= sdn_RO
.pop("schema_version", None)
949 sdn_RO
.pop("schema_type", None)
950 sdn_RO
.pop("description", None)
951 if sdn_RO
.get("password"):
952 sdn_RO
["password"] = self
.db
.decrypt(
953 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
956 await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
957 db_sdn_update
["_admin.operationalState"] = "ENABLED"
958 # Mark the SDN 'edit' HA task as successful
959 operation_state
= "COMPLETED"
960 operation_details
= "Done"
962 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
965 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
966 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
968 except Exception as e
:
969 self
.logger
.critical(
970 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
975 db_sdn
["_admin.operationalState"] = "ERROR"
976 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
977 # Mark the SDN 'edit' HA task as erroneous
978 operation_state
= "FAILED"
979 operation_details
= "ERROR {}: {}".format(step
, exc
)
982 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
983 # Register the SDN 'edit' HA task either
984 # succesful or erroneous, or do nothing (if legacy NBI)
985 self
.lcm_tasks
.unlock_HA(
989 operationState
=operation_state
,
990 detailed_status
=operation_details
,
992 except DbException
as e
:
993 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
994 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
996 async def delete(self
, sdn_content
, order_id
):
998 # HA tasks and backward compatibility:
999 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1000 # In such a case, HA is not supported by NBI, and the HA check always returns True
1001 op_id
= sdn_content
.pop("op_id", None)
1002 if not self
.lcm_tasks
.lock_HA("sdn", "delete", op_id
):
1005 sdn_id
= sdn_content
["_id"]
1006 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
1007 self
.logger
.debug(logging_text
+ "Enter")
1012 step
= "Getting sdn from db"
1014 # wait for any previous tasks in process
1015 await self
.lcm_tasks
.waitfor_related_HA("sdn", "delete", op_id
)
1017 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
1019 db_sdn
.get("_admin")
1020 and db_sdn
["_admin"].get("deployed")
1021 and db_sdn
["_admin"]["deployed"].get("RO")
1023 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
1024 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
1025 step
= "Deleting sdn from RO"
1027 await RO
.delete("sdn", RO_sdn_id
)
1028 except ROclient
.ROClientException
as e
:
1029 if e
.http_code
== 404: # not found
1032 + "RO_sdn_id={} already deleted".format(RO_sdn_id
)
1039 logging_text
+ "Skipping. There is not RO information at database"
1041 self
.db
.del_one("sdns", {"_id": sdn_id
})
1043 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
1046 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
1047 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1049 except Exception as e
:
1050 self
.logger
.critical(
1051 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1056 db_sdn
["_admin.operationalState"] = "ERROR"
1057 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1058 # Mark the SDN 'delete' HA task as erroneous
1059 operation_state
= "FAILED"
1060 operation_details
= "ERROR {}: {}".format(step
, exc
)
1061 self
.lcm_tasks
.unlock_HA(
1065 operationState
=operation_state
,
1066 detailed_status
=operation_details
,
1069 if db_sdn
and db_sdn_update
:
1070 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
1071 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
1072 # which means that there is nowhere to register this task, so do nothing here.
1073 except DbException
as e
:
1074 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1075 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
1078 class K8sClusterLcm(LcmBase
):
1079 timeout_create
= 300
1081 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1083 Init, Connect to database, filesystem storage, and messaging
1084 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1088 self
.logger
= logging
.getLogger("lcm.k8scluster")
1090 self
.lcm_tasks
= lcm_tasks
1091 self
.vca_config
= config
["VCA"]
1093 super().__init
__(msg
, self
.logger
)
1095 self
.helm2_k8scluster
= K8sHelmConnector(
1096 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1097 helm_command
=self
.vca_config
.get("helmpath"),
1104 self
.helm3_k8scluster
= K8sHelm3Connector(
1105 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1106 helm_command
=self
.vca_config
.get("helm3path"),
1113 self
.juju_k8scluster
= K8sJujuConnector(
1114 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1115 juju_command
=self
.vca_config
.get("jujupath"),
1124 "helm-chart": self
.helm2_k8scluster
,
1125 "helm-chart-v3": self
.helm3_k8scluster
,
1126 "juju-bundle": self
.juju_k8scluster
,
1129 async def create(self
, k8scluster_content
, order_id
):
1131 op_id
= k8scluster_content
.pop("op_id", None)
1132 if not self
.lcm_tasks
.lock_HA("k8scluster", "create", op_id
):
1135 k8scluster_id
= k8scluster_content
["_id"]
1136 logging_text
= "Task k8scluster_create={} ".format(k8scluster_id
)
1137 self
.logger
.debug(logging_text
+ "Enter")
1139 db_k8scluster
= None
1140 db_k8scluster_update
= {}
1143 step
= "Getting k8scluster-id='{}' from db".format(k8scluster_id
)
1144 self
.logger
.debug(logging_text
+ step
)
1145 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1146 self
.db
.encrypt_decrypt_fields(
1147 db_k8scluster
.get("credentials"),
1149 ["password", "secret"],
1150 schema_version
=db_k8scluster
["schema_version"],
1151 salt
=db_k8scluster
["_id"],
1153 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
1156 init_target
= deep_get(db_k8scluster
, ("_admin", "init"))
1157 step
= "Launching k8scluster init tasks"
1159 k8s_deploy_methods
= db_k8scluster
.get("deployment_methods", {})
1160 # for backwards compatibility and all-false case
1161 if not any(k8s_deploy_methods
.values()):
1162 k8s_deploy_methods
= {"helm-chart": True, "juju-bundle": True, "helm-chart-v3": True}
1163 deploy_methods
= tuple(filter(k8s_deploy_methods
.get
, k8s_deploy_methods
))
1165 for task_name
in deploy_methods
:
1166 if init_target
and task_name
not in init_target
:
1168 task
= asyncio
.ensure_future(
1169 self
.k8s_map
[task_name
].init_env(
1171 reuse_cluster_uuid
=k8scluster_id
,
1172 vca_id
=db_k8scluster
.get("vca_id"),
1175 pending_tasks
.append(task
)
1176 task2name
[task
] = task_name
1178 error_text_list
= []
1180 reached_timeout
= False
1183 while pending_tasks
:
1185 1, self
.timeout_create
- (time() - now
)
1186 ) # ensure not negative with max
1187 step
= "Waiting for k8scluster init tasks"
1188 done
, pending_tasks
= await asyncio
.wait(
1189 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
1192 # timeout. Set timeout is reached and process pending as if they hase been finished
1193 done
= pending_tasks
1194 pending_tasks
= None
1195 reached_timeout
= True
1197 task_name
= task2name
[task
]
1200 elif task
.cancelled():
1203 exc
= task
.exception()
1206 error_text_list
.append(
1207 "Failing init {}: {}".format(task_name
, exc
)
1209 db_k8scluster_update
[
1210 "_admin.{}.error_msg".format(task_name
)
1212 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = None
1213 db_k8scluster_update
[
1214 "_admin.{}.operationalState".format(task_name
)
1217 logging_text
+ "{} init fail: {}".format(task_name
, exc
),
1218 exc_info
=not isinstance(exc
, (N2VCException
, str)),
1221 k8s_id
, uninstall_sw
= task
.result()
1222 tasks_name_ok
.append(task_name
)
1225 + "{} init success. id={} created={}".format(
1226 task_name
, k8s_id
, uninstall_sw
1229 db_k8scluster_update
[
1230 "_admin.{}.error_msg".format(task_name
)
1232 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = k8s_id
1233 db_k8scluster_update
[
1234 "_admin.{}.created".format(task_name
)
1236 db_k8scluster_update
[
1237 "_admin.{}.operationalState".format(task_name
)
1240 step
= "Updating database for " + task_name
1241 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1243 operation_details
= "ready for " + ", ".join(tasks_name_ok
)
1244 operation_state
= "COMPLETED"
1245 db_k8scluster_update
["_admin.operationalState"] = (
1246 "ENABLED" if not error_text_list
else "DEGRADED"
1248 operation_details
+= "; " + ";".join(error_text_list
)
1250 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1251 operation_state
= "FAILED"
1252 operation_details
= ";".join(error_text_list
)
1253 db_k8scluster_update
["_admin.detailed-status"] = operation_details
1254 self
.logger
.debug(logging_text
+ "Done. Result: " + operation_state
)
1257 except Exception as e
:
1265 asyncio
.CancelledError
,
1268 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1270 self
.logger
.critical(
1271 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1275 if exc
and db_k8scluster
:
1276 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1277 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1280 operation_state
= "FAILED"
1281 operation_details
= "ERROR {}: {}".format(step
, exc
)
1283 if db_k8scluster
and db_k8scluster_update
:
1284 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1286 # Register the operation and unlock
1287 self
.lcm_tasks
.unlock_HA(
1291 operationState
=operation_state
,
1292 detailed_status
=operation_details
,
1294 except DbException
as e
:
1295 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1296 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1298 async def delete(self
, k8scluster_content
, order_id
):
1300 # HA tasks and backward compatibility:
1301 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1302 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1303 # Register 'delete' task here for related future HA operations
1304 op_id
= k8scluster_content
.pop("op_id", None)
1305 if not self
.lcm_tasks
.lock_HA("k8scluster", "delete", op_id
):
1308 k8scluster_id
= k8scluster_content
["_id"]
1309 logging_text
= "Task k8scluster_delete={} ".format(k8scluster_id
)
1310 self
.logger
.debug(logging_text
+ "Enter")
1312 db_k8scluster
= None
1313 db_k8scluster_update
= {}
1316 step
= "Getting k8scluster='{}' from db".format(k8scluster_id
)
1317 self
.logger
.debug(logging_text
+ step
)
1318 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1319 k8s_hc_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart", "id"))
1320 k8s_h3c_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "id"))
1321 k8s_jb_id
= deep_get(db_k8scluster
, ("_admin", "juju-bundle", "id"))
1323 cluster_removed
= True
1324 if k8s_jb_id
: # delete in reverse order of creation
1325 step
= "Removing juju-bundle '{}'".format(k8s_jb_id
)
1327 deep_get(db_k8scluster
, ("_admin", "juju-bundle", "created"))
1330 cluster_removed
= await self
.juju_k8scluster
.reset(
1331 cluster_uuid
=k8s_jb_id
,
1332 uninstall_sw
=uninstall_sw
,
1333 vca_id
=db_k8scluster
.get("vca_id"),
1335 db_k8scluster_update
["_admin.juju-bundle.id"] = None
1336 db_k8scluster_update
["_admin.juju-bundle.operationalState"] = "DISABLED"
1339 step
= "Removing helm-chart '{}'".format(k8s_hc_id
)
1341 deep_get(db_k8scluster
, ("_admin", "helm-chart", "created"))
1344 cluster_removed
= await self
.helm2_k8scluster
.reset(
1345 cluster_uuid
=k8s_hc_id
, uninstall_sw
=uninstall_sw
1347 db_k8scluster_update
["_admin.helm-chart.id"] = None
1348 db_k8scluster_update
["_admin.helm-chart.operationalState"] = "DISABLED"
1351 step
= "Removing helm-chart-v3 '{}'".format(k8s_hc_id
)
1353 deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "created"))
1356 cluster_removed
= await self
.helm3_k8scluster
.reset(
1357 cluster_uuid
=k8s_h3c_id
, uninstall_sw
=uninstall_sw
1359 db_k8scluster_update
["_admin.helm-chart-v3.id"] = None
1360 db_k8scluster_update
[
1361 "_admin.helm-chart-v3.operationalState"
1364 # Try to remove from cluster_inserted to clean old versions
1365 if k8s_hc_id
and cluster_removed
:
1366 step
= "Removing k8scluster='{}' from k8srepos".format(k8scluster_id
)
1367 self
.logger
.debug(logging_text
+ step
)
1368 db_k8srepo_list
= self
.db
.get_list(
1369 "k8srepos", {"_admin.cluster-inserted": k8s_hc_id
}
1371 for k8srepo
in db_k8srepo_list
:
1373 cluster_list
= k8srepo
["_admin"]["cluster-inserted"]
1374 cluster_list
.remove(k8s_hc_id
)
1378 {"_admin.cluster-inserted": cluster_list
},
1380 except Exception as e
:
1381 self
.logger
.error("{}: {}".format(step
, e
))
1382 self
.db
.del_one("k8sclusters", {"_id": k8scluster_id
})
1383 db_k8scluster_update
= None
1384 self
.logger
.debug(logging_text
+ "Done")
1386 except Exception as e
:
1394 asyncio
.CancelledError
,
1397 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1399 self
.logger
.critical(
1400 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1404 if exc
and db_k8scluster
:
1405 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1406 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1409 # Mark the WIM 'create' HA task as erroneous
1410 operation_state
= "FAILED"
1411 operation_details
= "ERROR {}: {}".format(step
, exc
)
1413 operation_state
= "COMPLETED"
1414 operation_details
= "deleted"
1417 if db_k8scluster_update
:
1418 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1419 # Register the K8scluster 'delete' HA task either
1420 # succesful or erroneous, or do nothing (if legacy NBI)
1421 self
.lcm_tasks
.unlock_HA(
1425 operationState
=operation_state
,
1426 detailed_status
=operation_details
,
1428 except DbException
as e
:
1429 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1430 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1433 class VcaLcm(LcmBase
):
1436 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1438 Init, Connect to database, filesystem storage, and messaging
1439 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1443 self
.logger
= logging
.getLogger("lcm.vca")
1445 self
.lcm_tasks
= lcm_tasks
1447 super().__init
__(msg
, self
.logger
)
1449 # create N2VC connector
1450 self
.n2vc
= N2VCJujuConnector(
1451 log
=self
.logger
, loop
=self
.loop
, fs
=self
.fs
, db
=self
.db
1454 def _get_vca_by_id(self
, vca_id
: str) -> dict:
1455 db_vca
= self
.db
.get_one("vca", {"_id": vca_id
})
1456 self
.db
.encrypt_decrypt_fields(
1459 ["secret", "cacert"],
1460 schema_version
=db_vca
["schema_version"],
1465 async def create(self
, vca_content
, order_id
):
1466 op_id
= vca_content
.pop("op_id", None)
1467 if not self
.lcm_tasks
.lock_HA("vca", "create", op_id
):
1470 vca_id
= vca_content
["_id"]
1471 self
.logger
.debug("Task vca_create={} {}".format(vca_id
, "Enter"))
1478 "Task vca_create={} {}".format(vca_id
, "Getting vca from db")
1480 db_vca
= self
._get
_vca
_by
_id
(vca_id
)
1482 task
= asyncio
.ensure_future(
1484 self
.n2vc
.validate_vca(db_vca
["_id"]),
1485 timeout
=self
.timeout_create
,
1489 await asyncio
.wait([task
], return_when
=asyncio
.FIRST_COMPLETED
)
1490 if task
.exception():
1491 raise task
.exception()
1493 "Task vca_create={} {}".format(
1494 vca_id
, "vca registered and validated successfully"
1497 db_vca_update
["_admin.operationalState"] = "ENABLED"
1498 db_vca_update
["_admin.detailed-status"] = "Connectivity: ok"
1499 operation_details
= "VCA validated"
1500 operation_state
= "COMPLETED"
1503 "Task vca_create={} {}".format(
1504 vca_id
, "Done. Result: {}".format(operation_state
)
1508 except Exception as e
:
1509 error_msg
= "Failed with exception: {}".format(e
)
1510 self
.logger
.error("Task vca_create={} {}".format(vca_id
, error_msg
))
1511 db_vca_update
["_admin.operationalState"] = "ERROR"
1512 db_vca_update
["_admin.detailed-status"] = error_msg
1513 operation_state
= "FAILED"
1514 operation_details
= error_msg
1517 self
.update_db_2("vca", vca_id
, db_vca_update
)
1519 # Register the operation and unlock
1520 self
.lcm_tasks
.unlock_HA(
1524 operationState
=operation_state
,
1525 detailed_status
=operation_details
,
1527 except DbException
as e
:
1529 "Task vca_create={} {}".format(
1530 vca_id
, "Cannot update database: {}".format(e
)
1533 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1535 async def delete(self
, vca_content
, order_id
):
1537 # HA tasks and backward compatibility:
1538 # If "vim_content" does not include "op_id", we a running a legacy NBI version.
1539 # In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing.
1540 # Register "delete" task here for related future HA operations
1541 op_id
= vca_content
.pop("op_id", None)
1542 if not self
.lcm_tasks
.lock_HA("vca", "delete", op_id
):
1546 vca_id
= vca_content
["_id"]
1550 "Task vca_delete={} {}".format(vca_id
, "Deleting vca from db")
1552 self
.db
.del_one("vca", {"_id": vca_id
})
1553 db_vca_update
= None
1554 operation_details
= "deleted"
1555 operation_state
= "COMPLETED"
1558 "Task vca_delete={} {}".format(
1559 vca_id
, "Done. Result: {}".format(operation_state
)
1562 except Exception as e
:
1563 error_msg
= "Failed with exception: {}".format(e
)
1564 self
.logger
.error("Task vca_delete={} {}".format(vca_id
, error_msg
))
1565 db_vca_update
["_admin.operationalState"] = "ERROR"
1566 db_vca_update
["_admin.detailed-status"] = error_msg
1567 operation_state
= "FAILED"
1568 operation_details
= error_msg
1571 self
.update_db_2("vca", vca_id
, db_vca_update
)
1572 self
.lcm_tasks
.unlock_HA(
1576 operationState
=operation_state
,
1577 detailed_status
=operation_details
,
1579 except DbException
as e
:
1581 "Task vca_delete={} {}".format(
1582 vca_id
, "Cannot update database: {}".format(e
)
1585 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1588 class K8sRepoLcm(LcmBase
):
1589 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1591 Init, Connect to database, filesystem storage, and messaging
1592 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1596 self
.logger
= logging
.getLogger("lcm.k8srepo")
1598 self
.lcm_tasks
= lcm_tasks
1599 self
.vca_config
= config
["VCA"]
1601 super().__init
__(msg
, self
.logger
)
1603 self
.k8srepo
= K8sHelmConnector(
1604 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1605 helm_command
=self
.vca_config
.get("helmpath"),
1612 async def create(self
, k8srepo_content
, order_id
):
1614 # HA tasks and backward compatibility:
1615 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1616 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1617 # Register 'create' task here for related future HA operations
1619 op_id
= k8srepo_content
.pop("op_id", None)
1620 if not self
.lcm_tasks
.lock_HA("k8srepo", "create", op_id
):
1623 k8srepo_id
= k8srepo_content
.get("_id")
1624 logging_text
= "Task k8srepo_create={} ".format(k8srepo_id
)
1625 self
.logger
.debug(logging_text
+ "Enter")
1628 db_k8srepo_update
= {}
1630 operation_state
= "COMPLETED"
1631 operation_details
= ""
1633 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1634 self
.logger
.debug(logging_text
+ step
)
1635 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1636 db_k8srepo_update
["_admin.operationalState"] = "ENABLED"
1637 except Exception as e
:
1639 logging_text
+ "Exit Exception {}".format(e
),
1640 exc_info
=not isinstance(
1647 asyncio
.CancelledError
,
1653 if exc
and db_k8srepo
:
1654 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1655 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1658 # Mark the WIM 'create' HA task as erroneous
1659 operation_state
= "FAILED"
1660 operation_details
= "ERROR {}: {}".format(step
, exc
)
1662 if db_k8srepo_update
:
1663 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1664 # Register the K8srepo 'create' HA task either
1665 # succesful or erroneous, or do nothing (if legacy NBI)
1666 self
.lcm_tasks
.unlock_HA(
1670 operationState
=operation_state
,
1671 detailed_status
=operation_details
,
1673 except DbException
as e
:
1674 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1675 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)
1677 async def delete(self
, k8srepo_content
, order_id
):
1679 # HA tasks and backward compatibility:
1680 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1681 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1682 # Register 'delete' task here for related future HA operations
1683 op_id
= k8srepo_content
.pop("op_id", None)
1684 if not self
.lcm_tasks
.lock_HA("k8srepo", "delete", op_id
):
1687 k8srepo_id
= k8srepo_content
.get("_id")
1688 logging_text
= "Task k8srepo_delete={} ".format(k8srepo_id
)
1689 self
.logger
.debug(logging_text
+ "Enter")
1692 db_k8srepo_update
= {}
1695 operation_state
= "COMPLETED"
1696 operation_details
= ""
1698 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1699 self
.logger
.debug(logging_text
+ step
)
1700 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1702 except Exception as e
:
1704 logging_text
+ "Exit Exception {}".format(e
),
1705 exc_info
=not isinstance(
1712 asyncio
.CancelledError
,
1718 if exc
and db_k8srepo
:
1719 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1720 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1723 # Mark the WIM 'create' HA task as erroneous
1724 operation_state
= "FAILED"
1725 operation_details
= "ERROR {}: {}".format(step
, exc
)
1727 if db_k8srepo_update
:
1728 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1729 # Register the K8srepo 'delete' HA task either
1730 # succesful or erroneous, or do nothing (if legacy NBI)
1731 self
.lcm_tasks
.unlock_HA(
1735 operationState
=operation_state
,
1736 detailed_status
=operation_details
,
1738 self
.db
.del_one("k8srepos", {"_id": k8srepo_id
})
1739 except DbException
as e
:
1740 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1741 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)