1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
23 from osm_lcm
import ROclient
24 from osm_lcm
.lcm_utils
import LcmException
, LcmBase
, deep_get
25 from n2vc
.k8s_helm_conn
import K8sHelmConnector
26 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
27 from n2vc
.k8s_juju_conn
import K8sJujuConnector
28 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
29 from n2vc
.exceptions
import K8sException
, N2VCException
30 from osm_common
.dbbase
import DbException
31 from copy
import deepcopy
34 __author__
= "Alfonso Tierno"
37 class VimLcm(LcmBase
):
38 # values that are encrypted at vim config because they are passwords
39 vim_config_encrypted
= {
40 "1.1": ("admin_password", "nsx_password", "vcenter_password"),
49 def __init__(self
, msg
, lcm_tasks
, config
):
51 Init, Connect to database, filesystem storage, and messaging
52 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
56 self
.logger
= logging
.getLogger("lcm.vim")
57 self
.lcm_tasks
= lcm_tasks
58 self
.ro_config
= config
["RO"]
60 super().__init
__(msg
, self
.logger
)
62 async def create(self
, vim_content
, order_id
):
63 # HA tasks and backward compatibility:
64 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
65 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
66 # Register 'create' task here for related future HA operations
67 op_id
= vim_content
.pop("op_id", None)
68 if not self
.lcm_tasks
.lock_HA("vim", "create", op_id
):
71 vim_id
= vim_content
["_id"]
72 logging_text
= "Task vim_create={} ".format(vim_id
)
73 self
.logger
.debug(logging_text
+ "Enter")
80 step
= "Getting vim-id='{}' from db".format(vim_id
)
81 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
82 if vim_content
.get("config") and vim_content
["config"].get(
85 step
= "Getting sdn-controller-id='{}' from db".format(
86 vim_content
["config"]["sdn-controller"]
88 db_sdn
= self
.db
.get_one(
89 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
92 # If the VIM account has an associated SDN account, also
93 # wait for any previous tasks in process for the SDN
94 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
98 and db_sdn
["_admin"].get("deployed")
99 and db_sdn
["_admin"]["deployed"].get("RO")
101 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
104 "sdn-controller={} is not available. Not deployed at RO".format(
105 vim_content
["config"]["sdn-controller"]
109 step
= "Creating vim at RO"
110 db_vim_update
["_admin.deployed.RO"] = None
111 db_vim_update
["_admin.detailed-status"] = step
112 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
113 RO
= ROclient
.ROClient(**self
.ro_config
)
114 vim_RO
= deepcopy(vim_content
)
115 vim_RO
.pop("_id", None)
116 vim_RO
.pop("_admin", None)
117 schema_version
= vim_RO
.pop("schema_version", None)
118 vim_RO
.pop("schema_type", None)
119 vim_RO
.pop("vim_tenant_name", None)
120 vim_RO
["type"] = vim_RO
.pop("vim_type")
121 vim_RO
.pop("vim_user", None)
122 vim_RO
.pop("vim_password", None)
124 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
125 desc
= await RO
.create("vim", descriptor
=vim_RO
)
126 RO_vim_id
= desc
["uuid"]
127 db_vim_update
["_admin.deployed.RO"] = RO_vim_id
129 logging_text
+ "VIM created at RO_vim_id={}".format(RO_vim_id
)
132 step
= "Creating vim_account at RO"
133 db_vim_update
["_admin.detailed-status"] = step
134 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
136 if vim_content
.get("vim_password"):
137 vim_content
["vim_password"] = self
.db
.decrypt(
138 vim_content
["vim_password"],
139 schema_version
=schema_version
,
143 "vim_tenant_name": vim_content
["vim_tenant_name"],
144 "vim_username": vim_content
["vim_user"],
145 "vim_password": vim_content
["vim_password"],
147 if vim_RO
.get("config"):
148 vim_account_RO
["config"] = vim_RO
["config"]
149 if "sdn-controller" in vim_account_RO
["config"]:
150 del vim_account_RO
["config"]["sdn-controller"]
151 if "sdn-port-mapping" in vim_account_RO
["config"]:
152 del vim_account_RO
["config"]["sdn-port-mapping"]
153 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
155 ) or self
.vim_config_encrypted
.get("default")
156 for p
in vim_config_encrypted_keys
:
157 if vim_account_RO
["config"].get(p
):
158 vim_account_RO
["config"][p
] = self
.db
.decrypt(
159 vim_account_RO
["config"][p
],
160 schema_version
=schema_version
,
164 desc
= await RO
.attach("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
165 db_vim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
166 db_vim_update
["_admin.operationalState"] = "ENABLED"
167 db_vim_update
["_admin.detailed-status"] = "Done"
168 # Mark the VIM 'create' HA task as successful
169 operation_state
= "COMPLETED"
170 operation_details
= "Done"
174 + "Exit Ok VIM account created at RO_vim_account_id={}".format(
180 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
181 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
183 except Exception as e
:
184 self
.logger
.critical(
185 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
190 db_vim_update
["_admin.operationalState"] = "ERROR"
191 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
194 # Mark the VIM 'create' HA task as erroneous
195 operation_state
= "FAILED"
196 operation_details
= "ERROR {}: {}".format(step
, exc
)
199 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
200 # Register the VIM 'create' HA task either
201 # succesful or erroneous, or do nothing (if legacy NBI)
202 self
.lcm_tasks
.unlock_HA(
206 operationState
=operation_state
,
207 detailed_status
=operation_details
,
209 except DbException
as e
:
210 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
212 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
214 async def edit(self
, vim_content
, order_id
):
215 # HA tasks and backward compatibility:
216 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
217 # In such a case, HA is not supported by NBI, and the HA check always returns True
218 op_id
= vim_content
.pop("op_id", None)
219 if not self
.lcm_tasks
.lock_HA("vim", "edit", op_id
):
222 vim_id
= vim_content
["_id"]
223 logging_text
= "Task vim_edit={} ".format(vim_id
)
224 self
.logger
.debug(logging_text
+ "Enter")
231 step
= "Getting vim-id='{}' from db".format(vim_id
)
233 # wait for any previous tasks in process
234 await self
.lcm_tasks
.waitfor_related_HA("vim", "edit", op_id
)
236 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
240 and db_vim
["_admin"].get("deployed")
241 and db_vim
["_admin"]["deployed"].get("RO")
243 if vim_content
.get("config") and vim_content
["config"].get(
246 step
= "Getting sdn-controller-id='{}' from db".format(
247 vim_content
["config"]["sdn-controller"]
249 db_sdn
= self
.db
.get_one(
250 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
253 # If the VIM account has an associated SDN account, also
254 # wait for any previous tasks in process for the SDN
255 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
259 and db_sdn
["_admin"].get("deployed")
260 and db_sdn
["_admin"]["deployed"].get("RO")
262 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
265 "sdn-controller={} is not available. Not deployed at RO".format(
266 vim_content
["config"]["sdn-controller"]
270 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
271 step
= "Editing vim at RO"
272 RO
= ROclient
.ROClient(**self
.ro_config
)
273 vim_RO
= deepcopy(vim_content
)
274 vim_RO
.pop("_id", None)
275 vim_RO
.pop("_admin", None)
276 schema_version
= vim_RO
.pop("schema_version", None)
277 vim_RO
.pop("schema_type", None)
278 vim_RO
.pop("vim_tenant_name", None)
279 if "vim_type" in vim_RO
:
280 vim_RO
["type"] = vim_RO
.pop("vim_type")
281 vim_RO
.pop("vim_user", None)
282 vim_RO
.pop("vim_password", None)
284 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
285 # TODO make a deep update of sdn-port-mapping
287 await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
289 step
= "Editing vim-account at RO tenant"
291 if "config" in vim_content
:
292 if "sdn-controller" in vim_content
["config"]:
293 del vim_content
["config"]["sdn-controller"]
294 if "sdn-port-mapping" in vim_content
["config"]:
295 del vim_content
["config"]["sdn-port-mapping"]
296 if not vim_content
["config"]:
297 del vim_content
["config"]
298 if "vim_tenant_name" in vim_content
:
299 vim_account_RO
["vim_tenant_name"] = vim_content
["vim_tenant_name"]
300 if "vim_password" in vim_content
:
301 vim_account_RO
["vim_password"] = vim_content
["vim_password"]
302 if vim_content
.get("vim_password"):
303 vim_account_RO
["vim_password"] = self
.db
.decrypt(
304 vim_content
["vim_password"],
305 schema_version
=schema_version
,
308 if "config" in vim_content
:
309 vim_account_RO
["config"] = vim_content
["config"]
310 if vim_content
.get("config"):
311 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
313 ) or self
.vim_config_encrypted
.get("default")
314 for p
in vim_config_encrypted_keys
:
315 if vim_content
["config"].get(p
):
316 vim_account_RO
["config"][p
] = self
.db
.decrypt(
317 vim_content
["config"][p
],
318 schema_version
=schema_version
,
322 if "vim_user" in vim_content
:
323 vim_content
["vim_username"] = vim_content
["vim_user"]
324 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
325 # vim_thread. RO will remove and relaunch a new thread for this vim_account
326 await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
327 db_vim_update
["_admin.operationalState"] = "ENABLED"
328 # Mark the VIM 'edit' HA task as successful
329 operation_state
= "COMPLETED"
330 operation_details
= "Done"
332 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id={}".format(RO_vim_id
))
335 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
336 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
338 except Exception as e
:
339 self
.logger
.critical(
340 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
345 db_vim_update
["_admin.operationalState"] = "ERROR"
346 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
349 # Mark the VIM 'edit' HA task as erroneous
350 operation_state
= "FAILED"
351 operation_details
= "ERROR {}: {}".format(step
, exc
)
354 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
355 # Register the VIM 'edit' HA task either
356 # succesful or erroneous, or do nothing (if legacy NBI)
357 self
.lcm_tasks
.unlock_HA(
361 operationState
=operation_state
,
362 detailed_status
=operation_details
,
364 except DbException
as e
:
365 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
367 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
369 async def delete(self
, vim_content
, order_id
):
370 # HA tasks and backward compatibility:
371 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
372 # In such a case, HA is not supported by NBI, and the HA check always returns True
373 op_id
= vim_content
.pop("op_id", None)
374 if not self
.lcm_tasks
.lock_HA("vim", "delete", op_id
):
377 vim_id
= vim_content
["_id"]
378 logging_text
= "Task vim_delete={} ".format(vim_id
)
379 self
.logger
.debug(logging_text
+ "Enter")
384 step
= "Getting vim from db"
386 # wait for any previous tasks in process
387 await self
.lcm_tasks
.waitfor_related_HA("vim", "delete", op_id
)
388 if not self
.ro_config
.get("ng"):
389 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
392 and db_vim
["_admin"].get("deployed")
393 and db_vim
["_admin"]["deployed"].get("RO")
395 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
396 RO
= ROclient
.ROClient(**self
.ro_config
)
397 step
= "Detaching vim from RO tenant"
399 await RO
.detach("vim_account", RO_vim_id
)
400 except ROclient
.ROClientException
as e
:
401 if e
.http_code
== 404: # not found
404 + "RO_vim_id={} already detached".format(RO_vim_id
)
409 step
= "Deleting vim from RO"
411 await RO
.delete("vim", RO_vim_id
)
412 except ROclient
.ROClientException
as e
:
413 if e
.http_code
== 404: # not found
416 + "RO_vim_id={} already deleted".format(RO_vim_id
)
422 self
.logger
.debug(logging_text
+ "Nothing to remove at RO")
423 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
425 self
.logger
.debug(logging_text
+ "Exit Ok")
428 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
429 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
431 except Exception as e
:
432 self
.logger
.critical(
433 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
437 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
439 db_vim_update
["_admin.operationalState"] = "ERROR"
440 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
443 # Mark the VIM 'delete' HA task as erroneous
444 operation_state
= "FAILED"
445 operation_details
= "ERROR {}: {}".format(step
, exc
)
446 self
.lcm_tasks
.unlock_HA(
450 operationState
=operation_state
,
451 detailed_status
=operation_details
,
454 if db_vim
and db_vim_update
:
455 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
456 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
457 # which means that there is nowhere to register this task, so do nothing here.
458 except DbException
as e
:
459 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
460 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
463 class WimLcm(LcmBase
):
464 # values that are encrypted at wim config because they are passwords
465 wim_config_encrypted
= ()
467 def __init__(self
, msg
, lcm_tasks
, config
):
469 Init, Connect to database, filesystem storage, and messaging
470 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
474 self
.logger
= logging
.getLogger("lcm.vim")
475 self
.lcm_tasks
= lcm_tasks
476 self
.ro_config
= config
["RO"]
478 super().__init
__(msg
, self
.logger
)
480 async def create(self
, wim_content
, order_id
):
481 # HA tasks and backward compatibility:
482 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
483 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
484 # Register 'create' task here for related future HA operations
485 op_id
= wim_content
.pop("op_id", None)
486 self
.lcm_tasks
.lock_HA("wim", "create", op_id
)
488 wim_id
= wim_content
["_id"]
489 logging_text
= "Task wim_create={} ".format(wim_id
)
490 self
.logger
.debug(logging_text
+ "Enter")
496 step
= "Getting wim-id='{}' from db".format(wim_id
)
497 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
498 db_wim_update
["_admin.deployed.RO"] = None
500 step
= "Creating wim at RO"
501 db_wim_update
["_admin.detailed-status"] = step
502 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
503 RO
= ROclient
.ROClient(**self
.ro_config
)
504 wim_RO
= deepcopy(wim_content
)
505 wim_RO
.pop("_id", None)
506 wim_RO
.pop("_admin", None)
507 schema_version
= wim_RO
.pop("schema_version", None)
508 wim_RO
.pop("schema_type", None)
509 wim_RO
.pop("wim_tenant_name", None)
510 wim_RO
["type"] = wim_RO
.pop("wim_type")
511 wim_RO
.pop("wim_user", None)
512 wim_RO
.pop("wim_password", None)
513 desc
= await RO
.create("wim", descriptor
=wim_RO
)
514 RO_wim_id
= desc
["uuid"]
515 db_wim_update
["_admin.deployed.RO"] = RO_wim_id
517 logging_text
+ "WIM created at RO_wim_id={}".format(RO_wim_id
)
520 step
= "Creating wim_account at RO"
521 db_wim_update
["_admin.detailed-status"] = step
522 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
524 if wim_content
.get("wim_password"):
525 wim_content
["wim_password"] = self
.db
.decrypt(
526 wim_content
["wim_password"],
527 schema_version
=schema_version
,
531 "name": wim_content
["name"],
532 "user": wim_content
["user"],
533 "password": wim_content
["password"],
535 if wim_RO
.get("config"):
536 wim_account_RO
["config"] = wim_RO
["config"]
537 if "wim_port_mapping" in wim_account_RO
["config"]:
538 del wim_account_RO
["config"]["wim_port_mapping"]
539 for p
in self
.wim_config_encrypted
:
540 if wim_account_RO
["config"].get(p
):
541 wim_account_RO
["config"][p
] = self
.db
.decrypt(
542 wim_account_RO
["config"][p
],
543 schema_version
=schema_version
,
547 desc
= await RO
.attach("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
548 db_wim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
549 db_wim_update
["_admin.operationalState"] = "ENABLED"
550 db_wim_update
["_admin.detailed-status"] = "Done"
551 # Mark the WIM 'create' HA task as successful
552 operation_state
= "COMPLETED"
553 operation_details
= "Done"
557 + "Exit Ok WIM account created at RO_wim_account_id={}".format(
563 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
564 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
566 except Exception as e
:
567 self
.logger
.critical(
568 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
573 db_wim_update
["_admin.operationalState"] = "ERROR"
574 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
577 # Mark the WIM 'create' HA task as erroneous
578 operation_state
= "FAILED"
579 operation_details
= "ERROR {}: {}".format(step
, exc
)
582 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
583 # Register the WIM 'create' HA task either
584 # succesful or erroneous, or do nothing (if legacy NBI)
585 self
.lcm_tasks
.unlock_HA(
589 operationState
=operation_state
,
590 detailed_status
=operation_details
,
592 except DbException
as e
:
593 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
594 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
596 async def edit(self
, wim_content
, order_id
):
597 # HA tasks and backward compatibility:
598 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
599 # In such a case, HA is not supported by NBI, and the HA check always returns True
600 op_id
= wim_content
.pop("op_id", None)
601 if not self
.lcm_tasks
.lock_HA("wim", "edit", op_id
):
604 wim_id
= wim_content
["_id"]
605 logging_text
= "Task wim_edit={} ".format(wim_id
)
606 self
.logger
.debug(logging_text
+ "Enter")
612 step
= "Getting wim-id='{}' from db".format(wim_id
)
614 # wait for any previous tasks in process
615 await self
.lcm_tasks
.waitfor_related_HA("wim", "edit", op_id
)
617 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
621 and db_wim
["_admin"].get("deployed")
622 and db_wim
["_admin"]["deployed"].get("RO")
624 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
625 step
= "Editing wim at RO"
626 RO
= ROclient
.ROClient(**self
.ro_config
)
627 wim_RO
= deepcopy(wim_content
)
628 wim_RO
.pop("_id", None)
629 wim_RO
.pop("_admin", None)
630 schema_version
= wim_RO
.pop("schema_version", None)
631 wim_RO
.pop("schema_type", None)
632 wim_RO
.pop("wim_tenant_name", None)
633 if "wim_type" in wim_RO
:
634 wim_RO
["type"] = wim_RO
.pop("wim_type")
635 wim_RO
.pop("wim_user", None)
636 wim_RO
.pop("wim_password", None)
637 # TODO make a deep update of wim_port_mapping
639 await RO
.edit("wim", RO_wim_id
, descriptor
=wim_RO
)
641 step
= "Editing wim-account at RO tenant"
643 if "config" in wim_content
:
644 if "wim_port_mapping" in wim_content
["config"]:
645 del wim_content
["config"]["wim_port_mapping"]
646 if not wim_content
["config"]:
647 del wim_content
["config"]
648 if "wim_tenant_name" in wim_content
:
649 wim_account_RO
["wim_tenant_name"] = wim_content
["wim_tenant_name"]
650 if "wim_password" in wim_content
:
651 wim_account_RO
["wim_password"] = wim_content
["wim_password"]
652 if wim_content
.get("wim_password"):
653 wim_account_RO
["wim_password"] = self
.db
.decrypt(
654 wim_content
["wim_password"],
655 schema_version
=schema_version
,
658 if "config" in wim_content
:
659 wim_account_RO
["config"] = wim_content
["config"]
660 if wim_content
.get("config"):
661 for p
in self
.wim_config_encrypted
:
662 if wim_content
["config"].get(p
):
663 wim_account_RO
["config"][p
] = self
.db
.decrypt(
664 wim_content
["config"][p
],
665 schema_version
=schema_version
,
669 if "wim_user" in wim_content
:
670 wim_content
["wim_username"] = wim_content
["wim_user"]
671 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
672 # wim_thread. RO will remove and relaunch a new thread for this wim_account
673 await RO
.edit("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
674 db_wim_update
["_admin.operationalState"] = "ENABLED"
675 # Mark the WIM 'edit' HA task as successful
676 operation_state
= "COMPLETED"
677 operation_details
= "Done"
679 self
.logger
.debug(logging_text
+ "Exit Ok RO_wim_id={}".format(RO_wim_id
))
682 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
683 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
685 except Exception as e
:
686 self
.logger
.critical(
687 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
692 db_wim_update
["_admin.operationalState"] = "ERROR"
693 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
696 # Mark the WIM 'edit' HA task as erroneous
697 operation_state
= "FAILED"
698 operation_details
= "ERROR {}: {}".format(step
, exc
)
701 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
702 # Register the WIM 'edit' HA task either
703 # succesful or erroneous, or do nothing (if legacy NBI)
704 self
.lcm_tasks
.unlock_HA(
708 operationState
=operation_state
,
709 detailed_status
=operation_details
,
711 except DbException
as e
:
712 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
713 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
715 async def delete(self
, wim_content
, order_id
):
716 # HA tasks and backward compatibility:
717 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
718 # In such a case, HA is not supported by NBI, and the HA check always returns True
719 op_id
= wim_content
.pop("op_id", None)
720 if not self
.lcm_tasks
.lock_HA("wim", "delete", op_id
):
723 wim_id
= wim_content
["_id"]
724 logging_text
= "Task wim_delete={} ".format(wim_id
)
725 self
.logger
.debug(logging_text
+ "Enter")
730 step
= "Getting wim from db"
732 # wait for any previous tasks in process
733 await self
.lcm_tasks
.waitfor_related_HA("wim", "delete", op_id
)
735 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
738 and db_wim
["_admin"].get("deployed")
739 and db_wim
["_admin"]["deployed"].get("RO")
741 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
742 RO
= ROclient
.ROClient(**self
.ro_config
)
743 step
= "Detaching wim from RO tenant"
745 await RO
.detach("wim_account", RO_wim_id
)
746 except ROclient
.ROClientException
as e
:
747 if e
.http_code
== 404: # not found
750 + "RO_wim_id={} already detached".format(RO_wim_id
)
755 step
= "Deleting wim from RO"
757 await RO
.delete("wim", RO_wim_id
)
758 except ROclient
.ROClientException
as e
:
759 if e
.http_code
== 404: # not found
762 + "RO_wim_id={} already deleted".format(RO_wim_id
)
768 self
.logger
.error(logging_text
+ "Nothing to remove at RO")
769 self
.db
.del_one("wim_accounts", {"_id": wim_id
})
771 self
.logger
.debug(logging_text
+ "Exit Ok")
774 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
775 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
777 except Exception as e
:
778 self
.logger
.critical(
779 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
783 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
785 db_wim_update
["_admin.operationalState"] = "ERROR"
786 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
789 # Mark the WIM 'delete' HA task as erroneous
790 operation_state
= "FAILED"
791 operation_details
= "ERROR {}: {}".format(step
, exc
)
792 self
.lcm_tasks
.unlock_HA(
796 operationState
=operation_state
,
797 detailed_status
=operation_details
,
800 if db_wim
and db_wim_update
:
801 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
802 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
803 # which means that there is nowhere to register this task, so do nothing here.
804 except DbException
as e
:
805 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
806 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
809 class SdnLcm(LcmBase
):
810 def __init__(self
, msg
, lcm_tasks
, config
):
812 Init, Connect to database, filesystem storage, and messaging
813 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
817 self
.logger
= logging
.getLogger("lcm.sdn")
818 self
.lcm_tasks
= lcm_tasks
819 self
.ro_config
= config
["RO"]
821 super().__init
__(msg
, self
.logger
)
823 async def create(self
, sdn_content
, order_id
):
824 # HA tasks and backward compatibility:
825 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
826 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
827 # Register 'create' task here for related future HA operations
828 op_id
= sdn_content
.pop("op_id", None)
829 self
.lcm_tasks
.lock_HA("sdn", "create", op_id
)
831 sdn_id
= sdn_content
["_id"]
832 logging_text
= "Task sdn_create={} ".format(sdn_id
)
833 self
.logger
.debug(logging_text
+ "Enter")
840 step
= "Getting sdn from db"
841 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
842 db_sdn_update
["_admin.deployed.RO"] = None
844 step
= "Creating sdn at RO"
845 db_sdn_update
["_admin.detailed-status"] = step
846 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
848 RO
= ROclient
.ROClient(**self
.ro_config
)
849 sdn_RO
= deepcopy(sdn_content
)
850 sdn_RO
.pop("_id", None)
851 sdn_RO
.pop("_admin", None)
852 schema_version
= sdn_RO
.pop("schema_version", None)
853 sdn_RO
.pop("schema_type", None)
854 sdn_RO
.pop("description", None)
855 if sdn_RO
.get("password"):
856 sdn_RO
["password"] = self
.db
.decrypt(
857 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
860 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
861 RO_sdn_id
= desc
["uuid"]
862 db_sdn_update
["_admin.deployed.RO"] = RO_sdn_id
863 db_sdn_update
["_admin.operationalState"] = "ENABLED"
864 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
865 # Mark the SDN 'create' HA task as successful
866 operation_state
= "COMPLETED"
867 operation_details
= "Done"
870 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
871 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
873 except Exception as e
:
874 self
.logger
.critical(
875 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
880 db_sdn_update
["_admin.operationalState"] = "ERROR"
881 db_sdn_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
884 # Mark the SDN 'create' HA task as erroneous
885 operation_state
= "FAILED"
886 operation_details
= "ERROR {}: {}".format(step
, exc
)
888 if db_sdn
and db_sdn_update
:
889 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
890 # Register the SDN 'create' HA task either
891 # succesful or erroneous, or do nothing (if legacy NBI)
892 self
.lcm_tasks
.unlock_HA(
896 operationState
=operation_state
,
897 detailed_status
=operation_details
,
899 except DbException
as e
:
900 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
901 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
903 async def edit(self
, sdn_content
, order_id
):
904 # HA tasks and backward compatibility:
905 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
906 # In such a case, HA is not supported by NBI, and the HA check always returns True
907 op_id
= sdn_content
.pop("op_id", None)
908 if not self
.lcm_tasks
.lock_HA("sdn", "edit", op_id
):
911 sdn_id
= sdn_content
["_id"]
912 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
913 self
.logger
.debug(logging_text
+ "Enter")
918 step
= "Getting sdn from db"
920 # wait for any previous tasks in process
921 await self
.lcm_tasks
.waitfor_related_HA("sdn", "edit", op_id
)
923 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
927 and db_sdn
["_admin"].get("deployed")
928 and db_sdn
["_admin"]["deployed"].get("RO")
930 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
931 RO
= ROclient
.ROClient(**self
.ro_config
)
932 step
= "Editing sdn at RO"
933 sdn_RO
= deepcopy(sdn_content
)
934 sdn_RO
.pop("_id", None)
935 sdn_RO
.pop("_admin", None)
936 schema_version
= sdn_RO
.pop("schema_version", None)
937 sdn_RO
.pop("schema_type", None)
938 sdn_RO
.pop("description", None)
939 if sdn_RO
.get("password"):
940 sdn_RO
["password"] = self
.db
.decrypt(
941 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
944 await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
945 db_sdn_update
["_admin.operationalState"] = "ENABLED"
946 # Mark the SDN 'edit' HA task as successful
947 operation_state
= "COMPLETED"
948 operation_details
= "Done"
950 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
953 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
954 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
956 except Exception as e
:
957 self
.logger
.critical(
958 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
963 db_sdn
["_admin.operationalState"] = "ERROR"
964 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
965 # Mark the SDN 'edit' HA task as erroneous
966 operation_state
= "FAILED"
967 operation_details
= "ERROR {}: {}".format(step
, exc
)
970 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
971 # Register the SDN 'edit' HA task either
972 # succesful or erroneous, or do nothing (if legacy NBI)
973 self
.lcm_tasks
.unlock_HA(
977 operationState
=operation_state
,
978 detailed_status
=operation_details
,
980 except DbException
as e
:
981 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
982 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
984 async def delete(self
, sdn_content
, order_id
):
985 # HA tasks and backward compatibility:
986 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
987 # In such a case, HA is not supported by NBI, and the HA check always returns True
988 op_id
= sdn_content
.pop("op_id", None)
989 if not self
.lcm_tasks
.lock_HA("sdn", "delete", op_id
):
992 sdn_id
= sdn_content
["_id"]
993 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
994 self
.logger
.debug(logging_text
+ "Enter")
999 step
= "Getting sdn from db"
1001 # wait for any previous tasks in process
1002 await self
.lcm_tasks
.waitfor_related_HA("sdn", "delete", op_id
)
1004 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
1006 db_sdn
.get("_admin")
1007 and db_sdn
["_admin"].get("deployed")
1008 and db_sdn
["_admin"]["deployed"].get("RO")
1010 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
1011 RO
= ROclient
.ROClient(**self
.ro_config
)
1012 step
= "Deleting sdn from RO"
1014 await RO
.delete("sdn", RO_sdn_id
)
1015 except ROclient
.ROClientException
as e
:
1016 if e
.http_code
== 404: # not found
1019 + "RO_sdn_id={} already deleted".format(RO_sdn_id
)
1026 logging_text
+ "Skipping. There is not RO information at database"
1028 self
.db
.del_one("sdns", {"_id": sdn_id
})
1030 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
1033 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
1034 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1036 except Exception as e
:
1037 self
.logger
.critical(
1038 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1043 db_sdn
["_admin.operationalState"] = "ERROR"
1044 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1045 # Mark the SDN 'delete' HA task as erroneous
1046 operation_state
= "FAILED"
1047 operation_details
= "ERROR {}: {}".format(step
, exc
)
1048 self
.lcm_tasks
.unlock_HA(
1052 operationState
=operation_state
,
1053 detailed_status
=operation_details
,
1056 if db_sdn
and db_sdn_update
:
1057 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
1058 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
1059 # which means that there is nowhere to register this task, so do nothing here.
1060 except DbException
as e
:
1061 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1062 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
1065 class K8sClusterLcm(LcmBase
):
1066 timeout_create
= 300
1068 def __init__(self
, msg
, lcm_tasks
, config
):
1070 Init, Connect to database, filesystem storage, and messaging
1071 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1075 self
.logger
= logging
.getLogger("lcm.k8scluster")
1076 self
.lcm_tasks
= lcm_tasks
1077 self
.vca_config
= config
["VCA"]
1079 super().__init
__(msg
, self
.logger
)
1081 self
.helm2_k8scluster
= K8sHelmConnector(
1082 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1083 helm_command
=self
.vca_config
.get("helmpath"),
1090 self
.helm3_k8scluster
= K8sHelm3Connector(
1091 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1092 helm_command
=self
.vca_config
.get("helm3path"),
1099 self
.juju_k8scluster
= K8sJujuConnector(
1100 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1101 juju_command
=self
.vca_config
.get("jujupath"),
1109 "helm-chart": self
.helm2_k8scluster
,
1110 "helm-chart-v3": self
.helm3_k8scluster
,
1111 "juju-bundle": self
.juju_k8scluster
,
1114 async def create(self
, k8scluster_content
, order_id
):
1115 op_id
= k8scluster_content
.pop("op_id", None)
1116 if not self
.lcm_tasks
.lock_HA("k8scluster", "create", op_id
):
1119 k8scluster_id
= k8scluster_content
["_id"]
1120 logging_text
= "Task k8scluster_create={} ".format(k8scluster_id
)
1121 self
.logger
.debug(logging_text
+ "Enter")
1123 db_k8scluster
= None
1124 db_k8scluster_update
= {}
1127 step
= "Getting k8scluster-id='{}' from db".format(k8scluster_id
)
1128 self
.logger
.debug(logging_text
+ step
)
1129 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1130 self
.db
.encrypt_decrypt_fields(
1131 db_k8scluster
.get("credentials"),
1133 ["password", "secret"],
1134 schema_version
=db_k8scluster
["schema_version"],
1135 salt
=db_k8scluster
["_id"],
1137 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
1140 init_target
= deep_get(db_k8scluster
, ("_admin", "init"))
1141 step
= "Launching k8scluster init tasks"
1143 k8s_deploy_methods
= db_k8scluster
.get("deployment_methods", {})
1144 # for backwards compatibility and all-false case
1145 if not any(k8s_deploy_methods
.values()):
1146 k8s_deploy_methods
= {
1148 "juju-bundle": True,
1149 "helm-chart-v3": True,
1151 deploy_methods
= tuple(filter(k8s_deploy_methods
.get
, k8s_deploy_methods
))
1153 for task_name
in deploy_methods
:
1154 if init_target
and task_name
not in init_target
:
1156 task
= asyncio
.ensure_future(
1157 self
.k8s_map
[task_name
].init_env(
1159 reuse_cluster_uuid
=k8scluster_id
,
1160 vca_id
=db_k8scluster
.get("vca_id"),
1163 pending_tasks
.append(task
)
1164 task2name
[task
] = task_name
1166 error_text_list
= []
1168 reached_timeout
= False
1171 while pending_tasks
:
1173 1, self
.timeout_create
- (time() - now
)
1174 ) # ensure not negative with max
1175 step
= "Waiting for k8scluster init tasks"
1176 done
, pending_tasks
= await asyncio
.wait(
1177 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
1180 # timeout. Set timeout is reached and process pending as if they hase been finished
1181 done
= pending_tasks
1182 pending_tasks
= None
1183 reached_timeout
= True
1185 task_name
= task2name
[task
]
1188 elif task
.cancelled():
1191 exc
= task
.exception()
1194 error_text_list
.append(
1195 "Failing init {}: {}".format(task_name
, exc
)
1197 db_k8scluster_update
[
1198 "_admin.{}.error_msg".format(task_name
)
1200 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = None
1201 db_k8scluster_update
[
1202 "_admin.{}.operationalState".format(task_name
)
1205 logging_text
+ "{} init fail: {}".format(task_name
, exc
),
1206 exc_info
=not isinstance(exc
, (N2VCException
, str)),
1209 k8s_id
, uninstall_sw
= task
.result()
1210 tasks_name_ok
.append(task_name
)
1213 + "{} init success. id={} created={}".format(
1214 task_name
, k8s_id
, uninstall_sw
1217 db_k8scluster_update
[
1218 "_admin.{}.error_msg".format(task_name
)
1220 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = k8s_id
1221 db_k8scluster_update
[
1222 "_admin.{}.created".format(task_name
)
1224 db_k8scluster_update
[
1225 "_admin.{}.operationalState".format(task_name
)
1228 step
= "Updating database for " + task_name
1229 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1231 operation_details
= "ready for " + ", ".join(tasks_name_ok
)
1232 operation_state
= "COMPLETED"
1233 db_k8scluster_update
["_admin.operationalState"] = (
1234 "ENABLED" if not error_text_list
else "DEGRADED"
1236 operation_details
+= "; " + ";".join(error_text_list
)
1238 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1239 operation_state
= "FAILED"
1240 operation_details
= ";".join(error_text_list
)
1241 db_k8scluster_update
["_admin.detailed-status"] = operation_details
1242 self
.logger
.debug(logging_text
+ "Done. Result: " + operation_state
)
1245 except Exception as e
:
1253 asyncio
.CancelledError
,
1256 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1258 self
.logger
.critical(
1259 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1263 if exc
and db_k8scluster
:
1264 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1265 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1268 operation_state
= "FAILED"
1269 operation_details
= "ERROR {}: {}".format(step
, exc
)
1271 if db_k8scluster
and db_k8scluster_update
:
1272 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1274 # Register the operation and unlock
1275 self
.lcm_tasks
.unlock_HA(
1279 operationState
=operation_state
,
1280 detailed_status
=operation_details
,
1282 except DbException
as e
:
1283 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1284 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1286 async def edit(self
, k8scluster_content
, order_id
):
1287 op_id
= k8scluster_content
.pop("op_id", None)
1288 if not self
.lcm_tasks
.lock_HA("k8scluster", "edit", op_id
):
1291 k8scluster_id
= k8scluster_content
["_id"]
1293 logging_text
= "Task k8scluster_edit={} ".format(k8scluster_id
)
1294 self
.logger
.debug(logging_text
+ "Enter")
1296 # TODO the implementation is pending and will be part of a new feature
1297 # It will support rotation of certificates, update of credentials and K8S API endpoint
1298 # At the moment the operation is set as completed
1300 operation_state
= "COMPLETED"
1301 operation_details
= "Not implemented"
1303 self
.lcm_tasks
.unlock_HA(
1307 operationState
=operation_state
,
1308 detailed_status
=operation_details
,
1310 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1312 async def delete(self
, k8scluster_content
, order_id
):
1313 # HA tasks and backward compatibility:
1314 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1315 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1316 # Register 'delete' task here for related future HA operations
1317 op_id
= k8scluster_content
.pop("op_id", None)
1318 if not self
.lcm_tasks
.lock_HA("k8scluster", "delete", op_id
):
1321 k8scluster_id
= k8scluster_content
["_id"]
1322 logging_text
= "Task k8scluster_delete={} ".format(k8scluster_id
)
1323 self
.logger
.debug(logging_text
+ "Enter")
1325 db_k8scluster
= None
1326 db_k8scluster_update
= {}
1329 step
= "Getting k8scluster='{}' from db".format(k8scluster_id
)
1330 self
.logger
.debug(logging_text
+ step
)
1331 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1332 k8s_hc_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart", "id"))
1333 k8s_h3c_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "id"))
1334 k8s_jb_id
= deep_get(db_k8scluster
, ("_admin", "juju-bundle", "id"))
1336 cluster_removed
= True
1337 if k8s_jb_id
: # delete in reverse order of creation
1338 step
= "Removing juju-bundle '{}'".format(k8s_jb_id
)
1340 deep_get(db_k8scluster
, ("_admin", "juju-bundle", "created"))
1343 cluster_removed
= await self
.juju_k8scluster
.reset(
1344 cluster_uuid
=k8s_jb_id
,
1345 uninstall_sw
=uninstall_sw
,
1346 vca_id
=db_k8scluster
.get("vca_id"),
1348 db_k8scluster_update
["_admin.juju-bundle.id"] = None
1349 db_k8scluster_update
["_admin.juju-bundle.operationalState"] = "DISABLED"
1352 step
= "Removing helm-chart '{}'".format(k8s_hc_id
)
1354 deep_get(db_k8scluster
, ("_admin", "helm-chart", "created"))
1357 cluster_removed
= await self
.helm2_k8scluster
.reset(
1358 cluster_uuid
=k8s_hc_id
, uninstall_sw
=uninstall_sw
1360 db_k8scluster_update
["_admin.helm-chart.id"] = None
1361 db_k8scluster_update
["_admin.helm-chart.operationalState"] = "DISABLED"
1364 step
= "Removing helm-chart-v3 '{}'".format(k8s_h3c_id
)
1366 deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "created"))
1369 cluster_removed
= await self
.helm3_k8scluster
.reset(
1370 cluster_uuid
=k8s_h3c_id
, uninstall_sw
=uninstall_sw
1372 db_k8scluster_update
["_admin.helm-chart-v3.id"] = None
1373 db_k8scluster_update
[
1374 "_admin.helm-chart-v3.operationalState"
1377 # Try to remove from cluster_inserted to clean old versions
1378 if k8s_hc_id
and cluster_removed
:
1379 step
= "Removing k8scluster='{}' from k8srepos".format(k8scluster_id
)
1380 self
.logger
.debug(logging_text
+ step
)
1381 db_k8srepo_list
= self
.db
.get_list(
1382 "k8srepos", {"_admin.cluster-inserted": k8s_hc_id
}
1384 for k8srepo
in db_k8srepo_list
:
1386 cluster_list
= k8srepo
["_admin"]["cluster-inserted"]
1387 cluster_list
.remove(k8s_hc_id
)
1391 {"_admin.cluster-inserted": cluster_list
},
1393 except Exception as e
:
1394 self
.logger
.error("{}: {}".format(step
, e
))
1395 self
.db
.del_one("k8sclusters", {"_id": k8scluster_id
})
1396 db_k8scluster_update
= None
1397 self
.logger
.debug(logging_text
+ "Done")
1399 except Exception as e
:
1407 asyncio
.CancelledError
,
1410 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1412 self
.logger
.critical(
1413 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1417 if exc
and db_k8scluster
:
1418 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1419 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1422 # Mark the WIM 'create' HA task as erroneous
1423 operation_state
= "FAILED"
1424 operation_details
= "ERROR {}: {}".format(step
, exc
)
1426 operation_state
= "COMPLETED"
1427 operation_details
= "deleted"
1430 if db_k8scluster_update
:
1431 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1432 # Register the K8scluster 'delete' HA task either
1433 # succesful or erroneous, or do nothing (if legacy NBI)
1434 self
.lcm_tasks
.unlock_HA(
1438 operationState
=operation_state
,
1439 detailed_status
=operation_details
,
1441 except DbException
as e
:
1442 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1443 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1446 class VcaLcm(LcmBase
):
1449 def __init__(self
, msg
, lcm_tasks
, config
):
1451 Init, Connect to database, filesystem storage, and messaging
1452 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1456 self
.logger
= logging
.getLogger("lcm.vca")
1457 self
.lcm_tasks
= lcm_tasks
1459 super().__init
__(msg
, self
.logger
)
1461 # create N2VC connector
1462 self
.n2vc
= N2VCJujuConnector(log
=self
.logger
, fs
=self
.fs
, db
=self
.db
)
1464 def _get_vca_by_id(self
, vca_id
: str) -> dict:
1465 db_vca
= self
.db
.get_one("vca", {"_id": vca_id
})
1466 self
.db
.encrypt_decrypt_fields(
1469 ["secret", "cacert"],
1470 schema_version
=db_vca
["schema_version"],
1475 async def _validate_vca(self
, db_vca_id
: str) -> None:
1476 task
= asyncio
.ensure_future(
1478 self
.n2vc
.validate_vca(db_vca_id
),
1479 timeout
=self
.timeout_create
,
1482 await asyncio
.wait([task
], return_when
=asyncio
.FIRST_COMPLETED
)
1483 if task
.exception():
1484 raise task
.exception()
1486 def _is_vca_config_update(self
, update_options
) -> bool:
1488 word
in update_options
.keys()
1502 async def create(self
, vca_content
, order_id
):
1503 op_id
= vca_content
.pop("op_id", None)
1504 if not self
.lcm_tasks
.lock_HA("vca", "create", op_id
):
1507 vca_id
= vca_content
["_id"]
1508 self
.logger
.debug("Task vca_create={} {}".format(vca_id
, "Enter"))
1512 operation_state
= "FAILED"
1513 operation_details
= ""
1516 "Task vca_create={} {}".format(vca_id
, "Getting vca from db")
1518 db_vca
= self
._get
_vca
_by
_id
(vca_id
)
1520 await self
._validate
_vca
(db_vca
["_id"])
1522 "Task vca_create={} {}".format(
1523 vca_id
, "vca registered and validated successfully"
1526 db_vca_update
["_admin.operationalState"] = "ENABLED"
1527 db_vca_update
["_admin.detailed-status"] = "Connectivity: ok"
1528 operation_details
= "VCA validated"
1529 operation_state
= "COMPLETED"
1532 "Task vca_create={} {}".format(
1533 vca_id
, "Done. Result: {}".format(operation_state
)
1537 except Exception as e
:
1538 error_msg
= "Failed with exception: {}".format(e
)
1539 self
.logger
.error("Task vca_create={} {}".format(vca_id
, error_msg
))
1540 db_vca_update
["_admin.operationalState"] = "ERROR"
1541 db_vca_update
["_admin.detailed-status"] = error_msg
1542 operation_details
= error_msg
1545 self
.update_db_2("vca", vca_id
, db_vca_update
)
1547 # Register the operation and unlock
1548 self
.lcm_tasks
.unlock_HA(
1552 operationState
=operation_state
,
1553 detailed_status
=operation_details
,
1555 except DbException
as e
:
1557 "Task vca_create={} {}".format(
1558 vca_id
, "Cannot update database: {}".format(e
)
1561 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1563 async def edit(self
, vca_content
, order_id
):
1564 op_id
= vca_content
.pop("op_id", None)
1565 if not self
.lcm_tasks
.lock_HA("vca", "edit", op_id
):
1568 vca_id
= vca_content
["_id"]
1569 self
.logger
.debug("Task vca_edit={} {}".format(vca_id
, "Enter"))
1574 operation_state
= "FAILED"
1575 operation_details
= ""
1578 "Task vca_edit={} {}".format(vca_id
, "Getting vca from db")
1580 db_vca
= self
._get
_vca
_by
_id
(vca_id
)
1581 if self
._is
_vca
_config
_update
(vca_content
):
1582 await self
._validate
_vca
(db_vca
["_id"])
1584 "Task vca_edit={} {}".format(
1585 vca_id
, "vca registered and validated successfully"
1588 db_vca_update
["_admin.operationalState"] = "ENABLED"
1589 db_vca_update
["_admin.detailed-status"] = "Connectivity: ok"
1591 operation_details
= "Edited"
1592 operation_state
= "COMPLETED"
1595 "Task vca_edit={} {}".format(
1596 vca_id
, "Done. Result: {}".format(operation_state
)
1600 except Exception as e
:
1601 error_msg
= "Failed with exception: {}".format(e
)
1602 self
.logger
.error("Task vca_edit={} {}".format(vca_id
, error_msg
))
1603 db_vca_update
["_admin.operationalState"] = "ERROR"
1604 db_vca_update
["_admin.detailed-status"] = error_msg
1605 operation_state
= "FAILED"
1606 operation_details
= error_msg
1609 self
.update_db_2("vca", vca_id
, db_vca_update
)
1611 # Register the operation and unlock
1612 self
.lcm_tasks
.unlock_HA(
1616 operationState
=operation_state
,
1617 detailed_status
=operation_details
,
1619 except DbException
as e
:
1621 "Task vca_edit={} {}".format(
1622 vca_id
, "Cannot update database: {}".format(e
)
1625 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1627 async def delete(self
, vca_content
, order_id
):
1628 # HA tasks and backward compatibility:
1629 # If "vim_content" does not include "op_id", we a running a legacy NBI version.
1630 # In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing.
1631 # Register "delete" task here for related future HA operations
1632 op_id
= vca_content
.pop("op_id", None)
1633 if not self
.lcm_tasks
.lock_HA("vca", "delete", op_id
):
1637 vca_id
= vca_content
["_id"]
1639 operation_state
= "FAILED"
1640 operation_details
= ""
1644 "Task vca_delete={} {}".format(vca_id
, "Deleting vca from db")
1646 self
.db
.del_one("vca", {"_id": vca_id
})
1647 db_vca_update
= None
1648 operation_details
= "deleted"
1649 operation_state
= "COMPLETED"
1652 "Task vca_delete={} {}".format(
1653 vca_id
, "Done. Result: {}".format(operation_state
)
1656 except Exception as e
:
1657 error_msg
= "Failed with exception: {}".format(e
)
1658 self
.logger
.error("Task vca_delete={} {}".format(vca_id
, error_msg
))
1659 db_vca_update
["_admin.operationalState"] = "ERROR"
1660 db_vca_update
["_admin.detailed-status"] = error_msg
1661 operation_details
= error_msg
1664 self
.update_db_2("vca", vca_id
, db_vca_update
)
1665 self
.lcm_tasks
.unlock_HA(
1669 operationState
=operation_state
,
1670 detailed_status
=operation_details
,
1672 except DbException
as e
:
1674 "Task vca_delete={} {}".format(
1675 vca_id
, "Cannot update database: {}".format(e
)
1678 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1681 class K8sRepoLcm(LcmBase
):
1682 def __init__(self
, msg
, lcm_tasks
, config
):
1684 Init, Connect to database, filesystem storage, and messaging
1685 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1689 self
.logger
= logging
.getLogger("lcm.k8srepo")
1690 self
.lcm_tasks
= lcm_tasks
1691 self
.vca_config
= config
["VCA"]
1693 super().__init
__(msg
, self
.logger
)
1695 self
.k8srepo
= K8sHelmConnector(
1696 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1697 helm_command
=self
.vca_config
.get("helmpath"),
1704 async def create(self
, k8srepo_content
, order_id
):
1705 # HA tasks and backward compatibility:
1706 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1707 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1708 # Register 'create' task here for related future HA operations
1710 op_id
= k8srepo_content
.pop("op_id", None)
1711 if not self
.lcm_tasks
.lock_HA("k8srepo", "create", op_id
):
1714 k8srepo_id
= k8srepo_content
.get("_id")
1715 logging_text
= "Task k8srepo_create={} ".format(k8srepo_id
)
1716 self
.logger
.debug(logging_text
+ "Enter")
1719 db_k8srepo_update
= {}
1721 operation_state
= "COMPLETED"
1722 operation_details
= ""
1724 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1725 self
.logger
.debug(logging_text
+ step
)
1726 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1727 db_k8srepo_update
["_admin.operationalState"] = "ENABLED"
1728 except Exception as e
:
1730 logging_text
+ "Exit Exception {}".format(e
),
1731 exc_info
=not isinstance(
1738 asyncio
.CancelledError
,
1744 if exc
and db_k8srepo
:
1745 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1746 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1749 # Mark the WIM 'create' HA task as erroneous
1750 operation_state
= "FAILED"
1751 operation_details
= "ERROR {}: {}".format(step
, exc
)
1753 if db_k8srepo_update
:
1754 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1755 # Register the K8srepo 'create' HA task either
1756 # succesful or erroneous, or do nothing (if legacy NBI)
1757 self
.lcm_tasks
.unlock_HA(
1761 operationState
=operation_state
,
1762 detailed_status
=operation_details
,
1764 except DbException
as e
:
1765 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1766 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)
1768 async def delete(self
, k8srepo_content
, order_id
):
1769 # HA tasks and backward compatibility:
1770 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1771 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1772 # Register 'delete' task here for related future HA operations
1773 op_id
= k8srepo_content
.pop("op_id", None)
1774 if not self
.lcm_tasks
.lock_HA("k8srepo", "delete", op_id
):
1777 k8srepo_id
= k8srepo_content
.get("_id")
1778 logging_text
= "Task k8srepo_delete={} ".format(k8srepo_id
)
1779 self
.logger
.debug(logging_text
+ "Enter")
1782 db_k8srepo_update
= {}
1785 operation_state
= "COMPLETED"
1786 operation_details
= ""
1788 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1789 self
.logger
.debug(logging_text
+ step
)
1790 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1792 except Exception as e
:
1794 logging_text
+ "Exit Exception {}".format(e
),
1795 exc_info
=not isinstance(
1802 asyncio
.CancelledError
,
1808 if exc
and db_k8srepo
:
1809 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1810 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1813 # Mark the WIM 'create' HA task as erroneous
1814 operation_state
= "FAILED"
1815 operation_details
= "ERROR {}: {}".format(step
, exc
)
1817 if db_k8srepo_update
:
1818 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1819 # Register the K8srepo 'delete' HA task either
1820 # succesful or erroneous, or do nothing (if legacy NBI)
1821 self
.lcm_tasks
.unlock_HA(
1825 operationState
=operation_state
,
1826 detailed_status
=operation_details
,
1828 self
.db
.del_one("k8srepos", {"_id": k8srepo_id
})
1829 except DbException
as e
:
1830 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1831 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)