workaround for bug 951. Allow k8scluster with only helm or juju
[osm/LCM.git] / osm_lcm / vim_sdn.py
1 # -*- coding: utf-8 -*-
2
3 ##
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 ##
18
19 import asyncio
20 import yaml
21 import logging
22 import logging.handlers
23 from osm_lcm import ROclient
24 from osm_lcm.lcm_utils import LcmException, LcmBase, deep_get
25 from n2vc.k8s_helm_conn import K8sHelmConnector
26 from n2vc.k8s_juju_conn import K8sJujuConnector
27 from n2vc.exceptions import K8sException, N2VCException
28 from osm_common.dbbase import DbException
29 from copy import deepcopy
30
31 __author__ = "Alfonso Tierno"
32
33
34 class VimLcm(LcmBase):
35 # values that are encrypted at vim config because they are passwords
36 vim_config_encrypted = {"1.1": ("admin_password", "nsx_password", "vcenter_password"),
37 "default": ("admin_password", "nsx_password", "vcenter_password", "vrops_password")}
38
39 def __init__(self, db, msg, fs, lcm_tasks, ro_config, loop):
40 """
41 Init, Connect to database, filesystem storage, and messaging
42 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
43 :return: None
44 """
45
46 self.logger = logging.getLogger('lcm.vim')
47 self.loop = loop
48 self.lcm_tasks = lcm_tasks
49 self.ro_config = ro_config
50
51 super().__init__(db, msg, fs, self.logger)
52
53 async def create(self, vim_content, order_id):
54
55 # HA tasks and backward compatibility:
56 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
57 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
58 # Register 'create' task here for related future HA operations
59 op_id = vim_content.pop('op_id', None)
60 if not self.lcm_tasks.lock_HA('vim', 'create', op_id):
61 return
62
63 vim_id = vim_content["_id"]
64 vim_content.pop("op_id", None)
65 logging_text = "Task vim_create={} ".format(vim_id)
66 self.logger.debug(logging_text + "Enter")
67
68 db_vim = None
69 db_vim_update = {}
70 exc = None
71 RO_sdn_id = None
72 operationState_HA = ''
73 detailed_status_HA = ''
74 try:
75 step = "Getting vim-id='{}' from db".format(vim_id)
76 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
77 if vim_content.get("config") and vim_content["config"].get("sdn-controller"):
78 step = "Getting sdn-controller-id='{}' from db".format(vim_content["config"]["sdn-controller"])
79 db_sdn = self.db.get_one("sdns", {"_id": vim_content["config"]["sdn-controller"]})
80
81 # If the VIM account has an associated SDN account, also
82 # wait for any previous tasks in process for the SDN
83 await self.lcm_tasks.waitfor_related_HA('sdn', 'ANY', db_sdn["_id"])
84
85 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
86 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
87 else:
88 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
89 vim_content["config"]["sdn-controller"]))
90
91 step = "Creating vim at RO"
92 db_vim_update["_admin.deployed.RO"] = None
93 db_vim_update["_admin.detailed-status"] = step
94 self.update_db_2("vim_accounts", vim_id, db_vim_update)
95 RO = ROclient.ROClient(self.loop, **self.ro_config)
96 vim_RO = deepcopy(vim_content)
97 vim_RO.pop("_id", None)
98 vim_RO.pop("_admin", None)
99 schema_version = vim_RO.pop("schema_version", None)
100 vim_RO.pop("schema_type", None)
101 vim_RO.pop("vim_tenant_name", None)
102 vim_RO["type"] = vim_RO.pop("vim_type")
103 vim_RO.pop("vim_user", None)
104 vim_RO.pop("vim_password", None)
105 if RO_sdn_id:
106 vim_RO["config"]["sdn-controller"] = RO_sdn_id
107 desc = await RO.create("vim", descriptor=vim_RO)
108 RO_vim_id = desc["uuid"]
109 db_vim_update["_admin.deployed.RO"] = RO_vim_id
110 self.logger.debug(logging_text + "VIM created at RO_vim_id={}".format(RO_vim_id))
111
112 step = "Creating vim_account at RO"
113 db_vim_update["_admin.detailed-status"] = step
114 self.update_db_2("vim_accounts", vim_id, db_vim_update)
115
116 if vim_content.get("vim_password"):
117 vim_content["vim_password"] = self.db.decrypt(vim_content["vim_password"],
118 schema_version=schema_version,
119 salt=vim_id)
120 vim_account_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
121 "vim_username": vim_content["vim_user"],
122 "vim_password": vim_content["vim_password"]
123 }
124 if vim_RO.get("config"):
125 vim_account_RO["config"] = vim_RO["config"]
126 if "sdn-controller" in vim_account_RO["config"]:
127 del vim_account_RO["config"]["sdn-controller"]
128 if "sdn-port-mapping" in vim_account_RO["config"]:
129 del vim_account_RO["config"]["sdn-port-mapping"]
130 vim_config_encrypted_keys = self.vim_config_encrypted.get(schema_version) or \
131 self.vim_config_encrypted.get("default")
132 for p in vim_config_encrypted_keys:
133 if vim_account_RO["config"].get(p):
134 vim_account_RO["config"][p] = self.db.decrypt(vim_account_RO["config"][p],
135 schema_version=schema_version,
136 salt=vim_id)
137
138 desc = await RO.attach("vim_account", RO_vim_id, descriptor=vim_account_RO)
139 db_vim_update["_admin.deployed.RO-account"] = desc["uuid"]
140 db_vim_update["_admin.operationalState"] = "ENABLED"
141 db_vim_update["_admin.detailed-status"] = "Done"
142 # Mark the VIM 'create' HA task as successful
143 operationState_HA = 'COMPLETED'
144 detailed_status_HA = 'Done'
145
146 # await asyncio.sleep(15) # TODO remove. This is for test
147 self.logger.debug(logging_text + "Exit Ok VIM account created at RO_vim_account_id={}".format(desc["uuid"]))
148 return
149
150 except (ROclient.ROClientException, DbException) as e:
151 self.logger.error(logging_text + "Exit Exception {}".format(e))
152 exc = e
153 except Exception as e:
154 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
155 exc = e
156 finally:
157 if exc and db_vim:
158 db_vim_update["_admin.operationalState"] = "ERROR"
159 db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
160 # Mark the VIM 'create' HA task as erroneous
161 operationState_HA = 'FAILED'
162 detailed_status_HA = "ERROR {}: {}".format(step, exc)
163 try:
164 if db_vim_update:
165 self.update_db_2("vim_accounts", vim_id, db_vim_update)
166 # Register the VIM 'create' HA task either
167 # succesful or erroneous, or do nothing (if legacy NBI)
168 self.lcm_tasks.register_HA('vim', 'create', op_id,
169 operationState=operationState_HA,
170 detailed_status=detailed_status_HA)
171 except DbException as e:
172 self.logger.error(logging_text + "Cannot update database: {}".format(e))
173
174 self.lcm_tasks.remove("vim_account", vim_id, order_id)
175
176 async def edit(self, vim_content, order_id):
177
178 # HA tasks and backward compatibility:
179 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
180 # In such a case, HA is not supported by NBI, and the HA check always returns True
181 op_id = vim_content.pop('op_id', None)
182 if not self.lcm_tasks.lock_HA('vim', 'edit', op_id):
183 return
184
185 vim_id = vim_content["_id"]
186 vim_content.pop("op_id", None)
187 logging_text = "Task vim_edit={} ".format(vim_id)
188 self.logger.debug(logging_text + "Enter")
189
190 db_vim = None
191 exc = None
192 RO_sdn_id = None
193 RO_vim_id = None
194 db_vim_update = {}
195 operationState_HA = ''
196 detailed_status_HA = ''
197 step = "Getting vim-id='{}' from db".format(vim_id)
198 try:
199 # wait for any previous tasks in process
200 await self.lcm_tasks.waitfor_related_HA('vim', 'edit', op_id)
201
202 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
203
204 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
205 if vim_content.get("config") and vim_content["config"].get("sdn-controller"):
206 step = "Getting sdn-controller-id='{}' from db".format(vim_content["config"]["sdn-controller"])
207 db_sdn = self.db.get_one("sdns", {"_id": vim_content["config"]["sdn-controller"]})
208
209 # If the VIM account has an associated SDN account, also
210 # wait for any previous tasks in process for the SDN
211 await self.lcm_tasks.waitfor_related_HA('sdn', 'ANY', db_sdn["_id"])
212
213 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get(
214 "RO"):
215 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
216 else:
217 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
218 vim_content["config"]["sdn-controller"]))
219
220 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
221 step = "Editing vim at RO"
222 RO = ROclient.ROClient(self.loop, **self.ro_config)
223 vim_RO = deepcopy(vim_content)
224 vim_RO.pop("_id", None)
225 vim_RO.pop("_admin", None)
226 schema_version = vim_RO.pop("schema_version", None)
227 vim_RO.pop("schema_type", None)
228 vim_RO.pop("vim_tenant_name", None)
229 if "vim_type" in vim_RO:
230 vim_RO["type"] = vim_RO.pop("vim_type")
231 vim_RO.pop("vim_user", None)
232 vim_RO.pop("vim_password", None)
233 if RO_sdn_id:
234 vim_RO["config"]["sdn-controller"] = RO_sdn_id
235 # TODO make a deep update of sdn-port-mapping
236 if vim_RO:
237 await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
238
239 step = "Editing vim-account at RO tenant"
240 vim_account_RO = {}
241 if "config" in vim_content:
242 if "sdn-controller" in vim_content["config"]:
243 del vim_content["config"]["sdn-controller"]
244 if "sdn-port-mapping" in vim_content["config"]:
245 del vim_content["config"]["sdn-port-mapping"]
246 if not vim_content["config"]:
247 del vim_content["config"]
248 if "vim_tenant_name" in vim_content:
249 vim_account_RO["vim_tenant_name"] = vim_content["vim_tenant_name"]
250 if "vim_password" in vim_content:
251 vim_account_RO["vim_password"] = vim_content["vim_password"]
252 if vim_content.get("vim_password"):
253 vim_account_RO["vim_password"] = self.db.decrypt(vim_content["vim_password"],
254 schema_version=schema_version,
255 salt=vim_id)
256 if "config" in vim_content:
257 vim_account_RO["config"] = vim_content["config"]
258 if vim_content.get("config"):
259 vim_config_encrypted_keys = self.vim_config_encrypted.get(schema_version) or \
260 self.vim_config_encrypted.get("default")
261 for p in vim_config_encrypted_keys:
262 if vim_content["config"].get(p):
263 vim_account_RO["config"][p] = self.db.decrypt(vim_content["config"][p],
264 schema_version=schema_version,
265 salt=vim_id)
266
267 if "vim_user" in vim_content:
268 vim_content["vim_username"] = vim_content["vim_user"]
269 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
270 # vim_thread. RO will remove and relaunch a new thread for this vim_account
271 await RO.edit("vim_account", RO_vim_id, descriptor=vim_account_RO)
272 db_vim_update["_admin.operationalState"] = "ENABLED"
273 # Mark the VIM 'edit' HA task as successful
274 operationState_HA = 'COMPLETED'
275 detailed_status_HA = 'Done'
276
277 self.logger.debug(logging_text + "Exit Ok RO_vim_id={}".format(RO_vim_id))
278 return
279
280 except (ROclient.ROClientException, DbException) as e:
281 self.logger.error(logging_text + "Exit Exception {}".format(e))
282 exc = e
283 except Exception as e:
284 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
285 exc = e
286 finally:
287 if exc and db_vim:
288 db_vim_update["_admin.operationalState"] = "ERROR"
289 db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
290 # Mark the VIM 'edit' HA task as erroneous
291 operationState_HA = 'FAILED'
292 detailed_status_HA = "ERROR {}: {}".format(step, exc)
293 try:
294 if db_vim_update:
295 self.update_db_2("vim_accounts", vim_id, db_vim_update)
296 # Register the VIM 'edit' HA task either
297 # succesful or erroneous, or do nothing (if legacy NBI)
298 self.lcm_tasks.register_HA('vim', 'edit', op_id,
299 operationState=operationState_HA,
300 detailed_status=detailed_status_HA)
301 except DbException as e:
302 self.logger.error(logging_text + "Cannot update database: {}".format(e))
303
304 self.lcm_tasks.remove("vim_account", vim_id, order_id)
305
306 async def delete(self, vim_content, order_id):
307
308 # HA tasks and backward compatibility:
309 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
310 # In such a case, HA is not supported by NBI, and the HA check always returns True
311 op_id = vim_content.pop('op_id', None)
312 if not self.lcm_tasks.lock_HA('vim', 'delete', op_id):
313 return
314
315 vim_id = vim_content["_id"]
316 logging_text = "Task vim_delete={} ".format(vim_id)
317 self.logger.debug(logging_text + "Enter")
318
319 db_vim = None
320 db_vim_update = {}
321 exc = None
322 operationState_HA = ''
323 detailed_status_HA = ''
324 step = "Getting vim from db"
325 try:
326 # wait for any previous tasks in process
327 await self.lcm_tasks.waitfor_related_HA('vim', 'delete', op_id)
328
329 db_vim = self.db.get_one("vim_accounts", {"_id": vim_id})
330 if db_vim.get("_admin") and db_vim["_admin"].get("deployed") and db_vim["_admin"]["deployed"].get("RO"):
331 RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
332 RO = ROclient.ROClient(self.loop, **self.ro_config)
333 step = "Detaching vim from RO tenant"
334 try:
335 await RO.detach("vim_account", RO_vim_id)
336 except ROclient.ROClientException as e:
337 if e.http_code == 404: # not found
338 self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
339 else:
340 raise
341
342 step = "Deleting vim from RO"
343 try:
344 await RO.delete("vim", RO_vim_id)
345 except ROclient.ROClientException as e:
346 if e.http_code == 404: # not found
347 self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
348 else:
349 raise
350 else:
351 # nothing to delete
352 self.logger.error(logging_text + "Nothing to remove at RO")
353 self.db.del_one("vim_accounts", {"_id": vim_id})
354 db_vim = None
355 self.logger.debug(logging_text + "Exit Ok")
356 return
357
358 except (ROclient.ROClientException, DbException) as e:
359 self.logger.error(logging_text + "Exit Exception {}".format(e))
360 exc = e
361 except Exception as e:
362 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
363 exc = e
364 finally:
365 self.lcm_tasks.remove("vim_account", vim_id, order_id)
366 if exc and db_vim:
367 db_vim_update["_admin.operationalState"] = "ERROR"
368 db_vim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
369 # Mark the VIM 'delete' HA task as erroneous
370 operationState_HA = 'FAILED'
371 detailed_status_HA = "ERROR {}: {}".format(step, exc)
372 self.lcm_tasks.register_HA('vim', 'delete', op_id,
373 operationState=operationState_HA,
374 detailed_status=detailed_status_HA)
375 try:
376 if db_vim and db_vim_update:
377 self.update_db_2("vim_accounts", vim_id, db_vim_update)
378 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
379 # which means that there is nowhere to register this task, so do nothing here.
380 except DbException as e:
381 self.logger.error(logging_text + "Cannot update database: {}".format(e))
382 self.lcm_tasks.remove("vim_account", vim_id, order_id)
383
384
385 class WimLcm(LcmBase):
386 # values that are encrypted at wim config because they are passwords
387 wim_config_encrypted = ()
388
389 def __init__(self, db, msg, fs, lcm_tasks, ro_config, loop):
390 """
391 Init, Connect to database, filesystem storage, and messaging
392 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
393 :return: None
394 """
395
396 self.logger = logging.getLogger('lcm.vim')
397 self.loop = loop
398 self.lcm_tasks = lcm_tasks
399 self.ro_config = ro_config
400
401 super().__init__(db, msg, fs, self.logger)
402
403 async def create(self, wim_content, order_id):
404
405 # HA tasks and backward compatibility:
406 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
407 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
408 # Register 'create' task here for related future HA operations
409 op_id = wim_content.pop('op_id', None)
410 self.lcm_tasks.lock_HA('wim', 'create', op_id)
411
412 wim_id = wim_content["_id"]
413 wim_content.pop("op_id", None)
414 logging_text = "Task wim_create={} ".format(wim_id)
415 self.logger.debug(logging_text + "Enter")
416
417 db_wim = None
418 db_wim_update = {}
419 exc = None
420 operationState_HA = ''
421 detailed_status_HA = ''
422 try:
423 step = "Getting wim-id='{}' from db".format(wim_id)
424 db_wim = self.db.get_one("wim_accounts", {"_id": wim_id})
425 db_wim_update["_admin.deployed.RO"] = None
426
427 step = "Creating wim at RO"
428 db_wim_update["_admin.detailed-status"] = step
429 self.update_db_2("wim_accounts", wim_id, db_wim_update)
430 RO = ROclient.ROClient(self.loop, **self.ro_config)
431 wim_RO = deepcopy(wim_content)
432 wim_RO.pop("_id", None)
433 wim_RO.pop("_admin", None)
434 schema_version = wim_RO.pop("schema_version", None)
435 wim_RO.pop("schema_type", None)
436 wim_RO.pop("wim_tenant_name", None)
437 wim_RO["type"] = wim_RO.pop("wim_type")
438 wim_RO.pop("wim_user", None)
439 wim_RO.pop("wim_password", None)
440 desc = await RO.create("wim", descriptor=wim_RO)
441 RO_wim_id = desc["uuid"]
442 db_wim_update["_admin.deployed.RO"] = RO_wim_id
443 self.logger.debug(logging_text + "WIM created at RO_wim_id={}".format(RO_wim_id))
444
445 step = "Creating wim_account at RO"
446 db_wim_update["_admin.detailed-status"] = step
447 self.update_db_2("wim_accounts", wim_id, db_wim_update)
448
449 if wim_content.get("wim_password"):
450 wim_content["wim_password"] = self.db.decrypt(wim_content["wim_password"],
451 schema_version=schema_version,
452 salt=wim_id)
453 wim_account_RO = {"name": wim_content["name"],
454 "user": wim_content["user"],
455 "password": wim_content["password"]
456 }
457 if wim_RO.get("config"):
458 wim_account_RO["config"] = wim_RO["config"]
459 if "wim_port_mapping" in wim_account_RO["config"]:
460 del wim_account_RO["config"]["wim_port_mapping"]
461 for p in self.wim_config_encrypted:
462 if wim_account_RO["config"].get(p):
463 wim_account_RO["config"][p] = self.db.decrypt(wim_account_RO["config"][p],
464 schema_version=schema_version,
465 salt=wim_id)
466
467 desc = await RO.attach("wim_account", RO_wim_id, descriptor=wim_account_RO)
468 db_wim_update["_admin.deployed.RO-account"] = desc["uuid"]
469 db_wim_update["_admin.operationalState"] = "ENABLED"
470 db_wim_update["_admin.detailed-status"] = "Done"
471 # Mark the WIM 'create' HA task as successful
472 operationState_HA = 'COMPLETED'
473 detailed_status_HA = 'Done'
474
475 self.logger.debug(logging_text + "Exit Ok WIM account created at RO_wim_account_id={}".format(desc["uuid"]))
476 return
477
478 except (ROclient.ROClientException, DbException) as e:
479 self.logger.error(logging_text + "Exit Exception {}".format(e))
480 exc = e
481 except Exception as e:
482 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
483 exc = e
484 finally:
485 if exc and db_wim:
486 db_wim_update["_admin.operationalState"] = "ERROR"
487 db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
488 # Mark the WIM 'create' HA task as erroneous
489 operationState_HA = 'FAILED'
490 detailed_status_HA = "ERROR {}: {}".format(step, exc)
491 try:
492 if db_wim_update:
493 self.update_db_2("wim_accounts", wim_id, db_wim_update)
494 # Register the WIM 'create' HA task either
495 # succesful or erroneous, or do nothing (if legacy NBI)
496 self.lcm_tasks.register_HA('wim', 'create', op_id,
497 operationState=operationState_HA,
498 detailed_status=detailed_status_HA)
499 except DbException as e:
500 self.logger.error(logging_text + "Cannot update database: {}".format(e))
501 self.lcm_tasks.remove("wim_account", wim_id, order_id)
502
503 async def edit(self, wim_content, order_id):
504
505 # HA tasks and backward compatibility:
506 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
507 # In such a case, HA is not supported by NBI, and the HA check always returns True
508 op_id = wim_content.pop('op_id', None)
509 if not self.lcm_tasks.lock_HA('wim', 'edit', op_id):
510 return
511
512 wim_id = wim_content["_id"]
513 wim_content.pop("op_id", None)
514 logging_text = "Task wim_edit={} ".format(wim_id)
515 self.logger.debug(logging_text + "Enter")
516
517 db_wim = None
518 exc = None
519 RO_wim_id = None
520 db_wim_update = {}
521 step = "Getting wim-id='{}' from db".format(wim_id)
522 operationState_HA = ''
523 detailed_status_HA = ''
524 try:
525 # wait for any previous tasks in process
526 await self.lcm_tasks.waitfor_related_HA('wim', 'edit', op_id)
527
528 db_wim = self.db.get_one("wim_accounts", {"_id": wim_id})
529
530 if db_wim.get("_admin") and db_wim["_admin"].get("deployed") and db_wim["_admin"]["deployed"].get("RO"):
531
532 RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
533 step = "Editing wim at RO"
534 RO = ROclient.ROClient(self.loop, **self.ro_config)
535 wim_RO = deepcopy(wim_content)
536 wim_RO.pop("_id", None)
537 wim_RO.pop("_admin", None)
538 schema_version = wim_RO.pop("schema_version", None)
539 wim_RO.pop("schema_type", None)
540 wim_RO.pop("wim_tenant_name", None)
541 if "wim_type" in wim_RO:
542 wim_RO["type"] = wim_RO.pop("wim_type")
543 wim_RO.pop("wim_user", None)
544 wim_RO.pop("wim_password", None)
545 # TODO make a deep update of wim_port_mapping
546 if wim_RO:
547 await RO.edit("wim", RO_wim_id, descriptor=wim_RO)
548
549 step = "Editing wim-account at RO tenant"
550 wim_account_RO = {}
551 if "config" in wim_content:
552 if "wim_port_mapping" in wim_content["config"]:
553 del wim_content["config"]["wim_port_mapping"]
554 if not wim_content["config"]:
555 del wim_content["config"]
556 if "wim_tenant_name" in wim_content:
557 wim_account_RO["wim_tenant_name"] = wim_content["wim_tenant_name"]
558 if "wim_password" in wim_content:
559 wim_account_RO["wim_password"] = wim_content["wim_password"]
560 if wim_content.get("wim_password"):
561 wim_account_RO["wim_password"] = self.db.decrypt(wim_content["wim_password"],
562 schema_version=schema_version,
563 salt=wim_id)
564 if "config" in wim_content:
565 wim_account_RO["config"] = wim_content["config"]
566 if wim_content.get("config"):
567 for p in self.wim_config_encrypted:
568 if wim_content["config"].get(p):
569 wim_account_RO["config"][p] = self.db.decrypt(wim_content["config"][p],
570 schema_version=schema_version,
571 salt=wim_id)
572
573 if "wim_user" in wim_content:
574 wim_content["wim_username"] = wim_content["wim_user"]
575 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
576 # wim_thread. RO will remove and relaunch a new thread for this wim_account
577 await RO.edit("wim_account", RO_wim_id, descriptor=wim_account_RO)
578 db_wim_update["_admin.operationalState"] = "ENABLED"
579 # Mark the WIM 'edit' HA task as successful
580 operationState_HA = 'COMPLETED'
581 detailed_status_HA = 'Done'
582
583 self.logger.debug(logging_text + "Exit Ok RO_wim_id={}".format(RO_wim_id))
584 return
585
586 except (ROclient.ROClientException, DbException) as e:
587 self.logger.error(logging_text + "Exit Exception {}".format(e))
588 exc = e
589 except Exception as e:
590 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
591 exc = e
592 finally:
593 if exc and db_wim:
594 db_wim_update["_admin.operationalState"] = "ERROR"
595 db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
596 # Mark the WIM 'edit' HA task as erroneous
597 operationState_HA = 'FAILED'
598 detailed_status_HA = "ERROR {}: {}".format(step, exc)
599 try:
600 if db_wim_update:
601 self.update_db_2("wim_accounts", wim_id, db_wim_update)
602 # Register the WIM 'edit' HA task either
603 # succesful or erroneous, or do nothing (if legacy NBI)
604 self.lcm_tasks.register_HA('wim', 'edit', op_id,
605 operationState=operationState_HA,
606 detailed_status=detailed_status_HA)
607 except DbException as e:
608 self.logger.error(logging_text + "Cannot update database: {}".format(e))
609 self.lcm_tasks.remove("wim_account", wim_id, order_id)
610
611 async def delete(self, wim_content, order_id):
612
613 # HA tasks and backward compatibility:
614 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
615 # In such a case, HA is not supported by NBI, and the HA check always returns True
616 op_id = wim_content.pop('op_id', None)
617 if not self.lcm_tasks.lock_HA('wim', 'delete', op_id):
618 return
619
620 wim_id = wim_content["_id"]
621 logging_text = "Task wim_delete={} ".format(wim_id)
622 self.logger.debug(logging_text + "Enter")
623
624 db_wim = None
625 db_wim_update = {}
626 exc = None
627 step = "Getting wim from db"
628 operationState_HA = ''
629 detailed_status_HA = ''
630 try:
631 # wait for any previous tasks in process
632 await self.lcm_tasks.waitfor_related_HA('wim', 'delete', op_id)
633
634 db_wim = self.db.get_one("wim_accounts", {"_id": wim_id})
635 if db_wim.get("_admin") and db_wim["_admin"].get("deployed") and db_wim["_admin"]["deployed"].get("RO"):
636 RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
637 RO = ROclient.ROClient(self.loop, **self.ro_config)
638 step = "Detaching wim from RO tenant"
639 try:
640 await RO.detach("wim_account", RO_wim_id)
641 except ROclient.ROClientException as e:
642 if e.http_code == 404: # not found
643 self.logger.debug(logging_text + "RO_wim_id={} already detached".format(RO_wim_id))
644 else:
645 raise
646
647 step = "Deleting wim from RO"
648 try:
649 await RO.delete("wim", RO_wim_id)
650 except ROclient.ROClientException as e:
651 if e.http_code == 404: # not found
652 self.logger.debug(logging_text + "RO_wim_id={} already deleted".format(RO_wim_id))
653 else:
654 raise
655 else:
656 # nothing to delete
657 self.logger.error(logging_text + "Nohing to remove at RO")
658 self.db.del_one("wim_accounts", {"_id": wim_id})
659 db_wim = None
660 self.logger.debug(logging_text + "Exit Ok")
661 return
662
663 except (ROclient.ROClientException, DbException) as e:
664 self.logger.error(logging_text + "Exit Exception {}".format(e))
665 exc = e
666 except Exception as e:
667 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
668 exc = e
669 finally:
670 self.lcm_tasks.remove("wim_account", wim_id, order_id)
671 if exc and db_wim:
672 db_wim_update["_admin.operationalState"] = "ERROR"
673 db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
674 # Mark the WIM 'delete' HA task as erroneous
675 operationState_HA = 'FAILED'
676 detailed_status_HA = "ERROR {}: {}".format(step, exc)
677 self.lcm_tasks.register_HA('wim', 'delete', op_id,
678 operationState=operationState_HA,
679 detailed_status=detailed_status_HA)
680 try:
681 if db_wim and db_wim_update:
682 self.update_db_2("wim_accounts", wim_id, db_wim_update)
683 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
684 # which means that there is nowhere to register this task, so do nothing here.
685 except DbException as e:
686 self.logger.error(logging_text + "Cannot update database: {}".format(e))
687 self.lcm_tasks.remove("wim_account", wim_id, order_id)
688
689
690 class SdnLcm(LcmBase):
691
692 def __init__(self, db, msg, fs, lcm_tasks, ro_config, loop):
693 """
694 Init, Connect to database, filesystem storage, and messaging
695 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
696 :return: None
697 """
698
699 self.logger = logging.getLogger('lcm.sdn')
700 self.loop = loop
701 self.lcm_tasks = lcm_tasks
702 self.ro_config = ro_config
703
704 super().__init__(db, msg, fs, self.logger)
705
706 async def create(self, sdn_content, order_id):
707
708 # HA tasks and backward compatibility:
709 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
710 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
711 # Register 'create' task here for related future HA operations
712 op_id = sdn_content.pop('op_id', None)
713 self.lcm_tasks.lock_HA('sdn', 'create', op_id)
714
715 sdn_id = sdn_content["_id"]
716 sdn_content.pop("op_id", None)
717 logging_text = "Task sdn_create={} ".format(sdn_id)
718 self.logger.debug(logging_text + "Enter")
719
720 db_sdn = None
721 db_sdn_update = {}
722 RO_sdn_id = None
723 exc = None
724 operationState_HA = ''
725 detailed_status_HA = ''
726 try:
727 step = "Getting sdn from db"
728 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
729 db_sdn_update["_admin.deployed.RO"] = None
730
731 step = "Creating sdn at RO"
732 db_sdn_update["_admin.detailed-status"] = step
733 self.update_db_2("sdns", sdn_id, db_sdn_update)
734
735 RO = ROclient.ROClient(self.loop, **self.ro_config)
736 sdn_RO = deepcopy(sdn_content)
737 sdn_RO.pop("_id", None)
738 sdn_RO.pop("_admin", None)
739 schema_version = sdn_RO.pop("schema_version", None)
740 sdn_RO.pop("schema_type", None)
741 sdn_RO.pop("description", None)
742 if sdn_RO.get("password"):
743 sdn_RO["password"] = self.db.decrypt(sdn_RO["password"], schema_version=schema_version, salt=sdn_id)
744
745 desc = await RO.create("sdn", descriptor=sdn_RO)
746 RO_sdn_id = desc["uuid"]
747 db_sdn_update["_admin.deployed.RO"] = RO_sdn_id
748 db_sdn_update["_admin.operationalState"] = "ENABLED"
749 self.logger.debug(logging_text + "Exit Ok RO_sdn_id={}".format(RO_sdn_id))
750 # Mark the SDN 'create' HA task as successful
751 operationState_HA = 'COMPLETED'
752 detailed_status_HA = 'Done'
753 return
754
755 except (ROclient.ROClientException, DbException) as e:
756 self.logger.error(logging_text + "Exit Exception {}".format(e))
757 exc = e
758 except Exception as e:
759 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
760 exc = e
761 finally:
762 if exc and db_sdn:
763 db_sdn_update["_admin.operationalState"] = "ERROR"
764 db_sdn_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
765 # Mark the SDN 'create' HA task as erroneous
766 operationState_HA = 'FAILED'
767 detailed_status_HA = "ERROR {}: {}".format(step, exc)
768 try:
769 if db_sdn and db_sdn_update:
770 self.update_db_2("sdns", sdn_id, db_sdn_update)
771 # Register the SDN 'create' HA task either
772 # succesful or erroneous, or do nothing (if legacy NBI)
773 self.lcm_tasks.register_HA('sdn', 'create', op_id,
774 operationState=operationState_HA,
775 detailed_status=detailed_status_HA)
776 except DbException as e:
777 self.logger.error(logging_text + "Cannot update database: {}".format(e))
778 self.lcm_tasks.remove("sdn", sdn_id, order_id)
779
780 async def edit(self, sdn_content, order_id):
781
782 # HA tasks and backward compatibility:
783 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
784 # In such a case, HA is not supported by NBI, and the HA check always returns True
785 op_id = sdn_content.pop('op_id', None)
786 if not self.lcm_tasks.lock_HA('sdn', 'edit', op_id):
787 return
788
789 sdn_id = sdn_content["_id"]
790 sdn_content.pop("op_id", None)
791 logging_text = "Task sdn_edit={} ".format(sdn_id)
792 self.logger.debug(logging_text + "Enter")
793
794 db_sdn = None
795 db_sdn_update = {}
796 exc = None
797 operationState_HA = ''
798 detailed_status_HA = ''
799 step = "Getting sdn from db"
800 try:
801 # wait for any previous tasks in process
802 await self.lcm_tasks.waitfor_related_HA('sdn', 'edit', op_id)
803
804 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
805 RO_sdn_id = None
806 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
807 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
808 RO = ROclient.ROClient(self.loop, **self.ro_config)
809 step = "Editing sdn at RO"
810 sdn_RO = deepcopy(sdn_content)
811 sdn_RO.pop("_id", None)
812 sdn_RO.pop("_admin", None)
813 schema_version = sdn_RO.pop("schema_version", None)
814 sdn_RO.pop("schema_type", None)
815 sdn_RO.pop("description", None)
816 if sdn_RO.get("password"):
817 sdn_RO["password"] = self.db.decrypt(sdn_RO["password"], schema_version=schema_version, salt=sdn_id)
818 if sdn_RO:
819 await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
820 db_sdn_update["_admin.operationalState"] = "ENABLED"
821 # Mark the SDN 'edit' HA task as successful
822 operationState_HA = 'COMPLETED'
823 detailed_status_HA = 'Done'
824
825 self.logger.debug(logging_text + "Exit Ok RO_sdn_id={}".format(RO_sdn_id))
826 return
827
828 except (ROclient.ROClientException, DbException) as e:
829 self.logger.error(logging_text + "Exit Exception {}".format(e))
830 exc = e
831 except Exception as e:
832 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
833 exc = e
834 finally:
835 if exc and db_sdn:
836 db_sdn["_admin.operationalState"] = "ERROR"
837 db_sdn["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
838 # Mark the SDN 'edit' HA task as erroneous
839 operationState_HA = 'FAILED'
840 detailed_status_HA = "ERROR {}: {}".format(step, exc)
841 try:
842 if db_sdn_update:
843 self.update_db_2("sdns", sdn_id, db_sdn_update)
844 # Register the SDN 'edit' HA task either
845 # succesful or erroneous, or do nothing (if legacy NBI)
846 self.lcm_tasks.register_HA('sdn', 'edit', op_id,
847 operationState=operationState_HA,
848 detailed_status=detailed_status_HA)
849 except DbException as e:
850 self.logger.error(logging_text + "Cannot update database: {}".format(e))
851 self.lcm_tasks.remove("sdn", sdn_id, order_id)
852
853 async def delete(self, sdn_content, order_id):
854
855 # HA tasks and backward compatibility:
856 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
857 # In such a case, HA is not supported by NBI, and the HA check always returns True
858 op_id = sdn_content.pop('op_id', None)
859 if not self.lcm_tasks.lock_HA('sdn', 'delete', op_id):
860 return
861
862 sdn_id = sdn_content["_id"]
863 logging_text = "Task sdn_delete={} ".format(sdn_id)
864 self.logger.debug(logging_text + "Enter")
865
866 db_sdn = None
867 db_sdn_update = {}
868 exc = None
869 operationState_HA = ''
870 detailed_status_HA = ''
871 step = "Getting sdn from db"
872 try:
873 # wait for any previous tasks in process
874 await self.lcm_tasks.waitfor_related_HA('sdn', 'delete', op_id)
875
876 db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
877 if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
878 RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
879 RO = ROclient.ROClient(self.loop, **self.ro_config)
880 step = "Deleting sdn from RO"
881 try:
882 await RO.delete("sdn", RO_sdn_id)
883 except ROclient.ROClientException as e:
884 if e.http_code == 404: # not found
885 self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
886 else:
887 raise
888 else:
889 # nothing to delete
890 self.logger.error(logging_text + "Skipping. There is not RO information at database")
891 self.db.del_one("sdns", {"_id": sdn_id})
892 db_sdn = None
893 self.logger.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id))
894 return
895
896 except (ROclient.ROClientException, DbException) as e:
897 self.logger.error(logging_text + "Exit Exception {}".format(e))
898 exc = e
899 except Exception as e:
900 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
901 exc = e
902 finally:
903 if exc and db_sdn:
904 db_sdn["_admin.operationalState"] = "ERROR"
905 db_sdn["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
906 # Mark the SDN 'delete' HA task as erroneous
907 operationState_HA = 'FAILED'
908 detailed_status_HA = "ERROR {}: {}".format(step, exc)
909 self.lcm_tasks.register_HA('sdn', 'delete', op_id,
910 operationState=operationState_HA,
911 detailed_status=detailed_status_HA)
912 try:
913 if db_sdn and db_sdn_update:
914 self.update_db_2("sdns", sdn_id, db_sdn_update)
915 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
916 # which means that there is nowhere to register this task, so do nothing here.
917 except DbException as e:
918 self.logger.error(logging_text + "Cannot update database: {}".format(e))
919 self.lcm_tasks.remove("sdn", sdn_id, order_id)
920
921
922 class K8sClusterLcm(LcmBase):
923
924 def __init__(self, db, msg, fs, lcm_tasks, vca_config, loop):
925 """
926 Init, Connect to database, filesystem storage, and messaging
927 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
928 :return: None
929 """
930
931 self.logger = logging.getLogger('lcm.k8scluster')
932 self.loop = loop
933 self.lcm_tasks = lcm_tasks
934 self.vca_config = vca_config
935 self.fs = fs
936 self.db = db
937
938 self.helm_k8scluster = K8sHelmConnector(
939 kubectl_command=self.vca_config.get("kubectlpath"),
940 helm_command=self.vca_config.get("helmpath"),
941 fs=self.fs,
942 log=self.logger,
943 db=self.db,
944 on_update_db=None
945 )
946
947 self.juju_k8scluster = K8sJujuConnector(
948 kubectl_command=self.vca_config.get("kubectlpath"),
949 juju_command=self.vca_config.get("jujupath"),
950 fs=self.fs,
951 log=self.logger,
952 db=self.db,
953 on_update_db=None
954 )
955
956 super().__init__(db, msg, fs, self.logger)
957
958 async def create(self, k8scluster_content, order_id):
959
960 # HA tasks and backward compatibility:
961 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
962 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
963 # Register 'create' task here for related future HA operations
964 op_id = k8scluster_content.pop('op_id', None)
965 if not self.lcm_tasks.lock_HA('k8scluster', 'create', op_id):
966 return
967
968 k8scluster_id = k8scluster_content["_id"]
969 k8scluster_content.pop("op_id", None)
970 logging_text = "Task k8scluster_create={} ".format(k8scluster_id)
971 self.logger.debug(logging_text + "Enter")
972
973 db_k8scluster = None
974 db_k8scluster_update = {}
975
976 exc = None
977 operationState_HA = ''
978 detailed_status_HA = ''
979 try:
980 step = "Getting k8scluster-id='{}' from db".format(k8scluster_id)
981 self.logger.debug(logging_text + step)
982 db_k8scluster = self.db.get_one("k8sclusters", {"_id": k8scluster_id})
983 self.db.encrypt_decrypt_fields(db_k8scluster.get("credentials"), 'decrypt', ['password', 'secret'],
984 schema_version=db_k8scluster["schema_version"], salt=db_k8scluster["_id"])
985 k8s_credentials = yaml.safe_dump(db_k8scluster.get("credentials"))
986 error_text_list = []
987 # helm-chart
988 k8s_hc_id = None
989 try:
990 k8s_hc_id, uninstall_sw = await self.helm_k8scluster.init_env(k8s_credentials)
991 db_k8scluster_update["_admin.helm-chart.id"] = k8s_hc_id
992 db_k8scluster_update["_admin.helm-chart.created"] = uninstall_sw
993 except Exception as e:
994 error_text_list.append("Failing init helm-chart: {}".format(e))
995 db_k8scluster_update["_admin.helm-chart.error_msg"] = str(e)
996 if isinstance(e, K8sException):
997 self.logger.error(logging_text + "Failing init helm-chart: {}".format(e))
998 else:
999 self.logger.error(logging_text + "Failing init helm-chart: {}".format(e), exc_info=True)
1000
1001 # Juju/k8s cluster
1002 k8s_jb_id = None
1003 try:
1004 k8s_jb_id, uninstall_sw = await self.juju_k8scluster.init_env(k8s_credentials)
1005 db_k8scluster_update["_admin.juju-bundle.id"] = k8s_jb_id
1006 db_k8scluster_update["_admin.juju-bundle.created"] = uninstall_sw
1007 except Exception as e:
1008 error_text_list.append("Failing init juju-bundle: {}".format(e))
1009 db_k8scluster_update["_admin.juju-bundle.error_msg"] = str(e)
1010 if isinstance(e, N2VCException):
1011 self.logger.error(logging_text + "Failing init juju-bundle: {}".format(e))
1012 else:
1013 self.logger.error(logging_text + "Failing init juju-bundle: {}".format(e), exc_info=True)
1014
1015 step = "Getting the list of repos"
1016 if k8s_hc_id:
1017 self.logger.debug(logging_text + step)
1018 task_list = []
1019 db_k8srepo_list = self.db.get_list("k8srepos", {"type": "helm-chart"})
1020 for repo in db_k8srepo_list:
1021 step = "Adding repo {} to cluster: {}".format(repo["name"], k8s_hc_id)
1022 self.logger.debug(logging_text + step)
1023 task = asyncio.ensure_future(self.helm_k8scluster.repo_add(cluster_uuid=k8s_hc_id,
1024 name=repo["name"], url=repo["url"],
1025 repo_type="chart"))
1026 task_list.append(task)
1027 repo_k8scluster_list = deep_get(repo, ("_admin", "cluster-inserted")) or []
1028 repo_k8scluster_list.append(k8s_hc_id)
1029 self.update_db_2("k8srepos", repo["_id"], {"_admin.cluster-inserted": repo_k8scluster_list})
1030
1031 if task_list:
1032 self.logger.debug(logging_text + 'Waiting for terminate tasks of repo_add')
1033 done, pending = await asyncio.wait(task_list, timeout=3600)
1034 if pending:
1035 self.logger.error(logging_text + 'There are pending tasks: {}'.format(pending))
1036
1037 # mark as an error if both helm-chart and juju-bundle have been failed
1038 if k8s_hc_id or k8s_jb_id:
1039 db_k8scluster_update["_admin.operationalState"] = "ENABLED"
1040 else:
1041 db_k8scluster_update["_admin.operationalState"] = "ERROR"
1042 db_k8scluster_update["_admin.detailed-status"] = ";".join(error_text_list)
1043
1044 except Exception as e:
1045 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1046 exc = e
1047 finally:
1048 if exc and db_k8scluster:
1049 db_k8scluster_update["_admin.operationalState"] = "ERROR"
1050 db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
1051
1052 # Mark the k8scluster 'create' HA task as erroneous
1053 operationState_HA = 'FAILED'
1054 detailed_status_HA = "ERROR {}: {}".format(step, exc)
1055 try:
1056 if db_k8scluster_update:
1057 self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
1058
1059 # Register the K8scluster 'create' HA task either
1060 # succesful or erroneous, or do nothing (if legacy NBI)
1061 self.lcm_tasks.register_HA('k8scluster', 'create', op_id,
1062 operationState=operationState_HA,
1063 detailed_status=detailed_status_HA)
1064 except DbException as e:
1065 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1066 self.lcm_tasks.remove("k8sclusters", k8scluster_id, order_id)
1067
1068 async def delete(self, k8scluster_content, order_id):
1069
1070 # HA tasks and backward compatibility:
1071 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1072 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1073 # Register 'delete' task here for related future HA operations
1074 op_id = k8scluster_content.pop('op_id', None)
1075 if not self.lcm_tasks.lock_HA('k8scluster', 'delete', op_id):
1076 return
1077
1078 k8scluster_id = k8scluster_content["_id"]
1079 k8scluster_content.pop("op_id", None)
1080 logging_text = "Task k8scluster_delete={} ".format(k8scluster_id)
1081 self.logger.debug(logging_text + "Enter")
1082
1083 db_k8scluster = None
1084 db_k8scluster_update = {}
1085 exc = None
1086 operationState_HA = ''
1087 detailed_status_HA = ''
1088 try:
1089 step = "Getting k8scluster='{}' from db".format(k8scluster_id)
1090 self.logger.debug(logging_text + step)
1091 db_k8scluster = self.db.get_one("k8sclusters", {"_id": k8scluster_id})
1092 k8s_hc_id = deep_get(db_k8scluster, ("_admin", "helm-chart", "id"))
1093 k8s_jb_id = deep_get(db_k8scluster, ("_admin", "juju-bundle", "id"))
1094
1095 uninstall_sw = deep_get(db_k8scluster, ("_admin", "helm-chart", "created"))
1096 cluster_removed = True
1097 if k8s_hc_id:
1098 uninstall_sw = uninstall_sw or False
1099 cluster_removed = await self.helm_k8scluster.reset(cluster_uuid=k8s_hc_id, uninstall_sw=uninstall_sw)
1100
1101 if k8s_jb_id:
1102 uninstall_sw = uninstall_sw or False
1103 cluster_removed = await self.juju_k8scluster.reset(cluster_uuid=k8s_jb_id, uninstall_sw=uninstall_sw)
1104
1105 if k8s_hc_id and cluster_removed:
1106 step = "Removing k8scluster='{}' from k8srepos".format(k8scluster_id)
1107 self.logger.debug(logging_text + step)
1108 db_k8srepo_list = self.db.get_list("k8srepos", {"_admin.cluster-inserted": k8s_hc_id})
1109 for k8srepo in db_k8srepo_list:
1110 try:
1111 cluster_list = k8srepo["_admin"]["cluster-inserted"]
1112 cluster_list.remove(k8s_hc_id)
1113 self.update_db_2("k8srepos", k8srepo["_id"], {"_admin.cluster-inserted": cluster_list})
1114 except Exception as e:
1115 self.logger.error("{}: {}".format(step, e))
1116 self.db.del_one("k8sclusters", {"_id": k8scluster_id})
1117 else:
1118 raise LcmException("An error happened during the reset of the k8s cluster '{}'".format(k8scluster_id))
1119 # if not cluster_removed:
1120 # raise Exception("K8scluster was not properly removed")
1121
1122 except Exception as e:
1123 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1124 exc = e
1125 finally:
1126 if exc and db_k8scluster:
1127 db_k8scluster_update["_admin.operationalState"] = "ERROR"
1128 db_k8scluster_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
1129 # Mark the WIM 'create' HA task as erroneous
1130 operationState_HA = 'FAILED'
1131 detailed_status_HA = "ERROR {}: {}".format(step, exc)
1132 try:
1133 if db_k8scluster_update:
1134 self.update_db_2("k8sclusters", k8scluster_id, db_k8scluster_update)
1135 # Register the K8scluster 'delete' HA task either
1136 # succesful or erroneous, or do nothing (if legacy NBI)
1137 self.lcm_tasks.register_HA('k8scluster', 'delete', op_id,
1138 operationState=operationState_HA,
1139 detailed_status=detailed_status_HA)
1140 except DbException as e:
1141 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1142 self.lcm_tasks.remove("k8sclusters", k8scluster_id, order_id)
1143
1144
1145 class K8sRepoLcm(LcmBase):
1146
1147 def __init__(self, db, msg, fs, lcm_tasks, vca_config, loop):
1148 """
1149 Init, Connect to database, filesystem storage, and messaging
1150 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1151 :return: None
1152 """
1153
1154 self.logger = logging.getLogger('lcm.k8srepo')
1155 self.loop = loop
1156 self.lcm_tasks = lcm_tasks
1157 self.vca_config = vca_config
1158 self.fs = fs
1159 self.db = db
1160
1161 self.k8srepo = K8sHelmConnector(
1162 kubectl_command=self.vca_config.get("kubectlpath"),
1163 helm_command=self.vca_config.get("helmpath"),
1164 fs=self.fs,
1165 log=self.logger,
1166 db=self.db,
1167 on_update_db=None
1168 )
1169
1170 super().__init__(db, msg, fs, self.logger)
1171
1172 async def create(self, k8srepo_content, order_id):
1173
1174 # HA tasks and backward compatibility:
1175 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1176 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1177 # Register 'create' task here for related future HA operations
1178
1179 op_id = k8srepo_content.pop('op_id', None)
1180 if not self.lcm_tasks.lock_HA('k8srepo', 'create', op_id):
1181 return
1182
1183 k8srepo_id = k8srepo_content.get("_id")
1184 logging_text = "Task k8srepo_create={} ".format(k8srepo_id)
1185 self.logger.debug(logging_text + "Enter")
1186
1187 db_k8srepo = None
1188 db_k8srepo_update = {}
1189 exc = None
1190 operationState_HA = ''
1191 detailed_status_HA = ''
1192 try:
1193 step = "Getting k8srepo-id='{}' from db".format(k8srepo_id)
1194 self.logger.debug(logging_text + step)
1195 db_k8srepo = self.db.get_one("k8srepos", {"_id": k8srepo_id})
1196 step = "Getting k8scluster_list from db"
1197 self.logger.debug(logging_text + step)
1198 db_k8scluster_list = self.db.get_list("k8sclusters", {})
1199 db_k8srepo_update["_admin.cluster-inserted"] = []
1200 task_list = []
1201 for k8scluster in db_k8scluster_list:
1202 hc_id = deep_get(k8scluster, ("_admin", "helm-chart", "id"))
1203 if hc_id:
1204 step = "Adding repo to cluster: {}".format(hc_id)
1205 self.logger.debug(logging_text + step)
1206 task = asyncio.ensure_future(self.k8srepo.repo_add(cluster_uuid=hc_id,
1207 name=db_k8srepo["name"], url=db_k8srepo["url"],
1208 repo_type="chart"))
1209 task_list.append(task)
1210 db_k8srepo_update["_admin.cluster-inserted"].append(hc_id)
1211
1212 done = None
1213 pending = None
1214 if len(task_list) > 0:
1215 self.logger.debug('Waiting for terminate pending tasks...')
1216 done, pending = await asyncio.wait(task_list, timeout=3600)
1217 if not pending:
1218 self.logger.debug('All tasks finished...')
1219 else:
1220 self.logger.info('There are pending tasks: {}'.format(pending))
1221 db_k8srepo_update["_admin.operationalState"] = "ENABLED"
1222 except Exception as e:
1223 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1224 exc = e
1225 finally:
1226 if exc and db_k8srepo:
1227 db_k8srepo_update["_admin.operationalState"] = "ERROR"
1228 db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
1229 # Mark the WIM 'create' HA task as erroneous
1230 operationState_HA = 'FAILED'
1231 detailed_status_HA = "ERROR {}: {}".format(step, exc)
1232 try:
1233 if db_k8srepo_update:
1234 self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update)
1235 # Register the K8srepo 'create' HA task either
1236 # succesful or erroneous, or do nothing (if legacy NBI)
1237 self.lcm_tasks.register_HA('k8srepo', 'create', op_id,
1238 operationState=operationState_HA,
1239 detailed_status=detailed_status_HA)
1240 except DbException as e:
1241 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1242 self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id)
1243
1244 async def delete(self, k8srepo_content, order_id):
1245
1246 # HA tasks and backward compatibility:
1247 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1248 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1249 # Register 'delete' task here for related future HA operations
1250 op_id = k8srepo_content.pop('op_id', None)
1251 if not self.lcm_tasks.lock_HA('k8srepo', 'delete', op_id):
1252 return
1253
1254 k8srepo_id = k8srepo_content.get("_id")
1255 logging_text = "Task k8srepo_delete={} ".format(k8srepo_id)
1256 self.logger.debug(logging_text + "Enter")
1257
1258 db_k8srepo = None
1259 db_k8srepo_update = {}
1260
1261 operationState_HA = ''
1262 detailed_status_HA = ''
1263 try:
1264 step = "Getting k8srepo-id='{}' from db".format(k8srepo_id)
1265 self.logger.debug(logging_text + step)
1266 db_k8srepo = self.db.get_one("k8srepos", {"_id": k8srepo_id})
1267 step = "Getting k8scluster_list from db"
1268 self.logger.debug(logging_text + step)
1269 db_k8scluster_list = self.db.get_list("k8sclusters", {})
1270
1271 task_list = []
1272 for k8scluster in db_k8scluster_list:
1273 hc_id = deep_get(k8scluster, ("_admin", "helm-chart", "id"))
1274 if hc_id:
1275 task = asyncio.ensure_future(self.k8srepo.repo_remove(cluster_uuid=hc_id,
1276 name=db_k8srepo["name"]))
1277 task_list.append(task)
1278 done = None
1279 pending = None
1280 if len(task_list) > 0:
1281 self.logger.debug('Waiting for terminate pending tasks...')
1282 done, pending = await asyncio.wait(task_list, timeout=3600)
1283 if not pending:
1284 self.logger.debug('All tasks finished...')
1285 else:
1286 self.logger.info('There are pending tasks: {}'.format(pending))
1287 self.db.del_one("k8srepos", {"_id": k8srepo_id})
1288
1289 except Exception as e:
1290 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
1291 exc = e
1292 finally:
1293 if exc and db_k8srepo:
1294 db_k8srepo_update["_admin.operationalState"] = "ERROR"
1295 db_k8srepo_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
1296 # Mark the WIM 'create' HA task as erroneous
1297 operationState_HA = 'FAILED'
1298 detailed_status_HA = "ERROR {}: {}".format(step, exc)
1299 try:
1300 if db_k8srepo_update:
1301 self.update_db_2("k8srepos", k8srepo_id, db_k8srepo_update)
1302 # Register the K8srepo 'delete' HA task either
1303 # succesful or erroneous, or do nothing (if legacy NBI)
1304 self.lcm_tasks.register_HA('k8srepo', 'delete', op_id,
1305 operationState=operationState_HA,
1306 detailed_status=detailed_status_HA)
1307 except DbException as e:
1308 self.logger.error(logging_text + "Cannot update database: {}".format(e))
1309 self.lcm_tasks.remove("k8srepo", k8srepo_id, order_id)