2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
31 from juju
.action
import Action
32 from juju
.application
import Application
33 from juju
.client
import client
34 from juju
.controller
import Controller
35 from juju
.errors
import JujuAPIError
36 from juju
.machine
import Machine
37 from juju
.model
import Model
38 from n2vc
.exceptions
import (
39 N2VCBadArgumentsException
,
41 N2VCConnectionException
,
42 N2VCExecutionException
,
43 N2VCInvalidCertificate
,
47 from n2vc
.juju_observer
import JujuModelObserver
48 from n2vc
.n2vc_conn
import N2VCConnector
49 from n2vc
.n2vc_conn
import obj_to_dict
, obj_to_yaml
50 from n2vc
.provisioner
import AsyncSSHProvisioner
51 from n2vc
.libjuju
import Libjuju
54 class N2VCJujuConnector(N2VCConnector
):
57 ####################################################################################
58 ################################### P U B L I C ####################################
59 ####################################################################################
62 BUILT_IN_CLOUDS
= ["localhost", "microk8s"]
70 url
: str = "127.0.0.1:17070",
71 username
: str = "admin",
72 vca_config
: dict = None,
75 """Initialize juju N2VC connector
78 # parent class constructor
79 N2VCConnector
.__init
__(
87 vca_config
=vca_config
,
88 on_update_db
=on_update_db
,
91 # silence websocket traffic log
92 logging
.getLogger("websockets.protocol").setLevel(logging
.INFO
)
93 logging
.getLogger("juju.client.connection").setLevel(logging
.WARN
)
94 logging
.getLogger("model").setLevel(logging
.WARN
)
96 self
.log
.info("Initializing N2VC juju connector...")
99 ##############################################################
101 ##############################################################
106 raise N2VCBadArgumentsException("Argument url is mandatory", ["url"])
107 url_parts
= url
.split(":")
108 if len(url_parts
) != 2:
109 raise N2VCBadArgumentsException(
110 "Argument url: bad format (localhost:port) -> {}".format(url
), ["url"]
112 self
.hostname
= url_parts
[0]
114 self
.port
= int(url_parts
[1])
116 raise N2VCBadArgumentsException(
117 "url port must be a number -> {}".format(url
), ["url"]
122 raise N2VCBadArgumentsException(
123 "Argument username is mandatory", ["username"]
127 if vca_config
is None:
128 raise N2VCBadArgumentsException(
129 "Argument vca_config is mandatory", ["vca_config"]
132 if "secret" in vca_config
:
133 self
.secret
= vca_config
["secret"]
135 raise N2VCBadArgumentsException(
136 "Argument vca_config.secret is mandatory", ["vca_config.secret"]
139 # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub
140 # if exists, it will be written in lcm container: _create_juju_public_key()
141 if "public_key" in vca_config
:
142 self
.public_key
= vca_config
["public_key"]
144 self
.public_key
= None
146 # TODO: Verify ca_cert is valid before using. VCA will crash
147 # if the ca_cert isn't formatted correctly.
148 def base64_to_cacert(b64string
):
149 """Convert the base64-encoded string containing the VCA CACERT.
155 cacert
= base64
.b64decode(b64string
).decode("utf-8")
157 cacert
= re
.sub(r
"\\n", r
"\n", cacert
,)
158 except binascii
.Error
as e
:
159 self
.log
.debug("Caught binascii.Error: {}".format(e
))
160 raise N2VCInvalidCertificate(message
="Invalid CA Certificate")
164 self
.ca_cert
= vca_config
.get("ca_cert")
166 self
.ca_cert
= base64_to_cacert(vca_config
["ca_cert"])
168 if "api_proxy" in vca_config
:
169 self
.api_proxy
= vca_config
["api_proxy"]
171 "api_proxy for native charms configured: {}".format(self
.api_proxy
)
175 "api_proxy is not configured. Support for native charms is disabled"
178 if "enable_os_upgrade" in vca_config
:
179 self
.enable_os_upgrade
= vca_config
["enable_os_upgrade"]
181 self
.enable_os_upgrade
= True
183 if "apt_mirror" in vca_config
:
184 self
.apt_mirror
= vca_config
["apt_mirror"]
186 self
.apt_mirror
= None
188 self
.cloud
= vca_config
.get("cloud")
189 # self.log.debug('Arguments have been checked')
192 self
.controller
= None # it will be filled when connect to juju
193 self
.juju_models
= {} # model objects for every model_name
194 self
.juju_observers
= {} # model observers for every model_name
196 False # while connecting to juju (to avoid duplicate connections)
198 self
._authenticated
= (
199 False # it will be True when juju connection be stablished
201 self
._creating
_model
= False # True during model creation
202 self
.libjuju
= Libjuju(
204 api_proxy
=self
.api_proxy
,
205 enable_os_upgrade
=self
.enable_os_upgrade
,
206 apt_mirror
=self
.apt_mirror
,
207 username
=self
.username
,
208 password
=self
.secret
,
216 # create juju pub key file in lcm container at
217 # ./local/share/juju/ssh/juju_id_rsa.pub
218 self
._create
_juju
_public
_key
()
220 self
.log
.info("N2VC juju connector initialized")
222 async def get_status(self
, namespace
: str, yaml_format
: bool = True):
224 # self.log.info('Getting NS status. namespace: {}'.format(namespace))
226 _nsi_id
, ns_id
, _vnf_id
, _vdu_id
, _vdu_count
= self
._get
_namespace
_components
(
229 # model name is ns_id
231 if model_name
is None:
232 msg
= "Namespace {} not valid".format(namespace
)
234 raise N2VCBadArgumentsException(msg
, ["namespace"])
236 status
= await self
.libjuju
.get_model_status(model_name
)
239 return obj_to_yaml(status
)
241 return obj_to_dict(status
)
243 async def create_execution_environment(
247 reuse_ee_id
: str = None,
248 progress_timeout
: float = None,
249 total_timeout
: float = None,
253 "Creating execution environment. namespace: {}, reuse_ee_id: {}".format(
254 namespace
, reuse_ee_id
260 model_name
, application_name
, machine_id
= self
._get
_ee
_id
_components
(
270 ) = self
._get
_namespace
_components
(namespace
=namespace
)
271 # model name is ns_id
274 application_name
= self
._get
_application
_name
(namespace
=namespace
)
277 "model name: {}, application name: {}, machine_id: {}".format(
278 model_name
, application_name
, machine_id
282 # create or reuse a new juju machine
284 if not await self
.libjuju
.model_exists(model_name
):
285 await self
.libjuju
.add_model(model_name
, cloud_name
=self
.cloud
)
286 machine
, new
= await self
.libjuju
.create_machine(
287 model_name
=model_name
,
288 machine_id
=machine_id
,
290 progress_timeout
=progress_timeout
,
291 total_timeout
=total_timeout
,
293 # id for the execution environment
294 ee_id
= N2VCJujuConnector
._build
_ee
_id
(
295 model_name
=model_name
,
296 application_name
=application_name
,
297 machine_id
=str(machine
.entity_id
),
299 self
.log
.debug("ee_id: {}".format(ee_id
))
302 # write ee_id in database
303 self
._write
_ee
_id
_db
(db_dict
=db_dict
, ee_id
=ee_id
)
305 except Exception as e
:
306 message
= "Error creating machine on juju: {}".format(e
)
307 self
.log
.error(message
)
308 raise N2VCException(message
=message
)
310 # new machine credentials
312 "hostname": machine
.dns_name
,
316 "Execution environment created. ee_id: {}, credentials: {}".format(
321 return ee_id
, credentials
323 async def register_execution_environment(
328 progress_timeout
: float = None,
329 total_timeout
: float = None,
333 "Registering execution environment. namespace={}, credentials={}".format(
334 namespace
, credentials
338 if credentials
is None:
339 raise N2VCBadArgumentsException(
340 message
="credentials are mandatory", bad_args
=["credentials"]
342 if credentials
.get("hostname"):
343 hostname
= credentials
["hostname"]
345 raise N2VCBadArgumentsException(
346 message
="hostname is mandatory", bad_args
=["credentials.hostname"]
348 if credentials
.get("username"):
349 username
= credentials
["username"]
351 raise N2VCBadArgumentsException(
352 message
="username is mandatory", bad_args
=["credentials.username"]
354 if "private_key_path" in credentials
:
355 private_key_path
= credentials
["private_key_path"]
357 # if not passed as argument, use generated private key path
358 private_key_path
= self
.private_key_path
360 _nsi_id
, ns_id
, _vnf_id
, _vdu_id
, _vdu_count
= self
._get
_namespace
_components
(
367 application_name
= self
._get
_application
_name
(namespace
=namespace
)
369 # register machine on juju
371 if not self
.api_proxy
:
372 msg
= "Cannot provision machine: api_proxy is not defined"
373 self
.log
.error(msg
=msg
)
374 raise N2VCException(message
=msg
)
375 if not await self
.libjuju
.model_exists(model_name
):
376 await self
.libjuju
.add_model(model_name
, cloud_name
=self
.cloud
)
377 machine_id
= await self
.libjuju
.provision_machine(
378 model_name
=model_name
,
381 private_key_path
=private_key_path
,
383 progress_timeout
=progress_timeout
,
384 total_timeout
=total_timeout
,
386 except Exception as e
:
387 self
.log
.error("Error registering machine: {}".format(e
))
389 message
="Error registering machine on juju: {}".format(e
)
392 self
.log
.info("Machine registered: {}".format(machine_id
))
394 # id for the execution environment
395 ee_id
= N2VCJujuConnector
._build
_ee
_id
(
396 model_name
=model_name
,
397 application_name
=application_name
,
398 machine_id
=str(machine_id
),
401 self
.log
.info("Execution environment registered. ee_id: {}".format(ee_id
))
405 async def install_configuration_sw(
410 progress_timeout
: float = None,
411 total_timeout
: float = None,
417 "Installing configuration sw on ee_id: {}, "
418 "artifact path: {}, db_dict: {}"
419 ).format(ee_id
, artifact_path
, db_dict
)
423 if ee_id
is None or len(ee_id
) == 0:
424 raise N2VCBadArgumentsException(
425 message
="ee_id is mandatory", bad_args
=["ee_id"]
427 if artifact_path
is None or len(artifact_path
) == 0:
428 raise N2VCBadArgumentsException(
429 message
="artifact_path is mandatory", bad_args
=["artifact_path"]
432 raise N2VCBadArgumentsException(
433 message
="db_dict is mandatory", bad_args
=["db_dict"]
441 ) = N2VCJujuConnector
._get
_ee
_id
_components
(ee_id
=ee_id
)
443 "model: {}, application: {}, machine: {}".format(
444 model_name
, application_name
, machine_id
448 raise N2VCBadArgumentsException(
449 message
="ee_id={} is not a valid execution environment id".format(
455 # remove // in charm path
456 while artifact_path
.find("//") >= 0:
457 artifact_path
= artifact_path
.replace("//", "/")
460 if not self
.fs
.file_exists(artifact_path
, mode
="dir"):
461 msg
= "artifact path does not exist: {}".format(artifact_path
)
462 raise N2VCBadArgumentsException(message
=msg
, bad_args
=["artifact_path"])
464 if artifact_path
.startswith("/"):
465 full_path
= self
.fs
.path
+ artifact_path
467 full_path
= self
.fs
.path
+ "/" + artifact_path
470 await self
.libjuju
.deploy_charm(
471 model_name
=model_name
,
472 application_name
=application_name
,
474 machine_id
=machine_id
,
476 progress_timeout
=progress_timeout
,
477 total_timeout
=total_timeout
,
480 except Exception as e
:
482 message
="Error desploying charm into ee={} : {}".format(ee_id
, e
)
485 self
.log
.info("Configuration sw installed")
487 async def get_ee_ssh_public__key(
491 progress_timeout
: float = None,
492 total_timeout
: float = None,
497 "Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}"
498 ).format(ee_id
, db_dict
)
502 if ee_id
is None or len(ee_id
) == 0:
503 raise N2VCBadArgumentsException(
504 message
="ee_id is mandatory", bad_args
=["ee_id"]
507 raise N2VCBadArgumentsException(
508 message
="db_dict is mandatory", bad_args
=["db_dict"]
516 ) = N2VCJujuConnector
._get
_ee
_id
_components
(ee_id
=ee_id
)
518 "model: {}, application: {}, machine: {}".format(
519 model_name
, application_name
, machine_id
523 raise N2VCBadArgumentsException(
524 message
="ee_id={} is not a valid execution environment id".format(
530 # try to execute ssh layer primitives (if exist):
536 application_name
= N2VCJujuConnector
._format
_app
_name
(application_name
)
538 # execute action: generate-ssh-key
540 output
, _status
= await self
.libjuju
.execute_action(
541 model_name
=model_name
,
542 application_name
=application_name
,
543 action_name
="generate-ssh-key",
545 progress_timeout
=progress_timeout
,
546 total_timeout
=total_timeout
,
548 except Exception as e
:
550 "Skipping exception while executing action generate-ssh-key: {}".format(
555 # execute action: get-ssh-public-key
557 output
, _status
= await self
.libjuju
.execute_action(
558 model_name
=model_name
,
559 application_name
=application_name
,
560 action_name
="get-ssh-public-key",
562 progress_timeout
=progress_timeout
,
563 total_timeout
=total_timeout
,
565 except Exception as e
:
566 msg
= "Cannot execute action get-ssh-public-key: {}\n".format(e
)
568 raise N2VCExecutionException(e
, primitive_name
="get-ssh-public-key")
570 # return public key if exists
571 return output
["pubkey"] if "pubkey" in output
else output
573 async def add_relation(
574 self
, ee_id_1
: str, ee_id_2
: str, endpoint_1
: str, endpoint_2
: str
578 "adding new relation between {} and {}, endpoints: {}, {}".format(
579 ee_id_1
, ee_id_2
, endpoint_1
, endpoint_2
585 message
= "EE 1 is mandatory"
586 self
.log
.error(message
)
587 raise N2VCBadArgumentsException(message
=message
, bad_args
=["ee_id_1"])
589 message
= "EE 2 is mandatory"
590 self
.log
.error(message
)
591 raise N2VCBadArgumentsException(message
=message
, bad_args
=["ee_id_2"])
593 message
= "endpoint 1 is mandatory"
594 self
.log
.error(message
)
595 raise N2VCBadArgumentsException(message
=message
, bad_args
=["endpoint_1"])
597 message
= "endpoint 2 is mandatory"
598 self
.log
.error(message
)
599 raise N2VCBadArgumentsException(message
=message
, bad_args
=["endpoint_2"])
601 # get the model, the applications and the machines from the ee_id's
602 model_1
, app_1
, _machine_1
= self
._get
_ee
_id
_components
(ee_id_1
)
603 model_2
, app_2
, _machine_2
= self
._get
_ee
_id
_components
(ee_id_2
)
605 # model must be the same
606 if model_1
!= model_2
:
607 message
= "EE models are not the same: {} vs {}".format(ee_id_1
, ee_id_2
)
608 self
.log
.error(message
)
609 raise N2VCBadArgumentsException(
610 message
=message
, bad_args
=["ee_id_1", "ee_id_2"]
613 # add juju relations between two applications
615 await self
.libjuju
.add_relation(
617 application_name_1
=app_1
,
618 application_name_2
=app_2
,
619 relation_1
=endpoint_1
,
620 relation_2
=endpoint_2
,
622 except Exception as e
:
623 message
= "Error adding relation between {} and {}: {}".format(
626 self
.log
.error(message
)
627 raise N2VCException(message
=message
)
629 async def remove_relation(self
):
631 self
.log
.info("Method not implemented yet")
632 raise MethodNotImplemented()
634 async def deregister_execution_environments(self
):
635 self
.log
.info("Method not implemented yet")
636 raise MethodNotImplemented()
638 async def delete_namespace(
639 self
, namespace
: str, db_dict
: dict = None, total_timeout
: float = None
641 self
.log
.info("Deleting namespace={}".format(namespace
))
644 if namespace
is None:
645 raise N2VCBadArgumentsException(
646 message
="namespace is mandatory", bad_args
=["namespace"]
649 _nsi_id
, ns_id
, _vnf_id
, _vdu_id
, _vdu_count
= self
._get
_namespace
_components
(
652 if ns_id
is not None:
654 if not await self
.libjuju
.model_exists(ns_id
):
655 raise N2VCNotFound(message
="Model {} does not exist".format(ns_id
))
656 await self
.libjuju
.destroy_model(
657 model_name
=ns_id
, total_timeout
=total_timeout
661 except Exception as e
:
663 message
="Error deleting namespace {} : {}".format(namespace
, e
)
666 raise N2VCBadArgumentsException(
667 message
="only ns_id is permitted to delete yet", bad_args
=["namespace"]
670 self
.log
.info("Namespace {} deleted".format(namespace
))
672 async def delete_execution_environment(
673 self
, ee_id
: str, db_dict
: dict = None, total_timeout
: float = None
675 self
.log
.info("Deleting execution environment ee_id={}".format(ee_id
))
679 raise N2VCBadArgumentsException(
680 message
="ee_id is mandatory", bad_args
=["ee_id"]
683 model_name
, application_name
, _machine_id
= self
._get
_ee
_id
_components
(
687 # destroy the application
689 await self
.libjuju
.destroy_model(
690 model_name
=model_name
, total_timeout
=total_timeout
692 except Exception as e
:
695 "Error deleting execution environment {} (application {}) : {}"
696 ).format(ee_id
, application_name
, e
)
699 # destroy the machine
701 # await self._juju_destroy_machine(
702 # model_name=model_name,
703 # machine_id=machine_id,
704 # total_timeout=total_timeout
706 # except Exception as e:
707 # raise N2VCException(
708 # message='Error deleting execution environment {} (machine {}) : {}'
709 # .format(ee_id, machine_id, e))
711 self
.log
.info("Execution environment {} deleted".format(ee_id
))
713 async def exec_primitive(
718 db_dict
: dict = None,
719 progress_timeout
: float = None,
720 total_timeout
: float = None,
724 "Executing primitive: {} on ee: {}, params: {}".format(
725 primitive_name
, ee_id
, params_dict
730 if ee_id
is None or len(ee_id
) == 0:
731 raise N2VCBadArgumentsException(
732 message
="ee_id is mandatory", bad_args
=["ee_id"]
734 if primitive_name
is None or len(primitive_name
) == 0:
735 raise N2VCBadArgumentsException(
736 message
="action_name is mandatory", bad_args
=["action_name"]
738 if params_dict
is None:
746 ) = N2VCJujuConnector
._get
_ee
_id
_components
(ee_id
=ee_id
)
748 raise N2VCBadArgumentsException(
749 message
="ee_id={} is not a valid execution environment id".format(
755 if primitive_name
== "config":
756 # Special case: config primitive
758 await self
.libjuju
.configure_application(
759 model_name
=model_name
,
760 application_name
=application_name
,
763 actions
= await self
.libjuju
.get_actions(
764 application_name
=application_name
, model_name
=model_name
,
767 "Application {} has these actions: {}".format(
768 application_name
, actions
771 if "verify-ssh-credentials" in actions
:
772 # execute verify-credentials
775 for _
in range(num_retries
):
777 self
.log
.debug("Executing action verify-ssh-credentials...")
778 output
, ok
= await self
.libjuju
.execute_action(
779 model_name
=model_name
,
780 application_name
=application_name
,
781 action_name
="verify-ssh-credentials",
783 progress_timeout
=progress_timeout
,
784 total_timeout
=total_timeout
,
786 self
.log
.debug("Result: {}, output: {}".format(ok
, output
))
788 except asyncio
.CancelledError
:
790 except Exception as e
:
792 "Error executing verify-ssh-credentials: {}. Retrying...".format(
796 await asyncio
.sleep(retry_timeout
)
799 "Error executing verify-ssh-credentials after {} retries. ".format(
804 msg
= "Action verify-ssh-credentials does not exist in application {}".format(
807 self
.log
.debug(msg
=msg
)
808 except Exception as e
:
809 self
.log
.error("Error configuring juju application: {}".format(e
))
810 raise N2VCExecutionException(
811 message
="Error configuring application into ee={} : {}".format(
814 primitive_name
=primitive_name
,
819 output
, status
= await self
.libjuju
.execute_action(
820 model_name
=model_name
,
821 application_name
=application_name
,
822 action_name
=primitive_name
,
824 progress_timeout
=progress_timeout
,
825 total_timeout
=total_timeout
,
828 if status
== "completed":
831 raise Exception("status is not completed: {}".format(status
))
832 except Exception as e
:
834 "Error executing primitive {}: {}".format(primitive_name
, e
)
836 raise N2VCExecutionException(
837 message
="Error executing primitive {} into ee={} : {}".format(
838 primitive_name
, ee_id
, e
840 primitive_name
=primitive_name
,
843 async def disconnect(self
):
844 self
.log
.info("closing juju N2VC...")
846 await self
.libjuju
.disconnect()
847 except Exception as e
:
848 raise N2VCConnectionException(
849 message
="Error disconnecting controller: {}".format(e
), url
=self
.url
853 ####################################################################################
854 ################################### P R I V A T E ##################################
855 ####################################################################################
858 def _write_ee_id_db(self
, db_dict
: dict, ee_id
: str):
860 # write ee_id to database: _admin.deployed.VCA.x
862 the_table
= db_dict
["collection"]
863 the_filter
= db_dict
["filter"]
864 the_path
= db_dict
["path"]
865 if not the_path
[-1] == ".":
866 the_path
= the_path
+ "."
867 update_dict
= {the_path
+ "ee_id": ee_id
}
868 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
872 update_dict
=update_dict
,
875 except asyncio
.CancelledError
:
877 except Exception as e
:
878 self
.log
.error("Error writing ee_id to database: {}".format(e
))
881 def _build_ee_id(model_name
: str, application_name
: str, machine_id
: str):
883 Build an execution environment id form model, application and machine
885 :param application_name:
889 # id for the execution environment
890 return "{}.{}.{}".format(model_name
, application_name
, machine_id
)
893 def _get_ee_id_components(ee_id
: str) -> (str, str, str):
895 Get model, application and machine components from an execution environment id
897 :return: model_name, application_name, machine_id
901 return None, None, None
903 # split components of id
904 parts
= ee_id
.split(".")
905 model_name
= parts
[0]
906 application_name
= parts
[1]
907 machine_id
= parts
[2]
908 return model_name
, application_name
, machine_id
910 def _get_application_name(self
, namespace
: str) -> str:
912 Build application name from namespace
914 :return: app-vnf-<vnf id>-vdu-<vdu-id>-cnt-<vdu-count>
917 # TODO: Enforce the Juju 50-character application limit
919 # split namespace components
920 _
, _
, vnf_id
, vdu_id
, vdu_count
= self
._get
_namespace
_components
(
924 if vnf_id
is None or len(vnf_id
) == 0:
927 # Shorten the vnf_id to its last twelve characters
928 vnf_id
= "vnf-" + vnf_id
[-12:]
930 if vdu_id
is None or len(vdu_id
) == 0:
933 # Shorten the vdu_id to its last twelve characters
934 vdu_id
= "-vdu-" + vdu_id
[-12:]
936 if vdu_count
is None or len(vdu_count
) == 0:
939 vdu_count
= "-cnt-" + vdu_count
941 application_name
= "app-{}{}{}".format(vnf_id
, vdu_id
, vdu_count
)
943 return N2VCJujuConnector
._format
_app
_name
(application_name
)
945 async def _juju_create_machine(
948 application_name
: str,
949 machine_id
: str = None,
950 db_dict
: dict = None,
951 progress_timeout
: float = None,
952 total_timeout
: float = None,
956 "creating machine in model: {}, existing machine id: {}".format(
957 model_name
, machine_id
961 # get juju model and observer (create model if needed)
962 model
= await self
._juju
_get
_model
(model_name
=model_name
)
963 observer
= self
.juju_observers
[model_name
]
965 # find machine id in model
967 if machine_id
is not None:
968 self
.log
.debug("Finding existing machine id {} in model".format(machine_id
))
969 # get juju existing machines in the model
970 existing_machines
= await model
.get_machines()
971 if machine_id
in existing_machines
:
973 "Machine id {} found in model (reusing it)".format(machine_id
)
975 machine
= model
.machines
[machine_id
]
978 self
.log
.debug("Creating a new machine in juju...")
979 # machine does not exist, create it and wait for it
980 machine
= await model
.add_machine(
981 spec
=None, constraints
=None, disks
=None, series
="xenial"
984 # register machine with observer
985 observer
.register_machine(machine
=machine
, db_dict
=db_dict
)
987 # id for the execution environment
988 ee_id
= N2VCJujuConnector
._build
_ee
_id
(
989 model_name
=model_name
,
990 application_name
=application_name
,
991 machine_id
=str(machine
.entity_id
),
994 # write ee_id in database
995 self
._write
_ee
_id
_db
(db_dict
=db_dict
, ee_id
=ee_id
)
997 # wait for machine creation
998 await observer
.wait_for_machine(
999 machine_id
=str(machine
.entity_id
),
1000 progress_timeout
=progress_timeout
,
1001 total_timeout
=total_timeout
,
1006 self
.log
.debug("Reusing old machine pending")
1008 # register machine with observer
1009 observer
.register_machine(machine
=machine
, db_dict
=db_dict
)
1011 # machine does exist, but it is in creation process (pending), wait for
1012 # create finalisation
1013 await observer
.wait_for_machine(
1014 machine_id
=machine
.entity_id
,
1015 progress_timeout
=progress_timeout
,
1016 total_timeout
=total_timeout
,
1019 self
.log
.debug("Machine ready at " + str(machine
.dns_name
))
1022 async def _juju_provision_machine(
1027 private_key_path
: str,
1028 db_dict
: dict = None,
1029 progress_timeout
: float = None,
1030 total_timeout
: float = None,
1033 if not self
.api_proxy
:
1034 msg
= "Cannot provision machine: api_proxy is not defined"
1035 self
.log
.error(msg
=msg
)
1036 raise N2VCException(message
=msg
)
1039 "provisioning machine. model: {}, hostname: {}, username: {}".format(
1040 model_name
, hostname
, username
1044 if not self
._authenticated
:
1045 await self
._juju
_login
()
1047 # get juju model and observer
1048 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1049 observer
= self
.juju_observers
[model_name
]
1051 # TODO check if machine is already provisioned
1052 machine_list
= await model
.get_machines()
1054 provisioner
= AsyncSSHProvisioner(
1057 private_key_path
=private_key_path
,
1063 params
= await provisioner
.provision_machine()
1064 except Exception as ex
:
1065 msg
= "Exception provisioning machine: {}".format(ex
)
1067 raise N2VCException(message
=msg
)
1069 params
.jobs
= ["JobHostUnits"]
1071 connection
= model
.connection()
1073 # Submit the request.
1074 self
.log
.debug("Adding machine to model")
1075 client_facade
= client
.ClientFacade
.from_connection(connection
)
1076 results
= await client_facade
.AddMachines(params
=[params
])
1077 error
= results
.machines
[0].error
1079 msg
= "Error adding machine: {}".format(error
.message
)
1080 self
.log
.error(msg
=msg
)
1081 raise ValueError(msg
)
1083 machine_id
= results
.machines
[0].machine
1085 # Need to run this after AddMachines has been called,
1086 # as we need the machine_id
1087 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
1088 asyncio
.ensure_future(
1089 provisioner
.install_agent(
1090 connection
=connection
,
1092 machine_id
=machine_id
,
1097 # wait for machine in model (now, machine is not yet in model, so we must
1101 machine_list
= await model
.get_machines()
1102 if machine_id
in machine_list
:
1103 self
.log
.debug("Machine {} found in model!".format(machine_id
))
1104 machine
= model
.machines
.get(machine_id
)
1106 await asyncio
.sleep(2)
1109 msg
= "Machine {} not found in model".format(machine_id
)
1110 self
.log
.error(msg
=msg
)
1111 raise Exception(msg
)
1113 # register machine with observer
1114 observer
.register_machine(machine
=machine
, db_dict
=db_dict
)
1116 # wait for machine creation
1117 self
.log
.debug("waiting for provision finishes... {}".format(machine_id
))
1118 await observer
.wait_for_machine(
1119 machine_id
=machine_id
,
1120 progress_timeout
=progress_timeout
,
1121 total_timeout
=total_timeout
,
1124 self
.log
.debug("Machine provisioned {}".format(machine_id
))
1128 async def _juju_deploy_charm(
1131 application_name
: str,
1135 progress_timeout
: float = None,
1136 total_timeout
: float = None,
1137 config
: dict = None,
1138 ) -> (Application
, int):
1140 # get juju model and observer
1141 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1142 observer
= self
.juju_observers
[model_name
]
1144 # check if application already exists
1146 if application_name
in model
.applications
:
1147 application
= model
.applications
[application_name
]
1149 if application
is None:
1151 # application does not exist, create it and wait for it
1153 "deploying application {} to machine {}, model {}".format(
1154 application_name
, machine_id
, model_name
1157 self
.log
.debug("charm: {}".format(charm_path
))
1158 machine
= model
.machines
[machine_id
]
1160 application
= await model
.deploy(
1161 entity_url
=charm_path
,
1162 application_name
=application_name
,
1165 series
=machine
.series
,
1170 # register application with observer
1171 observer
.register_application(application
=application
, db_dict
=db_dict
)
1174 "waiting for application deployed... {}".format(application
.entity_id
)
1176 retries
= await observer
.wait_for_application(
1177 application_id
=application
.entity_id
,
1178 progress_timeout
=progress_timeout
,
1179 total_timeout
=total_timeout
,
1181 self
.log
.debug("application deployed")
1185 # register application with observer
1186 observer
.register_application(application
=application
, db_dict
=db_dict
)
1188 # application already exists, but not finalised
1189 self
.log
.debug("application already exists, waiting for deployed...")
1190 retries
= await observer
.wait_for_application(
1191 application_id
=application
.entity_id
,
1192 progress_timeout
=progress_timeout
,
1193 total_timeout
=total_timeout
,
1195 self
.log
.debug("application deployed")
1197 return application
, retries
1199 async def _juju_execute_action(
1202 application_name
: str,
1205 progress_timeout
: float = None,
1206 total_timeout
: float = None,
1210 # get juju model and observer
1211 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1212 observer
= self
.juju_observers
[model_name
]
1214 application
= await self
._juju
_get
_application
(
1215 model_name
=model_name
, application_name
=application_name
1219 for u
in application
.units
:
1220 if await u
.is_leader_from_status():
1222 if unit
is not None:
1223 actions
= await application
.get_actions()
1224 if action_name
in actions
:
1226 'executing action "{}" using params: {}'.format(action_name
, kwargs
)
1228 action
= await unit
.run_action(action_name
, **kwargs
)
1230 # register action with observer
1231 observer
.register_action(action
=action
, db_dict
=db_dict
)
1233 await observer
.wait_for_action(
1234 action_id
=action
.entity_id
,
1235 progress_timeout
=progress_timeout
,
1236 total_timeout
=total_timeout
,
1238 self
.log
.debug("action completed with status: {}".format(action
.status
))
1239 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
1240 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
1241 if action
.entity_id
in status
:
1242 status
= status
[action
.entity_id
]
1245 return output
, status
1247 raise N2VCExecutionException(
1248 message
="Cannot execute action on charm", primitive_name
=action_name
1251 async def _juju_configure_application(
1254 application_name
: str,
1257 progress_timeout
: float = None,
1258 total_timeout
: float = None,
1261 # get the application
1262 application
= await self
._juju
_get
_application
(
1263 model_name
=model_name
, application_name
=application_name
1267 "configuring the application {} -> {}".format(application_name
, config
)
1269 res
= await application
.set_config(config
)
1271 "application {} configured. res={}".format(application_name
, res
)
1274 # Verify the config is set
1275 new_conf
= await application
.get_config()
1277 value
= new_conf
[key
]["value"]
1278 self
.log
.debug(" {} = {}".format(key
, value
))
1279 if config
[key
] != value
:
1280 raise N2VCException(
1281 message
="key {} is not configured correctly {} != {}".format(
1282 key
, config
[key
], new_conf
[key
]
1286 # check if 'verify-ssh-credentials' action exists
1287 # unit = application.units[0]
1288 actions
= await application
.get_actions()
1289 if "verify-ssh-credentials" not in actions
:
1291 "Action verify-ssh-credentials does not exist in application {}"
1292 ).format(application_name
)
1293 self
.log
.debug(msg
=msg
)
1296 # execute verify-credentials
1298 retry_timeout
= 15.0
1299 for _
in range(num_retries
):
1301 self
.log
.debug("Executing action verify-ssh-credentials...")
1302 output
, ok
= await self
._juju
_execute
_action
(
1303 model_name
=model_name
,
1304 application_name
=application_name
,
1305 action_name
="verify-ssh-credentials",
1307 progress_timeout
=progress_timeout
,
1308 total_timeout
=total_timeout
,
1310 self
.log
.debug("Result: {}, output: {}".format(ok
, output
))
1312 except asyncio
.CancelledError
:
1314 except Exception as e
:
1316 "Error executing verify-ssh-credentials: {}. Retrying...".format(e
)
1318 await asyncio
.sleep(retry_timeout
)
1321 "Error executing verify-ssh-credentials after {} retries. ".format(
1327 async def _juju_get_application(self
, model_name
: str, application_name
: str):
1328 """Get the deployed application."""
1330 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1332 application_name
= N2VCJujuConnector
._format
_app
_name
(application_name
)
1334 if model
.applications
and application_name
in model
.applications
:
1335 return model
.applications
[application_name
]
1337 raise N2VCException(
1338 message
="Cannot get application {} from model {}".format(
1339 application_name
, model_name
1343 async def _juju_get_model(self
, model_name
: str) -> Model
:
1344 """ Get a model object from juju controller
1345 If the model does not exits, it creates it.
1347 :param str model_name: name of the model
1348 :returns Model: model obtained from juju controller or Exception
1352 model_name
= N2VCJujuConnector
._format
_model
_name
(model_name
)
1354 if model_name
in self
.juju_models
:
1355 return self
.juju_models
[model_name
]
1357 if self
._creating
_model
:
1358 self
.log
.debug("Another coroutine is creating a model. Wait...")
1359 while self
._creating
_model
:
1360 # another coroutine is creating a model, wait
1361 await asyncio
.sleep(0.1)
1362 # retry (perhaps another coroutine has created the model meanwhile)
1363 if model_name
in self
.juju_models
:
1364 return self
.juju_models
[model_name
]
1367 self
._creating
_model
= True
1369 # get juju model names from juju
1370 model_list
= await self
.controller
.list_models()
1372 if model_name
not in model_list
:
1374 "Model {} does not exist. Creating new model...".format(model_name
)
1376 config_dict
= {"authorized-keys": self
.public_key
}
1378 config_dict
["apt-mirror"] = self
.apt_mirror
1379 if not self
.enable_os_upgrade
:
1380 config_dict
["enable-os-refresh-update"] = False
1381 config_dict
["enable-os-upgrade"] = False
1382 if self
.cloud
in self
.BUILT_IN_CLOUDS
:
1383 model
= await self
.controller
.add_model(
1384 model_name
=model_name
,
1386 cloud_name
=self
.cloud
,
1389 model
= await self
.controller
.add_model(
1390 model_name
=model_name
,
1392 cloud_name
=self
.cloud
,
1393 credential_name
=self
.cloud
,
1395 self
.log
.info("New model created, name={}".format(model_name
))
1398 "Model already exists in juju. Getting model {}".format(model_name
)
1400 model
= await self
.controller
.get_model(model_name
)
1401 self
.log
.debug("Existing model in juju, name={}".format(model_name
))
1403 self
.juju_models
[model_name
] = model
1404 self
.juju_observers
[model_name
] = JujuModelObserver(n2vc
=self
, model
=model
)
1407 except Exception as e
:
1408 msg
= "Cannot get model {}. Exception: {}".format(model_name
, e
)
1410 raise N2VCException(msg
)
1412 self
._creating
_model
= False
1414 async def _juju_add_relation(
1417 application_name_1
: str,
1418 application_name_2
: str,
1423 # get juju model and observer
1424 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1426 r1
= "{}:{}".format(application_name_1
, relation_1
)
1427 r2
= "{}:{}".format(application_name_2
, relation_2
)
1429 self
.log
.debug("adding relation: {} -> {}".format(r1
, r2
))
1431 await model
.add_relation(relation1
=r1
, relation2
=r2
)
1432 except JujuAPIError
as e
:
1433 # If one of the applications in the relationship doesn't exist, or the
1434 # relation has already been added,
1435 # let the operation fail silently.
1436 if "not found" in e
.message
:
1438 if "already exists" in e
.message
:
1440 # another execption, raise it
1443 async def _juju_destroy_application(self
, model_name
: str, application_name
: str):
1446 "Destroying application {} in model {}".format(application_name
, model_name
)
1449 # get juju model and observer
1450 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1451 observer
= self
.juju_observers
[model_name
]
1453 application
= model
.applications
.get(application_name
)
1455 observer
.unregister_application(application_name
)
1456 await application
.destroy()
1458 self
.log
.debug("Application not found: {}".format(application_name
))
1460 async def _juju_destroy_machine(
1461 self
, model_name
: str, machine_id
: str, total_timeout
: float = None
1465 "Destroying machine {} in model {}".format(machine_id
, model_name
)
1468 if total_timeout
is None:
1469 total_timeout
= 3600
1471 # get juju model and observer
1472 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1473 observer
= self
.juju_observers
[model_name
]
1475 machines
= await model
.get_machines()
1476 if machine_id
in machines
:
1477 machine
= model
.machines
[machine_id
]
1478 observer
.unregister_machine(machine_id
)
1479 # TODO: change this by machine.is_manual when this is upstreamed:
1480 # https://github.com/juju/python-libjuju/pull/396
1481 if "instance-id" in machine
.safe_data
and machine
.safe_data
[
1483 ].startswith("manual:"):
1484 self
.log
.debug("machine.destroy(force=True) started.")
1485 await machine
.destroy(force
=True)
1486 self
.log
.debug("machine.destroy(force=True) passed.")
1488 end
= time
.time() + total_timeout
1489 # wait for machine removal
1490 machines
= await model
.get_machines()
1491 while machine_id
in machines
and time
.time() < end
:
1493 "Waiting for machine {} is destroyed".format(machine_id
)
1495 await asyncio
.sleep(0.5)
1496 machines
= await model
.get_machines()
1497 self
.log
.debug("Machine destroyed: {}".format(machine_id
))
1499 self
.log
.debug("Machine not found: {}".format(machine_id
))
1501 async def _juju_destroy_model(self
, model_name
: str, total_timeout
: float = None):
1503 self
.log
.debug("Destroying model {}".format(model_name
))
1505 if total_timeout
is None:
1506 total_timeout
= 3600
1507 end
= time
.time() + total_timeout
1509 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1512 raise N2VCNotFound(message
="Model {} does not exist".format(model_name
))
1514 uuid
= model
.info
.uuid
1516 # destroy applications
1517 for application_name
in model
.applications
:
1519 await self
._juju
_destroy
_application
(
1520 model_name
=model_name
, application_name
=application_name
1522 except Exception as e
:
1524 "Error destroying application {} in model {}: {}".format(
1525 application_name
, model_name
, e
1530 machines
= await model
.get_machines()
1531 for machine_id
in machines
:
1533 await self
._juju
_destroy
_machine
(
1534 model_name
=model_name
, machine_id
=machine_id
1536 except asyncio
.CancelledError
:
1539 # ignore exceptions destroying machine
1542 await self
._juju
_disconnect
_model
(model_name
=model_name
)
1544 self
.log
.debug("destroying model {}...".format(model_name
))
1545 await self
.controller
.destroy_model(uuid
)
1546 # self.log.debug('model destroy requested {}'.format(model_name))
1548 # wait for model is completely destroyed
1549 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
1551 while time
.time() < end
:
1553 # await self.controller.get_model(uuid)
1554 models
= await self
.controller
.list_models()
1555 if model_name
not in models
:
1557 "The model {} ({}) was destroyed".format(model_name
, uuid
)
1560 except asyncio
.CancelledError
:
1562 except Exception as e
:
1564 await asyncio
.sleep(5)
1565 raise N2VCException(
1566 "Timeout waiting for model {} to be destroyed {}".format(
1567 model_name
, last_exception
1571 async def _juju_login(self
):
1572 """Connect to juju controller
1576 # if already authenticated, exit function
1577 if self
._authenticated
:
1580 # if connecting, wait for finish
1581 # another task could be trying to connect in parallel
1582 while self
._connecting
:
1583 await asyncio
.sleep(0.1)
1585 # double check after other task has finished
1586 if self
._authenticated
:
1590 self
._connecting
= True
1592 "connecting to juju controller: {} {}:{}{}".format(
1595 self
.secret
[:8] + "...",
1596 " with ca_cert" if self
.ca_cert
else "",
1600 # Create controller object
1601 self
.controller
= Controller(loop
=self
.loop
)
1602 # Connect to controller
1603 await self
.controller
.connect(
1605 username
=self
.username
,
1606 password
=self
.secret
,
1607 cacert
=self
.ca_cert
,
1609 self
._authenticated
= True
1610 self
.log
.info("juju controller connected")
1611 except Exception as e
:
1612 message
= "Exception connecting to juju: {}".format(e
)
1613 self
.log
.error(message
)
1614 raise N2VCConnectionException(message
=message
, url
=self
.url
)
1616 self
._connecting
= False
1618 async def _juju_logout(self
):
1619 """Logout of the Juju controller."""
1620 if not self
._authenticated
:
1623 # disconnect all models
1624 for model_name
in self
.juju_models
:
1626 await self
._juju
_disconnect
_model
(model_name
)
1627 except Exception as e
:
1629 "Error disconnecting model {} : {}".format(model_name
, e
)
1631 # continue with next model...
1633 self
.log
.info("Disconnecting controller")
1635 await self
.controller
.disconnect()
1636 except Exception as e
:
1637 raise N2VCConnectionException(
1638 message
="Error disconnecting controller: {}".format(e
), url
=self
.url
1641 self
.controller
= None
1642 self
._authenticated
= False
1643 self
.log
.info("disconnected")
1645 async def _juju_disconnect_model(self
, model_name
: str):
1646 self
.log
.debug("Disconnecting model {}".format(model_name
))
1647 if model_name
in self
.juju_models
:
1648 await self
.juju_models
[model_name
].disconnect()
1649 self
.juju_models
[model_name
] = None
1650 self
.juju_observers
[model_name
] = None
1652 self
.warning("Cannot disconnect model: {}".format(model_name
))
1654 def _create_juju_public_key(self
):
1655 """Recreate the Juju public key on lcm container, if needed
1656 Certain libjuju commands expect to be run from the same machine as Juju
1657 is bootstrapped to. This method will write the public key to disk in
1658 that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
1661 # Make sure that we have a public key before writing to disk
1662 if self
.public_key
is None or len(self
.public_key
) == 0:
1663 if "OSMLCM_VCA_PUBKEY" in os
.environ
:
1664 self
.public_key
= os
.getenv("OSMLCM_VCA_PUBKEY", "")
1665 if len(self
.public_key
) == 0:
1670 pk_path
= "{}/.local/share/juju/ssh".format(os
.path
.expanduser("~"))
1671 file_path
= "{}/juju_id_rsa.pub".format(pk_path
)
1673 "writing juju public key to file:\n{}\npublic key: {}".format(
1674 file_path
, self
.public_key
1677 if not os
.path
.exists(pk_path
):
1678 # create path and write file
1679 os
.makedirs(pk_path
)
1680 with
open(file_path
, "w") as f
:
1681 self
.log
.debug("Creating juju public key file: {}".format(file_path
))
1682 f
.write(self
.public_key
)
1684 self
.log
.debug("juju public key file already exists: {}".format(file_path
))
1687 def _format_model_name(name
: str) -> str:
1688 """Format the name of the model.
1690 Model names may only contain lowercase letters, digits and hyphens
1693 return name
.replace("_", "-").replace(" ", "-").lower()
1696 def _format_app_name(name
: str) -> str:
1697 """Format the name of the application (in order to assure valid application name).
1699 Application names have restrictions (run juju deploy --help):
1700 - contains lowercase letters 'a'-'z'
1701 - contains numbers '0'-'9'
1702 - contains hyphens '-'
1703 - starts with a lowercase letter
1704 - not two or more consecutive hyphens
1705 - after a hyphen, not a group with all numbers
1708 def all_numbers(s
: str) -> bool:
1714 new_name
= name
.replace("_", "-")
1715 new_name
= new_name
.replace(" ", "-")
1716 new_name
= new_name
.lower()
1717 while new_name
.find("--") >= 0:
1718 new_name
= new_name
.replace("--", "-")
1719 groups
= new_name
.split("-")
1721 # find 'all numbers' groups and prefix them with a letter
1723 for i
in range(len(groups
)):
1725 if all_numbers(group
):
1731 if app_name
[0].isdigit():
1732 app_name
= "z" + app_name