1 ############################################################################
2 # Copyright 2016 RIFT.io Inc #
4 # Licensed under the Apache License, Version 2.0 (the "License"); #
5 # you may not use this file except in compliance with the License. #
6 # You may obtain a copy of the License at #
8 # http://www.apache.org/licenses/LICENSE-2.0 #
10 # Unless required by applicable law or agreed to in writing, software #
11 # distributed under the License is distributed on an "AS IS" BASIS, #
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
13 # See the License for the specific language governing permissions and #
14 # limitations under the License. #
15 ############################################################################
19 from functools
import partial
27 from jujuclient
.juju1
.environment
import Environment
as Env1
28 from jujuclient
.juju2
.environment
import Environment
as Env2
29 except ImportError as e
:
30 # Try importing older jujuclient
31 from jujuclient
import Environment
as Env1
34 ssl
._create
_default
_https
_context
= ssl
._create
_unverified
_context
35 except AttributeError:
36 # Legacy Python doesn't verify by default (see pep-0476)
37 # https://www.python.org/dev/peps/pep-0476/
41 class JujuVersionError(Exception):
45 class JujuApiError(Exception):
49 class JujuEnvError(JujuApiError
):
53 class JujuModelError(JujuApiError
):
57 class JujuStatusError(JujuApiError
):
61 class JujuUnitsError(JujuApiError
):
65 class JujuWaitUnitsError(JujuApiError
):
69 class JujuSrvNotDeployedError(JujuApiError
):
73 class JujuAddCharmError(JujuApiError
):
77 class JujuDeployError(JujuApiError
):
81 class JujuDestroyError(JujuApiError
):
85 class JujuResolveError(JujuApiError
):
89 class JujuActionError(JujuApiError
):
93 class JujuActionApiError(JujuActionError
):
97 class JujuActionInfoError(JujuActionError
):
101 class JujuActionExecError(JujuActionError
):
105 class JujuApi(object):
107 JujuApi wrapper on jujuclient library
109 There should be one instance of JujuApi for each VNF manged by Juju.
112 Currently we use one unit per service/VNF. So once a service
113 is deployed, we store the unit name and reuse it
125 '''Initialize with the Juju credentials'''
130 if user
.startswith('user-'):
133 self
.user
= 'user-{}'.format(user
)
140 self
.log
= JujuApi
._get
_logger
()
143 raise JujuApiError("Logger not defined")
147 self
.version
= version
153 self
.log
.warn("Using older version of Juju client, which " \
154 "supports only Juju 1.x")
157 endpoint
= 'wss://%s:%d' % (server
, int(port
))
158 self
.endpoint
= endpoint
160 self
.charm
= None # Charm used
161 self
.service
= None # Service deployed
162 self
.units
= [] # Storing as list to support more units in future
164 self
.destroy_retries
= 25 # Number retires to destroy service
165 self
.retry_delay
= 5 # seconds
168 return ("JujuApi-{}".format(self
.endpoint
))
171 def _get_logger(cls
):
172 if cls
.log
is not None:
175 fmt
= logging
.Formatter(
176 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:' \
177 '%(filename)s:%(lineno)d) - %(message)s')
178 stderr_handler
= logging
.StreamHandler(stream
=sys
.stderr
)
179 stderr_handler
.setFormatter(fmt
)
180 logging
.basicConfig(level
=logging
.DEBUG
)
181 cls
.log
= logging
.getLogger('juju-api')
182 cls
.log
.addHandler(stderr_handler
)
187 def format_charm_name(name
):
188 '''Format the name to valid charm name
190 Charm service name accepts only a to z and -.
197 elif not c
.isalpha():
200 return new_name
.lower()
202 def _get_version_tag(self
, tag
):
218 2: 'workload-status',
226 return version_tag_map
[tag
][self
.version
]
230 env
= Env1(self
.endpoint
)
231 l
= env
.login(self
.secret
, user
=self
.user
)
234 except ConnectionRefusedError
as e
:
235 msg
= "{}: Failed Juju 1.x connect: {}".format(self
, e
)
237 self
.log
.exception(e
)
240 except Exception as e
:
241 msg
= "{}: Failed Juju 1.x connect: {}".format(self
, e
)
243 self
.log
.exception(e
)
244 raise JujuEnvError(msg
)
248 env
= Env2(self
.endpoint
)
249 l
= env
.login(self
.secret
, user
=self
.user
)
250 except KeyError as e
:
251 msg
= "{}: Failed Juju 2.x connect: {}".format(self
, e
)
253 raise JujuVersionError(msg
)
256 models
= env
.models
.list()
257 for m
in models
['user-models']:
258 if m
['model']['name'] == 'default':
259 mep
= '{}/model/{}/api'.format(self
.endpoint
,
261 model
= Env2(mep
, env_uuid
=m
['model']['uuid'])
262 l
= model
.login(self
.secret
, user
=self
.user
)
270 except Exception as e
:
271 msg
= "{}: Failed logging to model: {}".format(self
, e
)
273 self
.log
.exception(e
)
275 raise JujuModelError(msg
)
278 self
.log
.debug("{}: Connect to endpoint {}".
279 format(self
, self
.endpoint
))
281 if self
.version
is None:
282 # Try version 2 first
284 env
= self
._get
_env
2()
287 except JujuVersionError
as e
:
288 self
.log
.info("Unable to login as Juju 2.x, trying 1.x")
289 env
= self
._get
_env
1()
294 elif self
.version
== 2:
295 return self
._get
_env
2()
297 elif self
.version
== 1:
298 return self
._get
_env
1()
301 msg
= "{}: Unknown version set: {}".format(self
, self
.version
)
303 raise JujuVersionError(msg
)
307 ''' Connect to the Juju controller'''
308 env
= yield from self
.loop
.run_in_executor(
314 def _get_status(self
, env
=None):
316 env
= self
._get
_env
()
319 status
= env
.status()
322 except Exception as e
:
323 msg
= "{}: exception in getting status: {}". \
326 self
.log
.exception(e
)
327 raise JujuStatusError(msg
)
330 def get_status(self
, env
=None):
331 '''Get Juju controller status'''
332 pf
= partial(self
._get
_status
, env
=env
)
333 status
= yield from self
.loop
.run_in_executor(
339 def get_all_units(self
, status
, service
=None):
340 '''Parse the status and get the units'''
342 services
= status
.get(self
._get
_version
_tag
('applications'), {})
344 for svc_name
, svc_data
in services
.items():
345 if service
and service
!= svc_name
:
347 units
= svc_data
[self
._get
_version
_tag
('units')] or {}
349 results
[svc_name
] = {}
351 results
[svc_name
][unit
] = \
352 units
[unit
][self
._get
_version
_tag
('workload-status')] \
353 [self
._get
_version
_tag
('status')] or None
357 def _get_service_units(self
, service
=None, status
=None, env
=None):
359 service
= self
.service
361 # Optimizing calls to Juju, as currently we deploy only 1 unit per
363 # if self.service == service and len(self.units):
367 env
= self
._get
_env
()
370 status
= self
._get
_status
(env
=env
)
373 resp
= self
.get_all_units(status
, service
=service
)
374 self
.log
.debug("Get all units: {}".format(resp
))
375 units
= set(resp
[service
].keys())
377 if self
.service
== service
:
382 except Exception as e
:
383 msg
= "{}: exception in get units {}".format(self
, e
)
385 self
.log
.exception(e
)
386 raise JujuUnitsError(msg
)
389 def get_service_units(self
, service
=None, status
=None, env
=None):
390 '''Get the unit names for a service'''
391 pf
= partial(self
._get
_service
_units
,
395 units
= yield from self
.loop
.run_in_executor(
401 def _get_service_status(self
, service
=None, status
=None, env
=None):
403 env
= self
._get
_env
()
406 status
= self
._get
_status
(env
=env
)
409 service
= self
.service
412 srv_status
= status
[self
._get
_version
_tag
('applications')] \
413 [service
][self
._get
_version
_tag
('status')] \
414 [self
._get
_version
_tag
('status')]
415 self
.log
.debug("{}: Service {} status is {}".
416 format(self
, service
, srv_status
))
419 except KeyError as e
:
420 self
.log
.info("self: Did not find service {}, e={}".format(self
, service
, e
))
423 except Exception as e
:
424 msg
= "{}: exception checking service status for {}, e {}". \
425 format(self
, service
, e
)
427 self
.log
.exception(e
)
428 raise JujuStatusError(msg
)
432 def get_service_status(self
, service
=None, status
=None, env
=None):
433 ''' Get service status
435 maintenance : The unit is not yet providing services, but is actively doing stuff.
436 unknown : Service has finished an event but the charm has not called status-set yet.
437 waiting : Service is unable to progress to an active state because of dependency.
438 blocked : Service needs manual intervention to get back to the Running state.
439 active : Service correctly offering all the services.
440 NA : Service is not deployed
442 pf
= partial(self
._get
_service
_status
,
446 srv_status
= yield from self
.loop
.run_in_executor(
452 def _is_service_deployed(self
, service
=None, status
=None, env
=None):
453 resp
= self
._get
_service
_status
(service
=service
,
457 if resp
not in ['terminated', 'NA']:
463 def is_service_deployed(self
, service
=None, status
=None, env
=None):
464 '''Check if the service is deployed'''
465 pf
= partial(self
._is
_service
_deployed
,
469 rc
= yield from self
.loop
.run_in_executor(
475 def _is_service_error(self
, service
=None, status
=None, env
=None):
476 resp
= self
._get
_service
_status
(service
=service
,
480 if resp
in ['error']:
486 def is_service_error(self
, service
=None, status
=None, env
=None):
487 '''Check if the service is in error state'''
488 pf
= partial(self
._is
_service
_error
,
492 rc
= yield from self
.loop
.run_in_executor(
498 def _is_service_maint(self
, service
=None, status
=None, env
=None):
499 resp
= self
._get
_service
_status
(service
=service
,
503 if resp
in ['maintenance']:
509 def is_service_maint(self
, service
=None, status
=None, env
=None):
510 '''Check if the service is in error state'''
511 pf
= partial(self
._is
_service
_maint
,
515 rc
= yield from self
.loop
.run_in_executor(
521 def _is_service_active(self
, service
=None, status
=None, env
=None):
522 resp
= self
._get
_service
_status
(service
=service
,
526 if resp
in ['active']:
532 def is_service_active(self
, service
=None, status
=None, env
=None):
533 '''Check if the service is active'''
534 pf
= partial(self
._is
_service
_active
,
538 rc
= yield from self
.loop
.run_in_executor(
544 def _is_service_blocked(self
, service
=None, status
=None, env
=None):
545 resp
= self
._get
_service
_status
(service
=service
,
549 if resp
in ['blocked']:
555 def is_service_blocked(self
, service
=None, status
=None, env
=None):
556 '''Check if the service is blocked'''
557 pf
= partial(self
._is
_service
_blocked
,
561 rc
= yield from self
.loop
.run_in_executor(
567 def _is_service_up(self
, service
=None, status
=None, env
=None):
568 resp
= self
._get
_service
_status
(service
=service
,
572 if resp
in ['active', 'blocked']:
578 def is_service_up(self
, service
=None, status
=None, env
=None):
579 '''Check if the service is installed and up'''
580 pf
= partial(self
._is
_service
_up
,
585 rc
= yield from self
.loop
.run_in_executor(
591 def _apply_config(self
, config
, service
=None, env
=None):
593 service
= self
.service
595 if config
is None or len(config
) == 0:
596 self
.log
.warn("{}: Empty config passed for service {}".
597 format(self
, service
))
601 env
= self
._get
_env
()
603 status
= self
._get
_status
(env
=env
)
605 if not self
._is
_service
_deployed
(service
=service
,
608 raise JujuSrvNotDeployedError("{}: service {} is not deployed".
609 format(self
, service
))
611 self
.log
.debug("{}: Config for service {} update to: {}".
612 format(self
, service
, config
))
614 # Try to fix error on service, most probably due to config issue
615 if self
._is
_service
_error
(service
=service
, status
=status
, env
=env
):
616 self
._resolve
_error
(service
=service
, env
=env
)
618 if self
.version
== 2:
619 env
.service
.set(service
, config
)
621 env
.set_config(service
, config
)
623 except Exception as e
:
624 self
.log
.error("{}: exception setting config for {} with {}, e {}".
625 format(self
, service
, config
, e
))
626 self
.log
.exception(e
)
630 def apply_config(self
, config
, service
=None, env
=None, wait
=True):
631 '''Apply a config on the service'''
632 pf
= partial(self
._apply
_config
,
636 yield from self
.loop
.run_in_executor(
642 # Wait till config finished applying
643 self
.log
.debug("{}: Wait for config apply to finish".
648 # Sleep first to give time for config_changed hook to be invoked
649 yield from asyncio
.sleep(delay
, loop
=self
.loop
)
650 maint
= yield from self
.is_service_maint(service
=service
,
653 err
= yield from self
.is_service_error(service
=service
, env
=env
)
655 self
.log
.error("{}: Service is in error state".
659 self
.log
.debug("{}: Finished applying config".format(self
))
662 def _set_parameter(self
, parameter
, value
, service
=None):
663 return self
._apply
_config
({parameter
: value
}, service
=service
)
666 def set_parameter(self
, parameter
, value
, service
=None):
667 '''Set a config parameter for a service'''
668 return self
.apply_config({parameter
: value
}, service
=service
)
670 def _resolve_error(self
, service
=None, status
=None, env
=None):
672 env
= self
._get
_env
()
675 status
= self
._get
_status
(env
=env
)
678 service
= self
.service
681 env
= self
._get
_env
()
682 if self
._is
_service
_deployed
(service
=service
, status
=status
):
683 units
= self
.get_all_units(status
, service
=service
)
685 for unit
, ustatus
in units
[service
].items():
686 if ustatus
== 'error':
687 self
.log
.info("{}: Found unit {} with status {}".
688 format(self
, unit
, ustatus
))
690 # Takes the unit name as service_name/idx unlike action
693 except Exception as e
:
694 msg
= "{}: Resolve on unit {}: {}". \
695 format(self
, unit
, e
)
699 def resolve_error(self
, service
=None, status
=None, env
=None):
700 '''Resolve units in error state'''
701 pf
= partial(self
._resolve
_error
,
705 yield from self
.loop
.run_in_executor(
710 def _deploy_service(self
, charm
, service
,
711 path
=None, config
=None, env
=None):
712 self
.log
.debug("{}: Deploy service for charm {}({}) with service {}".
713 format(self
, charm
, path
, service
))
716 env
= self
._get
_env
()
718 self
.service
= service
721 if self
._is
_service
_deployed
(service
=service
, env
=env
):
722 self
.log
.info("{}: Charm service {} already deployed".
723 format (self
, service
))
725 self
._apply
_config
(config
, service
=service
, env
=env
)
731 if self
.version
== 1:
735 prefix
=os
.getenv('RIFT_INSTALL', '/')
736 path
= os
.path
.join(prefix
, 'usr/rift/charms', series
, charm
)
739 self
.log
.debug("{}: Local charm settings: dir={}, series={}".
740 format(self
, path
, series
))
741 result
= env
.add_local_charm_dir(path
, series
)
742 url
= result
[self
._get
_version
_tag
('charm-url')]
744 except Exception as e
:
745 msg
= '{}: Error setting local charm directory {} for {}: {}'. \
746 format(self
, path
, service
, e
)
748 self
.log
.exception(e
)
749 raise JujuAddCharmError(msg
)
752 self
.log
.debug("{}: Deploying using: service={}, url={}, to={}, config={}".
753 format(self
, service
, url
, deploy_to
, config
))
754 env
.deploy(service
, url
, config
=config
, machine_spec
=deploy_to
)
756 except Exception as e
:
757 msg
= '{}: Error deploying {}: {}'.format(self
, service
, e
)
759 self
.log
.exception(e
)
760 raise JujuDeployError(msg
)
763 def deploy_service(self
, charm
, service
,
764 wait
=False, timeout
=300,
765 path
=None, config
=None):
766 '''Deploy a service using the charm name provided'''
767 env
= yield from self
.get_env()
769 pf
= partial(self
._deploy
_service
,
775 yield from self
.loop
.run_in_executor(
782 # Wait for the deployed units to start
784 self
.log
.debug("{}: Waiting for service {} to come up".
785 format(self
, service
))
786 rc
= yield from self
.wait_for_service(timeout
=timeout
, env
=env
)
788 except Exception as e
:
789 msg
= '{}: Error starting all units for {}: {}'. \
790 format(self
, service
, e
)
792 self
.log
.exception(e
)
793 raise JujuWaitUnitsError(msg
)
798 def wait_for_service(self
, service
=None, timeout
=0, env
=None):
799 '''Wait for the service to come up'''
801 service
= self
.service
804 env
= yield from self
.get_env()
806 status
= yield from self
.get_status(env
=env
)
808 if self
._is
_service
_up
(service
=service
, status
=status
, env
=env
):
809 self
.log
.debug("{}: Service {} is already up".
810 format(self
, service
))
813 # Check if service is deployed
814 if not self
._is
_service
_deployed
(service
=service
, status
=status
, env
=env
):
815 raise JujuSrvNotDeployedError("{}: service {} is not deployed".
816 format(self
, service
))
822 delay
= self
.retry_delay
# seconds
823 self
.log
.debug("{}: In wait for service {}".format(self
, service
))
825 start_time
= time
.time()
826 max_time
= time
.time() + timeout
827 while timeout
!= 0 and (time
.time() <= max_time
):
829 rc
= yield from self
.is_service_up(service
=service
, env
=env
)
831 self
.log
.debug("{}: Service {} is up after {} seconds".
832 format(self
, service
, time
.time()-start_time
))
834 yield from asyncio
.sleep(delay
, loop
=self
.loop
)
837 def _destroy_service(self
, service
=None):
838 '''Destroy a service on Juju controller'''
839 self
.log
.debug("{}: Destroy charm service: {}".format(self
,service
))
842 service
= self
.service
844 env
= self
._get
_env
()
846 status
= self
._get
_status
(env
=env
)
849 while self
._is
_service
_deployed
(service
=service
, status
=status
, env
=env
):
851 self
.log
.debug("{}: Destroy service {}, count {}".
852 format(self
, service
, count
))
854 if count
> self
.destroy_retries
:
855 msg
= "{}: Not able to destroy service {} after {} tries". \
856 format(self
, service
, count
)
858 raise JujuDestroyError(msg
)
861 if self
._is
_service
_error
(service
=service
, status
=status
):
862 self
._resolve
_error
(service
, status
)
865 env
.destroy_service(service
)
867 except Exception as e
:
868 msg
= "{}: Exception when running destroy on service {}: {}". \
869 format(self
, service
, e
)
871 self
.log
.exception(e
)
872 raise JujuDestroyError(msg
)
874 time
.sleep(self
.retry_delay
)
875 status
= self
._get
_status
(env
=env
)
877 self
.log
.debug("{}: Destroyed service {} ({})".
878 format(self
, service
, count
))
881 def destroy_service(self
, service
=None):
882 '''Destroy a service on Juju controller'''
883 pf
= partial(self
._destroy
_service
,
885 yield from self
.loop
.run_in_executor(
891 def _get_action_status(self
, action_tag
, env
=None):
893 env
= self
._get
_env
()
895 if not action_tag
.startswith('action-'):
896 action_tag
= 'action-{}'.format(action_tag
)
900 except Exception as e
:
901 msg
= "{}: exception in Action API: {}".format(self
, e
)
903 self
.log
.exception(e
)
904 raise JujuActionApiError(msg
)
907 status
= action
.info([{'Tag': action_tag
}])
909 self
.log
.debug("{}: Action {} status {}".
910 format(self
, action_tag
, status
))
911 return status
['results'][0]
913 except Exception as e
:
914 msg
= "{}: exception in get action status {}".format(self
, e
)
916 self
.log
.exception(e
)
917 raise JujuActionInfoError(msg
)
920 def get_action_status(self
, action_tag
, env
=None):
922 Get the status of an action queued on the controller
924 responds with the action status, which is one of three values:
930 @param action_tag - the action UUID return from the enqueue method
931 eg: action-3428e20d-fcd7-4911-803b-9b857a2e5ec9
933 pf
= partial(self
._get
_action
_status
,
936 status
= yield from self
.loop
.run_in_executor(
942 def _execute_action(self
, action_name
, params
, service
=None, env
=None):
943 '''Execute the action on all units of a service'''
945 service
= self
.service
948 env
= self
._get
_env
()
952 except Exception as e
:
953 msg
= "{}: exception in Action API: {}".format(self
, e
)
955 self
.log
.exception(e
)
956 raise JujuActionApiError(msg
)
958 units
= self
._get
_service
_units
(service
)
959 self
.log
.debug("{}: Apply action {} on units {}".
960 format(self
, action_name
, units
))
962 # Rename units from <service>/<n> to unit-<service>-<n>
965 idx
= int(unit
[unit
.index('/')+1:])
966 unit_name
= "unit-%s-%d" % (service
, idx
)
967 unit_tags
.append(unit_name
)
968 self
.log
.debug("{}: Unit tags for action: {}".
969 format(self
, unit_tags
))
972 result
= action
.enqueue_units(unit_tags
, action_name
, params
)
973 self
.log
.debug("{}: Response for action: {}".
974 format(self
, result
))
975 return result
['results'][0]
977 except Exception as e
:
978 msg
= "{}: Exception enqueing action {} on units {} with " \
979 "params {}: {}".format(self
, action
, unit_tags
, params
, e
)
981 self
.log
.exception(e
)
982 raise JujuActionExecError(msg
)
985 def execute_action(self
, action_name
, params
, service
=None, env
=None):
986 '''Execute an action for a service on the controller
988 Currently, we execute the action on all units of the service
990 pf
= partial(self
._execute
_action
,
995 result
= yield from self
.loop
.run_in_executor(
1002 if __name__
== "__main__":
1003 parser
= argparse
.ArgumentParser(description
='Test Juju')
1004 parser
.add_argument("-s", "--server", default
='10.0.202.49', help="Juju controller")
1005 parser
.add_argument("-u", "--user", default
='admin', help="User, default user-admin")
1006 parser
.add_argument("-p", "--password", default
='nfvjuju', help="Password for the user")
1007 parser
.add_argument("-P", "--port", default
=17070, help="Port number, default 17070")
1008 parser
.add_argument("-d", "--directory", help="Local directory for the charm")
1009 parser
.add_argument("--service", help="Charm service name")
1010 parser
.add_argument("--vnf-ip", help="IP of the VNF to configure")
1011 args
= parser
.parse_args()
1013 api
= JujuApi(server
=args
.server
,
1016 secret
=args
.password
)
1018 env
= api
._get
_env
()
1020 raise "Not able to login to the Juju controller"
1022 print("Status: {}".format(api
._get
_status
(env
=env
)))
1024 if args
.directory
and args
.service
:
1026 charm
= os
.path
.basename(args
.directory
)
1027 api
._deploy
_service
(charm
, args
.service
,
1028 path
=args
.directory
,
1031 while not api
._is
_service
_up
():
1034 print ("Service {} is deployed with status {}".
1035 format(args
.service
, api
._get
_service
_status
()))
1037 if args
.vnf_ip
and \
1038 ('clearwater-aio' in args
.directory
):
1039 # Execute config on charm
1040 api
._apply
_config
({'proxied_ip': args
.vnf_ip
})
1042 while not api
._is
_service
_active
():
1045 print ("Service {} is in status {}".
1046 format(args
.service
, api
._get
_service
_status
()))
1048 res
= api
._execute
_action
('create-update-user', {'number': '125252352525',
1049 'password': 'asfsaf'})
1051 print ("Action 'creat-update-user response: {}".format(res
))
1053 status
= res
['status']
1054 while status
not in [ 'completed', 'failed' ]:
1056 status
= api
._get
_action
_status
(res
['action']['tag'])['status']
1058 print("Action status: {}".format(status
))
1060 # This action will fail as the number is non-numeric
1061 res
= api
._execute
_action
('delete-user', {'number': '125252352525asf'})
1063 print ("Action 'delete-user response: {}".format(res
))
1065 status
= res
['status']
1066 while status
not in [ 'completed', 'failed' ]:
1068 status
= api
._get
_action
_status
(res
['action']['tag'])['status']
1070 print("Action status: {}".format(status
))