1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
23 from osm_lcm
import ROclient
24 from osm_lcm
.lcm_utils
import LcmException
, LcmBase
25 from n2vc
.k8s_helm_conn
import K8sHelmConnector
26 from osm_common
.dbbase
import DbException
27 from copy
import deepcopy
29 __author__
= "Alfonso Tierno"
32 class VimLcm(LcmBase
):
33 # values that are encrypted at vim config because they are passwords
34 vim_config_encrypted
= {"1.1": ("admin_password", "nsx_password", "vcenter_password"),
35 "default": ("admin_password", "nsx_password", "vcenter_password", "vrops_password")}
37 def __init__(self
, db
, msg
, fs
, lcm_tasks
, ro_config
, loop
):
39 Init, Connect to database, filesystem storage, and messaging
40 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
44 self
.logger
= logging
.getLogger('lcm.vim')
46 self
.lcm_tasks
= lcm_tasks
47 self
.ro_config
= ro_config
49 super().__init
__(db
, msg
, fs
, self
.logger
)
51 async def create(self
, vim_content
, order_id
):
53 # HA tasks and backward compatibility:
54 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
55 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
56 # Register 'create' task here for related future HA operations
57 op_id
= vim_content
.pop('op_id', None)
58 if not self
.lcm_tasks
.lock_HA('vim', 'create', op_id
):
61 vim_id
= vim_content
["_id"]
62 vim_content
.pop("op_id", None)
63 logging_text
= "Task vim_create={} ".format(vim_id
)
64 self
.logger
.debug(logging_text
+ "Enter")
70 operationState_HA
= ''
71 detailed_status_HA
= ''
73 step
= "Getting vim-id='{}' from db".format(vim_id
)
74 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
75 if vim_content
.get("config") and vim_content
["config"].get("sdn-controller"):
76 step
= "Getting sdn-controller-id='{}' from db".format(vim_content
["config"]["sdn-controller"])
77 db_sdn
= self
.db
.get_one("sdns", {"_id": vim_content
["config"]["sdn-controller"]})
79 # If the VIM account has an associated SDN account, also
80 # wait for any previous tasks in process for the SDN
81 await self
.lcm_tasks
.waitfor_related_HA('sdn', 'ANY', db_sdn
["_id"])
83 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
84 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
86 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
87 vim_content
["config"]["sdn-controller"]))
89 step
= "Creating vim at RO"
90 db_vim_update
["_admin.deployed.RO"] = None
91 db_vim_update
["_admin.detailed-status"] = step
92 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
93 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
94 vim_RO
= deepcopy(vim_content
)
95 vim_RO
.pop("_id", None)
96 vim_RO
.pop("_admin", None)
97 schema_version
= vim_RO
.pop("schema_version", None)
98 vim_RO
.pop("schema_type", None)
99 vim_RO
.pop("vim_tenant_name", None)
100 vim_RO
["type"] = vim_RO
.pop("vim_type")
101 vim_RO
.pop("vim_user", None)
102 vim_RO
.pop("vim_password", None)
104 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
105 desc
= await RO
.create("vim", descriptor
=vim_RO
)
106 RO_vim_id
= desc
["uuid"]
107 db_vim_update
["_admin.deployed.RO"] = RO_vim_id
108 self
.logger
.debug(logging_text
+ "VIM created at RO_vim_id={}".format(RO_vim_id
))
110 step
= "Creating vim_account at RO"
111 db_vim_update
["_admin.detailed-status"] = step
112 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
114 if vim_content
.get("vim_password"):
115 vim_content
["vim_password"] = self
.db
.decrypt(vim_content
["vim_password"],
116 schema_version
=schema_version
,
118 vim_account_RO
= {"vim_tenant_name": vim_content
["vim_tenant_name"],
119 "vim_username": vim_content
["vim_user"],
120 "vim_password": vim_content
["vim_password"]
122 if vim_RO
.get("config"):
123 vim_account_RO
["config"] = vim_RO
["config"]
124 if "sdn-controller" in vim_account_RO
["config"]:
125 del vim_account_RO
["config"]["sdn-controller"]
126 if "sdn-port-mapping" in vim_account_RO
["config"]:
127 del vim_account_RO
["config"]["sdn-port-mapping"]
128 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(schema_version
) or \
129 self
.vim_config_encrypted
.get("default")
130 for p
in vim_config_encrypted_keys
:
131 if vim_account_RO
["config"].get(p
):
132 vim_account_RO
["config"][p
] = self
.db
.decrypt(vim_account_RO
["config"][p
],
133 schema_version
=schema_version
,
136 desc
= await RO
.attach("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
137 db_vim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
138 db_vim_update
["_admin.operationalState"] = "ENABLED"
139 db_vim_update
["_admin.detailed-status"] = "Done"
140 # Mark the VIM 'create' HA task as successful
141 operationState_HA
= 'COMPLETED'
142 detailed_status_HA
= 'Done'
144 # await asyncio.sleep(15) # TODO remove. This is for test
145 self
.logger
.debug(logging_text
+ "Exit Ok VIM account created at RO_vim_account_id={}".format(desc
["uuid"]))
148 except (ROclient
.ROClientException
, DbException
) as e
:
149 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
151 except Exception as e
:
152 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
156 db_vim_update
["_admin.operationalState"] = "ERROR"
157 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
158 # Mark the VIM 'create' HA task as erroneous
159 operationState_HA
= 'FAILED'
160 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
163 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
164 # Register the VIM 'create' HA task either
165 # succesful or erroneous, or do nothing (if legacy NBI)
166 self
.lcm_tasks
.register_HA('vim', 'create', op_id
,
167 operationState
=operationState_HA
,
168 detailed_status
=detailed_status_HA
)
169 except DbException
as e
:
170 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
172 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
174 async def edit(self
, vim_content
, order_id
):
176 # HA tasks and backward compatibility:
177 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
178 # In such a case, HA is not supported by NBI, and the HA check always returns True
179 op_id
= vim_content
.pop('op_id', None)
180 if not self
.lcm_tasks
.lock_HA('vim', 'edit', op_id
):
183 vim_id
= vim_content
["_id"]
184 vim_content
.pop("op_id", None)
185 logging_text
= "Task vim_edit={} ".format(vim_id
)
186 self
.logger
.debug(logging_text
+ "Enter")
193 operationState_HA
= ''
194 detailed_status_HA
= ''
195 step
= "Getting vim-id='{}' from db".format(vim_id
)
197 # wait for any previous tasks in process
198 await self
.lcm_tasks
.waitfor_related_HA('vim', 'edit', op_id
)
200 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
202 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
203 if vim_content
.get("config") and vim_content
["config"].get("sdn-controller"):
204 step
= "Getting sdn-controller-id='{}' from db".format(vim_content
["config"]["sdn-controller"])
205 db_sdn
= self
.db
.get_one("sdns", {"_id": vim_content
["config"]["sdn-controller"]})
207 # If the VIM account has an associated SDN account, also
208 # wait for any previous tasks in process for the SDN
209 await self
.lcm_tasks
.waitfor_related_HA('sdn', 'ANY', db_sdn
["_id"])
211 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get(
213 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
215 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
216 vim_content
["config"]["sdn-controller"]))
218 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
219 step
= "Editing vim at RO"
220 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
221 vim_RO
= deepcopy(vim_content
)
222 vim_RO
.pop("_id", None)
223 vim_RO
.pop("_admin", None)
224 schema_version
= vim_RO
.pop("schema_version", None)
225 vim_RO
.pop("schema_type", None)
226 vim_RO
.pop("vim_tenant_name", None)
227 if "vim_type" in vim_RO
:
228 vim_RO
["type"] = vim_RO
.pop("vim_type")
229 vim_RO
.pop("vim_user", None)
230 vim_RO
.pop("vim_password", None)
232 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
233 # TODO make a deep update of sdn-port-mapping
235 await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
237 step
= "Editing vim-account at RO tenant"
239 if "config" in vim_content
:
240 if "sdn-controller" in vim_content
["config"]:
241 del vim_content
["config"]["sdn-controller"]
242 if "sdn-port-mapping" in vim_content
["config"]:
243 del vim_content
["config"]["sdn-port-mapping"]
244 if not vim_content
["config"]:
245 del vim_content
["config"]
246 if "vim_tenant_name" in vim_content
:
247 vim_account_RO
["vim_tenant_name"] = vim_content
["vim_tenant_name"]
248 if "vim_password" in vim_content
:
249 vim_account_RO
["vim_password"] = vim_content
["vim_password"]
250 if vim_content
.get("vim_password"):
251 vim_account_RO
["vim_password"] = self
.db
.decrypt(vim_content
["vim_password"],
252 schema_version
=schema_version
,
254 if "config" in vim_content
:
255 vim_account_RO
["config"] = vim_content
["config"]
256 if vim_content
.get("config"):
257 vim_config_encrypted_keys
= self
.vim_config_encrypted
.get(schema_version
) or \
258 self
.vim_config_encrypted
.get("default")
259 for p
in vim_config_encrypted_keys
:
260 if vim_content
["config"].get(p
):
261 vim_account_RO
["config"][p
] = self
.db
.decrypt(vim_content
["config"][p
],
262 schema_version
=schema_version
,
265 if "vim_user" in vim_content
:
266 vim_content
["vim_username"] = vim_content
["vim_user"]
267 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
268 # vim_thread. RO will remove and relaunch a new thread for this vim_account
269 await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
270 db_vim_update
["_admin.operationalState"] = "ENABLED"
271 # Mark the VIM 'edit' HA task as successful
272 operationState_HA
= 'COMPLETED'
273 detailed_status_HA
= 'Done'
275 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id={}".format(RO_vim_id
))
278 except (ROclient
.ROClientException
, DbException
) as e
:
279 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
281 except Exception as e
:
282 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
286 db_vim_update
["_admin.operationalState"] = "ERROR"
287 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
288 # Mark the VIM 'edit' HA task as erroneous
289 operationState_HA
= 'FAILED'
290 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
293 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
294 # Register the VIM 'edit' HA task either
295 # succesful or erroneous, or do nothing (if legacy NBI)
296 self
.lcm_tasks
.register_HA('vim', 'edit', op_id
,
297 operationState
=operationState_HA
,
298 detailed_status
=detailed_status_HA
)
299 except DbException
as e
:
300 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
302 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
304 async def delete(self
, vim_content
, order_id
):
306 # HA tasks and backward compatibility:
307 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
308 # In such a case, HA is not supported by NBI, and the HA check always returns True
309 op_id
= vim_content
.pop('op_id', None)
310 if not self
.lcm_tasks
.lock_HA('vim', 'delete', op_id
):
313 vim_id
= vim_content
["_id"]
314 logging_text
= "Task vim_delete={} ".format(vim_id
)
315 self
.logger
.debug(logging_text
+ "Enter")
320 operationState_HA
= ''
321 detailed_status_HA
= ''
322 step
= "Getting vim from db"
324 # wait for any previous tasks in process
325 await self
.lcm_tasks
.waitfor_related_HA('vim', 'delete', op_id
)
327 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
328 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
329 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
330 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
331 step
= "Detaching vim from RO tenant"
333 await RO
.detach("vim_account", RO_vim_id
)
334 except ROclient
.ROClientException
as e
:
335 if e
.http_code
== 404: # not found
336 self
.logger
.debug(logging_text
+ "RO_vim_id={} already detached".format(RO_vim_id
))
340 step
= "Deleting vim from RO"
342 await RO
.delete("vim", RO_vim_id
)
343 except ROclient
.ROClientException
as e
:
344 if e
.http_code
== 404: # not found
345 self
.logger
.debug(logging_text
+ "RO_vim_id={} already deleted".format(RO_vim_id
))
350 self
.logger
.error(logging_text
+ "Nothing to remove at RO")
351 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
353 self
.logger
.debug(logging_text
+ "Exit Ok")
356 except (ROclient
.ROClientException
, DbException
) as e
:
357 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
359 except Exception as e
:
360 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
363 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
365 db_vim_update
["_admin.operationalState"] = "ERROR"
366 db_vim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
367 # Mark the VIM 'delete' HA task as erroneous
368 operationState_HA
= 'FAILED'
369 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
370 self
.lcm_tasks
.register_HA('vim', 'delete', op_id
,
371 operationState
=operationState_HA
,
372 detailed_status
=detailed_status_HA
)
374 if db_vim
and db_vim_update
:
375 self
.update_db_2("vim_accounts", vim_id
, db_vim_update
)
376 # If the VIM 'delete' HA task was succesful, the DB entry has been deleted,
377 # which means that there is nowhere to register this task, so do nothing here.
378 except DbException
as e
:
379 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
380 self
.lcm_tasks
.remove("vim_account", vim_id
, order_id
)
383 class WimLcm(LcmBase
):
384 # values that are encrypted at wim config because they are passwords
385 wim_config_encrypted
= ()
387 def __init__(self
, db
, msg
, fs
, lcm_tasks
, ro_config
, loop
):
389 Init, Connect to database, filesystem storage, and messaging
390 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
394 self
.logger
= logging
.getLogger('lcm.vim')
396 self
.lcm_tasks
= lcm_tasks
397 self
.ro_config
= ro_config
399 super().__init
__(db
, msg
, fs
, self
.logger
)
401 async def create(self
, wim_content
, order_id
):
403 # HA tasks and backward compatibility:
404 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
405 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
406 # Register 'create' task here for related future HA operations
407 op_id
= wim_content
.pop('op_id', None)
408 self
.lcm_tasks
.lock_HA('wim', 'create', op_id
)
410 wim_id
= wim_content
["_id"]
411 wim_content
.pop("op_id", None)
412 logging_text
= "Task wim_create={} ".format(wim_id
)
413 self
.logger
.debug(logging_text
+ "Enter")
418 operationState_HA
= ''
419 detailed_status_HA
= ''
421 step
= "Getting wim-id='{}' from db".format(wim_id
)
422 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
423 db_wim_update
["_admin.deployed.RO"] = None
425 step
= "Creating wim at RO"
426 db_wim_update
["_admin.detailed-status"] = step
427 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
428 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
429 wim_RO
= deepcopy(wim_content
)
430 wim_RO
.pop("_id", None)
431 wim_RO
.pop("_admin", None)
432 schema_version
= wim_RO
.pop("schema_version", None)
433 wim_RO
.pop("schema_type", None)
434 wim_RO
.pop("wim_tenant_name", None)
435 wim_RO
["type"] = wim_RO
.pop("wim_type")
436 wim_RO
.pop("wim_user", None)
437 wim_RO
.pop("wim_password", None)
438 desc
= await RO
.create("wim", descriptor
=wim_RO
)
439 RO_wim_id
= desc
["uuid"]
440 db_wim_update
["_admin.deployed.RO"] = RO_wim_id
441 self
.logger
.debug(logging_text
+ "WIM created at RO_wim_id={}".format(RO_wim_id
))
443 step
= "Creating wim_account at RO"
444 db_wim_update
["_admin.detailed-status"] = step
445 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
447 if wim_content
.get("wim_password"):
448 wim_content
["wim_password"] = self
.db
.decrypt(wim_content
["wim_password"],
449 schema_version
=schema_version
,
451 wim_account_RO
= {"name": wim_content
["name"],
452 "user": wim_content
["user"],
453 "password": wim_content
["password"]
455 if wim_RO
.get("config"):
456 wim_account_RO
["config"] = wim_RO
["config"]
457 if "wim_port_mapping" in wim_account_RO
["config"]:
458 del wim_account_RO
["config"]["wim_port_mapping"]
459 for p
in self
.wim_config_encrypted
:
460 if wim_account_RO
["config"].get(p
):
461 wim_account_RO
["config"][p
] = self
.db
.decrypt(wim_account_RO
["config"][p
],
462 schema_version
=schema_version
,
465 desc
= await RO
.attach("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
466 db_wim_update
["_admin.deployed.RO-account"] = desc
["uuid"]
467 db_wim_update
["_admin.operationalState"] = "ENABLED"
468 db_wim_update
["_admin.detailed-status"] = "Done"
469 # Mark the WIM 'create' HA task as successful
470 operationState_HA
= 'COMPLETED'
471 detailed_status_HA
= 'Done'
473 self
.logger
.debug(logging_text
+ "Exit Ok WIM account created at RO_wim_account_id={}".format(desc
["uuid"]))
476 except (ROclient
.ROClientException
, DbException
) as e
:
477 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
479 except Exception as e
:
480 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
484 db_wim_update
["_admin.operationalState"] = "ERROR"
485 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
486 # Mark the WIM 'create' HA task as erroneous
487 operationState_HA
= 'FAILED'
488 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
491 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
492 # Register the WIM 'create' HA task either
493 # succesful or erroneous, or do nothing (if legacy NBI)
494 self
.lcm_tasks
.register_HA('wim', 'create', op_id
,
495 operationState
=operationState_HA
,
496 detailed_status
=detailed_status_HA
)
497 except DbException
as e
:
498 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
499 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
501 async def edit(self
, wim_content
, order_id
):
503 # HA tasks and backward compatibility:
504 # If 'wim_content' does not include 'op_id', we a running a legacy NBI version.
505 # In such a case, HA is not supported by NBI, and the HA check always returns True
506 op_id
= wim_content
.pop('op_id', None)
507 if not self
.lcm_tasks
.lock_HA('wim', 'edit', op_id
):
510 wim_id
= wim_content
["_id"]
511 wim_content
.pop("op_id", None)
512 logging_text
= "Task wim_edit={} ".format(wim_id
)
513 self
.logger
.debug(logging_text
+ "Enter")
519 step
= "Getting wim-id='{}' from db".format(wim_id
)
520 operationState_HA
= ''
521 detailed_status_HA
= ''
523 # wait for any previous tasks in process
524 await self
.lcm_tasks
.waitfor_related_HA('wim', 'edit', op_id
)
526 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
528 if db_wim
.get("_admin") and db_wim
["_admin"].get("deployed") and db_wim
["_admin"]["deployed"].get("RO"):
530 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
531 step
= "Editing wim at RO"
532 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
533 wim_RO
= deepcopy(wim_content
)
534 wim_RO
.pop("_id", None)
535 wim_RO
.pop("_admin", None)
536 schema_version
= wim_RO
.pop("schema_version", None)
537 wim_RO
.pop("schema_type", None)
538 wim_RO
.pop("wim_tenant_name", None)
539 if "wim_type" in wim_RO
:
540 wim_RO
["type"] = wim_RO
.pop("wim_type")
541 wim_RO
.pop("wim_user", None)
542 wim_RO
.pop("wim_password", None)
543 # TODO make a deep update of wim_port_mapping
545 await RO
.edit("wim", RO_wim_id
, descriptor
=wim_RO
)
547 step
= "Editing wim-account at RO tenant"
549 if "config" in wim_content
:
550 if "wim_port_mapping" in wim_content
["config"]:
551 del wim_content
["config"]["wim_port_mapping"]
552 if not wim_content
["config"]:
553 del wim_content
["config"]
554 if "wim_tenant_name" in wim_content
:
555 wim_account_RO
["wim_tenant_name"] = wim_content
["wim_tenant_name"]
556 if "wim_password" in wim_content
:
557 wim_account_RO
["wim_password"] = wim_content
["wim_password"]
558 if wim_content
.get("wim_password"):
559 wim_account_RO
["wim_password"] = self
.db
.decrypt(wim_content
["wim_password"],
560 schema_version
=schema_version
,
562 if "config" in wim_content
:
563 wim_account_RO
["config"] = wim_content
["config"]
564 if wim_content
.get("config"):
565 for p
in self
.wim_config_encrypted
:
566 if wim_content
["config"].get(p
):
567 wim_account_RO
["config"][p
] = self
.db
.decrypt(wim_content
["config"][p
],
568 schema_version
=schema_version
,
571 if "wim_user" in wim_content
:
572 wim_content
["wim_username"] = wim_content
["wim_user"]
573 # wim_account must be edited always even if empty in order to ensure changes are translated to RO
574 # wim_thread. RO will remove and relaunch a new thread for this wim_account
575 await RO
.edit("wim_account", RO_wim_id
, descriptor
=wim_account_RO
)
576 db_wim_update
["_admin.operationalState"] = "ENABLED"
577 # Mark the WIM 'edit' HA task as successful
578 operationState_HA
= 'COMPLETED'
579 detailed_status_HA
= 'Done'
581 self
.logger
.debug(logging_text
+ "Exit Ok RO_wim_id={}".format(RO_wim_id
))
584 except (ROclient
.ROClientException
, DbException
) as e
:
585 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
587 except Exception as e
:
588 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
592 db_wim_update
["_admin.operationalState"] = "ERROR"
593 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
594 # Mark the WIM 'edit' HA task as erroneous
595 operationState_HA
= 'FAILED'
596 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
599 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
600 # Register the WIM 'edit' HA task either
601 # succesful or erroneous, or do nothing (if legacy NBI)
602 self
.lcm_tasks
.register_HA('wim', 'edit', op_id
,
603 operationState
=operationState_HA
,
604 detailed_status
=detailed_status_HA
)
605 except DbException
as e
:
606 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
607 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
609 async def delete(self
, wim_content
, order_id
):
611 # HA tasks and backward compatibility:
612 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
613 # In such a case, HA is not supported by NBI, and the HA check always returns True
614 op_id
= wim_content
.pop('op_id', None)
615 if not self
.lcm_tasks
.lock_HA('wim', 'delete', op_id
):
618 wim_id
= wim_content
["_id"]
619 logging_text
= "Task wim_delete={} ".format(wim_id
)
620 self
.logger
.debug(logging_text
+ "Enter")
625 step
= "Getting wim from db"
626 operationState_HA
= ''
627 detailed_status_HA
= ''
629 # wait for any previous tasks in process
630 await self
.lcm_tasks
.waitfor_related_HA('wim', 'delete', op_id
)
632 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_id
})
633 if db_wim
.get("_admin") and db_wim
["_admin"].get("deployed") and db_wim
["_admin"]["deployed"].get("RO"):
634 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO"]
635 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
636 step
= "Detaching wim from RO tenant"
638 await RO
.detach("wim_account", RO_wim_id
)
639 except ROclient
.ROClientException
as e
:
640 if e
.http_code
== 404: # not found
641 self
.logger
.debug(logging_text
+ "RO_wim_id={} already detached".format(RO_wim_id
))
645 step
= "Deleting wim from RO"
647 await RO
.delete("wim", RO_wim_id
)
648 except ROclient
.ROClientException
as e
:
649 if e
.http_code
== 404: # not found
650 self
.logger
.debug(logging_text
+ "RO_wim_id={} already deleted".format(RO_wim_id
))
655 self
.logger
.error(logging_text
+ "Nohing to remove at RO")
656 self
.db
.del_one("wim_accounts", {"_id": wim_id
})
658 self
.logger
.debug(logging_text
+ "Exit Ok")
661 except (ROclient
.ROClientException
, DbException
) as e
:
662 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
664 except Exception as e
:
665 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
668 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
670 db_wim_update
["_admin.operationalState"] = "ERROR"
671 db_wim_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
672 # Mark the WIM 'delete' HA task as erroneous
673 operationState_HA
= 'FAILED'
674 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
675 self
.lcm_tasks
.register_HA('wim', 'delete', op_id
,
676 operationState
=operationState_HA
,
677 detailed_status
=detailed_status_HA
)
679 if db_wim
and db_wim_update
:
680 self
.update_db_2("wim_accounts", wim_id
, db_wim_update
)
681 # If the WIM 'delete' HA task was succesful, the DB entry has been deleted,
682 # which means that there is nowhere to register this task, so do nothing here.
683 except DbException
as e
:
684 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
685 self
.lcm_tasks
.remove("wim_account", wim_id
, order_id
)
688 class SdnLcm(LcmBase
):
690 def __init__(self
, db
, msg
, fs
, lcm_tasks
, ro_config
, loop
):
692 Init, Connect to database, filesystem storage, and messaging
693 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
697 self
.logger
= logging
.getLogger('lcm.sdn')
699 self
.lcm_tasks
= lcm_tasks
700 self
.ro_config
= ro_config
702 super().__init
__(db
, msg
, fs
, self
.logger
)
704 async def create(self
, sdn_content
, order_id
):
706 # HA tasks and backward compatibility:
707 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
708 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
709 # Register 'create' task here for related future HA operations
710 op_id
= sdn_content
.pop('op_id', None)
711 self
.lcm_tasks
.lock_HA('sdn', 'create', op_id
)
713 sdn_id
= sdn_content
["_id"]
714 sdn_content
.pop("op_id", None)
715 logging_text
= "Task sdn_create={} ".format(sdn_id
)
716 self
.logger
.debug(logging_text
+ "Enter")
722 operationState_HA
= ''
723 detailed_status_HA
= ''
725 step
= "Getting sdn from db"
726 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
727 db_sdn_update
["_admin.deployed.RO"] = None
729 step
= "Creating sdn at RO"
730 db_sdn_update
["_admin.detailed-status"] = step
731 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
733 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
734 sdn_RO
= deepcopy(sdn_content
)
735 sdn_RO
.pop("_id", None)
736 sdn_RO
.pop("_admin", None)
737 schema_version
= sdn_RO
.pop("schema_version", None)
738 sdn_RO
.pop("schema_type", None)
739 sdn_RO
.pop("description", None)
740 if sdn_RO
.get("password"):
741 sdn_RO
["password"] = self
.db
.decrypt(sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
)
743 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
744 RO_sdn_id
= desc
["uuid"]
745 db_sdn_update
["_admin.deployed.RO"] = RO_sdn_id
746 db_sdn_update
["_admin.operationalState"] = "ENABLED"
747 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
748 # Mark the SDN 'create' HA task as successful
749 operationState_HA
= 'COMPLETED'
750 detailed_status_HA
= 'Done'
753 except (ROclient
.ROClientException
, DbException
) as e
:
754 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
756 except Exception as e
:
757 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
761 db_sdn_update
["_admin.operationalState"] = "ERROR"
762 db_sdn_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
763 # Mark the SDN 'create' HA task as erroneous
764 operationState_HA
= 'FAILED'
765 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
767 if db_sdn
and db_sdn_update
:
768 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
769 # Register the SDN 'create' HA task either
770 # succesful or erroneous, or do nothing (if legacy NBI)
771 self
.lcm_tasks
.register_HA('sdn', 'create', op_id
,
772 operationState
=operationState_HA
,
773 detailed_status
=detailed_status_HA
)
774 except DbException
as e
:
775 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
776 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
778 async def edit(self
, sdn_content
, order_id
):
780 # HA tasks and backward compatibility:
781 # If 'sdn_content' does not include 'op_id', we a running a legacy NBI version.
782 # In such a case, HA is not supported by NBI, and the HA check always returns True
783 op_id
= sdn_content
.pop('op_id', None)
784 if not self
.lcm_tasks
.lock_HA('sdn', 'edit', op_id
):
787 sdn_id
= sdn_content
["_id"]
788 sdn_content
.pop("op_id", None)
789 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
790 self
.logger
.debug(logging_text
+ "Enter")
795 operationState_HA
= ''
796 detailed_status_HA
= ''
797 step
= "Getting sdn from db"
799 # wait for any previous tasks in process
800 await self
.lcm_tasks
.waitfor_related_HA('sdn', 'edit', op_id
)
802 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
804 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
805 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
806 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
807 step
= "Editing sdn at RO"
808 sdn_RO
= deepcopy(sdn_content
)
809 sdn_RO
.pop("_id", None)
810 sdn_RO
.pop("_admin", None)
811 schema_version
= sdn_RO
.pop("schema_version", None)
812 sdn_RO
.pop("schema_type", None)
813 sdn_RO
.pop("description", None)
814 if sdn_RO
.get("password"):
815 sdn_RO
["password"] = self
.db
.decrypt(sdn_RO
["password"], schema_version
=schema_version
, salt
=sdn_id
)
817 await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
818 db_sdn_update
["_admin.operationalState"] = "ENABLED"
819 # Mark the SDN 'edit' HA task as successful
820 operationState_HA
= 'COMPLETED'
821 detailed_status_HA
= 'Done'
823 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id={}".format(RO_sdn_id
))
826 except (ROclient
.ROClientException
, DbException
) as e
:
827 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
829 except Exception as e
:
830 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
834 db_sdn
["_admin.operationalState"] = "ERROR"
835 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
836 # Mark the SDN 'edit' HA task as erroneous
837 operationState_HA
= 'FAILED'
838 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
841 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
842 # Register the SDN 'edit' HA task either
843 # succesful or erroneous, or do nothing (if legacy NBI)
844 self
.lcm_tasks
.register_HA('sdn', 'edit', op_id
,
845 operationState
=operationState_HA
,
846 detailed_status
=detailed_status_HA
)
847 except DbException
as e
:
848 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
849 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
851 async def delete(self
, sdn_content
, order_id
):
853 # HA tasks and backward compatibility:
854 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
855 # In such a case, HA is not supported by NBI, and the HA check always returns True
856 op_id
= sdn_content
.pop('op_id', None)
857 if not self
.lcm_tasks
.lock_HA('sdn', 'delete', op_id
):
860 sdn_id
= sdn_content
["_id"]
861 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
862 self
.logger
.debug(logging_text
+ "Enter")
867 operationState_HA
= ''
868 detailed_status_HA
= ''
869 step
= "Getting sdn from db"
871 # wait for any previous tasks in process
872 await self
.lcm_tasks
.waitfor_related_HA('sdn', 'delete', op_id
)
874 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
875 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
876 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
877 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
878 step
= "Deleting sdn from RO"
880 await RO
.delete("sdn", RO_sdn_id
)
881 except ROclient
.ROClientException
as e
:
882 if e
.http_code
== 404: # not found
883 self
.logger
.debug(logging_text
+ "RO_sdn_id={} already deleted".format(RO_sdn_id
))
888 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
889 self
.db
.del_one("sdns", {"_id": sdn_id
})
891 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
894 except (ROclient
.ROClientException
, DbException
) as e
:
895 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
897 except Exception as e
:
898 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
902 db_sdn
["_admin.operationalState"] = "ERROR"
903 db_sdn
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
904 # Mark the SDN 'delete' HA task as erroneous
905 operationState_HA
= 'FAILED'
906 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
907 self
.lcm_tasks
.register_HA('sdn', 'delete', op_id
,
908 operationState
=operationState_HA
,
909 detailed_status
=detailed_status_HA
)
911 if db_sdn
and db_sdn_update
:
912 self
.update_db_2("sdns", sdn_id
, db_sdn_update
)
913 # If the SDN 'delete' HA task was succesful, the DB entry has been deleted,
914 # which means that there is nowhere to register this task, so do nothing here.
915 except DbException
as e
:
916 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
917 self
.lcm_tasks
.remove("sdn", sdn_id
, order_id
)
920 class K8sClusterLcm(LcmBase
):
922 def __init__(self
, db
, msg
, fs
, lcm_tasks
, vca_config
, loop
):
924 Init, Connect to database, filesystem storage, and messaging
925 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
929 self
.logger
= logging
.getLogger('lcm.k8scluster')
931 self
.lcm_tasks
= lcm_tasks
932 self
.vca_config
= vca_config
936 self
.k8scluster
= K8sHelmConnector(
937 kubectl_command
=self
.vca_config
.get("kubectlpath"),
938 helm_command
=self
.vca_config
.get("helmpath"),
945 super().__init
__(db
, msg
, fs
, self
.logger
)
947 async def create(self
, k8scluster_content
, order_id
):
949 # HA tasks and backward compatibility:
950 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
951 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
952 # Register 'create' task here for related future HA operations
953 op_id
= k8scluster_content
.pop('op_id', None)
954 if not self
.lcm_tasks
.lock_HA('k8scluster', 'create', op_id
):
957 k8scluster_id
= k8scluster_content
["_id"]
958 k8scluster_content
.pop("op_id", None)
959 logging_text
= "Task k8scluster_create={} ".format(k8scluster_id
)
960 self
.logger
.debug(logging_text
+ "Enter")
963 db_k8scluster_update
= {}
966 operationState_HA
= ''
967 detailed_status_HA
= ''
969 step
= "Getting k8scluster-id='{}' from db".format(k8scluster_id
)
970 self
.logger
.debug(logging_text
+ step
)
971 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
972 self
.db
.encrypt_decrypt_fields(db_k8scluster
.get("credentials"), 'decrypt', ['password', 'secret'],
973 schema_version
=db_k8scluster
["schema_version"], salt
=db_k8scluster
["_id"])
974 print(db_k8scluster
.get("credentials"))
975 print("\n\n\n FIN CREDENTIALS")
976 print(yaml
.safe_dump(db_k8scluster
.get("credentials")))
977 print("\n\n\n FIN OUTPUT")
978 cluster_uuid
, uninstall_sw
= await self
.k8scluster
.init_env(yaml
.safe_dump(db_k8scluster
.
980 db_k8scluster_update
["cluster-uuid"] = cluster_uuid
982 db_k8scluster_update
["uninstall-sw"] = uninstall_sw
983 step
= "Getting the list of repos"
984 self
.logger
.debug(logging_text
+ step
)
986 db_k8srepo_list
= self
.db
.get_list("k8srepos", {})
987 for repo
in db_k8srepo_list
:
988 step
= "Adding repo {} to cluster: {}".format(repo
["name"], cluster_uuid
)
989 self
.logger
.debug(logging_text
+ step
)
990 task
= asyncio
.ensure_future(self
.k8scluster
.repo_add(cluster_uuid
=cluster_uuid
,
991 name
=repo
["name"], url
=repo
["url"],
993 task_list
.append(task
)
994 if not repo
["_admin"].get("cluster-inserted"):
995 repo
["_admin"]["cluster-inserted"] = []
996 repo
["_admin"]["cluster-inserted"].append(cluster_uuid
)
997 self
.update_db_2("k8srepos", repo
["_id"], repo
)
1001 if len(task_list
) > 0:
1002 self
.logger
.debug('Waiting for terminate pending tasks...')
1003 done
, pending
= await asyncio
.wait(task_list
, timeout
=3600)
1005 self
.logger
.debug('All tasks finished...')
1007 self
.logger
.info('There are pending tasks: {}'.format(pending
))
1008 db_k8scluster_update
["_admin.operationalState"] = "ENABLED"
1010 except Exception as e
:
1011 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1014 if exc
and db_k8scluster
:
1015 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1016 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1017 # Mark the k8scluster 'create' HA task as erroneous
1018 operationState_HA
= 'FAILED'
1019 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
1021 if db_k8scluster_update
:
1022 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1023 # Register the K8scluster 'create' HA task either
1024 # succesful or erroneous, or do nothing (if legacy NBI)
1025 self
.lcm_tasks
.register_HA('k8scluster', 'create', op_id
,
1026 operationState
=operationState_HA
,
1027 detailed_status
=detailed_status_HA
)
1028 except DbException
as e
:
1029 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1030 self
.lcm_tasks
.remove("k8sclusters", k8scluster_id
, order_id
)
1032 async def delete(self
, k8scluster_content
, order_id
):
1034 # HA tasks and backward compatibility:
1035 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1036 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1037 # Register 'delete' task here for related future HA operations
1038 op_id
= k8scluster_content
.pop('op_id', None)
1039 if not self
.lcm_tasks
.lock_HA('k8scluster', 'delete', op_id
):
1042 k8scluster_id
= k8scluster_content
["_id"]
1043 k8scluster_content
.pop("op_id", None)
1044 logging_text
= "Task k8scluster_delete={} ".format(k8scluster_id
)
1045 self
.logger
.debug(logging_text
+ "Enter")
1047 db_k8scluster
= None
1048 db_k8scluster_update
= {}
1050 operationState_HA
= ''
1051 detailed_status_HA
= ''
1053 step
= "Getting k8scluster='{}' from db".format(k8scluster_id
)
1054 self
.logger
.debug(logging_text
+ step
)
1055 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": k8scluster_id
})
1056 uninstall_sw
= db_k8scluster
.get("uninstall-sw")
1057 if uninstall_sw
is False or uninstall_sw
is None:
1058 uninstall_sw
= False
1059 cluster_removed
= await self
.k8scluster
.reset(cluster_uuid
=db_k8scluster
.get("cluster-uuid"),
1060 uninstall_sw
=uninstall_sw
)
1063 step
= "Removing k8scluster='{}' from db".format(k8scluster_id
)
1064 self
.logger
.debug(logging_text
+ step
)
1065 db_k8srepo_list
= self
.db
.get_list("k8srepos", {})
1066 for k8srepo
in db_k8srepo_list
:
1068 for cluster
in k8srepo
["_admin"]["cluster-inserted"]:
1069 if db_k8scluster
.get("cluster-uuid") == cluster
:
1070 del(k8srepo
["_admin"]["cluster-inserted"][index
])
1073 self
.update_db_2("k8srepos", k8srepo
["_id"], k8srepo
)
1074 self
.db
.del_one("k8sclusters", {"_id": k8scluster_id
})
1076 raise LcmException("An error happened during the reset of the k8s cluster '{}'".format(k8scluster_id
))
1077 # if not cluster_removed:
1078 # raise Exception("K8scluster was not properly removed")
1080 except Exception as e
:
1081 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1084 if exc
and db_k8scluster
:
1085 db_k8scluster_update
["_admin.operationalState"] = "ERROR"
1086 db_k8scluster_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1087 # Mark the WIM 'create' HA task as erroneous
1088 operationState_HA
= 'FAILED'
1089 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
1091 if db_k8scluster_update
:
1092 self
.update_db_2("k8sclusters", k8scluster_id
, db_k8scluster_update
)
1093 # Register the K8scluster 'delete' HA task either
1094 # succesful or erroneous, or do nothing (if legacy NBI)
1095 self
.lcm_tasks
.register_HA('k8scluster', 'delete', op_id
,
1096 operationState
=operationState_HA
,
1097 detailed_status
=detailed_status_HA
)
1098 except DbException
as e
:
1099 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1100 self
.lcm_tasks
.remove("k8sclusters", k8scluster_id
, order_id
)
1103 class K8sRepoLcm(LcmBase
):
1105 def __init__(self
, db
, msg
, fs
, lcm_tasks
, vca_config
, loop
):
1107 Init, Connect to database, filesystem storage, and messaging
1108 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
1112 self
.logger
= logging
.getLogger('lcm.k8srepo')
1114 self
.lcm_tasks
= lcm_tasks
1115 self
.vca_config
= vca_config
1119 self
.k8srepo
= K8sHelmConnector(
1120 kubectl_command
=self
.vca_config
.get("kubectlpath"),
1121 helm_command
=self
.vca_config
.get("helmpath"),
1128 super().__init
__(db
, msg
, fs
, self
.logger
)
1130 async def create(self
, k8srepo_content
, order_id
):
1132 # HA tasks and backward compatibility:
1133 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1134 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1135 # Register 'create' task here for related future HA operations
1137 op_id
= k8srepo_content
.pop('op_id', None)
1138 if not self
.lcm_tasks
.lock_HA('k8srepo', 'create', op_id
):
1141 k8srepo_id
= k8srepo_content
.get("_id")
1142 logging_text
= "Task k8srepo_create={} ".format(k8srepo_id
)
1143 self
.logger
.debug(logging_text
+ "Enter")
1146 db_k8srepo_update
= {}
1148 operationState_HA
= ''
1149 detailed_status_HA
= ''
1151 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1152 self
.logger
.debug(logging_text
+ step
)
1153 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1154 step
= "Getting k8scluster_list from db"
1155 self
.logger
.debug(logging_text
+ step
)
1156 db_k8scluster_list
= self
.db
.get_list("k8sclusters", {})
1157 db_k8srepo_update
["_admin.cluster-inserted"] = []
1159 for k8scluster
in db_k8scluster_list
:
1160 step
= "Adding repo to cluster: {}".format(k8scluster
["cluster-uuid"])
1161 self
.logger
.debug(logging_text
+ step
)
1162 task
= asyncio
.ensure_future(self
.k8srepo
.repo_add(cluster_uuid
=k8scluster
["cluster-uuid"],
1163 name
=db_k8srepo
["name"], url
=db_k8srepo
["url"],
1165 task_list
.append(task
)
1166 db_k8srepo_update
["_admin.cluster-inserted"].append(k8scluster
["cluster-uuid"])
1170 if len(task_list
) > 0:
1171 self
.logger
.debug('Waiting for terminate pending tasks...')
1172 done
, pending
= await asyncio
.wait(task_list
, timeout
=3600)
1174 self
.logger
.debug('All tasks finished...')
1176 self
.logger
.info('There are pending tasks: {}'.format(pending
))
1177 db_k8srepo_update
["_admin.operationalState"] = "ENABLED"
1178 except Exception as e
:
1179 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1182 if exc
and db_k8srepo
:
1183 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1184 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1185 # Mark the WIM 'create' HA task as erroneous
1186 operationState_HA
= 'FAILED'
1187 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
1189 if db_k8srepo_update
:
1190 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1191 # Register the K8srepo 'create' HA task either
1192 # succesful or erroneous, or do nothing (if legacy NBI)
1193 self
.lcm_tasks
.register_HA('k8srepo', 'create', op_id
,
1194 operationState
=operationState_HA
,
1195 detailed_status
=detailed_status_HA
)
1196 except DbException
as e
:
1197 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1198 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)
1200 async def delete(self
, k8srepo_content
, order_id
):
1202 # HA tasks and backward compatibility:
1203 # If 'vim_content' does not include 'op_id', we a running a legacy NBI version.
1204 # In such a case, HA is not supported by NBI, 'op_id' is None, and lock_HA() will do nothing.
1205 # Register 'delete' task here for related future HA operations
1206 op_id
= k8srepo_content
.pop('op_id', None)
1207 if not self
.lcm_tasks
.lock_HA('k8srepo', 'delete', op_id
):
1210 k8srepo_id
= k8srepo_content
.get("_id")
1211 logging_text
= "Task k8srepo_delete={} ".format(k8srepo_id
)
1212 self
.logger
.debug(logging_text
+ "Enter")
1215 db_k8srepo_update
= {}
1217 operationState_HA
= ''
1218 detailed_status_HA
= ''
1220 step
= "Getting k8srepo-id='{}' from db".format(k8srepo_id
)
1221 self
.logger
.debug(logging_text
+ step
)
1222 db_k8srepo
= self
.db
.get_one("k8srepos", {"_id": k8srepo_id
})
1223 step
= "Getting k8scluster_list from db"
1224 self
.logger
.debug(logging_text
+ step
)
1225 db_k8scluster_list
= self
.db
.get_list("k8sclusters", {})
1228 for k8scluster
in db_k8scluster_list
:
1229 task
= asyncio
.ensure_future(self
.k8srepo
.repo_remove(cluster_uuid
=k8scluster
["cluster-uuid"],
1230 name
=db_k8srepo
["name"]))
1231 task_list
.append(task
)
1234 if len(task_list
) > 0:
1235 self
.logger
.debug('Waiting for terminate pending tasks...')
1236 done
, pending
= await asyncio
.wait(task_list
, timeout
=3600)
1238 self
.logger
.debug('All tasks finished...')
1240 self
.logger
.info('There are pending tasks: {}'.format(pending
))
1241 self
.db
.del_one("k8srepos", {"_id": k8srepo_id
})
1243 except Exception as e
:
1244 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1247 if exc
and db_k8srepo
:
1248 db_k8srepo_update
["_admin.operationalState"] = "ERROR"
1249 db_k8srepo_update
["_admin.detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1250 # Mark the WIM 'create' HA task as erroneous
1251 operationState_HA
= 'FAILED'
1252 detailed_status_HA
= "ERROR {}: {}".format(step
, exc
)
1254 if db_k8srepo_update
:
1255 self
.update_db_2("k8srepos", k8srepo_id
, db_k8srepo_update
)
1256 # Register the K8srepo 'delete' HA task either
1257 # succesful or erroneous, or do nothing (if legacy NBI)
1258 self
.lcm_tasks
.register_HA('k8srepo', 'delete', op_id
,
1259 operationState
=operationState_HA
,
1260 detailed_status
=detailed_status_HA
)
1261 except DbException
as e
:
1262 self
.logger
.error(logging_text
+ "Cannot update database: {}".format(e
))
1263 self
.lcm_tasks
.remove("k8srepo", k8srepo_id
, order_id
)