47015c0e2ead4839bbee1749b6c305f099bc8fd8
[osm/LCM.git] / osm_lcm / vim_sdn.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import yaml
20 import asyncio
21 import logging
22 import logging.handlers
23 from osm_lcm import ROclient
24 from osm_lcm.lcm_utils import LcmException, LcmBase, deep_get
25 from n2vc.k8s_helm_conn import K8sHelmConnector
26 from n2vc.k8s_helm3_conn import K8sHelm3Connector
27 from n2vc.k8s_juju_conn import K8sJujuConnector
28 from n2vc.n2vc_juju_conn import N2VCJujuConnector
29 from n2vc.exceptions import K8sException, N2VCException
30 from osm_common.dbbase import DbException
31 from copy import deepcopy
32 from time import time
33
34 __author__ = "Alfonso Tierno"
35
36
37 class VimLcm(LcmBase):
38 # values that are encrypted at vim config because they are passwords
39 vim_config_encrypted = {
40 "1.1": ("admin_password", "nsx_password", "vcenter_password"),
41 "default": (
42 "admin_password",
43 "nsx_password",
44 "vcenter_password",
45 "vrops_password",
46 ),
47 }
48
49 def __init__(self, msg, lcm_tasks, config):
50 """
51 Init, Connect to database, filesystem storage, and messaging
52 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
53 :return: None
54 """
55
56 self.logger = logging.getLogger("lcm.vim")
57 self.lcm_tasks = lcm_tasks
58 self.ro_config = config["RO"]
59
60 super().__init__(msg, self.logger)
61
62 async def create(self, vim_content, order_id):
63 # HA tasks and backward compatibility:
64 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
65 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
66 # Register 'create' task here for related future HA operations
67 op_id = vim_content.pop("op_id", None)
68 if not self.lcm_tasks.lock_HA("vim", "create", op_id):
69 return
70
71 vim_id = vim_content["_id"]
72 logging_text = "Task vim_create={} ".format(vim_id)
73 self.logger.debug(logging_text + "Enter")
74
75 db_vim = None
76 db_vim_update = {}
77 exc = None
78 RO_sdn_id = None
79 try:
80 step = "Getting vim-id='{}' from db".format(vim_id)
81 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
82 if vim_content.get("config") and vim_content["config"].get(
83 "sdn-controller"
84 ):
85 step = "Getting sdn-controller-id='{}' from db".format(
86 vim_content["config"]["sdn-controller"]
87 )
88 db_sdn = self.db.get_one(
89 "sdns", {"_id": vim_content["config"]["sdn-controller"]}
90 )
91
92 # If the VIM account has an associated SDN account, also
93 # wait for any previous tasks in process for the SDN
94 await self.lcm_tasks.waitfor_related_HA("sdn", "ANY", db_sdn["_id"])
95
96 if (
97 db_sdn.get("_admin")
98 and db_sdn["_admin"].get("deployed")
99 and db_sdn["_admin"]["deployed"].get("RO")
100 ):
101 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
102 else:
103 raise LcmException(
104 "sdn-controller={} is not available. Not deployed at RO".format(
105 vim_content["config"]["sdn-controller"]
106 )
107 )
108
109 step = "Creating vim at RO"
110 db_vim_update["_admin.deployed.RO"] = None
111 db_vim_update["_admin.detailed-status"] = step
112 self.update_db_2("vim_accounts", vim_id, db_vim_update)
113 RO = ROclient.ROClient(**self.ro_config)
114 vim_RO = deepcopy(vim_content)
115 vim_RO.pop("_id", None)
116 vim_RO.pop("_admin", None)
117 schema_version = vim_RO.pop("schema_version", None)
118 vim_RO.pop("schema_type", None)
119 vim_RO.pop("vim_tenant_name", None)
120 vim_RO["type"] = vim_RO.pop("vim_type")
121 vim_RO.pop("vim_user", None)
122 vim_RO.pop("vim_password", None)
123 if RO_sdn_id:
124 vim_RO["config"]["sdn-controller"] = RO_sdn_id
125 desc = await RO.create("vim", descriptor=vim_RO)
126 RO_vim_id = desc["uuid"]
127 db_vim_update["_admin.deployed.RO"] = RO_vim_id
128 self.logger.debug(
129 logging_text + "VIM created at RO_vim_id={}".format(RO_vim_id)
130 )
131
132 step = "Creating vim_account at RO"
133 db_vim_update["_admin.detailed-status"] = step
134 self.update_db_2("vim_accounts", vim_id, db_vim_update)
135
136 if vim_content.get("vim_password"):
137 vim_content["vim_password"] = self.db.decrypt(
138 vim_content["vim_password"],
139 schema_version=schema_version,
140 salt=vim_id,
141 )
142 vim_account_RO = {
143 "vim_tenant_name": vim_content["vim_tenant_name"],
144 "vim_username": vim_content["vim_user"],
145 "vim_password": vim_content["vim_password"],
146 }
147 if vim_RO.get("config"):
148 vim_account_RO["config"] = vim_RO["config"]
149 if "sdn-controller" in vim_account_RO["config"]:
150 del vim_account_RO["config"]["sdn-controller"]
151 if "sdn-port-mapping" in vim_account_RO["config"]:
152 del vim_account_RO["config"]["sdn-port-mapping"]
153 vim_config_encrypted_keys = self.vim_config_encrypted.get(
154 schema_version
155 ) or self.vim_config_encrypted.get("default")
156 for p in vim_config_encrypted_keys:
157 if vim_account_RO["config"].get(p):
158 vim_account_RO["config"][p] = self.db.decrypt(
159 vim_account_RO["config"][p],
160 schema_version=schema_version,
161 salt=vim_id,
162 )
163
164 desc = await RO.attach("vim_account", RO_vim_id, descriptor=vim_account_RO)
165 db_vim_update["_admin.deployed.RO-account"] = desc["uuid"]
166 db_vim_update["_admin.operationalState"] = "ENABLED"
167 db_vim_update["_admin.detailed-status"] = "Done"
168 # Mark the VIM 'create' HA task as successful
169 operation_state = "COMPLETED"
170 operation_details = "Done"
171
172 self.logger.debug(
173 logging_text
174 + "Exit Ok VIM account created at RO_vim_account_id={}".format(
175 desc["uuid"]
176 )
177 )
178 return
179
180 except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e:
181 self.logger.error(logging_text + "Exit Exception {}".format(e))
182 exc = e
183 except Exception as e:
184 self.logger.critical(
185 logging_text + "Exit Exception {}".format(e), exc_info=True
186 )
187 exc = e
188 finally:
189 if exc and db_vim:
190 db_vim_update["_admin.operationalState"] = "ERROR"
191 db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(
192 step, exc
193 )
194 # Mark the VIM 'create' HA task as erroneous
195 operation_state = "FAILED"
196 operation_details = "ERROR {}: {}".format(step, exc)
197 try:
198 if db_vim_update:
199 self.update_db_2("vim_accounts", vim_id, db_vim_update)
200 # Register the VIM 'create' HA task either
201 # succesful or erroneous, or do nothing (if legacy NBI)
202 self.lcm_tasks.unlock_HA(
203 "vim",
204 "create",
205 op_id,
206 operationState=operation_state,
207 detailed_status=operation_details,
208 )
209 except DbException as e:
210 self.logger.error(logging_text + "Cannot update database: {}".format(e))
211
212 self.lcm_tasks.remove("vim_account", vim_id, order_id)
213
214 async def edit(self, vim_content, order_id):
215 # HA tasks and backward compatibility:
216 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
217 # In such a case, HA is not supported by NBI, and the HA check always returns True
218 op_id = vim_content.pop("op_id", None)
219 if not self.lcm_tasks.lock_HA("vim", "edit", op_id):
220 return
221
222 vim_id = vim_content["_id"]
223 logging_text = "Task vim_edit={} ".format(vim_id)
224 self.logger.debug(logging_text + "Enter")
225
226 db_vim = None
227 exc = None
228 RO_sdn_id = None
229 RO_vim_id = None
230 db_vim_update = {}
231 step = "Getting vim-id='{}' from db".format(vim_id)
232 try:
233 # wait for any previous tasks in process
234 await self.lcm_tasks.waitfor_related_HA("vim", "edit", op_id)
235
236 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
237
238 if (
239 db_vim.get("_admin")
240 and db_vim["_admin"].get("deployed")
241 and db_vim["_admin"]["deployed"].get("RO")
242 ):
243 if vim_content.get("config") and vim_content["config"].get(
244 "sdn-controller"
245 ):
246 step = "Getting sdn-controller-id='{}' from db".format(
247 vim_content["config"]["sdn-controller"]
248 )
249 db_sdn = self.db.get_one(
250 "sdns", {"_id": vim_content["config"]["sdn-controller"]}
251 )
252
253 # If the VIM account has an associated SDN account, also
254 # wait for any previous tasks in process for the SDN
255 await self.lcm_tasks.waitfor_related_HA("sdn", "ANY", db_sdn["_id"])
256
257 if (
258 db_sdn.get("_admin")
259 and db_sdn["_admin"].get("deployed")
260 and db_sdn["_admin"]["deployed"].get("RO")
261 ):
262 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
263 else:
264 raise LcmException(
265 "sdn-controller={} is not available. Not deployed at RO".format(
266 vim_content["config"]["sdn-controller"]
267 )
268 )
269
270 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
271 step = "Editing vim at RO"
272 RO = ROclient.ROClient(**self.ro_config)
273 vim_RO = deepcopy(vim_content)
274 vim_RO.pop("_id", None)
275 vim_RO.pop("_admin", None)
276 schema_version = vim_RO.pop("schema_version", None)
277 vim_RO.pop("schema_type", None)
278 vim_RO.pop("vim_tenant_name", None)
279 if "vim_type" in vim_RO:
280 vim_RO["type"] = vim_RO.pop("vim_type")
281 vim_RO.pop("vim_user", None)
282 vim_RO.pop("vim_password", None)
283 if RO_sdn_id:
284 vim_RO["config"]["sdn-controller"] = RO_sdn_id
285 # TODO make a deep update of sdn-port-mapping
286 if vim_RO:
287 await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
288
289 step = "Editing vim-account at RO tenant"
290 vim_account_RO = {}
291 if "config" in vim_content:
292 if "sdn-controller" in vim_content["config"]:
293 del vim_content["config"]["sdn-controller"]
294 if "sdn-port-mapping" in vim_content["config"]:
295 del vim_content["config"]["sdn-port-mapping"]
296 if not vim_content["config"]:
297 del vim_content["config"]
298 if "vim_tenant_name" in vim_content:
299 vim_account_RO["vim_tenant_name"] = vim_content["vim_tenant_name"]
300 if "vim_password" in vim_content:
301 vim_account_RO["vim_password"] = vim_content["vim_password"]
302 if vim_content.get("vim_password"):
303 vim_account_RO["vim_password"] = self.db.decrypt(
304 vim_content["vim_password"],
305 schema_version=schema_version,
306 salt=vim_id,
307 )
308 if "config" in vim_content:
309 vim_account_RO["config"] = vim_content["config"]
310 if vim_content.get("config"):
311 vim_config_encrypted_keys = self.vim_config_encrypted.get(
312 schema_version
313 ) or self.vim_config_encrypted.get("default")
314 for p in vim_config_encrypted_keys:
315 if vim_content["config"].get(p):
316 vim_account_RO["config"][p] = self.db.decrypt(
317 vim_content["config"][p],
318 schema_version=schema_version,
319 salt=vim_id,
320 )
321
322 if "vim_user" in vim_content:
323 vim_content["vim_username"] = vim_content["vim_user"]
324 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
325 # vim_thread. RO will remove and relaunch a new thread for this vim_account
326 await RO.edit("vim_account", RO_vim_id, descriptor=vim_account_RO)
327 db_vim_update["_admin.operationalState"] = "ENABLED"
328 # Mark the VIM 'edit' HA task as successful
329 operation_state = "COMPLETED"
330 operation_details = "Done"
331
332 self.logger.debug(logging_text + "Exit Ok RO_vim_id={}".format(RO_vim_id))
333 return
334
335 except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e:
336 self.logger.error(logging_text + "Exit Exception {}".format(e))
337 exc = e
338 except Exception as e:
339 self.logger.critical(
340 logging_text + "Exit Exception {}".format(e), exc_info=True
341 )
342 exc = e
343 finally:
344 if exc and db_vim:
345 db_vim_update["_admin.operationalState"] = "ERROR"
346 db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(
347 step, exc
348 )
349 # Mark the VIM 'edit' HA task as erroneous
350 operation_state = "FAILED"
351 operation_details = "ERROR {}: {}".format(step, exc)
352 try:
353 if db_vim_update:
354 self.update_db_2("vim_accounts", vim_id, db_vim_update)
355 # Register the VIM 'edit' HA task either
356 # succesful or erroneous, or do nothing (if legacy NBI)
357 self.lcm_tasks.unlock_HA(
358 "vim",
359 "edit",
360 op_id,
361 operationState=operation_state,
362 detailed_status=operation_details,
363 )
364 except DbException as e:
365 self.logger.error(logging_text + "Cannot update database: {}".format(e))
366
367 self.lcm_tasks.remove("vim_account", vim_id, order_id)
368
369 async def delete(self, vim_content, order_id):
370 # HA tasks and backward compatibility:
371 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
372 # In such a case, HA is not supported by NBI, and the HA check always returns True
373 op_id = vim_content.pop("op_id", None)
374 if not self.lcm_tasks.lock_HA("vim", "delete", op_id):
375 return
376
377 vim_id = vim_content["_id"]
378 logging_text = "Task vim_delete={} ".format(vim_id)
379 self.logger.debug(logging_text + "Enter")
380
381 db_vim = None
382 db_vim_update = {}
383 exc = None
384 step = "Getting vim from db"
385 try:
386 # wait for any previous tasks in process
387 await self.lcm_tasks.waitfor_related_HA("vim", "delete", op_id)
388 if not self.ro_config.get("ng"):
389 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
390 if (
391 db_vim.get("_admin")
392 and db_vim["_admin"].get("deployed")
393 and db_vim["_admin"]["deployed"].get("RO")
394 ):
395 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
396 RO = ROclient.ROClient(**self.ro_config)
397 step = "Detaching vim from RO tenant"
398 try:
399 await RO.detach("vim_account", RO_vim_id)
400 except ROclient.ROClientException as e:
401 if e.http_code == 404: # not found
402 self.logger.debug(
403 logging_text
404 + "RO_vim_id={} already detached".format(RO_vim_id)
405 )
406 else:
407 raise
408
409 step = "Deleting vim from RO"
410 try:
411 await RO.delete("vim", RO_vim_id)
412 except ROclient.ROClientException as e:
413 if e.http_code == 404: # not found
414 self.logger.debug(
415 logging_text
416 + "RO_vim_id={} already deleted".format(RO_vim_id)
417 )
418 else:
419 raise
420 else:
421 # nothing to delete
422 self.logger.debug(logging_text + "Nothing to remove at RO")
423 self.db.del_one("vim_accounts", {"_id": vim_id})
424 db_vim = None
425 self.logger.debug(logging_text + "Exit Ok")
426 return
427
428 except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e:
429 self.logger.error(logging_text + "Exit Exception {}".format(e))
430 exc = e
431 except Exception as e:
432 self.logger.critical(
433 logging_text + "Exit Exception {}".format(e), exc_info=True
434 )
435 exc = e
436 finally:
437 self.lcm_tasks.remove("vim_account", vim_id, order_id)
438 if exc and db_vim:
439 db_vim_update["_admin.operationalState"] = "ERROR"
440 db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(
441 step, exc
442 )
443 # Mark the VIM 'delete' HA task as erroneous
444 operation_state = "FAILED"
445 operation_details = "ERROR {}: {}".format(step, exc)
446 self.lcm_tasks.unlock_HA(
447 "vim",
448 "delete",
449 op_id,
450 operationState=operation_state,
451 detailed_status=operation_details,
452 )
453 try:
454 if db_vim and db_vim_update:
455 self.update_db_2("vim_accounts", vim_id, db_vim_update)
456 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
457 # which means that there is nowhere to register this task, so do nothing here.
458 except DbException as e:
459 self.logger.error(logging_text + "Cannot update database: {}".format(e))
460 self.lcm_tasks.remove("vim_account", vim_id, order_id)
461
462
463 class WimLcm(LcmBase):
464 # values that are encrypted at wim config because they are passwords
465 wim_config_encrypted = ()
466
467 def __init__(self, msg, lcm_tasks, config):
468 """
469 Init, Connect to database, filesystem storage, and messaging
470 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
471 :return: None
472 """
473
474 self.logger = logging.getLogger("lcm.vim")
475 self.lcm_tasks = lcm_tasks
476 self.ro_config = config["RO"]
477
478 super().__init__(msg, self.logger)
479
480 async def create(self, wim_content, order_id):
481 # HA tasks and backward compatibility:
482 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
483 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
484 # Register 'create' task here for related future HA operations
485 op_id = wim_content.pop("op_id", None)
486 self.lcm_tasks.lock_HA("wim", "create", op_id)
487
488 wim_id = wim_content["_id"]
489 logging_text = "Task wim_create={} ".format(wim_id)
490 self.logger.debug(logging_text + "Enter")
491
492 db_wim = None
493 db_wim_update = {}
494 exc = None
495 try:
496 step = "Getting wim-id='{}' from db".format(wim_id)
497 db_wim = self.db.get_one("wim_accounts", {"_id": wim_id})
498 db_wim_update["_admin.deployed.RO"] = None
499
500 step = "Creating wim at RO"
501 db_wim_update["_admin.detailed-status"] = step
502 self.update_db_2("wim_accounts", wim_id, db_wim_update)
503 RO = ROclient.ROClient(**self.ro_config)
504 wim_RO = deepcopy(wim_content)
505 wim_RO.pop("_id", None)
506 wim_RO.pop("_admin", None)
507 schema_version = wim_RO.pop("schema_version", None)
508 wim_RO.pop("schema_type", None)
509 wim_RO.pop("wim_tenant_name", None)
510 wim_RO["type"] = wim_RO.pop("wim_type")
511 wim_RO.pop("wim_user", None)
512 wim_RO.pop("wim_password", None)
513 desc = await RO.create("wim", descriptor=wim_RO)
514 RO_wim_id = desc["uuid"]
515 db_wim_update["_admin.deployed.RO"] = RO_wim_id
516 self.logger.debug(
517 logging_text + "WIM created at RO_wim_id={}".format(RO_wim_id)
518 )
519
520 step = "Creating wim_account at RO"
521 db_wim_update["_admin.detailed-status"] = step
522 self.update_db_2("wim_accounts", wim_id, db_wim_update)
523
524 if wim_content.get("wim_password"):
525 wim_content["wim_password"] = self.db.decrypt(
526 wim_content["wim_password"],
527 schema_version=schema_version,
528 salt=wim_id,
529 )
530 wim_account_RO = {
531 "name": wim_content["name"],
532 "user": wim_content["user"],
533 "password": wim_content["password"],
534 }
535 if wim_RO.get("config"):
536 wim_account_RO["config"] = wim_RO["config"]
537 if "wim_port_mapping" in wim_account_RO["config"]:
538 del wim_account_RO["config"]["wim_port_mapping"]
539 for p in self.wim_config_encrypted:
540 if wim_account_RO["config"].get(p):
541 wim_account_RO["config"][p] = self.db.decrypt(
542 wim_account_RO["config"][p],
543 schema_version=schema_version,
544 salt=wim_id,
545 )
546
547 desc = await RO.attach("wim_account", RO_wim_id, descriptor=wim_account_RO)
548 db_wim_update["_admin.deployed.RO-account"] = desc["uuid"]
549 db_wim_update["_admin.operationalState"] = "ENABLED"
550 db_wim_update["_admin.detailed-status"] = "Done"
551 # Mark the WIM 'create' HA task as successful
552 operation_state = "COMPLETED"
553 operation_details = "Done"
554
555 self.logger.debug(
556 logging_text
557 + "Exit Ok WIM account created at RO_wim_account_id={}".format(
558 desc["uuid"]
559 )
560 )
561 return
562
563 except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e:
564 self.logger.error(logging_text + "Exit Exception {}".format(e))
565 exc = e
566 except Exception as e:
567 self.logger.critical(
568 logging_text + "Exit Exception {}".format(e), exc_info=True
569 )
570 exc = e
571 finally:
572 if exc and db_wim:
573 db_wim_update["_admin.operationalState"] = "ERROR"
574 db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(
575 step, exc
576 )
577 # Mark the WIM 'create' HA task as erroneous
578 operation_state = "FAILED"
579 operation_details = "ERROR {}: {}".format(step, exc)
580 try:
581 if db_wim_update:
582 self.update_db_2("wim_accounts", wim_id, db_wim_update)
583 # Register the WIM 'create' HA task either
584 # succesful or erroneous, or do nothing (if legacy NBI)
585 self.lcm_tasks.unlock_HA(
586 "wim",
587 "create",
588 op_id,
589 operationState=operation_state,
590 detailed_status=operation_details,
591 )
592 except DbException as e:
593 self.logger.error(logging_text + "Cannot update database: {}".format(e))
594 self.lcm_tasks.remove("wim_account", wim_id, order_id)
595
596 async def edit(self, wim_content, order_id):
597 # HA tasks and backward compatibility:
598 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
599 # In such a case, HA is not supported by NBI, and the HA check always returns True
600 op_id = wim_content.pop("op_id", None)
601 if not self.lcm_tasks.lock_HA("wim", "edit", op_id):
602 return
603
604 wim_id = wim_content["_id"]
605 logging_text = "Task wim_edit={} ".format(wim_id)
606 self.logger.debug(logging_text + "Enter")
607
608 db_wim = None
609 exc = None
610 RO_wim_id = None
611 db_wim_update = {}
612 step = "Getting wim-id='{}' from db".format(wim_id)
613 try:
614 # wait for any previous tasks in process
615 await self.lcm_tasks.waitfor_related_HA("wim", "edit", op_id)
616
617 db_wim = self.db.get_one("wim_accounts", {"_id": wim_id})
618
619 if (
620 db_wim.get("_admin")
621 and db_wim["_admin"].get("deployed")
622 and db_wim["_admin"]["deployed"].get("RO")
623 ):
624 RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
625 step = "Editing wim at RO"
626 RO = ROclient.ROClient(**self.ro_config)
627 wim_RO = deepcopy(wim_content)
628 wim_RO.pop("_id", None)
629 wim_RO.pop("_admin", None)
630 schema_version = wim_RO.pop("schema_version", None)
631 wim_RO.pop("schema_type", None)
632 wim_RO.pop("wim_tenant_name", None)
633 if "wim_type" in wim_RO:
634 wim_RO["type"] = wim_RO.pop("wim_type")
635 wim_RO.pop("wim_user", None)
636 wim_RO.pop("wim_password", None)
637 # TODO make a deep update of wim_port_mapping
638 if wim_RO:
639 await RO.edit("wim", RO_wim_id, descriptor=wim_RO)
640
641 step = "Editing wim-account at RO tenant"
642 wim_account_RO = {}
643 if "config" in wim_content:
644 if "wim_port_mapping" in wim_content["config"]:
645 del wim_content["config"]["wim_port_mapping"]
646 if not wim_content["config"]:
647 del wim_content["config"]
648 if "wim_tenant_name" in wim_content:
649 wim_account_RO["wim_tenant_name"] = wim_content["wim_tenant_name"]
650 if "wim_password" in wim_content:
651 wim_account_RO["wim_password"] = wim_content["wim_password"]
652 if wim_content.get("wim_password"):
653 wim_account_RO["wim_password"] = self.db.decrypt(
654 wim_content["wim_password"],
655 schema_version=schema_version,
656 salt=wim_id,
657 )
658 if "config" in wim_content:
659 wim_account_RO["config"] = wim_content["config"]
660 if wim_content.get("config"):
661 for p in self.wim_config_encrypted:
662 if wim_content["config"].get(p):
663 wim_account_RO["config"][p] = self.db.decrypt(
664 wim_content["config"][p],
665 schema_version=schema_version,
666 salt=wim_id,
667 )
668
669 if "wim_user" in wim_content:
670 wim_content["wim_username"] = wim_content["wim_user"]
671 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
672 # wim_thread. RO will remove and relaunch a new thread for this wim_account
673 await RO.edit("wim_account", RO_wim_id, descriptor=wim_account_RO)
674 db_wim_update["_admin.operationalState"] = "ENABLED"
675 # Mark the WIM 'edit' HA task as successful
676 operation_state = "COMPLETED"
677 operation_details = "Done"
678
679 self.logger.debug(logging_text + "Exit Ok RO_wim_id={}".format(RO_wim_id))
680 return
681
682 except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e:
683 self.logger.error(logging_text + "Exit Exception {}".format(e))
684 exc = e
685 except Exception as e:
686 self.logger.critical(
687 logging_text + "Exit Exception {}".format(e), exc_info=True
688 )
689 exc = e
690 finally:
691 if exc and db_wim:
692 db_wim_update["_admin.operationalState"] = "ERROR"
693 db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(
694 step, exc
695 )
696 # Mark the WIM 'edit' HA task as erroneous
697 operation_state = "FAILED"
698 operation_details = "ERROR {}: {}".format(step, exc)
699 try:
700 if db_wim_update:
701 self.update_db_2("wim_accounts", wim_id, db_wim_update)
702 # Register the WIM 'edit' HA task either
703 # succesful or erroneous, or do nothing (if legacy NBI)
704 self.lcm_tasks.unlock_HA(
705 "wim",
706 "edit",
707 op_id,
708 operationState=operation_state,
709 detailed_status=operation_details,
710 )
711 except DbException as e:
712 self.logger.error(logging_text + "Cannot update database: {}".format(e))
713 self.lcm_tasks.remove("wim_account", wim_id, order_id)
714
715 async def delete(self, wim_content, order_id):
716 # HA tasks and backward compatibility:
717 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
718 # In such a case, HA is not supported by NBI, and the HA check always returns True
719 op_id = wim_content.pop("op_id", None)
720 if not self.lcm_tasks.lock_HA("wim", "delete", op_id):
721 return
722
723 wim_id = wim_content["_id"]
724 logging_text = "Task wim_delete={} ".format(wim_id)
725 self.logger.debug(logging_text + "Enter")
726
727 db_wim = None
728 db_wim_update = {}
729 exc = None
730 step = "Getting wim from db"
731 try:
732 # wait for any previous tasks in process
733 await self.lcm_tasks.waitfor_related_HA("wim", "delete", op_id)
734
735 db_wim = self.db.get_one("wim_accounts", {"_id": wim_id})
736 if (
737 db_wim.get("_admin")
738 and db_wim["_admin"].get("deployed")
739 and db_wim["_admin"]["deployed"].get("RO")
740 ):
741 RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
742 RO = ROclient.ROClient(**self.ro_config)
743 step = "Detaching wim from RO tenant"
744 try:
745 await RO.detach("wim_account", RO_wim_id)
746 except ROclient.ROClientException as e:
747 if e.http_code == 404: # not found
748 self.logger.debug(
749 logging_text
750 + "RO_wim_id={} already detached".format(RO_wim_id)
751 )
752 else:
753 raise
754
755 step = "Deleting wim from RO"
756 try:
757 await RO.delete("wim", RO_wim_id)
758 except ROclient.ROClientException as e:
759 if e.http_code == 404: # not found
760 self.logger.debug(
761 logging_text
762 + "RO_wim_id={} already deleted".format(RO_wim_id)
763 )
764 else:
765 raise
766 else:
767 # nothing to delete
768 self.logger.error(logging_text + "Nothing to remove at RO")
769 self.db.del_one("wim_accounts", {"_id": wim_id})
770 db_wim = None
771 self.logger.debug(logging_text + "Exit Ok")
772 return
773
774 except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e:
775 self.logger.error(logging_text + "Exit Exception {}".format(e))
776 exc = e
777 except Exception as e:
778 self.logger.critical(
779 logging_text + "Exit Exception {}".format(e), exc_info=True
780 )
781 exc = e
782 finally:
783 self.lcm_tasks.remove("wim_account", wim_id, order_id)
784 if exc and db_wim:
785 db_wim_update["_admin.operationalState"] = "ERROR"
786 db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(
787 step, exc
788 )
789 # Mark the WIM 'delete' HA task as erroneous
790 operation_state = "FAILED"
791 operation_details = "ERROR {}: {}".format(step, exc)
792 self.lcm_tasks.unlock_HA(
793 "wim",
794 "delete",
795 op_id,
796 operationState=operation_state,
797 detailed_status=operation_details,
798 )
799 try:
800 if db_wim and db_wim_update:
801 self.update_db_2("wim_accounts", wim_id, db_wim_update)
802 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
803 # which means that there is nowhere to register this task, so do nothing here.
804 except DbException as e:
805 self.logger.error(logging_text + "Cannot update database: {}".format(e))
806 self.lcm_tasks.remove("wim_account", wim_id, order_id)
807
808
809 class SdnLcm(LcmBase):
810 def __init__(self, msg, lcm_tasks, config):
811 """
812 Init, Connect to database, filesystem storage, and messaging
813 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
814 :return: None
815 """
816
817 self.logger = logging.getLogger("lcm.sdn")
818 self.lcm_tasks = lcm_tasks
819 self.ro_config = config["RO"]
820
821 super().__init__(msg, self.logger)
822
823 async def create(self, sdn_content, order_id):
824 # HA tasks and backward compatibility:
825 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
826 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
827 # Register 'create' task here for related future HA operations
828 op_id = sdn_content.pop("op_id", None)
829 self.lcm_tasks.lock_HA("sdn", "create", op_id)
830
831 sdn_id = sdn_content["_id"]
832 logging_text = "Task sdn_create={} ".format(sdn_id)
833 self.logger.debug(logging_text + "Enter")
834
835 db_sdn = None
836 db_sdn_update = {}
837 RO_sdn_id = None
838 exc = None
839 try:
840 step = "Getting sdn from db"
841 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
842 db_sdn_update["_admin.deployed.RO"] = None
843
844 step = "Creating sdn at RO"
845 db_sdn_update["_admin.detailed-status"] = step
846 self.update_db_2("sdns", sdn_id, db_sdn_update)
847
848 RO = ROclient.ROClient(**self.ro_config)
849 sdn_RO = deepcopy(sdn_content)
850 sdn_RO.pop("_id", None)
851 sdn_RO.pop("_admin", None)
852 schema_version = sdn_RO.pop("schema_version", None)
853 sdn_RO.pop("schema_type", None)
854 sdn_RO.pop("description", None)
855 if sdn_RO.get("password"):
856 sdn_RO["password"] = self.db.decrypt(
857 sdn_RO["password"], schema_version=schema_version, salt=sdn_id
858 )
859
860 desc = await RO.create("sdn", descriptor=sdn_RO)
861 RO_sdn_id = desc["uuid"]
862 db_sdn_update["_admin.deployed.RO"] = RO_sdn_id
863 db_sdn_update["_admin.operationalState"] = "ENABLED"
864 self.logger.debug(logging_text + "Exit Ok RO_sdn_id={}".format(RO_sdn_id))
865 # Mark the SDN 'create' HA task as successful
866 operation_state = "COMPLETED"
867 operation_details = "Done"
868 return
869
870 except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e:
871 self.logger.error(logging_text + "Exit Exception {}".format(e))
872 exc = e
873 except Exception as e:
874 self.logger.critical(
875 logging_text + "Exit Exception {}".format(e), exc_info=True
876 )
877 exc = e
878 finally:
879 if exc and db_sdn:
880 db_sdn_update["_admin.operationalState"] = "ERROR"
881 db_sdn_update["_admin.detailed-status"] = "ERROR {}: {}".format(
882 step, exc
883 )
884 # Mark the SDN 'create' HA task as erroneous
885 operation_state = "FAILED"
886 operation_details = "ERROR {}: {}".format(step, exc)
887 try:
888 if db_sdn and db_sdn_update:
889 self.update_db_2("sdns", sdn_id, db_sdn_update)
890 # Register the SDN 'create' HA task either
891 # succesful or erroneous, or do nothing (if legacy NBI)
892 self.lcm_tasks.unlock_HA(
893 "sdn",
894 "create",
895 op_id,
896 operationState=operation_state,
897 detailed_status=operation_details,
898 )
899 except DbException as e:
900 self.logger.error(logging_text + "Cannot update database: {}".format(e))
901 self.lcm_tasks.remove("sdn", sdn_id, order_id)
902
903 async def edit(self, sdn_content, order_id):
904 # HA tasks and backward compatibility:
905 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
906 # In such a case, HA is not supported by NBI, and the HA check always returns True
907 op_id = sdn_content.pop("op_id", None)
908 if not self.lcm_tasks.lock_HA("sdn", "edit", op_id):
909 return
910
911 sdn_id = sdn_content["_id"]
912 logging_text = "Task sdn_edit={} ".format(sdn_id)
913 self.logger.debug(logging_text + "Enter")
914
915 db_sdn = None
916 db_sdn_update = {}
917 exc = None
918 step = "Getting sdn from db"
919 try:
920 # wait for any previous tasks in process
921 await self.lcm_tasks.waitfor_related_HA("sdn", "edit", op_id)
922
923 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
924 RO_sdn_id = None
925 if (
926 db_sdn.get("_admin")
927 and db_sdn["_admin"].get("deployed")
928 and db_sdn["_admin"]["deployed"].get("RO")
929 ):
930 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
931 RO = ROclient.ROClient(**self.ro_config)
932 step = "Editing sdn at RO"
933 sdn_RO = deepcopy(sdn_content)
934 sdn_RO.pop("_id", None)
935 sdn_RO.pop("_admin", None)
936 schema_version = sdn_RO.pop("schema_version", None)
937 sdn_RO.pop("schema_type", None)
938 sdn_RO.pop("description", None)
939 if sdn_RO.get("password"):
940 sdn_RO["password"] = self.db.decrypt(
941 sdn_RO["password"], schema_version=schema_version, salt=sdn_id
942 )
943 if sdn_RO:
944 await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
945 db_sdn_update["_admin.operationalState"] = "ENABLED"
946 # Mark the SDN 'edit' HA task as successful
947 operation_state = "COMPLETED"
948 operation_details = "Done"
949
950 self.logger.debug(logging_text + "Exit Ok RO_sdn_id={}".format(RO_sdn_id))
951 return
952
953 except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e:
954 self.logger.error(logging_text + "Exit Exception {}".format(e))
955 exc = e
956 except Exception as e:
957 self.logger.critical(
958 logging_text + "Exit Exception {}".format(e), exc_info=True
959 )
960 exc = e
961 finally:
962 if exc and db_sdn:
963 db_sdn["_admin.operationalState"] = "ERROR"
964 db_sdn["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
965 # Mark the SDN 'edit' HA task as erroneous
966 operation_state = "FAILED"
967 operation_details = "ERROR {}: {}".format(step, exc)
968 try:
969 if db_sdn_update:
970 self.update_db_2("sdns", sdn_id, db_sdn_update)
971 # Register the SDN 'edit' HA task either
972 # succesful or erroneous, or do nothing (if legacy NBI)
973 self.lcm_tasks.unlock_HA(
974 "sdn",
975 "edit",
976 op_id,
977 operationState=operation_state,
978 detailed_status=operation_details,
979 )
980 except DbException as e:
981 self.logger.error(logging_text + "Cannot update database: {}".format(e))
982 self.lcm_tasks.remove("sdn", sdn_id, order_id)
983
984 async def delete(self, sdn_content, order_id):
985 # HA tasks and backward compatibility:
986 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
987 # In such a case, HA is not supported by NBI, and the HA check always returns True
988 op_id = sdn_content.pop("op_id", None)
989 if not self.lcm_tasks.lock_HA("sdn", "delete", op_id):
990 return
991
992 sdn_id = sdn_content["_id"]
993 logging_text = "Task sdn_delete={} ".format(sdn_id)
994 self.logger.debug(logging_text + "Enter")
995
996 db_sdn = {}
997 db_sdn_update = {}
998 exc = None
999 step = "Getting sdn from db"
1000 try:
1001 # wait for any previous tasks in process
1002 await self.lcm_tasks.waitfor_related_HA("sdn", "delete", op_id)
1003
1004 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
1005 if (
1006 db_sdn.get("_admin")
1007 and db_sdn["_admin"].get("deployed")
1008 and db_sdn["_admin"]["deployed"].get("RO")
1009 ):
1010 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
1011 RO = ROclient.ROClient(**self.ro_config)
1012 step = "Deleting sdn from RO"
1013 try:
1014 await RO.delete("sdn", RO_sdn_id)
1015 except ROclient.ROClientException as e:
1016 if e.http_code == 404: # not found
1017 self.logger.debug(
1018 logging_text
1019 + "RO_sdn_id={} already deleted".format(RO_sdn_id)
1020 )
1021 else:
1022 raise
1023 else:
1024 # nothing to delete
1025 self.logger.error(
1026 logging_text + "Skipping. There is not RO information at database"
1027 )
1028 self.db.del_one("sdns", {"_id": sdn_id})
1029 db_sdn = {}
1030 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
1031 return
1032
1033 except (ROclient.ROClientException, DbException, asyncio.CancelledError) as e:
1034 self.logger.error(logging_text + "Exit Exception {}".format(e))
1035 exc = e
1036 except Exception as e:
1037 self.logger.critical(
1038 logging_text + "Exit Exception {}".format(e), exc_info=True
1039 )
1040 exc = e
1041 finally:
1042 if exc and db_sdn:
1043 db_sdn["_admin.operationalState"] = "ERROR"
1044 db_sdn["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
1045 # Mark the SDN 'delete' HA task as erroneous
1046 operation_state = "FAILED"
1047 operation_details = "ERROR {}: {}".format(step, exc)
1048 self.lcm_tasks.unlock_HA(
1049 "sdn",
1050 "delete",
1051 op_id,
1052 operationState=operation_state,
1053 detailed_status=operation_details,
1054 )
1055 try:
1056 if db_sdn and db_sdn_update:
1057 self.update_db_2("sdns", sdn_id, db_sdn_update)
1058 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
1059 # which means that there is nowhere to register this task, so do nothing here.
1060 except DbException as e:
1061 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1062 self.lcm_tasks.remove("sdn", sdn_id, order_id)
1063
1064
1065 class K8sClusterLcm(LcmBase):
1066 timeout_create = 300
1067
1068 def __init__(self, msg, lcm_tasks, config):
1069 """
1070 Init, Connect to database, filesystem storage, and messaging
1071 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1072 :return: None
1073 """
1074
1075 self.logger = logging.getLogger("lcm.k8scluster")
1076 self.lcm_tasks = lcm_tasks
1077 self.vca_config = config["VCA"]
1078
1079 super().__init__(msg, self.logger)
1080
1081 self.helm2_k8scluster = K8sHelmConnector(
1082 kubectl_command=self.vca_config.get("kubectlpath"),
1083 helm_command=self.vca_config.get("helmpath"),
1084 log=self.logger,
1085 on_update_db=None,
1086 db=self.db,
1087 fs=self.fs,
1088 )
1089
1090 self.helm3_k8scluster = K8sHelm3Connector(
1091 kubectl_command=self.vca_config.get("kubectlpath"),
1092 helm_command=self.vca_config.get("helm3path"),
1093 fs=self.fs,
1094 log=self.logger,
1095 db=self.db,
1096 on_update_db=None,
1097 )
1098
1099 self.juju_k8scluster = K8sJujuConnector(
1100 kubectl_command=self.vca_config.get("kubectlpath"),
1101 juju_command=self.vca_config.get("jujupath"),
1102 log=self.logger,
1103 on_update_db=None,
1104 db=self.db,
1105 fs=self.fs,
1106 )
1107
1108 self.k8s_map = {
1109 "helm-chart": self.helm2_k8scluster,
1110 "helm-chart-v3": self.helm3_k8scluster,
1111 "juju-bundle": self.juju_k8scluster,
1112 }
1113
1114 async def create(self, k8scluster_content, order_id):
1115 op_id = k8scluster_content.pop("op_id", None)
1116 if not self.lcm_tasks.lock_HA("k8scluster", "create", op_id):
1117 return
1118
1119 k8scluster_id = k8scluster_content["_id"]
1120 logging_text = "Task k8scluster_create={} ".format(k8scluster_id)
1121 self.logger.debug(logging_text + "Enter")
1122
1123 db_k8scluster = None
1124 db_k8scluster_update = {}
1125 exc = None
1126 try:
1127 step = "Getting k8scluster-id='{}' from db".format(k8scluster_id)
1128 self.logger.debug(logging_text + step)
1129 db_k8scluster = self.db.get_one("k8sclusters", {"_id": k8scluster_id})
1130 self.db.encrypt_decrypt_fields(
1131 db_k8scluster.get("credentials"),
1132 "decrypt",
1133 ["password", "secret"],
1134 schema_version=db_k8scluster["schema_version"],
1135 salt=db_k8scluster["_id"],
1136 )
1137 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
1138 pending_tasks = []
1139 task2name = {}
1140 init_target = deep_get(db_k8scluster, ("_admin", "init"))
1141 step = "Launching k8scluster init tasks"
1142
1143 k8s_deploy_methods = db_k8scluster.get("deployment_methods", {})
1144 # for backwards compatibility and all-false case
1145 if not any(k8s_deploy_methods.values()):
1146 k8s_deploy_methods = {
1147 "helm-chart": True,
1148 "juju-bundle": True,
1149 "helm-chart-v3": True,
1150 }
1151 deploy_methods = tuple(filter(k8s_deploy_methods.get, k8s_deploy_methods))
1152
1153 for task_name in deploy_methods:
1154 if init_target and task_name not in init_target:
1155 continue
1156 task = asyncio.ensure_future(
1157 self.k8s_map[task_name].init_env(
1158 k8s_credentials,
1159 reuse_cluster_uuid=k8scluster_id,
1160 vca_id=db_k8scluster.get("vca_id"),
1161 )
1162 )
1163 pending_tasks.append(task)
1164 task2name[task] = task_name
1165
1166 error_text_list = []
1167 tasks_name_ok = []
1168 reached_timeout = False
1169 now = time()
1170
1171 while pending_tasks:
1172 _timeout = max(
1173 1, self.timeout_create - (time() - now)
1174 ) # ensure not negative with max
1175 step = "Waiting for k8scluster init tasks"
1176 done, pending_tasks = await asyncio.wait(
1177 pending_tasks, timeout=_timeout, return_when=asyncio.FIRST_COMPLETED
1178 )
1179 if not done:
1180 # timeout. Set timeout is reached and process pending as if they hase been finished
1181 done = pending_tasks
1182 pending_tasks = None
1183 reached_timeout = True
1184 for task in done:
1185 task_name = task2name[task]
1186 if reached_timeout:
1187 exc = "Timeout"
1188 elif task.cancelled():
1189 exc = "Cancelled"
1190 else:
1191 exc = task.exception()
1192
1193 if exc:
1194 error_text_list.append(
1195 "Failing init {}: {}".format(task_name, exc)
1196 )
1197 db_k8scluster_update[
1198 "_admin.{}.error_msg".format(task_name)
1199 ] = str(exc)
1200 db_k8scluster_update["_admin.{}.id".format(task_name)] = None
1201 db_k8scluster_update[
1202 "_admin.{}.operationalState".format(task_name)
1203 ] = "ERROR"
1204 self.logger.error(
1205 logging_text + "{} init fail: {}".format(task_name, exc),
1206 exc_info=not isinstance(exc, (N2VCException, str)),
1207 )
1208 else:
1209 k8s_id, uninstall_sw = task.result()
1210 tasks_name_ok.append(task_name)
1211 self.logger.debug(
1212 logging_text
1213 + "{} init success. id={} created={}".format(
1214 task_name, k8s_id, uninstall_sw
1215 )
1216 )
1217 db_k8scluster_update[
1218 "_admin.{}.error_msg".format(task_name)
1219 ] = None
1220 db_k8scluster_update["_admin.{}.id".format(task_name)] = k8s_id
1221 db_k8scluster_update[
1222 "_admin.{}.created".format(task_name)
1223 ] = uninstall_sw
1224 db_k8scluster_update[
1225 "_admin.{}.operationalState".format(task_name)
1226 ] = "ENABLED"
1227 # update database
1228 step = "Updating database for " + task_name
1229 self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
1230 if tasks_name_ok:
1231 operation_details = "ready for " + ", ".join(tasks_name_ok)
1232 operation_state = "COMPLETED"
1233 db_k8scluster_update["_admin.operationalState"] = (
1234 "ENABLED" if not error_text_list else "DEGRADED"
1235 )
1236 operation_details += "; " + ";".join(error_text_list)
1237 else:
1238 db_k8scluster_update["_admin.operationalState"] = "ERROR"
1239 operation_state = "FAILED"
1240 operation_details = ";".join(error_text_list)
1241 db_k8scluster_update["_admin.detailed-status"] = operation_details
1242 self.logger.debug(logging_text + "Done. Result: " + operation_state)
1243 exc = None
1244
1245 except Exception as e:
1246 if isinstance(
1247 e,
1248 (
1249 LcmException,
1250 DbException,
1251 K8sException,
1252 N2VCException,
1253 asyncio.CancelledError,
1254 ),
1255 ):
1256 self.logger.error(logging_text + "Exit Exception {}".format(e))
1257 else:
1258 self.logger.critical(
1259 logging_text + "Exit Exception {}".format(e), exc_info=True
1260 )
1261 exc = e
1262 finally:
1263 if exc and db_k8scluster:
1264 db_k8scluster_update["_admin.operationalState"] = "ERROR"
1265 db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(
1266 step, exc
1267 )
1268 operation_state = "FAILED"
1269 operation_details = "ERROR {}: {}".format(step, exc)
1270 try:
1271 if db_k8scluster and db_k8scluster_update:
1272 self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
1273
1274 # Register the operation and unlock
1275 self.lcm_tasks.unlock_HA(
1276 "k8scluster",
1277 "create",
1278 op_id,
1279 operationState=operation_state,
1280 detailed_status=operation_details,
1281 )
1282 except DbException as e:
1283 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1284 self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id)
1285
1286 async def edit(self, k8scluster_content, order_id):
1287 op_id = k8scluster_content.pop("op_id", None)
1288 if not self.lcm_tasks.lock_HA("k8scluster", "edit", op_id):
1289 return
1290
1291 k8scluster_id = k8scluster_content["_id"]
1292
1293 logging_text = "Task k8scluster_edit={} ".format(k8scluster_id)
1294 self.logger.debug(logging_text + "Enter")
1295
1296 # TODO the implementation is pending and will be part of a new feature
1297 # It will support rotation of certificates, update of credentials and K8S API endpoint
1298 # At the moment the operation is set as completed
1299
1300 operation_state = "COMPLETED"
1301 operation_details = "Not implemented"
1302
1303 self.lcm_tasks.unlock_HA(
1304 "k8scluster",
1305 "edit",
1306 op_id,
1307 operationState=operation_state,
1308 detailed_status=operation_details,
1309 )
1310 self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id)
1311
1312 async def delete(self, k8scluster_content, order_id):
1313 # HA tasks and backward compatibility:
1314 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1315 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1316 # Register 'delete' task here for related future HA operations
1317 op_id = k8scluster_content.pop("op_id", None)
1318 if not self.lcm_tasks.lock_HA("k8scluster", "delete", op_id):
1319 return
1320
1321 k8scluster_id = k8scluster_content["_id"]
1322 logging_text = "Task k8scluster_delete={} ".format(k8scluster_id)
1323 self.logger.debug(logging_text + "Enter")
1324
1325 db_k8scluster = None
1326 db_k8scluster_update = {}
1327 exc = None
1328 try:
1329 step = "Getting k8scluster='{}' from db".format(k8scluster_id)
1330 self.logger.debug(logging_text + step)
1331 db_k8scluster = self.db.get_one("k8sclusters", {"_id": k8scluster_id})
1332 k8s_hc_id = deep_get(db_k8scluster, ("_admin", "helm-chart", "id"))
1333 k8s_h3c_id = deep_get(db_k8scluster, ("_admin", "helm-chart-v3", "id"))
1334 k8s_jb_id = deep_get(db_k8scluster, ("_admin", "juju-bundle", "id"))
1335
1336 cluster_removed = True
1337 if k8s_jb_id: # delete in reverse order of creation
1338 step = "Removing juju-bundle '{}'".format(k8s_jb_id)
1339 uninstall_sw = (
1340 deep_get(db_k8scluster, ("_admin", "juju-bundle", "created"))
1341 or False
1342 )
1343 cluster_removed = await self.juju_k8scluster.reset(
1344 cluster_uuid=k8s_jb_id,
1345 uninstall_sw=uninstall_sw,
1346 vca_id=db_k8scluster.get("vca_id"),
1347 )
1348 db_k8scluster_update["_admin.juju-bundle.id"] = None
1349 db_k8scluster_update["_admin.juju-bundle.operationalState"] = "DISABLED"
1350
1351 if k8s_hc_id:
1352 step = "Removing helm-chart '{}'".format(k8s_hc_id)
1353 uninstall_sw = (
1354 deep_get(db_k8scluster, ("_admin", "helm-chart", "created"))
1355 or False
1356 )
1357 cluster_removed = await self.helm2_k8scluster.reset(
1358 cluster_uuid=k8s_hc_id, uninstall_sw=uninstall_sw
1359 )
1360 db_k8scluster_update["_admin.helm-chart.id"] = None
1361 db_k8scluster_update["_admin.helm-chart.operationalState"] = "DISABLED"
1362
1363 if k8s_h3c_id:
1364 step = "Removing helm-chart-v3 '{}'".format(k8s_h3c_id)
1365 uninstall_sw = (
1366 deep_get(db_k8scluster, ("_admin", "helm-chart-v3", "created"))
1367 or False
1368 )
1369 cluster_removed = await self.helm3_k8scluster.reset(
1370 cluster_uuid=k8s_h3c_id, uninstall_sw=uninstall_sw
1371 )
1372 db_k8scluster_update["_admin.helm-chart-v3.id"] = None
1373 db_k8scluster_update[
1374 "_admin.helm-chart-v3.operationalState"
1375 ] = "DISABLED"
1376
1377 # Try to remove from cluster_inserted to clean old versions
1378 if k8s_hc_id and cluster_removed:
1379 step = "Removing k8scluster='{}' from k8srepos".format(k8scluster_id)
1380 self.logger.debug(logging_text + step)
1381 db_k8srepo_list = self.db.get_list(
1382 "k8srepos", {"_admin.cluster-inserted": k8s_hc_id}
1383 )
1384 for k8srepo in db_k8srepo_list:
1385 try:
1386 cluster_list = k8srepo["_admin"]["cluster-inserted"]
1387 cluster_list.remove(k8s_hc_id)
1388 self.update_db_2(
1389 "k8srepos",
1390 k8srepo["_id"],
1391 {"_admin.cluster-inserted": cluster_list},
1392 )
1393 except Exception as e:
1394 self.logger.error("{}: {}".format(step, e))
1395 self.db.del_one("k8sclusters", {"_id": k8scluster_id})
1396 db_k8scluster_update = None
1397 self.logger.debug(logging_text + "Done")
1398
1399 except Exception as e:
1400 if isinstance(
1401 e,
1402 (
1403 LcmException,
1404 DbException,
1405 K8sException,
1406 N2VCException,
1407 asyncio.CancelledError,
1408 ),
1409 ):
1410 self.logger.error(logging_text + "Exit Exception {}".format(e))
1411 else:
1412 self.logger.critical(
1413 logging_text + "Exit Exception {}".format(e), exc_info=True
1414 )
1415 exc = e
1416 finally:
1417 if exc and db_k8scluster:
1418 db_k8scluster_update["_admin.operationalState"] = "ERROR"
1419 db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(
1420 step, exc
1421 )
1422 # Mark the WIM 'create' HA task as erroneous
1423 operation_state = "FAILED"
1424 operation_details = "ERROR {}: {}".format(step, exc)
1425 else:
1426 operation_state = "COMPLETED"
1427 operation_details = "deleted"
1428
1429 try:
1430 if db_k8scluster_update:
1431 self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
1432 # Register the K8scluster 'delete' HA task either
1433 # succesful or erroneous, or do nothing (if legacy NBI)
1434 self.lcm_tasks.unlock_HA(
1435 "k8scluster",
1436 "delete",
1437 op_id,
1438 operationState=operation_state,
1439 detailed_status=operation_details,
1440 )
1441 except DbException as e:
1442 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1443 self.lcm_tasks.remove("k8scluster", k8scluster_id, order_id)
1444
1445
1446 class VcaLcm(LcmBase):
1447 timeout_create = 30
1448
1449 def __init__(self, msg, lcm_tasks, config):
1450 """
1451 Init, Connect to database, filesystem storage, and messaging
1452 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1453 :return: None
1454 """
1455
1456 self.logger = logging.getLogger("lcm.vca")
1457 self.lcm_tasks = lcm_tasks
1458
1459 super().__init__(msg, self.logger)
1460
1461 # create N2VC connector
1462 self.n2vc = N2VCJujuConnector(log=self.logger, fs=self.fs, db=self.db)
1463
1464 def _get_vca_by_id(self, vca_id: str) -> dict:
1465 db_vca = self.db.get_one("vca", {"_id": vca_id})
1466 self.db.encrypt_decrypt_fields(
1467 db_vca,
1468 "decrypt",
1469 ["secret", "cacert"],
1470 schema_version=db_vca["schema_version"],
1471 salt=db_vca["_id"],
1472 )
1473 return db_vca
1474
1475 async def _validate_vca(self, db_vca_id: str) -> None:
1476 task = asyncio.ensure_future(
1477 asyncio.wait_for(
1478 self.n2vc.validate_vca(db_vca_id),
1479 timeout=self.timeout_create,
1480 )
1481 )
1482 await asyncio.wait([task], return_when=asyncio.FIRST_COMPLETED)
1483 if task.exception():
1484 raise task.exception()
1485
1486 def _is_vca_config_update(self, update_options) -> bool:
1487 return any(
1488 word in update_options.keys()
1489 for word in [
1490 "cacert",
1491 "endpoints",
1492 "lxd-cloud",
1493 "lxd-credentials",
1494 "k8s-cloud",
1495 "k8s-credentials",
1496 "model-config",
1497 "user",
1498 "secret",
1499 ]
1500 )
1501
1502 async def create(self, vca_content, order_id):
1503 op_id = vca_content.pop("op_id", None)
1504 if not self.lcm_tasks.lock_HA("vca", "create", op_id):
1505 return
1506
1507 vca_id = vca_content["_id"]
1508 self.logger.debug("Task vca_create={} {}".format(vca_id, "Enter"))
1509
1510 db_vca_update = {}
1511
1512 operation_state = "FAILED"
1513 operation_details = ""
1514 try:
1515 self.logger.debug(
1516 "Task vca_create={} {}".format(vca_id, "Getting vca from db")
1517 )
1518 db_vca = self._get_vca_by_id(vca_id)
1519
1520 await self._validate_vca(db_vca["_id"])
1521 self.logger.debug(
1522 "Task vca_create={} {}".format(
1523 vca_id, "vca registered and validated successfully"
1524 )
1525 )
1526 db_vca_update["_admin.operationalState"] = "ENABLED"
1527 db_vca_update["_admin.detailed-status"] = "Connectivity: ok"
1528 operation_details = "VCA validated"
1529 operation_state = "COMPLETED"
1530
1531 self.logger.debug(
1532 "Task vca_create={} {}".format(
1533 vca_id, "Done. Result: {}".format(operation_state)
1534 )
1535 )
1536
1537 except Exception as e:
1538 error_msg = "Failed with exception: {}".format(e)
1539 self.logger.error("Task vca_create={} {}".format(vca_id, error_msg))
1540 db_vca_update["_admin.operationalState"] = "ERROR"
1541 db_vca_update["_admin.detailed-status"] = error_msg
1542 operation_details = error_msg
1543 finally:
1544 try:
1545 self.update_db_2("vca", vca_id, db_vca_update)
1546
1547 # Register the operation and unlock
1548 self.lcm_tasks.unlock_HA(
1549 "vca",
1550 "create",
1551 op_id,
1552 operationState=operation_state,
1553 detailed_status=operation_details,
1554 )
1555 except DbException as e:
1556 self.logger.error(
1557 "Task vca_create={} {}".format(
1558 vca_id, "Cannot update database: {}".format(e)
1559 )
1560 )
1561 self.lcm_tasks.remove("vca", vca_id, order_id)
1562
1563 async def edit(self, vca_content, order_id):
1564 op_id = vca_content.pop("op_id", None)
1565 if not self.lcm_tasks.lock_HA("vca", "edit", op_id):
1566 return
1567
1568 vca_id = vca_content["_id"]
1569 self.logger.debug("Task vca_edit={} {}".format(vca_id, "Enter"))
1570
1571 db_vca = None
1572 db_vca_update = {}
1573
1574 operation_state = "FAILED"
1575 operation_details = ""
1576 try:
1577 self.logger.debug(
1578 "Task vca_edit={} {}".format(vca_id, "Getting vca from db")
1579 )
1580 db_vca = self._get_vca_by_id(vca_id)
1581 if self._is_vca_config_update(vca_content):
1582 await self._validate_vca(db_vca["_id"])
1583 self.logger.debug(
1584 "Task vca_edit={} {}".format(
1585 vca_id, "vca registered and validated successfully"
1586 )
1587 )
1588 db_vca_update["_admin.operationalState"] = "ENABLED"
1589 db_vca_update["_admin.detailed-status"] = "Connectivity: ok"
1590
1591 operation_details = "Edited"
1592 operation_state = "COMPLETED"
1593
1594 self.logger.debug(
1595 "Task vca_edit={} {}".format(
1596 vca_id, "Done. Result: {}".format(operation_state)
1597 )
1598 )
1599
1600 except Exception as e:
1601 error_msg = "Failed with exception: {}".format(e)
1602 self.logger.error("Task vca_edit={} {}".format(vca_id, error_msg))
1603 db_vca_update["_admin.operationalState"] = "ERROR"
1604 db_vca_update["_admin.detailed-status"] = error_msg
1605 operation_state = "FAILED"
1606 operation_details = error_msg
1607 finally:
1608 try:
1609 self.update_db_2("vca", vca_id, db_vca_update)
1610
1611 # Register the operation and unlock
1612 self.lcm_tasks.unlock_HA(
1613 "vca",
1614 "edit",
1615 op_id,
1616 operationState=operation_state,
1617 detailed_status=operation_details,
1618 )
1619 except DbException as e:
1620 self.logger.error(
1621 "Task vca_edit={} {}".format(
1622 vca_id, "Cannot update database: {}".format(e)
1623 )
1624 )
1625 self.lcm_tasks.remove("vca", vca_id, order_id)
1626
1627 async def delete(self, vca_content, order_id):
1628 # HA tasks and backward compatibility:
1629 # If "vim_content" does not include "op_id", we a running a legacy NBI version.
1630 # In such a case, HA is not supported by NBI, "op_id" is None, and lock_HA() will do nothing.
1631 # Register "delete" task here for related future HA operations
1632 op_id = vca_content.pop("op_id", None)
1633 if not self.lcm_tasks.lock_HA("vca", "delete", op_id):
1634 return
1635
1636 db_vca_update = {}
1637 vca_id = vca_content["_id"]
1638
1639 operation_state = "FAILED"
1640 operation_details = ""
1641
1642 try:
1643 self.logger.debug(
1644 "Task vca_delete={} {}".format(vca_id, "Deleting vca from db")
1645 )
1646 self.db.del_one("vca", {"_id": vca_id})
1647 db_vca_update = None
1648 operation_details = "deleted"
1649 operation_state = "COMPLETED"
1650
1651 self.logger.debug(
1652 "Task vca_delete={} {}".format(
1653 vca_id, "Done. Result: {}".format(operation_state)
1654 )
1655 )
1656 except Exception as e:
1657 error_msg = "Failed with exception: {}".format(e)
1658 self.logger.error("Task vca_delete={} {}".format(vca_id, error_msg))
1659 db_vca_update["_admin.operationalState"] = "ERROR"
1660 db_vca_update["_admin.detailed-status"] = error_msg
1661 operation_details = error_msg
1662 finally:
1663 try:
1664 self.update_db_2("vca", vca_id, db_vca_update)
1665 self.lcm_tasks.unlock_HA(
1666 "vca",
1667 "delete",
1668 op_id,
1669 operationState=operation_state,
1670 detailed_status=operation_details,
1671 )
1672 except DbException as e:
1673 self.logger.error(
1674 "Task vca_delete={} {}".format(
1675 vca_id, "Cannot update database: {}".format(e)
1676 )
1677 )
1678 self.lcm_tasks.remove("vca", vca_id, order_id)
1679
1680
1681 class K8sRepoLcm(LcmBase):
1682 def __init__(self, msg, lcm_tasks, config):
1683 """
1684 Init, Connect to database, filesystem storage, and messaging
1685 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1686 :return: None
1687 """
1688
1689 self.logger = logging.getLogger("lcm.k8srepo")
1690 self.lcm_tasks = lcm_tasks
1691 self.vca_config = config["VCA"]
1692
1693 super().__init__(msg, self.logger)
1694
1695 self.k8srepo = K8sHelmConnector(
1696 kubectl_command=self.vca_config.get("kubectlpath"),
1697 helm_command=self.vca_config.get("helmpath"),
1698 fs=self.fs,
1699 log=self.logger,
1700 db=self.db,
1701 on_update_db=None,
1702 )
1703
1704 async def create(self, k8srepo_content, order_id):
1705 # HA tasks and backward compatibility:
1706 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1707 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1708 # Register 'create' task here for related future HA operations
1709
1710 op_id = k8srepo_content.pop("op_id", None)
1711 if not self.lcm_tasks.lock_HA("k8srepo", "create", op_id):
1712 return
1713
1714 k8srepo_id = k8srepo_content.get("_id")
1715 logging_text = "Task k8srepo_create={} ".format(k8srepo_id)
1716 self.logger.debug(logging_text + "Enter")
1717
1718 db_k8srepo = None
1719 db_k8srepo_update = {}
1720 exc = None
1721 operation_state = "COMPLETED"
1722 operation_details = ""
1723 try:
1724 step = "Getting k8srepo-id='{}' from db".format(k8srepo_id)
1725 self.logger.debug(logging_text + step)
1726 db_k8srepo = self.db.get_one("k8srepos", {"_id": k8srepo_id})
1727 db_k8srepo_update["_admin.operationalState"] = "ENABLED"
1728 except Exception as e:
1729 self.logger.error(
1730 logging_text + "Exit Exception {}".format(e),
1731 exc_info=not isinstance(
1732 e,
1733 (
1734 LcmException,
1735 DbException,
1736 K8sException,
1737 N2VCException,
1738 asyncio.CancelledError,
1739 ),
1740 ),
1741 )
1742 exc = e
1743 finally:
1744 if exc and db_k8srepo:
1745 db_k8srepo_update["_admin.operationalState"] = "ERROR"
1746 db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(
1747 step, exc
1748 )
1749 # Mark the WIM 'create' HA task as erroneous
1750 operation_state = "FAILED"
1751 operation_details = "ERROR {}: {}".format(step, exc)
1752 try:
1753 if db_k8srepo_update:
1754 self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update)
1755 # Register the K8srepo 'create' HA task either
1756 # succesful or erroneous, or do nothing (if legacy NBI)
1757 self.lcm_tasks.unlock_HA(
1758 "k8srepo",
1759 "create",
1760 op_id,
1761 operationState=operation_state,
1762 detailed_status=operation_details,
1763 )
1764 except DbException as e:
1765 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1766 self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id)
1767
1768 async def delete(self, k8srepo_content, order_id):
1769 # HA tasks and backward compatibility:
1770 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1771 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1772 # Register 'delete' task here for related future HA operations
1773 op_id = k8srepo_content.pop("op_id", None)
1774 if not self.lcm_tasks.lock_HA("k8srepo", "delete", op_id):
1775 return
1776
1777 k8srepo_id = k8srepo_content.get("_id")
1778 logging_text = "Task k8srepo_delete={} ".format(k8srepo_id)
1779 self.logger.debug(logging_text + "Enter")
1780
1781 db_k8srepo = None
1782 db_k8srepo_update = {}
1783
1784 exc = None
1785 operation_state = "COMPLETED"
1786 operation_details = ""
1787 try:
1788 step = "Getting k8srepo-id='{}' from db".format(k8srepo_id)
1789 self.logger.debug(logging_text + step)
1790 db_k8srepo = self.db.get_one("k8srepos", {"_id": k8srepo_id})
1791
1792 except Exception as e:
1793 self.logger.error(
1794 logging_text + "Exit Exception {}".format(e),
1795 exc_info=not isinstance(
1796 e,
1797 (
1798 LcmException,
1799 DbException,
1800 K8sException,
1801 N2VCException,
1802 asyncio.CancelledError,
1803 ),
1804 ),
1805 )
1806 exc = e
1807 finally:
1808 if exc and db_k8srepo:
1809 db_k8srepo_update["_admin.operationalState"] = "ERROR"
1810 db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(
1811 step, exc
1812 )
1813 # Mark the WIM 'create' HA task as erroneous
1814 operation_state = "FAILED"
1815 operation_details = "ERROR {}: {}".format(step, exc)
1816 try:
1817 if db_k8srepo_update:
1818 self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update)
1819 # Register the K8srepo 'delete' HA task either
1820 # succesful or erroneous, or do nothing (if legacy NBI)
1821 self.lcm_tasks.unlock_HA(
1822 "k8srepo",
1823 "delete",
1824 op_id,
1825 operationState=operation_state,
1826 detailed_status=operation_details,
1827 )
1828 self.db.del_one("k8srepos", {"_id": k8srepo_id})
1829 except DbException as e:
1830 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1831 self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id)