1 ############################################################################
2 # Copyright 2016 RIFT.io Inc #
4 # Licensed under the Apache License, Version 2.0 (the "License"); #
5 # you may not use this file except in compliance with the License. #
6 # You may obtain a copy of the License at #
8 # http://www.apache.org/licenses/LICENSE-2.0 #
10 # Unless required by applicable law or agreed to in writing, software #
11 # distributed under the License is distributed on an "AS IS" BASIS, #
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
13 # See the License for the specific language governing permissions and #
14 # limitations under the License. #
15 ############################################################################
19 from functools
import partial
27 from jujuclient
.juju1
.environment
import Environment
as Env1
28 from jujuclient
.juju2
.environment
import Environment
as Env2
29 except ImportError as e
:
30 # Try importing older jujuclient
31 from jujuclient
import Environment
as Env1
34 ssl
._create
_default
_https
_context
= ssl
._create
_unverified
_context
35 except AttributeError:
36 # Legacy Python doesn't verify by default (see pep-0476)
37 # https://www.python.org/dev/peps/pep-0476/
41 class JujuVersionError(Exception):
45 class JujuApiError(Exception):
49 class JujuEnvError(JujuApiError
):
53 class JujuModelError(JujuApiError
):
57 class JujuStatusError(JujuApiError
):
61 class JujuUnitsError(JujuApiError
):
65 class JujuWaitUnitsError(JujuApiError
):
69 class JujuSrvNotDeployedError(JujuApiError
):
73 class JujuAddCharmError(JujuApiError
):
77 class JujuDeployError(JujuApiError
):
81 class JujuDestroyError(JujuApiError
):
85 class JujuResolveError(JujuApiError
):
89 class JujuActionError(JujuApiError
):
93 class JujuActionApiError(JujuActionError
):
97 class JujuActionInfoError(JujuActionError
):
101 class JujuActionExecError(JujuActionError
):
105 class JujuApi(object):
107 JujuApi wrapper on jujuclient library
109 There should be one instance of JujuApi for each VNF manged by Juju.
112 Currently we use one unit per service/VNF. So once a service
113 is deployed, we store the unit name and reuse it
125 '''Initialize with the Juju credentials'''
130 if user
.startswith('user-'):
133 self
.user
= 'user-{}'.format(user
)
140 self
.log
= JujuApi
._get
_logger
()
143 raise JujuApiError("Logger not defined")
147 self
.version
= version
153 self
.log
.warn("Using older version of Juju client, which " \
154 "supports only Juju 1.x")
157 endpoint
= 'wss://%s:%d' % (server
, int(port
))
158 self
.endpoint
= endpoint
160 self
.charm
= None # Charm used
161 self
.service
= None # Service deployed
162 self
.units
= [] # Storing as list to support more units in future
164 self
.destroy_retries
= 25 # Number retires to destroy service
165 self
.retry_delay
= 5 # seconds
168 return ("JujuApi-{}".format(self
.endpoint
))
171 def _get_logger(cls
):
172 if cls
.log
is not None:
175 fmt
= logging
.Formatter(
176 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:' \
177 '%(filename)s:%(lineno)d) - %(message)s')
178 stderr_handler
= logging
.StreamHandler(stream
=sys
.stderr
)
179 stderr_handler
.setFormatter(fmt
)
180 logging
.basicConfig(level
=logging
.DEBUG
)
181 cls
.log
= logging
.getLogger('juju-api')
182 cls
.log
.addHandler(stderr_handler
)
187 def format_charm_name(name
):
188 '''Format the name to valid charm name
190 Charm service name accepts only a to z and -.
197 elif not c
.isalpha():
200 return new_name
.lower()
202 def _get_version_tag(self
, tag
):
218 2: 'workload-status',
226 return version_tag_map
[tag
][self
.version
]
230 env
= Env1(self
.endpoint
)
231 l
= env
.login(self
.secret
, user
=self
.user
)
234 except ConnectionRefusedError
as e
:
235 msg
= "{}: Failed Juju 1.x connect: {}".format(self
, e
)
237 self
.log
.exception(e
)
240 except Exception as e
:
241 msg
= "{}: Failed Juju 1.x connect: {}".format(self
, e
)
243 self
.log
.exception(e
)
244 raise JujuEnvError(msg
)
248 env
= Env2(self
.endpoint
)
249 l
= env
.login(self
.secret
, user
=self
.user
)
250 except KeyError as e
:
251 msg
= "{}: Failed Juju 2.x connect: {}".format(self
, e
)
253 raise JujuVersionError(msg
)
256 models
= env
.models
.list()
257 for m
in models
['user-models']:
258 if m
['model']['name'] == 'default':
259 mep
= '{}/model/{}/api'.format(self
.endpoint
,
261 model
= Env2(mep
, env_uuid
=m
['model']['uuid'])
262 l
= model
.login(self
.secret
, user
=self
.user
)
270 except Exception as e
:
271 msg
= "{}: Failed logging to model: {}".format(self
, e
)
273 self
.log
.exception(e
)
275 raise JujuModelError(msg
)
278 self
.log
.debug("{}: Connect to endpoint {}".
279 format(self
, self
.endpoint
))
281 if self
.version
is None:
282 # Try version 2 first
284 env
= self
._get
_env
2()
287 except JujuVersionError
as e
:
288 self
.log
.info("Unable to login as Juju 2.x, trying 1.x")
289 env
= self
._get
_env
1()
294 elif self
.version
== 2:
295 return self
._get
_env
2()
297 elif self
.version
== 1:
298 return self
._get
_env
1()
301 msg
= "{}: Unknown version set: {}".format(self
, self
.version
)
303 raise JujuVersionError(msg
)
307 ''' Connect to the Juju controller'''
308 env
= yield from self
.loop
.run_in_executor(
314 def _get_status(self
, env
=None):
316 env
= self
._get
_env
()
319 status
= env
.status()
322 except Exception as e
:
323 msg
= "{}: exception in getting status: {}". \
326 self
.log
.exception(e
)
327 raise JujuStatusError(msg
)
330 def get_status(self
, env
=None):
331 '''Get Juju controller status'''
332 pf
= partial(self
._get
_status
, env
=env
)
333 status
= yield from self
.loop
.run_in_executor(
339 def get_all_units(self
, status
, service
=None):
340 '''Parse the status and get the units'''
342 services
= status
.get(self
._get
_version
_tag
('applications'), {})
344 for svc_name
, svc_data
in services
.items():
345 if service
and service
!= svc_name
:
347 units
= svc_data
[self
._get
_version
_tag
('units')] or {}
349 results
[svc_name
] = {}
351 results
[svc_name
][unit
] = \
352 units
[unit
][self
._get
_version
_tag
('workload-status')] \
353 [self
._get
_version
_tag
('status')] or None
357 def _get_service_units(self
, service
=None, status
=None, env
=None):
359 service
= self
.service
361 # Optimizing calls to Juju, as currently we deploy only 1 unit per
363 if self
.service
== service
and len(self
.units
):
367 env
= self
._get
_env
()
370 status
= self
._get
_status
(env
=env
)
373 resp
= self
.get_all_units(status
, service
=service
)
374 self
.log
.debug("Get all units: {}".format(resp
))
375 units
= set(resp
[service
].keys())
377 if self
.service
== service
:
382 except Exception as e
:
383 msg
= "{}: exception in get units {}".format(self
, e
)
385 self
.log
.exception(e
)
386 raise JujuUnitsError(msg
)
389 def get_service_units(self
, service
=None, status
=None, env
=None):
390 '''Get the unit names for a service'''
391 pf
= partial(self
._get
_service
_units
,
395 units
= yield from self
.loop
.run_in_executor(
401 def _get_service_status(self
, service
=None, status
=None, env
=None):
403 env
= self
._get
_env
()
406 status
= self
._get
_status
(env
=env
)
409 service
= self
.service
412 srv_status
= status
[self
._get
_version
_tag
('applications')] \
413 [service
][self
._get
_version
_tag
('status')] \
414 [self
._get
_version
_tag
('status')]
415 self
.log
.debug("{}: Service {} status is {}".
416 format(self
, service
, srv_status
))
419 except KeyError as e
:
420 self
.log
.info("self: Did not find service {}, e={}".format(self
, service
, e
))
423 except Exception as e
:
424 msg
= "{}: exception checking service status for {}, e {}". \
425 format(self
, service
, e
)
427 self
.log
.exception(e
)
428 raise JujuStatusError(msg
)
432 def get_service_status(self
, service
=None, status
=None, env
=None):
433 ''' Get service status
435 maintenance : The unit is not yet providing services, but is actively doing stuff.
436 unknown : Service has finished an event but the charm has not called status-set yet.
437 waiting : Service is unable to progress to an active state because of dependency.
438 blocked : Service needs manual intervention to get back to the Running state.
439 active : Service correctly offering all the services.
440 NA : Service is not deployed
442 pf
= partial(self
._get
_service
_status
,
446 srv_status
= yield from self
.loop
.run_in_executor(
452 def _is_service_deployed(self
, service
=None, status
=None, env
=None):
453 resp
= self
._get
_service
_status
(service
=service
,
457 if resp
not in ['terminated', 'NA']:
463 def is_service_deployed(self
, service
=None, status
=None, env
=None):
464 '''Check if the service is deployed'''
465 pf
= partial(self
._is
_service
_deployed
,
469 rc
= yield from self
.loop
.run_in_executor(
475 def _is_service_error(self
, service
=None, status
=None, env
=None):
476 resp
= self
._get
_service
_status
(service
=service
,
480 if resp
in ['error']:
486 def is_service_error(self
, service
=None, status
=None, env
=None):
487 '''Check if the service is in error state'''
488 pf
= partial(self
._is
_service
_error
,
492 rc
= yield from self
.loop
.run_in_executor(
498 def _is_service_maint(self
, service
=None, status
=None, env
=None):
499 resp
= self
._get
_service
_status
(service
=service
,
503 if resp
in ['maintenance']:
509 def is_service_maint(self
, service
=None, status
=None, env
=None):
510 '''Check if the service is in error state'''
511 pf
= partial(self
._is
_service
_maint
,
515 rc
= yield from self
.loop
.run_in_executor(
521 def _is_service_active(self
, service
=None, status
=None, env
=None):
522 resp
= self
._get
_service
_status
(service
=service
,
526 if resp
in ['active']:
532 def is_service_active(self
, service
=None, status
=None, env
=None):
533 '''Check if the service is active'''
534 pf
= partial(self
._is
_service
_active
,
538 rc
= yield from self
.loop
.run_in_executor(
544 def _is_service_blocked(self
, service
=None, status
=None, env
=None):
545 resp
= self
._get
_service
_status
(service
=service
,
549 if resp
in ['blocked']:
555 def is_service_blocked(self
, service
=None, status
=None, env
=None):
556 '''Check if the service is blocked'''
557 pf
= partial(self
._is
_service
_blocked
,
561 rc
= yield from self
.loop
.run_in_executor(
567 def _is_service_up(self
, service
=None, status
=None, env
=None):
568 resp
= self
._get
_service
_status
(service
=service
,
572 if resp
in ['active', 'blocked']:
578 def is_service_up(self
, service
=None, status
=None, env
=None):
579 '''Check if the service is installed and up'''
580 pf
= partial(self
._is
_service
_up
,
585 rc
= yield from self
.loop
.run_in_executor(
591 def _apply_config(self
, config
, service
=None, env
=None):
593 service
= self
.service
595 if config
is None or len(config
) == 0:
596 self
.log
.warn("{}: Empty config passed for service {}".
597 format(self
, service
))
601 env
= self
._get
_env
()
603 status
= self
._get
_status
(env
=env
)
605 if not self
._is
_service
_deployed
(service
=service
,
608 raise JujuSrvNotDeployedError("{}: service {} is not deployed".
609 format(self
, service
))
611 self
.log
.debug("{}: Config for service {} update to: {}".
612 format(self
, service
, config
))
614 # Try to fix error on service, most probably due to config issue
615 if self
._is
_service
_error
(service
=service
, status
=status
, env
=env
):
616 self
._resolve
_error
(service
=service
, env
=env
)
618 if self
.version
== 2:
619 env
.service
.set(service
, config
)
621 env
.set_config(service
, config
)
623 except Exception as e
:
624 self
.log
.error("{}: exception setting config for {} with {}, e {}".
625 format(self
, service
, config
, e
))
626 self
.log
.exception(e
)
630 def apply_config(self
, config
, service
=None, env
=None, wait
=True):
631 '''Apply a config on the service'''
632 pf
= partial(self
._apply
_config
,
636 yield from self
.loop
.run_in_executor(
642 # Wait till config finished applying
643 self
.log
.debug("{}: Wait for config apply to finish".
648 # Sleep first to give time for config_changed hook to be invoked
649 yield from asyncio
.sleep(delay
, loop
=self
.loop
)
650 maint
= yield from self
.is_service_maint(service
=service
,
653 err
= yield from self
.is_service_error(service
=service
, env
=env
)
655 self
.log
.error("{}: Service is in error state".
659 self
.log
.debug("{}: Finished applying config".format(self
))
662 def _set_parameter(self
, parameter
, value
, service
=None):
663 return self
._apply
_config
({parameter
: value
}, service
=service
)
666 def set_parameter(self
, parameter
, value
, service
=None):
667 '''Set a config parameter for a service'''
668 return self
.apply_config({parameter
: value
}, service
=service
)
670 def _resolve_error(self
, service
=None, status
=None, env
=None):
672 env
= self
._get
_env
()
675 status
= self
._get
_status
(env
=env
)
678 service
= self
.service
681 env
= self
._get
_env
()
682 if self
._is
_service
_deployed
(service
=service
, status
=status
):
683 units
= self
.get_all_units(status
, service
=service
)
685 for unit
, ustatus
in units
[service
].items():
686 if ustatus
== 'error':
687 self
.log
.info("{}: Found unit {} with status {}".
688 format(self
, unit
, ustatus
))
690 # Takes the unit name as service_name/idx unlike action
693 except Exception as e
:
694 msg
= "{}: Resolve on unit {}: {}". \
695 format(self
, unit
, e
)
697 self
.log
.exception(e
)
698 raise JujuResolveError(msg
)
701 def resolve_error(self
, service
=None, status
=None, env
=None):
702 '''Resolve units in error state'''
703 pf
= partial(self
._resolve
_error
,
707 yield from self
.loop
.run_in_executor(
712 def _deploy_service(self
, charm
, service
,
713 path
=None, config
=None, env
=None):
714 self
.log
.debug("{}: Deploy service for charm {}({}) with service {}".
715 format(self
, charm
, path
, service
))
718 env
= self
._get
_env
()
720 self
.service
= service
723 if self
._is
_service
_deployed
(service
=service
, env
=env
):
724 self
.log
.info("{}: Charm service {} already deployed".
725 format (self
, service
))
727 self
._apply
_config
(config
, service
=service
, env
=env
)
733 if self
.version
== 1:
737 prefix
=os
.getenv('RIFT_INSTALL', '/')
738 path
= os
.path
.join(prefix
, 'usr/rift/charms', series
, charm
)
741 self
.log
.debug("{}: Local charm settings: dir={}, series={}".
742 format(self
, path
, series
))
743 result
= env
.add_local_charm_dir(path
, series
)
744 url
= result
[self
._get
_version
_tag
('charm-url')]
746 except Exception as e
:
747 msg
= '{}: Error setting local charm directory {} for {}: {}'. \
748 format(self
, path
, service
, e
)
750 self
.log
.exception(e
)
751 raise JujuAddCharmError(msg
)
754 self
.log
.debug("{}: Deploying using: service={}, url={}, to={}, config={}".
755 format(self
, service
, url
, deploy_to
, config
))
756 env
.deploy(service
, url
, config
=config
, machine_spec
=deploy_to
)
758 except Exception as e
:
759 msg
= '{}: Error deploying {}: {}'.format(self
, service
, e
)
761 self
.log
.exception(e
)
762 raise JujuDeployError(msg
)
765 def deploy_service(self
, charm
, service
,
766 wait
=False, timeout
=300,
767 path
=None, config
=None):
768 '''Deploy a service using the charm name provided'''
769 env
= yield from self
.get_env()
771 pf
= partial(self
._deploy
_service
,
777 yield from self
.loop
.run_in_executor(
784 # Wait for the deployed units to start
786 self
.log
.debug("{}: Waiting for service {} to come up".
787 format(self
, service
))
788 rc
= yield from self
.wait_for_service(timeout
=timeout
, env
=env
)
790 except Exception as e
:
791 msg
= '{}: Error starting all units for {}: {}'. \
792 format(self
, service
, e
)
794 self
.log
.exception(e
)
795 raise JujuWaitUnitsError(msg
)
800 def wait_for_service(self
, service
=None, timeout
=0, env
=None):
801 '''Wait for the service to come up'''
803 service
= self
.service
806 env
= yield from self
.get_env()
808 status
= yield from self
.get_status(env
=env
)
810 if self
._is
_service
_up
(service
=service
, status
=status
, env
=env
):
811 self
.log
.debug("{}: Service {} is already up".
812 format(self
, service
))
815 # Check if service is deployed
816 if not self
._is
_service
_deployed
(service
=service
, status
=status
, env
=env
):
817 raise JujuSrvNotDeployedError("{}: service {} is not deployed".
818 format(self
, service
))
824 delay
= self
.retry_delay
# seconds
825 self
.log
.debug("{}: In wait for service {}".format(self
, service
))
827 start_time
= time
.time()
828 max_time
= time
.time() + timeout
829 while timeout
!= 0 and (time
.time() <= max_time
):
831 rc
= yield from self
.is_service_up(service
=service
, env
=env
)
833 self
.log
.debug("{}: Service {} is up after {} seconds".
834 format(self
, service
, time
.time()-start_time
))
836 yield from asyncio
.sleep(delay
, loop
=self
.loop
)
839 def _destroy_service(self
, service
=None):
840 '''Destroy a service on Juju controller'''
841 self
.log
.debug("{}: Destroy charm service: {}".format(self
,service
))
844 service
= self
.service
846 env
= self
._get
_env
()
848 status
= self
._get
_status
(env
=env
)
851 while self
._is
_service
_deployed
(service
=service
, status
=status
, env
=env
):
853 self
.log
.debug("{}: Destroy service {}, count {}".
854 format(self
, service
, count
))
856 if count
> self
.destroy_retries
:
857 msg
= "{}: Not able to destroy service {} after {} tries". \
858 format(self
, service
, count
)
860 raise JujuDestroyError(msg
)
863 if self
._is
_service
_error
(service
=service
, status
=status
):
864 self
._resolve
_error
(service
, status
)
867 env
.destroy_service(service
)
869 except Exception as e
:
870 msg
= "{}: Exception when running destroy on service {}: {}". \
871 format(self
, service
, e
)
873 self
.log
.exception(e
)
874 raise JujuDestroyError(msg
)
876 time
.sleep(self
.retry_delay
)
877 status
= self
._get
_status
(env
=env
)
879 self
.log
.debug("{}: Destroyed service {} ({})".
880 format(self
, service
, count
))
883 def destroy_service(self
, service
=None):
884 '''Destroy a service on Juju controller'''
885 pf
= partial(self
._destroy
_service
,
887 yield from self
.loop
.run_in_executor(
893 def _get_action_status(self
, action_tag
, env
=None):
895 env
= self
._get
_env
()
897 if not action_tag
.startswith('action-'):
898 action_tag
= 'action-{}'.format(action_tag
)
902 except Exception as e
:
903 msg
= "{}: exception in Action API: {}".format(self
, e
)
905 self
.log
.exception(e
)
906 raise JujuActionApiError(msg
)
909 status
= action
.info([{'Tag': action_tag
}])
911 self
.log
.debug("{}: Action {} status {}".
912 format(self
, action_tag
, status
))
913 return status
['results'][0]
915 except Exception as e
:
916 msg
= "{}: exception in get action status {}".format(self
, e
)
918 self
.log
.exception(e
)
919 raise JujuActionInfoError(msg
)
922 def get_action_status(self
, action_tag
, env
=None):
924 Get the status of an action queued on the controller
926 responds with the action status, which is one of three values:
932 @param action_tag - the action UUID return from the enqueue method
933 eg: action-3428e20d-fcd7-4911-803b-9b857a2e5ec9
935 pf
= partial(self
._get
_action
_status
,
938 status
= yield from self
.loop
.run_in_executor(
944 def _execute_action(self
, action_name
, params
, service
=None, env
=None):
945 '''Execute the action on all units of a service'''
947 service
= self
.service
950 env
= self
._get
_env
()
954 except Exception as e
:
955 msg
= "{}: exception in Action API: {}".format(self
, e
)
957 self
.log
.exception(e
)
958 raise JujuActionApiError(msg
)
960 units
= self
._get
_service
_units
(service
)
961 self
.log
.debug("{}: Apply action {} on units {}".
962 format(self
, action_name
, units
))
964 # Rename units from <service>/<n> to unit-<service>-<n>
967 idx
= int(unit
[unit
.index('/')+1:])
968 unit_name
= "unit-%s-%d" % (service
, idx
)
969 unit_tags
.append(unit_name
)
970 self
.log
.debug("{}: Unit tags for action: {}".
971 format(self
, unit_tags
))
974 result
= action
.enqueue_units(unit_tags
, action_name
, params
)
975 self
.log
.debug("{}: Response for action: {}".
976 format(self
, result
))
977 return result
['results'][0]
979 except Exception as e
:
980 msg
= "{}: Exception enqueing action {} on units {} with " \
981 "params {}: {}".format(self
, action
, unit_tags
, params
, e
)
983 self
.log
.exception(e
)
984 raise JujuActionExecError(msg
)
987 def execute_action(self
, action_name
, params
, service
=None, env
=None):
988 '''Execute an action for a service on the controller
990 Currently, we execute the action on all units of the service
992 pf
= partial(self
._execute
_action
,
997 result
= yield from self
.loop
.run_in_executor(
1004 if __name__
== "__main__":
1005 parser
= argparse
.ArgumentParser(description
='Test Juju')
1006 parser
.add_argument("-s", "--server", default
='10.0.202.49', help="Juju controller")
1007 parser
.add_argument("-u", "--user", default
='admin', help="User, default user-admin")
1008 parser
.add_argument("-p", "--password", default
='nfvjuju', help="Password for the user")
1009 parser
.add_argument("-P", "--port", default
=17070, help="Port number, default 17070")
1010 parser
.add_argument("-d", "--directory", help="Local directory for the charm")
1011 parser
.add_argument("--service", help="Charm service name")
1012 parser
.add_argument("--vnf-ip", help="IP of the VNF to configure")
1013 args
= parser
.parse_args()
1015 api
= JujuApi(server
=args
.server
,
1018 secret
=args
.password
)
1020 env
= api
._get
_env
()
1022 raise "Not able to login to the Juju controller"
1024 print("Status: {}".format(api
._get
_status
(env
=env
)))
1026 if args
.directory
and args
.service
:
1028 charm
= os
.path
.basename(args
.directory
)
1029 api
._deploy
_service
(charm
, args
.service
,
1030 path
=args
.directory
,
1033 while not api
._is
_service
_up
():
1036 print ("Service {} is deployed with status {}".
1037 format(args
.service
, api
._get
_service
_status
()))
1039 if args
.vnf_ip
and \
1040 ('clearwater-aio' in args
.directory
):
1041 # Execute config on charm
1042 api
._apply
_config
({'proxied_ip': args
.vnf_ip
})
1044 while not api
._is
_service
_active
():
1047 print ("Service {} is in status {}".
1048 format(args
.service
, api
._get
_service
_status
()))
1050 res
= api
._execute
_action
('create-update-user', {'number': '125252352525',
1051 'password': 'asfsaf'})
1053 print ("Action 'creat-update-user response: {}".format(res
))
1055 status
= res
['status']
1056 while status
not in [ 'completed', 'failed' ]:
1058 status
= api
._get
_action
_status
(res
['action']['tag'])['status']
1060 print("Action status: {}".format(status
))
1062 # This action will fail as the number is non-numeric
1063 res
= api
._execute
_action
('delete-user', {'number': '125252352525asf'})
1065 print ("Action 'delete-user response: {}".format(res
))
1067 status
= res
['status']
1068 while status
not in [ 'completed', 'failed' ]:
1070 status
= api
._get
_action
_status
(res
['action']['tag'])['status']
1072 print("Action status: {}".format(status
))