1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
23 from osm_lcm
import ROclient
24 from osm_lcm
.lcm_utils
import LcmException
, LcmBase
, deep_get
25 from n2vc
.k8s_helm_conn
import K8sHelmConnector
26 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
27 from n2vc
.k8s_juju_conn
import K8sJujuConnector
28 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
29 from n2vc
.exceptions
import K8sException
, N2VCException
30 from osm_common
.dbbase
import DbException
31 from copy
import deepcopy
34 __author__
= "Alfonso Tierno"
37 class VimLcm(LcmBase
):
38 # values that are encrypted at vim config because they are passwords
39 vim_config_encrypted
= {
40 "1.1": ("admin_password", "nsx_password", "vcenter_password"),
49 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
51 Init, Connect to database, filesystem storage, and messaging
52 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
56 self
.logger
= logging
.getLogger("lcm.vim")
58 self
.lcm_tasks
= lcm_tasks
59 self
.ro_config
= config
["ro_config"]
61 super().__init
__(msg
, self
.logger
)
63 async def create(self
, vim_content
, order_id
):
65 # HA tasks and backward compatibility:
66 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
67 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
68 # Register 'create' task here for related future HA operations
69 op_id
= vim_content
.pop("op_id", None)
70 if not self
.lcm_tasks
.lock_HA("vim", "create", op_id
):
73 vim_id
= vim_content
["_id"]
74 logging_text
= "Task vim_create={} ".format(vim_id
)
75 self
.logger
.debug(logging_text
+ "Enter")
82 step
= "Getting vim-id='{}' from db".format(vim_id
)
83 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
84 if vim_content
.get("config") and vim_content
["config"].get(
87 step
= "Getting sdn-controller-id='{}' from db".format(
88 vim_content
["config"]["sdn-controller"]
90 db_sdn
= self
.db
.get_one(
91 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
94 # If the VIM account has an associated SDN account, also
95 # wait for any previous tasks in process for the SDN
96 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
100 and db_sdn
["_admin"].get("deployed")
101 and db_sdn
["_admin"]["deployed"].get("RO")
103 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
106 "sdn-controller={} is not available. Not deployed at RO".format(
107 vim_content
["config"]["sdn-controller"]
111 step
= "Creating vim at RO"
112 db_vim_update
["_admin.deployed.RO"] = None
113 db_vim_update
["_admin.detailed-status"] = step
114 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
115 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
116 vim_RO
= deepcopy(vim_content
)
117 vim_RO
.pop("_id", None)
118 vim_RO
.pop("_admin", None)
119 schema_version
= vim_RO
.pop("schema_version", None)
120 vim_RO
.pop("schema_type", None)
121 vim_RO
.pop("vim_tenant_name", None)
122 vim_RO
["type"] = vim_RO
.pop("vim_type")
123 vim_RO
.pop("vim_user", None)
124 vim_RO
.pop("vim_password", None)
126 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
127 desc
= await RO
.create("vim", descriptor
=vim_RO
)
128 RO_vim_id
= desc
["uuid"]
129 db_vim_update
["_admin.deployed.RO"] = RO_vim_id
131 logging_text
+ "VIM created at RO_vim_id={}".format(RO_vim_id
)
134 step
= "Creating vim_account at RO"
135 db_vim_update
["_admin.detailed-status"] = step
136 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
138 if vim_content
.get("vim_password"):
139 vim_content
["vim_password"] = self
.db
.decrypt(
140 vim_content
["vim_password"],
141 schema_version
=schema_version
,
145 "vim_tenant_name": vim_content
["vim_tenant_name"],
146 "vim_username": vim_content
["vim_user"],
147 "vim_password": vim_content
["vim_password"],
149 if vim_RO
.get("config"):
150 vim_account_RO
["config"] = vim_RO
["config"]
151 if "sdn-controller" in vim_account_RO
["config"]:
152 del vim_account_RO
["config"]["sdn-controller"]
153 if "sdn-port-mapping" in vim_account_RO
["config"]:
154 del vim_account_RO
["config"]["sdn-port-mapping"]
155 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
157 ) or self
.vim_config_encrypted
.get("default")
158 for p
in vim_config_encrypted_keys
:
159 if vim_account_RO
["config"].get(p
):
160 vim_account_RO
["config"][p
] = self
.db
.decrypt(
161 vim_account_RO
["config"][p
],
162 schema_version
=schema_version
,
166 desc
= await RO
.attach("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
167 db_vim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
168 db_vim_update
["_admin.operationalState"] = "ENABLED"
169 db_vim_update
["_admin.detailed-status"] = "Done"
170 # Mark the VIM 'create' HA task as successful
171 operation_state
= "COMPLETED"
172 operation_details
= "Done"
176 + "Exit Ok VIM account created at RO_vim_account_id={}".format(
182 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
183 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
185 except Exception as e
:
186 self
.logger
.critical(
187 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
192 db_vim_update
["_admin.operationalState"] = "ERROR"
193 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
196 # Mark the VIM 'create' HA task as erroneous
197 operation_state
= "FAILED"
198 operation_details
= "ERROR {}: {}".format(step
, exc
)
201 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
202 # Register the VIM 'create' HA task either
203 # succesful or erroneous, or do nothing (if legacy NBI)
204 self
.lcm_tasks
.unlock_HA(
208 operationState
=operation_state
,
209 detailed_status
=operation_details
,
211 except DbException
as e
:
212 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
214 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
216 async def edit(self
, vim_content
, order_id
):
218 # HA tasks and backward compatibility:
219 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
220 # In such a case, HA is not supported by NBI, and the HA check always returns True
221 op_id
= vim_content
.pop("op_id", None)
222 if not self
.lcm_tasks
.lock_HA("vim", "edit", op_id
):
225 vim_id
= vim_content
["_id"]
226 logging_text
= "Task vim_edit={} ".format(vim_id
)
227 self
.logger
.debug(logging_text
+ "Enter")
234 step
= "Getting vim-id='{}' from db".format(vim_id
)
236 # wait for any previous tasks in process
237 await self
.lcm_tasks
.waitfor_related_HA("vim", "edit", op_id
)
239 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
243 and db_vim
["_admin"].get("deployed")
244 and db_vim
["_admin"]["deployed"].get("RO")
246 if vim_content
.get("config") and vim_content
["config"].get(
249 step
= "Getting sdn-controller-id='{}' from db".format(
250 vim_content
["config"]["sdn-controller"]
252 db_sdn
= self
.db
.get_one(
253 "sdns", {"_id": vim_content
["config"]["sdn-controller"]}
256 # If the VIM account has an associated SDN account, also
257 # wait for any previous tasks in process for the SDN
258 await self
.lcm_tasks
.waitfor_related_HA("sdn", "ANY", db_sdn
["_id"])
262 and db_sdn
["_admin"].get("deployed")
263 and db_sdn
["_admin"]["deployed"].get("RO")
265 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
268 "sdn-controller={} is not available. Not deployed at RO".format(
269 vim_content
["config"]["sdn-controller"]
273 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
274 step
= "Editing vim at RO"
275 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
276 vim_RO
= deepcopy(vim_content
)
277 vim_RO
.pop("_id", None)
278 vim_RO
.pop("_admin", None)
279 schema_version
= vim_RO
.pop("schema_version", None)
280 vim_RO
.pop("schema_type", None)
281 vim_RO
.pop("vim_tenant_name", None)
282 if "vim_type" in vim_RO
:
283 vim_RO
["type"] = vim_RO
.pop("vim_type")
284 vim_RO
.pop("vim_user", None)
285 vim_RO
.pop("vim_password", None)
287 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
288 # TODO make a deep update of sdn-port-mapping
290 await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
292 step
= "Editing vim-account at RO tenant"
294 if "config" in vim_content
:
295 if "sdn-controller" in vim_content
["config"]:
296 del vim_content
["config"]["sdn-controller"]
297 if "sdn-port-mapping" in vim_content
["config"]:
298 del vim_content
["config"]["sdn-port-mapping"]
299 if not vim_content
["config"]:
300 del vim_content
["config"]
301 if "vim_tenant_name" in vim_content
:
302 vim_account_RO
["vim_tenant_name"] = vim_content
["vim_tenant_name"]
303 if "vim_password" in vim_content
:
304 vim_account_RO
["vim_password"] = vim_content
["vim_password"]
305 if vim_content
.get("vim_password"):
306 vim_account_RO
["vim_password"] = self
.db
.decrypt(
307 vim_content
["vim_password"],
308 schema_version
=schema_version
,
311 if "config" in vim_content
:
312 vim_account_RO
["config"] = vim_content
["config"]
313 if vim_content
.get("config"):
314 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(
316 ) or self
.vim_config_encrypted
.get("default")
317 for p
in vim_config_encrypted_keys
:
318 if vim_content
["config"].get(p
):
319 vim_account_RO
["config"][p
] = self
.db
.decrypt(
320 vim_content
["config"][p
],
321 schema_version
=schema_version
,
325 if "vim_user" in vim_content
:
326 vim_content
["vim_username"] = vim_content
["vim_user"]
327 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
328 # vim_thread. RO will remove and relaunch a new thread for this vim_account
329 await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
330 db_vim_update
["_admin.operationalState"] = "ENABLED"
331 # Mark the VIM 'edit' HA task as successful
332 operation_state
= "COMPLETED"
333 operation_details
= "Done"
335 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id={}".format(RO_vim_id
))
338 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
339 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
341 except Exception as e
:
342 self
.logger
.critical(
343 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
348 db_vim_update
["_admin.operationalState"] = "ERROR"
349 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
352 # Mark the VIM 'edit' HA task as erroneous
353 operation_state
= "FAILED"
354 operation_details
= "ERROR {}: {}".format(step
, exc
)
357 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
358 # Register the VIM 'edit' HA task either
359 # succesful or erroneous, or do nothing (if legacy NBI)
360 self
.lcm_tasks
.unlock_HA(
364 operationState
=operation_state
,
365 detailed_status
=operation_details
,
367 except DbException
as e
:
368 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
370 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
372 async def delete(self
, vim_content
, order_id
):
374 # HA tasks and backward compatibility:
375 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
376 # In such a case, HA is not supported by NBI, and the HA check always returns True
377 op_id
= vim_content
.pop("op_id", None)
378 if not self
.lcm_tasks
.lock_HA("vim", "delete", op_id
):
381 vim_id
= vim_content
["_id"]
382 logging_text
= "Task vim_delete={} ".format(vim_id
)
383 self
.logger
.debug(logging_text
+ "Enter")
388 step
= "Getting vim from db"
390 # wait for any previous tasks in process
391 await self
.lcm_tasks
.waitfor_related_HA("vim", "delete", op_id
)
392 if not self
.ro_config
.get("ng"):
393 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
396 and db_vim
["_admin"].get("deployed")
397 and db_vim
["_admin"]["deployed"].get("RO")
399 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
400 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
401 step
= "Detaching vim from RO tenant"
403 await RO
.detach("vim_account", RO_vim_id
)
404 except ROclient
.ROClientException
as e
:
405 if e
.http_code
== 404: # not found
408 + "RO_vim_id={} already detached".format(RO_vim_id
)
413 step
= "Deleting vim from RO"
415 await RO
.delete("vim", RO_vim_id
)
416 except ROclient
.ROClientException
as e
:
417 if e
.http_code
== 404: # not found
420 + "RO_vim_id={} already deleted".format(RO_vim_id
)
426 self
.logger
.debug(logging_text
+ "Nothing to remove at RO")
427 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
429 self
.logger
.debug(logging_text
+ "Exit Ok")
432 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
433 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
435 except Exception as e
:
436 self
.logger
.critical(
437 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
441 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
443 db_vim_update
["_admin.operationalState"] = "ERROR"
444 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
447 # Mark the VIM 'delete' HA task as erroneous
448 operation_state
= "FAILED"
449 operation_details
= "ERROR {}: {}".format(step
, exc
)
450 self
.lcm_tasks
.unlock_HA(
454 operationState
=operation_state
,
455 detailed_status
=operation_details
,
458 if db_vim
and db_vim_update
:
459 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
460 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
461 # which means that there is nowhere to register this task, so do nothing here.
462 except DbException
as e
:
463 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
464 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
467 class WimLcm(LcmBase
):
468 # values that are encrypted at wim config because they are passwords
469 wim_config_encrypted
= ()
471 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
473 Init, Connect to database, filesystem storage, and messaging
474 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
478 self
.logger
= logging
.getLogger("lcm.vim")
480 self
.lcm_tasks
= lcm_tasks
481 self
.ro_config
= config
["ro_config"]
483 super().__init
__(msg
, self
.logger
)
485 async def create(self
, wim_content
, order_id
):
487 # HA tasks and backward compatibility:
488 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
489 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
490 # Register 'create' task here for related future HA operations
491 op_id
= wim_content
.pop("op_id", None)
492 self
.lcm_tasks
.lock_HA("wim", "create", op_id
)
494 wim_id
= wim_content
["_id"]
495 logging_text
= "Task wim_create={} ".format(wim_id
)
496 self
.logger
.debug(logging_text
+ "Enter")
502 step
= "Getting wim-id='{}' from db".format(wim_id
)
503 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
504 db_wim_update
["_admin.deployed.RO"] = None
506 step
= "Creating wim at RO"
507 db_wim_update
["_admin.detailed-status"] = step
508 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
509 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
510 wim_RO
= deepcopy(wim_content
)
511 wim_RO
.pop("_id", None)
512 wim_RO
.pop("_admin", None)
513 schema_version
= wim_RO
.pop("schema_version", None)
514 wim_RO
.pop("schema_type", None)
515 wim_RO
.pop("wim_tenant_name", None)
516 wim_RO
["type"] = wim_RO
.pop("wim_type")
517 wim_RO
.pop("wim_user", None)
518 wim_RO
.pop("wim_password", None)
519 desc
= await RO
.create("wim", descriptor
=wim_RO
)
520 RO_wim_id
= desc
["uuid"]
521 db_wim_update
["_admin.deployed.RO"] = RO_wim_id
523 logging_text
+ "WIM created at RO_wim_id={}".format(RO_wim_id
)
526 step
= "Creating wim_account at RO"
527 db_wim_update
["_admin.detailed-status"] = step
528 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
530 if wim_content
.get("wim_password"):
531 wim_content
["wim_password"] = self
.db
.decrypt(
532 wim_content
["wim_password"],
533 schema_version
=schema_version
,
537 "name": wim_content
["name"],
538 "user": wim_content
["user"],
539 "password": wim_content
["password"],
541 if wim_RO
.get("config"):
542 wim_account_RO
["config"] = wim_RO
["config"]
543 if "wim_port_mapping" in wim_account_RO
["config"]:
544 del wim_account_RO
["config"]["wim_port_mapping"]
545 for p
in self
.wim_config_encrypted
:
546 if wim_account_RO
["config"].get(p
):
547 wim_account_RO
["config"][p
] = self
.db
.decrypt(
548 wim_account_RO
["config"][p
],
549 schema_version
=schema_version
,
553 desc
= await RO
.attach("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
554 db_wim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
555 db_wim_update
["_admin.operationalState"] = "ENABLED"
556 db_wim_update
["_admin.detailed-status"] = "Done"
557 # Mark the WIM 'create' HA task as successful
558 operation_state
= "COMPLETED"
559 operation_details
= "Done"
563 + "Exit Ok WIM account created at RO_wim_account_id={}".format(
569 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
570 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
572 except Exception as e
:
573 self
.logger
.critical(
574 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
579 db_wim_update
["_admin.operationalState"] = "ERROR"
580 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
583 # Mark the WIM 'create' HA task as erroneous
584 operation_state
= "FAILED"
585 operation_details
= "ERROR {}: {}".format(step
, exc
)
588 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
589 # Register the WIM 'create' HA task either
590 # succesful or erroneous, or do nothing (if legacy NBI)
591 self
.lcm_tasks
.unlock_HA(
595 operationState
=operation_state
,
596 detailed_status
=operation_details
,
598 except DbException
as e
:
599 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
600 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
602 async def edit(self
, wim_content
, order_id
):
604 # HA tasks and backward compatibility:
605 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
606 # In such a case, HA is not supported by NBI, and the HA check always returns True
607 op_id
= wim_content
.pop("op_id", None)
608 if not self
.lcm_tasks
.lock_HA("wim", "edit", op_id
):
611 wim_id
= wim_content
["_id"]
612 logging_text
= "Task wim_edit={} ".format(wim_id
)
613 self
.logger
.debug(logging_text
+ "Enter")
619 step
= "Getting wim-id='{}' from db".format(wim_id
)
621 # wait for any previous tasks in process
622 await self
.lcm_tasks
.waitfor_related_HA("wim", "edit", op_id
)
624 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
628 and db_wim
["_admin"].get("deployed")
629 and db_wim
["_admin"]["deployed"].get("RO")
632 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
633 step
= "Editing wim at RO"
634 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
635 wim_RO
= deepcopy(wim_content
)
636 wim_RO
.pop("_id", None)
637 wim_RO
.pop("_admin", None)
638 schema_version
= wim_RO
.pop("schema_version", None)
639 wim_RO
.pop("schema_type", None)
640 wim_RO
.pop("wim_tenant_name", None)
641 if "wim_type" in wim_RO
:
642 wim_RO
["type"] = wim_RO
.pop("wim_type")
643 wim_RO
.pop("wim_user", None)
644 wim_RO
.pop("wim_password", None)
645 # TODO make a deep update of wim_port_mapping
647 await RO
.edit("wim", RO_wim_id
, descriptor
=wim_RO
)
649 step
= "Editing wim-account at RO tenant"
651 if "config" in wim_content
:
652 if "wim_port_mapping" in wim_content
["config"]:
653 del wim_content
["config"]["wim_port_mapping"]
654 if not wim_content
["config"]:
655 del wim_content
["config"]
656 if "wim_tenant_name" in wim_content
:
657 wim_account_RO
["wim_tenant_name"] = wim_content
["wim_tenant_name"]
658 if "wim_password" in wim_content
:
659 wim_account_RO
["wim_password"] = wim_content
["wim_password"]
660 if wim_content
.get("wim_password"):
661 wim_account_RO
["wim_password"] = self
.db
.decrypt(
662 wim_content
["wim_password"],
663 schema_version
=schema_version
,
666 if "config" in wim_content
:
667 wim_account_RO
["config"] = wim_content
["config"]
668 if wim_content
.get("config"):
669 for p
in self
.wim_config_encrypted
:
670 if wim_content
["config"].get(p
):
671 wim_account_RO
["config"][p
] = self
.db
.decrypt(
672 wim_content
["config"][p
],
673 schema_version
=schema_version
,
677 if "wim_user" in wim_content
:
678 wim_content
["wim_username"] = wim_content
["wim_user"]
679 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
680 # wim_thread. RO will remove and relaunch a new thread for this wim_account
681 await RO
.edit("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
682 db_wim_update
["_admin.operationalState"] = "ENABLED"
683 # Mark the WIM 'edit' HA task as successful
684 operation_state
= "COMPLETED"
685 operation_details
= "Done"
687 self
.logger
.debug(logging_text
+ "Exit Ok RO_wim_id={}".format(RO_wim_id
))
690 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
691 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
693 except Exception as e
:
694 self
.logger
.critical(
695 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
700 db_wim_update
["_admin.operationalState"] = "ERROR"
701 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
704 # Mark the WIM 'edit' HA task as erroneous
705 operation_state
= "FAILED"
706 operation_details
= "ERROR {}: {}".format(step
, exc
)
709 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
710 # Register the WIM 'edit' HA task either
711 # succesful or erroneous, or do nothing (if legacy NBI)
712 self
.lcm_tasks
.unlock_HA(
716 operationState
=operation_state
,
717 detailed_status
=operation_details
,
719 except DbException
as e
:
720 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
721 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
723 async def delete(self
, wim_content
, order_id
):
725 # HA tasks and backward compatibility:
726 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
727 # In such a case, HA is not supported by NBI, and the HA check always returns True
728 op_id
= wim_content
.pop("op_id", None)
729 if not self
.lcm_tasks
.lock_HA("wim", "delete", op_id
):
732 wim_id
= wim_content
["_id"]
733 logging_text
= "Task wim_delete={} ".format(wim_id
)
734 self
.logger
.debug(logging_text
+ "Enter")
739 step
= "Getting wim from db"
741 # wait for any previous tasks in process
742 await self
.lcm_tasks
.waitfor_related_HA("wim", "delete", op_id
)
744 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
747 and db_wim
["_admin"].get("deployed")
748 and db_wim
["_admin"]["deployed"].get("RO")
750 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
751 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
752 step
= "Detaching wim from RO tenant"
754 await RO
.detach("wim_account", RO_wim_id
)
755 except ROclient
.ROClientException
as e
:
756 if e
.http_code
== 404: # not found
759 + "RO_wim_id={} already detached".format(RO_wim_id
)
764 step
= "Deleting wim from RO"
766 await RO
.delete("wim", RO_wim_id
)
767 except ROclient
.ROClientException
as e
:
768 if e
.http_code
== 404: # not found
771 + "RO_wim_id={} already deleted".format(RO_wim_id
)
777 self
.logger
.error(logging_text
+ "Nothing to remove at RO")
778 self
.db
.del_one("wim_accounts", {"_id": wim_id
})
780 self
.logger
.debug(logging_text
+ "Exit Ok")
783 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
784 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
786 except Exception as e
:
787 self
.logger
.critical(
788 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
792 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
794 db_wim_update
["_admin.operationalState"] = "ERROR"
795 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
798 # Mark the WIM 'delete' HA task as erroneous
799 operation_state
= "FAILED"
800 operation_details
= "ERROR {}: {}".format(step
, exc
)
801 self
.lcm_tasks
.unlock_HA(
805 operationState
=operation_state
,
806 detailed_status
=operation_details
,
809 if db_wim
and db_wim_update
:
810 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
811 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
812 # which means that there is nowhere to register this task, so do nothing here.
813 except DbException
as e
:
814 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
815 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
818 class SdnLcm(LcmBase
):
819 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
821 Init, Connect to database, filesystem storage, and messaging
822 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
826 self
.logger
= logging
.getLogger("lcm.sdn")
828 self
.lcm_tasks
= lcm_tasks
829 self
.ro_config
= config
["ro_config"]
831 super().__init
__(msg
, self
.logger
)
833 async def create(self
, sdn_content
, order_id
):
835 # HA tasks and backward compatibility:
836 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
837 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
838 # Register 'create' task here for related future HA operations
839 op_id
= sdn_content
.pop("op_id", None)
840 self
.lcm_tasks
.lock_HA("sdn", "create", op_id
)
842 sdn_id
= sdn_content
["_id"]
843 logging_text
= "Task sdn_create={} ".format(sdn_id
)
844 self
.logger
.debug(logging_text
+ "Enter")
851 step
= "Getting sdn from db"
852 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
853 db_sdn_update
["_admin.deployed.RO"] = None
855 step
= "Creating sdn at RO"
856 db_sdn_update
["_admin.detailed-status"] = step
857 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
859 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
860 sdn_RO
= deepcopy(sdn_content
)
861 sdn_RO
.pop("_id", None)
862 sdn_RO
.pop("_admin", None)
863 schema_version
= sdn_RO
.pop("schema_version", None)
864 sdn_RO
.pop("schema_type", None)
865 sdn_RO
.pop("description", None)
866 if sdn_RO
.get("password"):
867 sdn_RO
["password"] = self
.db
.decrypt(
868 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
871 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
872 RO_sdn_id
= desc
["uuid"]
873 db_sdn_update
["_admin.deployed.RO"] = RO_sdn_id
874 db_sdn_update
["_admin.operationalState"] = "ENABLED"
875 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
876 # Mark the SDN 'create' HA task as successful
877 operation_state
= "COMPLETED"
878 operation_details
= "Done"
881 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
882 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
884 except Exception as e
:
885 self
.logger
.critical(
886 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
891 db_sdn_update
["_admin.operationalState"] = "ERROR"
892 db_sdn_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
895 # Mark the SDN 'create' HA task as erroneous
896 operation_state
= "FAILED"
897 operation_details
= "ERROR {}: {}".format(step
, exc
)
899 if db_sdn
and db_sdn_update
:
900 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
901 # Register the SDN 'create' HA task either
902 # succesful or erroneous, or do nothing (if legacy NBI)
903 self
.lcm_tasks
.unlock_HA(
907 operationState
=operation_state
,
908 detailed_status
=operation_details
,
910 except DbException
as e
:
911 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
912 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
914 async def edit(self
, sdn_content
, order_id
):
916 # HA tasks and backward compatibility:
917 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
918 # In such a case, HA is not supported by NBI, and the HA check always returns True
919 op_id
= sdn_content
.pop("op_id", None)
920 if not self
.lcm_tasks
.lock_HA("sdn", "edit", op_id
):
923 sdn_id
= sdn_content
["_id"]
924 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
925 self
.logger
.debug(logging_text
+ "Enter")
930 step
= "Getting sdn from db"
932 # wait for any previous tasks in process
933 await self
.lcm_tasks
.waitfor_related_HA("sdn", "edit", op_id
)
935 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
939 and db_sdn
["_admin"].get("deployed")
940 and db_sdn
["_admin"]["deployed"].get("RO")
942 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
943 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
944 step
= "Editing sdn at RO"
945 sdn_RO
= deepcopy(sdn_content
)
946 sdn_RO
.pop("_id", None)
947 sdn_RO
.pop("_admin", None)
948 schema_version
= sdn_RO
.pop("schema_version", None)
949 sdn_RO
.pop("schema_type", None)
950 sdn_RO
.pop("description", None)
951 if sdn_RO
.get("password"):
952 sdn_RO
["password"] = self
.db
.decrypt(
953 sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
956 await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
957 db_sdn_update
["_admin.operationalState"] = "ENABLED"
958 # Mark the SDN 'edit' HA task as successful
959 operation_state
= "COMPLETED"
960 operation_details
= "Done"
962 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
965 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
966 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
968 except Exception as e
:
969 self
.logger
.critical(
970 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
975 db_sdn
["_admin.operationalState"] = "ERROR"
976 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
977 # Mark the SDN 'edit' HA task as erroneous
978 operation_state
= "FAILED"
979 operation_details
= "ERROR {}: {}".format(step
, exc
)
982 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
983 # Register the SDN 'edit' HA task either
984 # succesful or erroneous, or do nothing (if legacy NBI)
985 self
.lcm_tasks
.unlock_HA(
989 operationState
=operation_state
,
990 detailed_status
=operation_details
,
992 except DbException
as e
:
993 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
994 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
996 async def delete(self
, sdn_content
, order_id
):
998 # HA tasks and backward compatibility:
999 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1000 # In such a case, HA is not supported by NBI, and the HA check always returns True
1001 op_id
= sdn_content
.pop("op_id", None)
1002 if not self
.lcm_tasks
.lock_HA("sdn", "delete", op_id
):
1005 sdn_id
= sdn_content
["_id"]
1006 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
1007 self
.logger
.debug(logging_text
+ "Enter")
1012 step
= "Getting sdn from db"
1014 # wait for any previous tasks in process
1015 await self
.lcm_tasks
.waitfor_related_HA("sdn", "delete", op_id
)
1017 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
1019 db_sdn
.get("_admin")
1020 and db_sdn
["_admin"].get("deployed")
1021 and db_sdn
["_admin"]["deployed"].get("RO")
1023 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
1024 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
1025 step
= "Deleting sdn from RO"
1027 await RO
.delete("sdn", RO_sdn_id
)
1028 except ROclient
.ROClientException
as e
:
1029 if e
.http_code
== 404: # not found
1032 + "RO_sdn_id={} already deleted".format(RO_sdn_id
)
1039 logging_text
+ "Skipping. There is not RO information at database"
1041 self
.db
.del_one("sdns", {"_id": sdn_id
})
1043 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
1046 except (ROclient
.ROClientException
, DbException
, asyncio
.CancelledError
) as e
:
1047 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1049 except Exception as e
:
1050 self
.logger
.critical(
1051 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1056 db_sdn
["_admin.operationalState"] = "ERROR"
1057 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1058 # Mark the SDN 'delete' HA task as erroneous
1059 operation_state
= "FAILED"
1060 operation_details
= "ERROR {}: {}".format(step
, exc
)
1061 self
.lcm_tasks
.unlock_HA(
1065 operationState
=operation_state
,
1066 detailed_status
=operation_details
,
1069 if db_sdn
and db_sdn_update
:
1070 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
1071 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
1072 # which means that there is nowhere to register this task, so do nothing here.
1073 except DbException
as e
:
1074 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1075 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
1078 class K8sClusterLcm(LcmBase
):
1079 timeout_create
= 300
1081 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1083 Init, Connect to database, filesystem storage, and messaging
1084 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1088 self
.logger
= logging
.getLogger("lcm.k8scluster")
1090 self
.lcm_tasks
= lcm_tasks
1091 self
.vca_config
= config
["VCA"]
1093 super().__init
__(msg
, self
.logger
)
1095 self
.helm2_k8scluster
= K8sHelmConnector(
1096 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1097 helm_command
=self
.vca_config
.get("helmpath"),
1104 self
.helm3_k8scluster
= K8sHelm3Connector(
1105 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1106 helm_command
=self
.vca_config
.get("helm3path"),
1113 self
.juju_k8scluster
= K8sJujuConnector(
1114 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1115 juju_command
=self
.vca_config
.get("jujupath"),
1124 "helm-chart": self
.helm2_k8scluster
,
1125 "helm-chart-v3": self
.helm3_k8scluster
,
1126 "juju-bundle": self
.juju_k8scluster
,
1129 async def create(self
, k8scluster_content
, order_id
):
1131 op_id
= k8scluster_content
.pop("op_id", None)
1132 if not self
.lcm_tasks
.lock_HA("k8scluster", "create", op_id
):
1135 k8scluster_id
= k8scluster_content
["_id"]
1136 logging_text
= "Task k8scluster_create={} ".format(k8scluster_id
)
1137 self
.logger
.debug(logging_text
+ "Enter")
1139 db_k8scluster
= None
1140 db_k8scluster_update
= {}
1143 step
= "Getting k8scluster-id='{}' from db".format(k8scluster_id
)
1144 self
.logger
.debug(logging_text
+ step
)
1145 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1146 self
.db
.encrypt_decrypt_fields(
1147 db_k8scluster
.get("credentials"),
1149 ["password", "secret"],
1150 schema_version
=db_k8scluster
["schema_version"],
1151 salt
=db_k8scluster
["_id"],
1153 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
1156 init_target
= deep_get(db_k8scluster
, ("_admin", "init"))
1157 step
= "Launching k8scluster init tasks"
1158 for task_name
in ("helm-chart", "juju-bundle", "helm-chart-v3"):
1159 if init_target
and task_name
not in init_target
:
1161 task
= asyncio
.ensure_future(
1162 self
.k8s_map
[task_name
].init_env(
1164 reuse_cluster_uuid
=k8scluster_id
,
1165 vca_id
=db_k8scluster
.get("vca_id"),
1168 pending_tasks
.append(task
)
1169 task2name
[task
] = task_name
1171 error_text_list
= []
1173 reached_timeout
= False
1176 while pending_tasks
:
1178 1, self
.timeout_create
- (time() - now
)
1179 ) # ensure not negative with max
1180 step
= "Waiting for k8scluster init tasks"
1181 done
, pending_tasks
= await asyncio
.wait(
1182 pending_tasks
, timeout
=_timeout
, return_when
=asyncio
.FIRST_COMPLETED
1185 # timeout. Set timeout is reached and process pending as if they hase been finished
1186 done
= pending_tasks
1187 pending_tasks
= None
1188 reached_timeout
= True
1190 task_name
= task2name
[task
]
1193 elif task
.cancelled():
1196 exc
= task
.exception()
1199 error_text_list
.append(
1200 "Failing init {}: {}".format(task_name
, exc
)
1202 db_k8scluster_update
[
1203 "_admin.{}.error_msg".format(task_name
)
1205 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = None
1206 db_k8scluster_update
[
1207 "_admin.{}.operationalState".format(task_name
)
1210 logging_text
+ "{} init fail: {}".format(task_name
, exc
),
1211 exc_info
=not isinstance(exc
, (N2VCException
, str)),
1214 k8s_id
, uninstall_sw
= task
.result()
1215 tasks_name_ok
.append(task_name
)
1218 + "{} init success. id={} created={}".format(
1219 task_name
, k8s_id
, uninstall_sw
1222 db_k8scluster_update
[
1223 "_admin.{}.error_msg".format(task_name
)
1225 db_k8scluster_update
["_admin.{}.id".format(task_name
)] = k8s_id
1226 db_k8scluster_update
[
1227 "_admin.{}.created".format(task_name
)
1229 db_k8scluster_update
[
1230 "_admin.{}.operationalState".format(task_name
)
1233 step
= "Updating database for " + task_name
1234 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1236 operation_details
= "ready for " + ", ".join(tasks_name_ok
)
1237 operation_state
= "COMPLETED"
1238 db_k8scluster_update
["_admin.operationalState"] = (
1239 "ENABLED" if not error_text_list
else "DEGRADED"
1241 operation_details
+= "; " + ";".join(error_text_list
)
1243 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1244 operation_state
= "FAILED"
1245 operation_details
= ";".join(error_text_list
)
1246 db_k8scluster_update
["_admin.detailed-status"] = operation_details
1247 self
.logger
.debug(logging_text
+ "Done. Result: " + operation_state
)
1250 except Exception as e
:
1258 asyncio
.CancelledError
,
1261 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1263 self
.logger
.critical(
1264 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1268 if exc
and db_k8scluster
:
1269 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1270 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1273 operation_state
= "FAILED"
1274 operation_details
= "ERROR {}: {}".format(step
, exc
)
1276 if db_k8scluster
and db_k8scluster_update
:
1277 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1279 # Register the operation and unlock
1280 self
.lcm_tasks
.unlock_HA(
1284 operationState
=operation_state
,
1285 detailed_status
=operation_details
,
1287 except DbException
as e
:
1288 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1289 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1291 async def delete(self
, k8scluster_content
, order_id
):
1293 # HA tasks and backward compatibility:
1294 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1295 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1296 # Register 'delete' task here for related future HA operations
1297 op_id
= k8scluster_content
.pop("op_id", None)
1298 if not self
.lcm_tasks
.lock_HA("k8scluster", "delete", op_id
):
1301 k8scluster_id
= k8scluster_content
["_id"]
1302 logging_text
= "Task k8scluster_delete={} ".format(k8scluster_id
)
1303 self
.logger
.debug(logging_text
+ "Enter")
1305 db_k8scluster
= None
1306 db_k8scluster_update
= {}
1309 step
= "Getting k8scluster='{}' from db".format(k8scluster_id
)
1310 self
.logger
.debug(logging_text
+ step
)
1311 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1312 k8s_hc_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart", "id"))
1313 k8s_h3c_id
= deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "id"))
1314 k8s_jb_id
= deep_get(db_k8scluster
, ("_admin", "juju-bundle", "id"))
1316 cluster_removed
= True
1317 if k8s_jb_id
: # delete in reverse order of creation
1318 step
= "Removing juju-bundle '{}'".format(k8s_jb_id
)
1320 deep_get(db_k8scluster
, ("_admin", "juju-bundle", "created"))
1323 cluster_removed
= await self
.juju_k8scluster
.reset(
1324 cluster_uuid
=k8s_jb_id
,
1325 uninstall_sw
=uninstall_sw
,
1326 vca_id
=db_k8scluster
.get("vca_id"),
1328 db_k8scluster_update
["_admin.juju-bundle.id"] = None
1329 db_k8scluster_update
["_admin.juju-bundle.operationalState"] = "DISABLED"
1332 step
= "Removing helm-chart '{}'".format(k8s_hc_id
)
1334 deep_get(db_k8scluster
, ("_admin", "helm-chart", "created"))
1337 cluster_removed
= await self
.helm2_k8scluster
.reset(
1338 cluster_uuid
=k8s_hc_id
, uninstall_sw
=uninstall_sw
1340 db_k8scluster_update
["_admin.helm-chart.id"] = None
1341 db_k8scluster_update
["_admin.helm-chart.operationalState"] = "DISABLED"
1344 step
= "Removing helm-chart-v3 '{}'".format(k8s_hc_id
)
1346 deep_get(db_k8scluster
, ("_admin", "helm-chart-v3", "created"))
1349 cluster_removed
= await self
.helm3_k8scluster
.reset(
1350 cluster_uuid
=k8s_h3c_id
, uninstall_sw
=uninstall_sw
1352 db_k8scluster_update
["_admin.helm-chart-v3.id"] = None
1353 db_k8scluster_update
[
1354 "_admin.helm-chart-v3.operationalState"
1357 # Try to remove from cluster_inserted to clean old versions
1358 if k8s_hc_id
and cluster_removed
:
1359 step
= "Removing k8scluster='{}' from k8srepos".format(k8scluster_id
)
1360 self
.logger
.debug(logging_text
+ step
)
1361 db_k8srepo_list
= self
.db
.get_list(
1362 "k8srepos", {"_admin.cluster-inserted": k8s_hc_id
}
1364 for k8srepo
in db_k8srepo_list
:
1366 cluster_list
= k8srepo
["_admin"]["cluster-inserted"]
1367 cluster_list
.remove(k8s_hc_id
)
1371 {"_admin.cluster-inserted": cluster_list
},
1373 except Exception as e
:
1374 self
.logger
.error("{}: {}".format(step
, e
))
1375 self
.db
.del_one("k8sclusters", {"_id": k8scluster_id
})
1376 db_k8scluster_update
= None
1377 self
.logger
.debug(logging_text
+ "Done")
1379 except Exception as e
:
1387 asyncio
.CancelledError
,
1390 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1392 self
.logger
.critical(
1393 logging_text
+ "Exit Exception {}".format(e
), exc_info
=True
1397 if exc
and db_k8scluster
:
1398 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1399 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1402 # Mark the WIM 'create' HA task as erroneous
1403 operation_state
= "FAILED"
1404 operation_details
= "ERROR {}: {}".format(step
, exc
)
1406 operation_state
= "COMPLETED"
1407 operation_details
= "deleted"
1410 if db_k8scluster_update
:
1411 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1412 # Register the K8scluster 'delete' HA task either
1413 # succesful or erroneous, or do nothing (if legacy NBI)
1414 self
.lcm_tasks
.unlock_HA(
1418 operationState
=operation_state
,
1419 detailed_status
=operation_details
,
1421 except DbException
as e
:
1422 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1423 self
.lcm_tasks
.remove("k8scluster", k8scluster_id
, order_id
)
1426 class VcaLcm(LcmBase
):
1429 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1431 Init, Connect to database, filesystem storage, and messaging
1432 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1436 self
.logger
= logging
.getLogger("lcm.vca")
1438 self
.lcm_tasks
= lcm_tasks
1440 super().__init
__(msg
, self
.logger
)
1442 # create N2VC connector
1443 self
.n2vc
= N2VCJujuConnector(
1444 log
=self
.logger
, loop
=self
.loop
, fs
=self
.fs
, db
=self
.db
1447 def _get_vca_by_id(self
, vca_id
: str) -> dict:
1448 db_vca
= self
.db
.get_one("vca", {"_id": vca_id
})
1449 self
.db
.encrypt_decrypt_fields(
1452 ["secret", "cacert"],
1453 schema_version
=db_vca
["schema_version"],
1458 async def create(self
, vca_content
, order_id
):
1459 op_id
= vca_content
.pop("op_id", None)
1460 if not self
.lcm_tasks
.lock_HA("vca", "create", op_id
):
1463 vca_id
= vca_content
["_id"]
1464 self
.logger
.debug("Task vca_create={} {}".format(vca_id
, "Enter"))
1471 "Task vca_create={} {}".format(vca_id
, "Getting vca from db")
1473 db_vca
= self
._get
_vca
_by
_id
(vca_id
)
1475 task
= asyncio
.ensure_future(
1477 self
.n2vc
.validate_vca(db_vca
["_id"]),
1478 timeout
=self
.timeout_create
,
1482 await asyncio
.wait([task
], return_when
=asyncio
.FIRST_COMPLETED
)
1483 if task
.exception():
1484 raise task
.exception()
1486 "Task vca_create={} {}".format(
1487 vca_id
, "vca registered and validated successfully"
1490 db_vca_update
["_admin.operationalState"] = "ENABLED"
1491 db_vca_update
["_admin.detailed-status"] = "Connectivity: ok"
1492 operation_details
= "VCA validated"
1493 operation_state
= "COMPLETED"
1496 "Task vca_create={} {}".format(
1497 vca_id
, "Done. Result: {}".format(operation_state
)
1501 except Exception as e
:
1502 error_msg
= "Failed with exception: {}".format(e
)
1503 self
.logger
.error("Task vca_create={} {}".format(vca_id
, error_msg
))
1504 db_vca_update
["_admin.operationalState"] = "ERROR"
1505 db_vca_update
["_admin.detailed-status"] = error_msg
1506 operation_state
= "FAILED"
1507 operation_details
= error_msg
1510 self
.update_db_2("vca", vca_id
, db_vca_update
)
1512 # Register the operation and unlock
1513 self
.lcm_tasks
.unlock_HA(
1517 operationState
=operation_state
,
1518 detailed_status
=operation_details
,
1520 except DbException
as e
:
1522 "Task vca_create={} {}".format(
1523 vca_id
, "Cannot update database: {}".format(e
)
1526 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1528 async def delete(self
, vca_content
, order_id
):
1530 # HA tasks and backward compatibility:
1531 # If "vim_content" does not include "op_id", we a running a legacy NBI version.
1532 # In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing.
1533 # Register "delete" task here for related future HA operations
1534 op_id
= vca_content
.pop("op_id", None)
1535 if not self
.lcm_tasks
.lock_HA("vca", "delete", op_id
):
1539 vca_id
= vca_content
["_id"]
1543 "Task vca_delete={} {}".format(vca_id
, "Deleting vca from db")
1545 self
.db
.del_one("vca", {"_id": vca_id
})
1546 db_vca_update
= None
1547 operation_details
= "deleted"
1548 operation_state
= "COMPLETED"
1551 "Task vca_delete={} {}".format(
1552 vca_id
, "Done. Result: {}".format(operation_state
)
1555 except Exception as e
:
1556 error_msg
= "Failed with exception: {}".format(e
)
1557 self
.logger
.error("Task vca_delete={} {}".format(vca_id
, error_msg
))
1558 db_vca_update
["_admin.operationalState"] = "ERROR"
1559 db_vca_update
["_admin.detailed-status"] = error_msg
1560 operation_state
= "FAILED"
1561 operation_details
= error_msg
1564 self
.update_db_2("vca", vca_id
, db_vca_update
)
1565 self
.lcm_tasks
.unlock_HA(
1569 operationState
=operation_state
,
1570 detailed_status
=operation_details
,
1572 except DbException
as e
:
1574 "Task vca_delete={} {}".format(
1575 vca_id
, "Cannot update database: {}".format(e
)
1578 self
.lcm_tasks
.remove("vca", vca_id
, order_id
)
1581 class K8sRepoLcm(LcmBase
):
1582 def __init__(self
, msg
, lcm_tasks
, config
, loop
):
1584 Init, Connect to database, filesystem storage, and messaging
1585 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1589 self
.logger
= logging
.getLogger("lcm.k8srepo")
1591 self
.lcm_tasks
= lcm_tasks
1592 self
.vca_config
= config
["VCA"]
1594 super().__init
__(msg
, self
.logger
)
1596 self
.k8srepo
= K8sHelmConnector(
1597 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1598 helm_command
=self
.vca_config
.get("helmpath"),
1605 async def create(self
, k8srepo_content
, order_id
):
1607 # HA tasks and backward compatibility:
1608 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1609 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1610 # Register 'create' task here for related future HA operations
1612 op_id
= k8srepo_content
.pop("op_id", None)
1613 if not self
.lcm_tasks
.lock_HA("k8srepo", "create", op_id
):
1616 k8srepo_id
= k8srepo_content
.get("_id")
1617 logging_text
= "Task k8srepo_create={} ".format(k8srepo_id
)
1618 self
.logger
.debug(logging_text
+ "Enter")
1621 db_k8srepo_update
= {}
1623 operation_state
= "COMPLETED"
1624 operation_details
= ""
1626 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1627 self
.logger
.debug(logging_text
+ step
)
1628 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1629 db_k8srepo_update
["_admin.operationalState"] = "ENABLED"
1630 except Exception as e
:
1632 logging_text
+ "Exit Exception {}".format(e
),
1633 exc_info
=not isinstance(
1640 asyncio
.CancelledError
,
1646 if exc
and db_k8srepo
:
1647 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1648 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1651 # Mark the WIM 'create' HA task as erroneous
1652 operation_state
= "FAILED"
1653 operation_details
= "ERROR {}: {}".format(step
, exc
)
1655 if db_k8srepo_update
:
1656 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1657 # Register the K8srepo 'create' HA task either
1658 # succesful or erroneous, or do nothing (if legacy NBI)
1659 self
.lcm_tasks
.unlock_HA(
1663 operationState
=operation_state
,
1664 detailed_status
=operation_details
,
1666 except DbException
as e
:
1667 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1668 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)
1670 async def delete(self
, k8srepo_content
, order_id
):
1672 # HA tasks and backward compatibility:
1673 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1674 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1675 # Register 'delete' task here for related future HA operations
1676 op_id
= k8srepo_content
.pop("op_id", None)
1677 if not self
.lcm_tasks
.lock_HA("k8srepo", "delete", op_id
):
1680 k8srepo_id
= k8srepo_content
.get("_id")
1681 logging_text
= "Task k8srepo_delete={} ".format(k8srepo_id
)
1682 self
.logger
.debug(logging_text
+ "Enter")
1685 db_k8srepo_update
= {}
1688 operation_state
= "COMPLETED"
1689 operation_details
= ""
1691 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1692 self
.logger
.debug(logging_text
+ step
)
1693 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1695 except Exception as e
:
1697 logging_text
+ "Exit Exception {}".format(e
),
1698 exc_info
=not isinstance(
1705 asyncio
.CancelledError
,
1711 if exc
and db_k8srepo
:
1712 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1713 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(
1716 # Mark the WIM 'create' HA task as erroneous
1717 operation_state
= "FAILED"
1718 operation_details
= "ERROR {}: {}".format(step
, exc
)
1720 if db_k8srepo_update
:
1721 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1722 # Register the K8srepo 'delete' HA task either
1723 # succesful or erroneous, or do nothing (if legacy NBI)
1724 self
.lcm_tasks
.unlock_HA(
1728 operationState
=operation_state
,
1729 detailed_status
=operation_details
,
1731 self
.db
.del_one("k8srepos", {"_id": k8srepo_id
})
1732 except DbException
as e
:
1733 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1734 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)