1 ############################################################################
2 # Copyright 2016 RIFT.io Inc #
4 # Licensed under the Apache License, Version 2.0 (the "License"); #
5 # you may not use this file except in compliance with the License. #
6 # You may obtain a copy of the License at #
8 # http://www.apache.org/licenses/LICENSE-2.0 #
10 # Unless required by applicable law or agreed to in writing, software #
11 # distributed under the License is distributed on an "AS IS" BASIS, #
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
13 # See the License for the specific language governing permissions and #
14 # limitations under the License. #
15 ############################################################################
19 from functools
import partial
27 from jujuclient
.juju1
.environment
import Environment
as Env1
28 from jujuclient
.juju2
.environment
import Environment
as Env2
29 except ImportError as e
:
30 # Try importing older jujuclient
31 from jujuclient
import Environment
as Env1
34 ssl
._create
_default
_https
_context
= ssl
._create
_unverified
_context
35 except AttributeError:
36 # Legacy Python doesn't verify by default (see pep-0476)
37 # https://www.python.org/dev/peps/pep-0476/
41 class JujuVersionError(Exception):
45 class JujuApiError(Exception):
49 class JujuEnvError(JujuApiError
):
53 class JujuModelError(JujuApiError
):
57 class JujuStatusError(JujuApiError
):
61 class JujuUnitsError(JujuApiError
):
65 class JujuWaitUnitsError(JujuApiError
):
69 class JujuSrvNotDeployedError(JujuApiError
):
73 class JujuAddCharmError(JujuApiError
):
77 class JujuDeployError(JujuApiError
):
81 class JujuDestroyError(JujuApiError
):
85 class JujuResolveError(JujuApiError
):
89 class JujuActionError(JujuApiError
):
93 class JujuActionApiError(JujuActionError
):
97 class JujuActionInfoError(JujuActionError
):
101 class JujuActionExecError(JujuActionError
):
105 class JujuApi(object):
107 JujuApi wrapper on jujuclient library
109 There should be one instance of JujuApi for each VNF manged by Juju.
112 Currently we use one unit per service/VNF. So once a service
113 is deployed, we store the unit name and reuse it
125 '''Initialize with the Juju credentials'''
130 if user
.startswith('user-'):
133 self
.user
= 'user-{}'.format(user
)
140 self
.log
= JujuApi
._get
_logger
()
143 raise JujuApiError("Logger not defined")
147 self
.version
= version
153 self
.log
.warn("Using older version of Juju client, which " \
154 "supports only Juju 1.x")
157 endpoint
= 'wss://%s:%d' % (server
, int(port
))
158 self
.endpoint
= endpoint
160 self
.charm
= None # Charm used
161 self
.service
= None # Service deployed
162 self
.units
= [] # Storing as list to support more units in future
164 self
.destroy_retries
= 25 # Number retires to destroy service
165 self
.retry_delay
= 5 # seconds
168 return ("JujuApi-{}".format(self
.endpoint
))
171 def _get_logger(cls
):
172 if cls
.log
is not None:
175 fmt
= logging
.Formatter(
176 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:' \
177 '%(filename)s:%(lineno)d) - %(message)s')
178 stderr_handler
= logging
.StreamHandler(stream
=sys
.stderr
)
179 stderr_handler
.setFormatter(fmt
)
180 logging
.basicConfig(level
=logging
.DEBUG
)
181 cls
.log
= logging
.getLogger('juju-api')
182 cls
.log
.addHandler(stderr_handler
)
187 def format_charm_name(name
):
188 '''Format the name to valid charm name
190 Charm service name accepts only a to z and -.
197 elif not c
.isalpha():
200 return new_name
.lower()
202 def _get_version_tag(self
, tag
):
218 2: 'workload-status',
226 return version_tag_map
[tag
][self
.version
]
230 env
= Env1(self
.endpoint
)
231 l
= env
.login(self
.secret
, user
=self
.user
)
234 except ConnectionRefusedError
as e
:
235 msg
= "{}: Failed Juju 1.x connect: {}".format(self
, e
)
237 self
.log
.exception(e
)
240 except Exception as e
:
241 msg
= "{}: Failed Juju 1.x connect: {}".format(self
, e
)
243 self
.log
.exception(e
)
244 raise JujuEnvError(msg
)
248 env
= Env2(self
.endpoint
)
249 l
= env
.login(self
.secret
, user
=self
.user
)
250 except KeyError as e
:
251 msg
= "{}: Failed Juju 2.x connect: {}".format(self
, e
)
253 raise JujuVersionError(msg
)
256 models
= env
.models
.list()
257 for m
in models
['user-models']:
258 if m
['model']['name'] == 'default':
259 mep
= '{}/model/{}/api'.format(endpoint
, m
['model']['uuid'])
260 model
= Env2(mep
, env_uuid
=m
['model']['uuid'])
261 l
= model
.login(args
.password
, user
=args
.user
)
269 except Exception as e
:
270 msg
= "{}: Failed logging to model: {}".format(self
, e
)
272 self
.log
.exception(e
)
274 raise JujuModelError(msg
)
277 self
.log
.debug("{}: Connect to endpoint {}".
278 format(self
, self
.endpoint
))
280 if self
.version
is None:
281 # Try version 2 first
283 env
= self
._get
_env
2()
286 except JujuVersionError
as e
:
287 self
.log
.info("Unable to login as Juju 2.x, trying 1.x")
288 env
= self
._get
_env
1()
293 elif self
.version
== 2:
294 return self
._get
_env
2()
296 elif self
.version
== 1:
297 return self
._get
_env
1()
300 msg
= "{}: Unknown version set: {}".format(self
, self
.version
)
302 raise JujuVersionError(msg
)
306 ''' Connect to the Juju controller'''
307 env
= yield from self
.loop
.run_in_executor(
313 def _get_status(self
, env
=None):
315 env
= self
._get
_env
()
318 status
= env
.status()
321 except Exception as e
:
322 msg
= "{}: exception in getting status: {}". \
325 self
.log
.exception(e
)
326 raise JujuStatusError(msg
)
329 def get_status(self
, env
=None):
330 '''Get Juju controller status'''
331 pf
= partial(self
._get
_status
, env
=env
)
332 status
= yield from self
.loop
.run_in_executor(
338 def get_all_units(self
, status
, service
=None):
339 '''Parse the status and get the units'''
341 services
= status
.get(self
._get
_version
_tag
('applications'), {})
343 for svc_name
, svc_data
in services
.items():
344 if service
and service
!= svc_name
:
346 units
= svc_data
[self
._get
_version
_tag
('units')] or {}
348 results
[svc_name
] = {}
350 results
[svc_name
][unit
] = \
351 units
[unit
][self
._get
_version
_tag
('workload-status')] \
352 [self
._get
_version
_tag
('status')] or None
356 def _get_service_units(self
, service
=None, status
=None, env
=None):
358 service
= self
.service
360 # Optimizing calls to Juju, as currently we deploy only 1 unit per
362 if self
.service
== service
and len(self
.units
):
366 env
= self
._get
_env
()
369 status
= self
._get
_status
(env
=env
)
372 resp
= self
.get_all_units(status
, service
=service
)
373 self
.log
.debug("Get all units: {}".format(resp
))
374 units
= set(resp
[service
].keys())
376 if self
.service
== service
:
381 except Exception as e
:
382 msg
= "{}: exception in get units {}".format(self
, e
)
384 self
.log
.exception(e
)
385 raise JujuUnitsError(msg
)
388 def get_service_units(self
, service
=None, status
=None, env
=None):
389 '''Get the unit names for a service'''
390 pf
= partial(self
._get
_service
_units
,
394 units
= yield from self
.loop
.run_in_executor(
400 def _get_service_status(self
, service
=None, status
=None, env
=None):
402 env
= self
._get
_env
()
405 status
= self
._get
_status
(env
=env
)
408 service
= self
.service
411 srv_status
= status
[self
._get
_version
_tag
('applications')] \
412 [service
][self
._get
_version
_tag
('status')] \
413 [self
._get
_version
_tag
('status')]
414 self
.log
.debug("{}: Service {} status is {}".
415 format(self
, service
, srv_status
))
418 except KeyError as e
:
419 self
.log
.info("self: Did not find service {}, e={}".format(self
, service
, e
))
422 except Exception as e
:
423 msg
= "{}: exception checking service status for {}, e {}". \
424 format(self
, service
, e
)
426 self
.log
.exception(e
)
427 raise JujuStatusError(msg
)
431 def get_service_status(self
, service
=None, status
=None, env
=None):
432 ''' Get service status
434 maintenance : The unit is not yet providing services, but is actively doing stuff.
435 unknown : Service has finished an event but the charm has not called status-set yet.
436 waiting : Service is unable to progress to an active state because of dependency.
437 blocked : Service needs manual intervention to get back to the Running state.
438 active : Service correctly offering all the services.
439 NA : Service is not deployed
441 pf
= partial(self
._get
_service
_status
,
445 srv_status
= yield from self
.loop
.run_in_executor(
451 def _is_service_deployed(self
, service
=None, status
=None, env
=None):
452 resp
= self
._get
_service
_status
(service
=service
,
456 if resp
not in ['terminated', 'NA']:
462 def is_service_deployed(self
, service
=None, status
=None, env
=None):
463 '''Check if the service is deployed'''
464 pf
= partial(self
._is
_service
_deployed
,
468 rc
= yield from self
.loop
.run_in_executor(
474 def _is_service_error(self
, service
=None, status
=None, env
=None):
475 resp
= self
._get
_service
_status
(service
=service
,
479 if resp
in ['error']:
485 def is_service_error(self
, service
=None, status
=None, env
=None):
486 '''Check if the service is in error state'''
487 pf
= partial(self
._is
_service
_error
,
491 rc
= yield from self
.loop
.run_in_executor(
497 def _is_service_maint(self
, service
=None, status
=None, env
=None):
498 resp
= self
._get
_service
_status
(service
=service
,
502 if resp
in ['maintenance']:
508 def is_service_maint(self
, service
=None, status
=None, env
=None):
509 '''Check if the service is in error state'''
510 pf
= partial(self
._is
_service
_maint
,
514 rc
= yield from self
.loop
.run_in_executor(
520 def _is_service_active(self
, service
=None, status
=None, env
=None):
521 resp
= self
._get
_service
_status
(service
=service
,
525 if resp
in ['active']:
531 def is_service_active(self
, service
=None, status
=None, env
=None):
532 '''Check if the service is active'''
533 pf
= partial(self
._is
_service
_active
,
537 rc
= yield from self
.loop
.run_in_executor(
543 def _is_service_blocked(self
, service
=None, status
=None, env
=None):
544 resp
= self
._get
_service
_status
(service
=service
,
548 if resp
in ['blocked']:
554 def is_service_blocked(self
, service
=None, status
=None, env
=None):
555 '''Check if the service is blocked'''
556 pf
= partial(self
._is
_service
_blocked
,
560 rc
= yield from self
.loop
.run_in_executor(
566 def _is_service_up(self
, service
=None, status
=None, env
=None):
567 resp
= self
._get
_service
_status
(service
=service
,
571 if resp
in ['active', 'blocked']:
577 def is_service_up(self
, service
=None, status
=None, env
=None):
578 '''Check if the service is installed and up'''
579 pf
= partial(self
._is
_service
_up
,
584 rc
= yield from self
.loop
.run_in_executor(
590 def _apply_config(self
, config
, service
=None, env
=None):
592 service
= self
.service
594 if config
is None or len(config
) == 0:
595 self
.log
.warn("{}: Empty config passed for service {}".
596 format(self
, service
))
600 env
= self
._get
_env
()
602 status
= self
._get
_status
(env
=env
)
604 if not self
._is
_service
_deployed
(service
=service
,
607 raise JujuSrvNotDeployedError("{}: service {} is not deployed".
608 format(self
, service
))
610 self
.log
.debug("{}: Config for service {} update to: {}".
611 format(self
, service
, config
))
613 # Try to fix error on service, most probably due to config issue
614 if self
._is
_service
_error
(service
=service
, status
=status
, env
=env
):
615 self
._resolve
_error
(service
=service
, env
=env
)
617 if self
.version
== 2:
618 env
.service
.set(service
, config
)
620 env
.set_config(service
, config
)
622 except Exception as e
:
623 self
.log
.error("{}: exception setting config for {} with {}, e {}".
624 format(self
, service
, config
, e
))
625 self
.log
.exception(e
)
629 def apply_config(self
, config
, service
=None, env
=None, wait
=True):
630 '''Apply a config on the service'''
631 pf
= partial(self
._apply
_config
,
635 yield from self
.loop
.run_in_executor(
641 # Wait till config finished applying
642 self
.log
.debug("{}: Wait for config apply to finish".
647 # Sleep first to give time for config_changed hook to be invoked
648 yield from asyncio
.sleep(delay
, loop
=self
.loop
)
649 maint
= yield from self
.is_service_maint(service
=service
,
652 err
= yield from self
.is_service_error(service
=service
, env
=env
)
654 self
.log
.error("{}: Service is in error state".
658 self
.log
.debug("{}: Finished applying config".format(self
))
661 def _set_parameter(self
, parameter
, value
, service
=None):
662 return self
._apply
_config
({parameter
: value
}, service
=service
)
665 def set_parameter(self
, parameter
, value
, service
=None):
666 '''Set a config parameter for a service'''
667 return self
.apply_config({parameter
: value
}, service
=service
)
669 def _resolve_error(self
, service
=None, status
=None, env
=None):
671 env
= self
._get
_env
()
674 status
= self
._get
_status
(env
=env
)
677 service
= self
.service
680 env
= self
._get
_env
()
681 if self
._is
_service
_deployed
(service
=service
, status
=status
):
682 units
= self
.get_all_units(status
, service
=service
)
684 for unit
, ustatus
in units
[service
].items():
685 if ustatus
== 'error':
686 self
.log
.info("{}: Found unit {} with status {}".
687 format(self
, unit
, ustatus
))
689 # Takes the unit name as service_name/idx unlike action
692 except Exception as e
:
693 msg
= "{}: Resolve on unit {}: {}". \
694 format(self
, unit
, e
)
696 self
.log
.exception(e
)
697 raise JujuResolveError(msg
)
700 def resolve_error(self
, service
=None, status
=None, env
=None):
701 '''Resolve units in error state'''
702 pf
= partial(self
._resolve
_error
,
706 yield from self
.loop
.run_in_executor(
711 def _deploy_service(self
, charm
, service
,
712 path
=None, config
=None, env
=None):
713 self
.log
.debug("{}: Deploy service for charm {}({}) with service {}".
714 format(self
, charm
, path
, service
))
717 env
= self
._get
_env
()
719 self
.service
= service
722 if self
._is
_service
_deployed
(service
=service
, env
=env
):
723 self
.log
.info("{}: Charm service {} already deployed".
724 format (self
, service
))
726 self
._apply
_config
(config
, service
=service
, env
=env
)
732 if self
.version
== 1:
736 prefix
=os
.getenv('RIFT_INSTALL', '/')
737 path
= os
.path
.join(prefix
, 'usr/rift/charms', series
, charm
)
740 self
.log
.debug("{}: Local charm settings: dir={}, series={}".
741 format(self
, path
, series
))
742 result
= env
.add_local_charm_dir(path
, series
)
743 url
= result
[self
._get
_version
_tag
('charm-url')]
745 except Exception as e
:
746 msg
= '{}: Error setting local charm directory {} for {}: {}'. \
747 format(self
, path
, service
, e
)
749 self
.log
.exception(e
)
750 raise JujuAddCharmError(msg
)
753 self
.log
.debug("{}: Deploying using: service={}, url={}, to={}, config={}".
754 format(self
, service
, url
, deploy_to
, config
))
755 env
.deploy(service
, url
, config
=config
, machine_spec
=deploy_to
)
757 except Exception as e
:
758 msg
= '{}: Error deploying {}: {}'.format(self
, service
, e
)
760 self
.log
.exception(e
)
761 raise JujuDeployError(msg
)
764 def deploy_service(self
, charm
, service
,
765 wait
=False, timeout
=300,
766 path
=None, config
=None):
767 '''Deploy a service using the charm name provided'''
768 env
= yield from self
.get_env()
770 pf
= partial(self
._deploy
_service
,
776 yield from self
.loop
.run_in_executor(
783 # Wait for the deployed units to start
785 self
.log
.debug("{}: Waiting for service {} to come up".
786 format(self
, service
))
787 rc
= yield from self
.wait_for_service(timeout
=timeout
, env
=env
)
789 except Exception as e
:
790 msg
= '{}: Error starting all units for {}: {}'. \
791 format(self
, service
, e
)
793 self
.log
.exception(e
)
794 raise JujuWaitUnitsError(msg
)
799 def wait_for_service(self
, service
=None, timeout
=0, env
=None):
800 '''Wait for the service to come up'''
802 service
= self
.service
805 env
= yield from self
.get_env()
807 status
= yield from self
.get_status(env
=env
)
809 if self
._is
_service
_up
(service
=service
, status
=status
, env
=env
):
810 self
.log
.debug("{}: Service {} is already up".
811 format(self
, service
))
814 # Check if service is deployed
815 if not self
._is
_service
_deployed
(service
=service
, status
=status
, env
=env
):
816 raise JujuSrvNotDeployedError("{}: service {} is not deployed".
817 format(self
, service
))
823 delay
= self
.retry_delay
# seconds
824 self
.log
.debug("{}: In wait for service {}".format(self
, service
))
826 start_time
= time
.time()
827 max_time
= time
.time() + timeout
828 while timeout
!= 0 and (time
.time() <= max_time
):
830 rc
= yield from self
.is_service_up(service
=service
, env
=env
)
832 self
.log
.debug("{}: Service {} is up after {} seconds".
833 format(self
, service
, time
.time()-start_time
))
835 yield from asyncio
.sleep(delay
, loop
=self
.loop
)
838 def _destroy_service(self
, service
=None):
839 '''Destroy a service on Juju controller'''
840 self
.log
.debug("{}: Destroy charm service: {}".format(self
,service
))
843 service
= self
.service
845 env
= self
._get
_env
()
847 status
= self
._get
_status
(env
=env
)
850 while self
._is
_service
_deployed
(service
=service
, status
=status
, env
=env
):
852 self
.log
.debug("{}: Destroy service {}, count {}".
853 format(self
, service
, count
))
855 if count
> self
.destroy_retries
:
856 msg
= "{}: Not able to destroy service {} after {} tries". \
857 format(self
, service
, count
)
859 raise JujuDestroyError(msg
)
862 if self
._is
_service
_error
(service
=service
, status
=status
):
863 self
._resolve
_error
(service
, status
)
866 env
.destroy_service(service
)
868 except Exception as e
:
869 msg
= "{}: Exception when running destroy on service {}: {}". \
870 format(self
, service
, e
)
872 self
.log
.exception(e
)
873 raise JujuDestroyError(msg
)
875 time
.sleep(self
.retry_delay
)
876 status
= self
._get
_status
(env
=env
)
878 self
.log
.debug("{}: Destroyed service {} ({})".
879 format(self
, service
, count
))
882 def destroy_service(self
, service
=None):
883 '''Destroy a service on Juju controller'''
884 pf
= partial(self
._destroy
_service
,
886 yield from self
.loop
.run_in_executor(
892 def _get_action_status(self
, action_tag
, env
=None):
894 env
= self
._get
_env
()
896 if not action_tag
.startswith('action-'):
897 action_tag
= 'action-{}'.format(action_tag
)
901 except Exception as e
:
902 msg
= "{}: exception in Action API: {}".format(self
, e
)
904 self
.log
.exception(e
)
905 raise JujuActionApiError(msg
)
908 status
= action
.info([{'Tag': action_tag
}])
910 self
.log
.debug("{}: Action {} status {}".
911 format(self
, action_tag
, status
))
912 return status
['results'][0]
914 except Exception as e
:
915 msg
= "{}: exception in get action status {}".format(self
, e
)
917 self
.log
.exception(e
)
918 raise JujuActionInfoError(msg
)
921 def get_action_status(self
, action_tag
, env
=None):
923 Get the status of an action queued on the controller
925 responds with the action status, which is one of three values:
931 @param action_tag - the action UUID return from the enqueue method
932 eg: action-3428e20d-fcd7-4911-803b-9b857a2e5ec9
934 pf
= partial(self
._get
_action
_status
,
937 status
= yield from self
.loop
.run_in_executor(
943 def _execute_action(self
, action_name
, params
, service
=None, env
=None):
944 '''Execute the action on all units of a service'''
946 service
= self
.service
949 env
= self
._get
_env
()
953 except Exception as e
:
954 msg
= "{}: exception in Action API: {}".format(self
, e
)
956 self
.log
.exception(e
)
957 raise JujuActionApiError(msg
)
959 units
= self
._get
_service
_units
(service
)
960 self
.log
.debug("{}: Apply action {} on units {}".
961 format(self
, action_name
, units
))
963 # Rename units from <service>/<n> to unit-<service>-<n>
966 idx
= int(unit
[unit
.index('/')+1:])
967 unit_name
= "unit-%s-%d" % (service
, idx
)
968 unit_tags
.append(unit_name
)
969 self
.log
.debug("{}: Unit tags for action: {}".
970 format(self
, unit_tags
))
973 result
= action
.enqueue_units(unit_tags
, action_name
, params
)
974 self
.log
.debug("{}: Response for action: {}".
975 format(self
, result
))
976 return result
['results'][0]
978 except Exception as e
:
979 msg
= "{}: Exception enqueing action {} on units {} with " \
980 "params {}: {}".format(self
, action
, unit_tags
, params
, e
)
982 self
.log
.exception(e
)
983 raise JujuActionExecError(msg
)
986 def execute_action(self
, action_name
, params
, service
=None, env
=None):
987 '''Execute an action for a service on the controller
989 Currently, we execute the action on all units of the service
991 pf
= partial(self
._execute
_action
,
996 result
= yield from self
.loop
.run_in_executor(
1003 if __name__
== "__main__":
1004 parser
= argparse
.ArgumentParser(description
='Test Juju')
1005 parser
.add_argument("-s", "--server", default
='10.0.202.49', help="Juju controller")
1006 parser
.add_argument("-u", "--user", default
='admin', help="User, default user-admin")
1007 parser
.add_argument("-p", "--password", default
='nfvjuju', help="Password for the user")
1008 parser
.add_argument("-P", "--port", default
=17070, help="Port number, default 17070")
1009 parser
.add_argument("-d", "--directory", help="Local directory for the charm")
1010 parser
.add_argument("--service", help="Charm service name")
1011 parser
.add_argument("--vnf-ip", help="IP of the VNF to configure")
1012 args
= parser
.parse_args()
1014 api
= JujuApi(server
=args
.server
,
1017 secret
=args
.password
)
1019 env
= api
._get
_env
()
1021 raise "Not able to login to the Juju controller"
1023 print("Status: {}".format(api
._get
_status
(env
=env
)))
1025 if args
.directory
and args
.service
:
1027 charm
= os
.path
.basename(args
.directory
)
1028 api
._deploy
_service
(charm
, args
.service
,
1029 path
=args
.directory
,
1032 while not api
._is
_service
_up
():
1035 print ("Service {} is deployed with status {}".
1036 format(args
.service
, api
._get
_service
_status
()))
1038 if args
.vnf_ip
and \
1039 ('clearwater-aio' in args
.directory
):
1040 # Execute config on charm
1041 api
._apply
_config
({'proxied_ip': args
.vnf_ip
})
1043 while not api
._is
_service
_active
():
1046 print ("Service {} is in status {}".
1047 format(args
.service
, api
._get
_service
_status
()))
1049 res
= api
._execute
_action
('create-update-user', {'number': '125252352525',
1050 'password': 'asfsaf'})
1052 print ("Action 'creat-update-user response: {}".format(res
))
1054 status
= res
['status']
1055 while status
not in [ 'completed', 'failed' ]:
1057 status
= api
._get
_action
_status
(res
['action']['tag'])['status']
1059 print("Action status: {}".format(status
))
1061 # This action will fail as the number is non-numeric
1062 res
= api
._execute
_action
('delete-user', {'number': '125252352525asf'})
1064 print ("Action 'delete-user response: {}".format(res
))
1066 status
= res
['status']
1067 while status
not in [ 'completed', 'failed' ]:
1069 status
= api
._get
_action
_status
(res
['action']['tag'])['status']
1071 print("Action status: {}".format(status
))