b452b8a5e01090a992506d287fef39b135e687e1
[osm/SO.git] / common / python / rift / mano / utils / juju_api.py
1 ############################################################################
2 # Copyright 2016 RIFT.io Inc #
3 # #
4 # Licensed under the Apache License, Version 2.0 (the "License"); #
5 # you may not use this file except in compliance with the License. #
6 # You may obtain a copy of the License at #
7 # #
8 # http://www.apache.org/licenses/LICENSE-2.0 #
9 # #
10 # Unless required by applicable law or agreed to in writing, software #
11 # distributed under the License is distributed on an "AS IS" BASIS, #
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
13 # See the License for the specific language governing permissions and #
14 # limitations under the License. #
15 ############################################################################
16
17 import argparse
18 import asyncio
19 from functools import partial
20 import logging
21 import os
22 import ssl
23 import sys
24 import time
25
26 try:
27 from jujuclient.juju1.environment import Environment as Env1
28 from jujuclient.juju2.environment import Environment as Env2
29 except ImportError as e:
30 # Try importing older jujuclient
31 from jujuclient import Environment as Env1
32
33 try:
34 ssl._create_default_https_context = ssl._create_unverified_context
35 except AttributeError:
36 # Legacy Python doesn't verify by default (see pep-0476)
37 # https://www.python.org/dev/peps/pep-0476/
38 pass
39
40
41 class JujuVersionError(Exception):
42 pass
43
44
45 class JujuApiError(Exception):
46 pass
47
48
49 class JujuEnvError(JujuApiError):
50 pass
51
52
53 class JujuModelError(JujuApiError):
54 pass
55
56
57 class JujuStatusError(JujuApiError):
58 pass
59
60
61 class JujuUnitsError(JujuApiError):
62 pass
63
64
65 class JujuWaitUnitsError(JujuApiError):
66 pass
67
68
69 class JujuSrvNotDeployedError(JujuApiError):
70 pass
71
72
73 class JujuAddCharmError(JujuApiError):
74 pass
75
76
77 class JujuDeployError(JujuApiError):
78 pass
79
80
81 class JujuDestroyError(JujuApiError):
82 pass
83
84
85 class JujuResolveError(JujuApiError):
86 pass
87
88
89 class JujuActionError(JujuApiError):
90 pass
91
92
93 class JujuActionApiError(JujuActionError):
94 pass
95
96
97 class JujuActionInfoError(JujuActionError):
98 pass
99
100
101 class JujuActionExecError(JujuActionError):
102 pass
103
104
105 class JujuApi(object):
106 '''
107 JujuApi wrapper on jujuclient library
108
109 There should be one instance of JujuApi for each VNF manged by Juju.
110
111 Assumption:
112 Currently we use one unit per service/VNF. So once a service
113 is deployed, we store the unit name and reuse it
114 '''
115 log = None
116
117 def __init__ (self,
118 log=None,
119 loop=None,
120 server='127.0.0.1',
121 port=17070,
122 user='admin',
123 secret=None,
124 version=None):
125 '''Initialize with the Juju credentials'''
126 self.server = server
127 self.port = port
128
129 self.secret = secret
130 if user.startswith('user-'):
131 self.user = user
132 else:
133 self.user = 'user-{}'.format(user)
134
135 self.loop = loop
136
137 if log is not None:
138 self.log = log
139 else:
140 self.log = JujuApi._get_logger()
141
142 if self.log is None:
143 raise JujuApiError("Logger not defined")
144
145 self.version = None
146 if version:
147 self.version = version
148 else:
149 try:
150 if Env2:
151 pass
152 except NameError:
153 self.log.warn("Using older version of Juju client, which " \
154 "supports only Juju 1.x")
155 self.version = 1
156
157 endpoint = 'wss://%s:%d' % (server, int(port))
158 self.endpoint = endpoint
159
160 self.charm = None # Charm used
161 self.service = None # Service deployed
162 self.units = [] # Storing as list to support more units in future
163
164 self.destroy_retries = 25 # Number retires to destroy service
165 self.retry_delay = 5 # seconds
166
167 def __str__(self):
168 return ("JujuApi-{}".format(self.endpoint))
169
170 @classmethod
171 def _get_logger(cls):
172 if cls.log is not None:
173 return cls.log
174
175 fmt = logging.Formatter(
176 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:' \
177 '%(filename)s:%(lineno)d) - %(message)s')
178 stderr_handler = logging.StreamHandler(stream=sys.stderr)
179 stderr_handler.setFormatter(fmt)
180 logging.basicConfig(level=logging.DEBUG)
181 cls.log = logging.getLogger('juju-api')
182 cls.log.addHandler(stderr_handler)
183
184 return cls.log
185
186 @staticmethod
187 def format_charm_name(name):
188 '''Format the name to valid charm name
189
190 Charm service name accepts only a to z and -.
191 '''
192
193 new_name = ''
194 for c in name:
195 if c.isdigit():
196 c = chr(97 + int(c))
197 elif not c.isalpha():
198 c = "-"
199 new_name += c
200 return new_name.lower()
201
202 def _get_version_tag(self, tag):
203 version_tag_map = {
204 'applications': {
205 1: 'Services',
206 2: 'applications',
207 },
208 'units': {
209 1: 'Units',
210 2: 'units',
211 },
212 'status': {
213 1: 'Status',
214 2: 'status',
215 },
216 'workload-status': {
217 1: 'Workload',
218 2: 'workload-status',
219 },
220 'charm-url': {
221 1: 'CharmURL',
222 2: 'charm-url',
223 },
224 }
225
226 return version_tag_map[tag][self.version]
227
228 def _get_env1(self):
229 try:
230 env = Env1(self.endpoint)
231 l = env.login(self.secret, user=self.user)
232 return env
233
234 except ConnectionRefusedError as e:
235 msg = "{}: Failed Juju 1.x connect: {}".format(self, e)
236 self.log.error(msg)
237 self.log.exception(e)
238 raise e
239
240 except Exception as e:
241 msg = "{}: Failed Juju 1.x connect: {}".format(self, e)
242 self.log.error(msg)
243 self.log.exception(e)
244 raise JujuEnvError(msg)
245
246 def _get_env2(self):
247 try:
248 env = Env2(self.endpoint)
249 l = env.login(self.secret, user=self.user)
250 except KeyError as e:
251 msg = "{}: Failed Juju 2.x connect: {}".format(self, e)
252 self.log.debug(msg)
253 raise JujuVersionError(msg)
254
255 try:
256 models = env.models.list()
257 for m in models['user-models']:
258 if m['model']['name'] == 'default':
259 mep = '{}/model/{}/api'.format(self.endpoint,
260 m['model']['uuid'])
261 model = Env2(mep, env_uuid=m['model']['uuid'])
262 l = model.login(self.secret, user=self.user)
263 break
264
265 if model is None:
266 raise
267
268 return model
269
270 except Exception as e:
271 msg = "{}: Failed logging to model: {}".format(self, e)
272 self.log.error(msg)
273 self.log.exception(e)
274 env.close()
275 raise JujuModelError(msg)
276
277 def _get_env(self):
278 self.log.debug("{}: Connect to endpoint {}".
279 format(self, self.endpoint))
280
281 if self.version is None:
282 # Try version 2 first
283 try:
284 env = self._get_env2()
285 self.version = 2
286
287 except JujuVersionError as e:
288 self.log.info("Unable to login as Juju 2.x, trying 1.x")
289 env = self._get_env1()
290 self.version = 1
291
292 return env
293
294 elif self.version == 2:
295 return self._get_env2()
296
297 elif self.version == 1:
298 return self._get_env1()
299
300 else:
301 msg = "{}: Unknown version set: {}".format(self, self.version)
302 self.log.error(msg)
303 raise JujuVersionError(msg)
304
305 @asyncio.coroutine
306 def get_env(self):
307 ''' Connect to the Juju controller'''
308 env = yield from self.loop.run_in_executor(
309 None,
310 self._get_env,
311 )
312 return env
313
314 def _get_status(self, env=None):
315 if env is None:
316 env = self._get_env()
317
318 try:
319 status = env.status()
320 return status
321
322 except Exception as e:
323 msg = "{}: exception in getting status: {}". \
324 format(self, e)
325 self.log.error(msg)
326 self.log.exception(e)
327 raise JujuStatusError(msg)
328
329 @asyncio.coroutine
330 def get_status(self, env=None):
331 '''Get Juju controller status'''
332 pf = partial(self._get_status, env=env)
333 status = yield from self.loop.run_in_executor(
334 None,
335 pf,
336 )
337 return status
338
339 def get_all_units(self, status, service=None):
340 '''Parse the status and get the units'''
341 results = {}
342 services = status.get(self._get_version_tag('applications'), {})
343
344 for svc_name, svc_data in services.items():
345 if service and service != svc_name:
346 continue
347 units = svc_data[self._get_version_tag('units')] or {}
348
349 results[svc_name] = {}
350 for unit in units:
351 results[svc_name][unit] = \
352 units[unit][self._get_version_tag('workload-status')] \
353 [self._get_version_tag('status')] or None
354 return results
355
356
357 def _get_service_units(self, service=None, status=None, env=None):
358 if service is None:
359 service = self.service
360
361 # Optimizing calls to Juju, as currently we deploy only 1 unit per
362 # service.
363 # if self.service == service and len(self.units):
364 # return self.units
365
366 if env is None:
367 env = self._get_env()
368
369 if status is None:
370 status = self._get_status(env=env)
371
372 try:
373 resp = self.get_all_units(status, service=service)
374 self.log.debug("Get all units: {}".format(resp))
375 units = set(resp[service].keys())
376
377 if self.service == service:
378 self.units = units
379
380 return units
381
382 except Exception as e:
383 msg = "{}: exception in get units {}".format(self, e)
384 self.log.error(msg)
385 self.log.exception(e)
386 raise JujuUnitsError(msg)
387
388 @asyncio.coroutine
389 def get_service_units(self, service=None, status=None, env=None):
390 '''Get the unit names for a service'''
391 pf = partial(self._get_service_units,
392 service=service,
393 status=status,
394 env=env)
395 units = yield from self.loop.run_in_executor(
396 None,
397 pf,
398 )
399 return units
400
401 def _get_service_status(self, service=None, status=None, env=None):
402 if env is None:
403 env = self._get_env()
404
405 if status is None:
406 status = self._get_status(env=env)
407
408 if service is None:
409 service = self.service
410
411 try:
412 srv_status = status[self._get_version_tag('applications')] \
413 [service][self._get_version_tag('status')] \
414 [self._get_version_tag('status')]
415 self.log.debug("{}: Service {} status is {}".
416 format(self, service, srv_status))
417 return srv_status
418
419 except KeyError as e:
420 self.log.info("self: Did not find service {}, e={}".format(self, service, e))
421 return 'NA'
422
423 except Exception as e:
424 msg = "{}: exception checking service status for {}, e {}". \
425 format(self, service, e)
426 self.log.error(msg)
427 self.log.exception(e)
428 raise JujuStatusError(msg)
429
430
431 @asyncio.coroutine
432 def get_service_status(self, service=None, status=None, env=None):
433 ''' Get service status
434
435 maintenance : The unit is not yet providing services, but is actively doing stuff.
436 unknown : Service has finished an event but the charm has not called status-set yet.
437 waiting : Service is unable to progress to an active state because of dependency.
438 blocked : Service needs manual intervention to get back to the Running state.
439 active : Service correctly offering all the services.
440 NA : Service is not deployed
441 '''
442 pf = partial(self._get_service_status,
443 service=service,
444 status=status,
445 env=env)
446 srv_status = yield from self.loop.run_in_executor(
447 None,
448 pf,
449 )
450 return srv_status
451
452 def _is_service_deployed(self, service=None, status=None, env=None):
453 resp = self._get_service_status(service=service,
454 status=status,
455 env=env)
456
457 if resp not in ['terminated', 'NA']:
458 return True
459
460 return False
461
462 @asyncio.coroutine
463 def is_service_deployed(self, service=None, status=None, env=None):
464 '''Check if the service is deployed'''
465 pf = partial(self._is_service_deployed,
466 service=service,
467 status=status,
468 env=env)
469 rc = yield from self.loop.run_in_executor(
470 None,
471 pf,
472 )
473 return rc
474
475 def _is_service_error(self, service=None, status=None, env=None):
476 resp = self._get_service_status(service=service,
477 status=status,
478 env=env)
479
480 if resp in ['error']:
481 return True
482
483 return False
484
485 @asyncio.coroutine
486 def is_service_error(self, service=None, status=None, env=None):
487 '''Check if the service is in error state'''
488 pf = partial(self._is_service_error,
489 service=service,
490 status=status,
491 env=env)
492 rc = yield from self.loop.run_in_executor(
493 None,
494 pf,
495 )
496 return rc
497
498 def _is_service_maint(self, service=None, status=None, env=None):
499 resp = self._get_service_status(service=service,
500 status=status,
501 env=env)
502
503 if resp in ['maintenance']:
504 return True
505
506 return False
507
508 @asyncio.coroutine
509 def is_service_maint(self, service=None, status=None, env=None):
510 '''Check if the service is in error state'''
511 pf = partial(self._is_service_maint,
512 service=service,
513 status=status,
514 env=env)
515 rc = yield from self.loop.run_in_executor(
516 None,
517 pf,
518 )
519 return rc
520
521 def _is_service_active(self, service=None, status=None, env=None):
522 resp = self._get_service_status(service=service,
523 status=status,
524 env=env)
525
526 if resp in ['active']:
527 return True
528
529 return False
530
531 @asyncio.coroutine
532 def is_service_active(self, service=None, status=None, env=None):
533 '''Check if the service is active'''
534 pf = partial(self._is_service_active,
535 service=service,
536 status=status,
537 env=env)
538 rc = yield from self.loop.run_in_executor(
539 None,
540 pf,
541 )
542 return rc
543
544 def _is_service_blocked(self, service=None, status=None, env=None):
545 resp = self._get_service_status(service=service,
546 status=status,
547 env=env)
548
549 if resp in ['blocked']:
550 return True
551
552 return False
553
554 @asyncio.coroutine
555 def is_service_blocked(self, service=None, status=None, env=None):
556 '''Check if the service is blocked'''
557 pf = partial(self._is_service_blocked,
558 service=service,
559 status=status,
560 env=env)
561 rc = yield from self.loop.run_in_executor(
562 None,
563 pf,
564 )
565 return rc
566
567 def _is_service_up(self, service=None, status=None, env=None):
568 resp = self._get_service_status(service=service,
569 status=status,
570 env=env)
571
572 if resp in ['active', 'blocked']:
573 return True
574
575 return False
576
577 @asyncio.coroutine
578 def is_service_up(self, service=None, status=None, env=None):
579 '''Check if the service is installed and up'''
580 pf = partial(self._is_service_up,
581 service=service,
582 status=status,
583 env=env)
584
585 rc = yield from self.loop.run_in_executor(
586 None,
587 pf,
588 )
589 return rc
590
591 def _apply_config(self, config, service=None, env=None):
592 if service is None:
593 service = self.service
594
595 if config is None or len(config) == 0:
596 self.log.warn("{}: Empty config passed for service {}".
597 format(self, service))
598 return
599
600 if env is None:
601 env = self._get_env()
602
603 status = self._get_status(env=env)
604
605 if not self._is_service_deployed(service=service,
606 status=status,
607 env=env):
608 raise JujuSrvNotDeployedError("{}: service {} is not deployed".
609 format(self, service))
610
611 self.log.debug("{}: Config for service {} update to: {}".
612 format(self, service, config))
613 try:
614 # Try to fix error on service, most probably due to config issue
615 if self._is_service_error(service=service, status=status, env=env):
616 self._resolve_error(service=service, env=env)
617
618 if self.version == 2:
619 env.service.set(service, config)
620 else:
621 env.set_config(service, config)
622
623 except Exception as e:
624 self.log.error("{}: exception setting config for {} with {}, e {}".
625 format(self, service, config, e))
626 self.log.exception(e)
627 raise e
628
629 @asyncio.coroutine
630 def apply_config(self, config, service=None, env=None, wait=True):
631 '''Apply a config on the service'''
632 pf = partial(self._apply_config,
633 config,
634 service=service,
635 env=env)
636 yield from self.loop.run_in_executor(
637 None,
638 pf,
639 )
640
641 if wait:
642 # Wait till config finished applying
643 self.log.debug("{}: Wait for config apply to finish".
644 format(self))
645 delay = 3 # secs
646 maint = True
647 while maint:
648 # Sleep first to give time for config_changed hook to be invoked
649 yield from asyncio.sleep(delay, loop=self.loop)
650 maint = yield from self.is_service_maint(service=service,
651 env=env)
652
653 err = yield from self.is_service_error(service=service, env=env)
654 if err:
655 self.log.error("{}: Service is in error state".
656 format(self))
657 return False
658
659 self.log.debug("{}: Finished applying config".format(self))
660 return True
661
662 def _set_parameter(self, parameter, value, service=None):
663 return self._apply_config({parameter : value}, service=service)
664
665 @asyncio.coroutine
666 def set_parameter(self, parameter, value, service=None):
667 '''Set a config parameter for a service'''
668 return self.apply_config({parameter : value}, service=service)
669
670 def _resolve_error(self, service=None, status=None, env=None):
671 if env is None:
672 env = self._get_env()
673
674 if status is None:
675 status = self._get_status(env=env)
676
677 if service is None:
678 service = self.service
679
680 if env is None:
681 env = self._get_env()
682 if self._is_service_deployed(service=service, status=status):
683 units = self.get_all_units(status, service=service)
684
685 for unit, ustatus in units[service].items():
686 if ustatus == 'error':
687 self.log.info("{}: Found unit {} with status {}".
688 format(self, unit, ustatus))
689 try:
690 # Takes the unit name as service_name/idx unlike action
691 env.resolved(unit)
692
693 except Exception as e:
694 msg = "{}: Resolve on unit {}: {}". \
695 format(self, unit, e)
696 self.log.warn(msg)
697
698 @asyncio.coroutine
699 def resolve_error(self, service=None, status=None, env=None):
700 '''Resolve units in error state'''
701 pf = partial(self._resolve_error,
702 service=service,
703 status=status,
704 env=env)
705 yield from self.loop.run_in_executor(
706 None,
707 pf,
708 )
709
710 def _deploy_service(self, charm, service,
711 path=None, config=None, env=None):
712 self.log.debug("{}: Deploy service for charm {}({}) with service {}".
713 format(self, charm, path, service))
714
715 if env is None:
716 env = self._get_env()
717
718 self.service = service
719 self.charm = charm
720
721 if self._is_service_deployed(service=service, env=env):
722 self.log.info("{}: Charm service {} already deployed".
723 format (self, service))
724 if config:
725 self._apply_config(config, service=service, env=env)
726 return
727
728 series = "trusty"
729
730 deploy_to = None
731 if self.version == 1:
732 deploy_to = "lxc:0"
733
734 if path is None:
735 prefix=os.getenv('RIFT_INSTALL', '/')
736 path = os.path.join(prefix, 'usr/rift/charms', series, charm)
737
738 try:
739 self.log.debug("{}: Local charm settings: dir={}, series={}".
740 format(self, path, series))
741 result = env.add_local_charm_dir(path, series)
742 url = result[self._get_version_tag('charm-url')]
743
744 except Exception as e:
745 msg = '{}: Error setting local charm directory {} for {}: {}'. \
746 format(self, path, service, e)
747 self.log.error(msg)
748 self.log.exception(e)
749 raise JujuAddCharmError(msg)
750
751 try:
752 self.log.debug("{}: Deploying using: service={}, url={}, to={}, config={}".
753 format(self, service, url, deploy_to, config))
754 env.deploy(service, url, config=config, machine_spec=deploy_to)
755
756 except Exception as e:
757 msg = '{}: Error deploying {}: {}'.format(self, service, e)
758 self.log.error(msg)
759 self.log.exception(e)
760 raise JujuDeployError(msg)
761
762 @asyncio.coroutine
763 def deploy_service(self, charm, service,
764 wait=False, timeout=300,
765 path=None, config=None):
766 '''Deploy a service using the charm name provided'''
767 env = yield from self.get_env()
768
769 pf = partial(self._deploy_service,
770 charm,
771 service,
772 path=path,
773 config=config,
774 env=env)
775 yield from self.loop.run_in_executor(
776 None,
777 pf,
778 )
779
780 rc = True
781 if wait is True:
782 # Wait for the deployed units to start
783 try:
784 self.log.debug("{}: Waiting for service {} to come up".
785 format(self, service))
786 rc = yield from self.wait_for_service(timeout=timeout, env=env)
787
788 except Exception as e:
789 msg = '{}: Error starting all units for {}: {}'. \
790 format(self, service, e)
791 self.log.error(msg)
792 self.log.exception(e)
793 raise JujuWaitUnitsError(msg)
794
795 return rc
796
797 @asyncio.coroutine
798 def wait_for_service(self, service=None, timeout=0, env=None):
799 '''Wait for the service to come up'''
800 if service is None:
801 service = self.service
802
803 if env is None:
804 env = yield from self.get_env()
805
806 status = yield from self.get_status(env=env)
807
808 if self._is_service_up(service=service, status=status, env=env):
809 self.log.debug("{}: Service {} is already up".
810 format(self, service))
811 return True
812
813 # Check if service is deployed
814 if not self._is_service_deployed(service=service, status=status, env=env):
815 raise JujuSrvNotDeployedError("{}: service {} is not deployed".
816 format(self, service))
817
818 if timeout < 0:
819 timeout = 0
820
821 count = 0
822 delay = self.retry_delay # seconds
823 self.log.debug("{}: In wait for service {}".format(self, service))
824
825 start_time = time.time()
826 max_time = time.time() + timeout
827 while timeout != 0 and (time.time() <= max_time):
828 count += 1
829 rc = yield from self.is_service_up(service=service, env=env)
830 if rc:
831 self.log.debug("{}: Service {} is up after {} seconds".
832 format(self, service, time.time()-start_time))
833 return True
834 yield from asyncio.sleep(delay, loop=self.loop)
835 return False
836
837 def _destroy_service(self, service=None):
838 '''Destroy a service on Juju controller'''
839 self.log.debug("{}: Destroy charm service: {}".format(self,service))
840
841 if service is None:
842 service = self.service
843
844 env = self._get_env()
845
846 status = self._get_status(env=env)
847
848 count = 0
849 while self._is_service_deployed(service=service, status=status, env=env):
850 count += 1
851 self.log.debug("{}: Destroy service {}, count {}".
852 format(self, service, count))
853
854 if count > self.destroy_retries:
855 msg = "{}: Not able to destroy service {} after {} tries". \
856 format(self, service, count)
857 self.log.error(msg)
858 raise JujuDestroyError(msg)
859
860
861 if self._is_service_error(service=service, status=status):
862 self._resolve_error(service, status)
863
864 try:
865 env.destroy_service(service)
866
867 except Exception as e:
868 msg = "{}: Exception when running destroy on service {}: {}". \
869 format(self, service, e)
870 self.log.error(msg)
871 self.log.exception(e)
872 raise JujuDestroyError(msg)
873
874 time.sleep(self.retry_delay)
875 status = self._get_status(env=env)
876
877 self.log.debug("{}: Destroyed service {} ({})".
878 format(self, service, count))
879
880 @asyncio.coroutine
881 def destroy_service(self, service=None):
882 '''Destroy a service on Juju controller'''
883 pf = partial(self._destroy_service,
884 service=service)
885 yield from self.loop.run_in_executor(
886 None,
887 pf,
888 )
889
890
891 def _get_action_status(self, action_tag, env=None):
892 if env is None:
893 env = self._get_env()
894
895 if not action_tag.startswith('action-'):
896 action_tag = 'action-{}'.format(action_tag)
897
898 try:
899 action = env.actions
900 except Exception as e:
901 msg = "{}: exception in Action API: {}".format(self, e)
902 self.log.error(msg)
903 self.log.exception(e)
904 raise JujuActionApiError(msg)
905
906 try:
907 status = action.info([{'Tag': action_tag}])
908
909 self.log.debug("{}: Action {} status {}".
910 format(self, action_tag, status))
911 return status['results'][0]
912
913 except Exception as e:
914 msg = "{}: exception in get action status {}".format(self, e)
915 self.log.error(msg)
916 self.log.exception(e)
917 raise JujuActionInfoError(msg)
918
919 @asyncio.coroutine
920 def get_action_status(self, action_tag, env=None):
921 '''
922 Get the status of an action queued on the controller
923
924 responds with the action status, which is one of three values:
925
926 - completed
927 - pending
928 - failed
929
930 @param action_tag - the action UUID return from the enqueue method
931 eg: action-3428e20d-fcd7-4911-803b-9b857a2e5ec9
932 '''
933 pf = partial(self._get_action_status,
934 action_tag,
935 env=env,)
936 status = yield from self.loop.run_in_executor(
937 None,
938 pf,
939 )
940 return status
941
942 def _execute_action(self, action_name, params, service=None, env=None):
943 '''Execute the action on all units of a service'''
944 if service is None:
945 service = self.service
946
947 if env is None:
948 env = self._get_env()
949
950 try:
951 action = env.actions
952 except Exception as e:
953 msg = "{}: exception in Action API: {}".format(self, e)
954 self.log.error(msg)
955 self.log.exception(e)
956 raise JujuActionApiError(msg)
957
958 units = self._get_service_units(service)
959 self.log.debug("{}: Apply action {} on units {}".
960 format(self, action_name, units))
961
962 # Rename units from <service>/<n> to unit-<service>-<n>
963 unit_tags = []
964 for unit in units:
965 idx = int(unit[unit.index('/')+1:])
966 unit_name = "unit-%s-%d" % (service, idx)
967 unit_tags.append(unit_name)
968 self.log.debug("{}: Unit tags for action: {}".
969 format(self, unit_tags))
970
971 try:
972 result = action.enqueue_units(unit_tags, action_name, params)
973 self.log.debug("{}: Response for action: {}".
974 format(self, result))
975 return result['results'][0]
976
977 except Exception as e:
978 msg = "{}: Exception enqueing action {} on units {} with " \
979 "params {}: {}".format(self, action, unit_tags, params, e)
980 self.log.error(msg)
981 self.log.exception(e)
982 raise JujuActionExecError(msg)
983
984 @asyncio.coroutine
985 def execute_action(self, action_name, params, service=None, env=None):
986 '''Execute an action for a service on the controller
987
988 Currently, we execute the action on all units of the service
989 '''
990 pf = partial(self._execute_action,
991 action_name,
992 params,
993 service=service,
994 env=env)
995 result = yield from self.loop.run_in_executor(
996 None,
997 pf,
998 )
999 return result
1000
1001
1002 if __name__ == "__main__":
1003 parser = argparse.ArgumentParser(description='Test Juju')
1004 parser.add_argument("-s", "--server", default='10.0.202.49', help="Juju controller")
1005 parser.add_argument("-u", "--user", default='admin', help="User, default user-admin")
1006 parser.add_argument("-p", "--password", default='nfvjuju', help="Password for the user")
1007 parser.add_argument("-P", "--port", default=17070, help="Port number, default 17070")
1008 parser.add_argument("-d", "--directory", help="Local directory for the charm")
1009 parser.add_argument("--service", help="Charm service name")
1010 parser.add_argument("--vnf-ip", help="IP of the VNF to configure")
1011 args = parser.parse_args()
1012
1013 api = JujuApi(server=args.server,
1014 port=args.port,
1015 user=args.user,
1016 secret=args.password)
1017
1018 env = api._get_env()
1019 if env is None:
1020 raise "Not able to login to the Juju controller"
1021
1022 print("Status: {}".format(api._get_status(env=env)))
1023
1024 if args.directory and args.service:
1025 # Deploy the charm
1026 charm = os.path.basename(args.directory)
1027 api._deploy_service(charm, args.service,
1028 path=args.directory,
1029 env=env)
1030
1031 while not api._is_service_up():
1032 time.sleep(5)
1033
1034 print ("Service {} is deployed with status {}".
1035 format(args.service, api._get_service_status()))
1036
1037 if args.vnf_ip and \
1038 ('clearwater-aio' in args.directory):
1039 # Execute config on charm
1040 api._apply_config({'proxied_ip': args.vnf_ip})
1041
1042 while not api._is_service_active():
1043 time.sleep(10)
1044
1045 print ("Service {} is in status {}".
1046 format(args.service, api._get_service_status()))
1047
1048 res = api._execute_action('create-update-user', {'number': '125252352525',
1049 'password': 'asfsaf'})
1050
1051 print ("Action 'creat-update-user response: {}".format(res))
1052
1053 status = res['status']
1054 while status not in [ 'completed', 'failed' ]:
1055 time.sleep(2)
1056 status = api._get_action_status(res['action']['tag'])['status']
1057
1058 print("Action status: {}".format(status))
1059
1060 # This action will fail as the number is non-numeric
1061 res = api._execute_action('delete-user', {'number': '125252352525asf'})
1062
1063 print ("Action 'delete-user response: {}".format(res))
1064
1065 status = res['status']
1066 while status not in [ 'completed', 'failed' ]:
1067 time.sleep(2)
1068 status = api._get_action_status(res['action']['tag'])['status']
1069
1070 print("Action status: {}".format(status))