RIFT OSM R1 Initial Submission
[osm/SO.git] / common / python / rift / mano / utils / juju_api.py
1 ############################################################################
2 # Copyright 2016 RIFT.io Inc #
3 # #
4 # Licensed under the Apache License, Version 2.0 (the "License"); #
5 # you may not use this file except in compliance with the License. #
6 # You may obtain a copy of the License at #
7 # #
8 # http://www.apache.org/licenses/LICENSE-2.0 #
9 # #
10 # Unless required by applicable law or agreed to in writing, software #
11 # distributed under the License is distributed on an "AS IS" BASIS, #
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
13 # See the License for the specific language governing permissions and #
14 # limitations under the License. #
15 ############################################################################
16
17 import argparse
18 import asyncio
19 from functools import partial
20 import logging
21 import os
22 import ssl
23 import sys
24 import time
25
26 try:
27 from jujuclient.juju1.environment import Environment as Env1
28 from jujuclient.juju2.environment import Environment as Env2
29 except ImportError as e:
30 # Try importing older jujuclient
31 from jujuclient import Environment as Env1
32
33 try:
34 ssl._create_default_https_context = ssl._create_unverified_context
35 except AttributeError:
36 # Legacy Python doesn't verify by default (see pep-0476)
37 # https://www.python.org/dev/peps/pep-0476/
38 pass
39
40
41 class JujuVersionError(Exception):
42 pass
43
44
45 class JujuApiError(Exception):
46 pass
47
48
49 class JujuEnvError(JujuApiError):
50 pass
51
52
53 class JujuModelError(JujuApiError):
54 pass
55
56
57 class JujuStatusError(JujuApiError):
58 pass
59
60
61 class JujuUnitsError(JujuApiError):
62 pass
63
64
65 class JujuWaitUnitsError(JujuApiError):
66 pass
67
68
69 class JujuSrvNotDeployedError(JujuApiError):
70 pass
71
72
73 class JujuAddCharmError(JujuApiError):
74 pass
75
76
77 class JujuDeployError(JujuApiError):
78 pass
79
80
81 class JujuDestroyError(JujuApiError):
82 pass
83
84
85 class JujuResolveError(JujuApiError):
86 pass
87
88
89 class JujuActionError(JujuApiError):
90 pass
91
92
93 class JujuActionApiError(JujuActionError):
94 pass
95
96
97 class JujuActionInfoError(JujuActionError):
98 pass
99
100
101 class JujuActionExecError(JujuActionError):
102 pass
103
104
105 class JujuApi(object):
106 '''
107 JujuApi wrapper on jujuclient library
108
109 There should be one instance of JujuApi for each VNF manged by Juju.
110
111 Assumption:
112 Currently we use one unit per service/VNF. So once a service
113 is deployed, we store the unit name and reuse it
114 '''
115 log = None
116
117 def __init__ (self,
118 log=None,
119 loop=None,
120 server='127.0.0.1',
121 port=17070,
122 user='admin',
123 secret=None,
124 version=None):
125 '''Initialize with the Juju credentials'''
126 self.server = server
127 self.port = port
128
129 self.secret = secret
130 if user.startswith('user-'):
131 self.user = user
132 else:
133 self.user = 'user-{}'.format(user)
134
135 self.loop = loop
136
137 if log is not None:
138 self.log = log
139 else:
140 self.log = JujuApi._get_logger()
141
142 if self.log is None:
143 raise JujuApiError("Logger not defined")
144
145 self.version = None
146 if version:
147 self.version = version
148 else:
149 try:
150 if Env2:
151 pass
152 except NameError:
153 self.log.warn("Using older version of Juju client, which " \
154 "supports only Juju 1.x")
155 self.version = 1
156
157 endpoint = 'wss://%s:%d' % (server, int(port))
158 self.endpoint = endpoint
159
160 self.charm = None # Charm used
161 self.service = None # Service deployed
162 self.units = [] # Storing as list to support more units in future
163
164 self.destroy_retries = 25 # Number retires to destroy service
165 self.retry_delay = 5 # seconds
166
167 def __str__(self):
168 return ("JujuApi-{}".format(self.endpoint))
169
170 @classmethod
171 def _get_logger(cls):
172 if cls.log is not None:
173 return cls.log
174
175 fmt = logging.Formatter(
176 '%(asctime)-23s %(levelname)-5s (%(name)s@%(process)d:' \
177 '%(filename)s:%(lineno)d) - %(message)s')
178 stderr_handler = logging.StreamHandler(stream=sys.stderr)
179 stderr_handler.setFormatter(fmt)
180 logging.basicConfig(level=logging.DEBUG)
181 cls.log = logging.getLogger('juju-api')
182 cls.log.addHandler(stderr_handler)
183
184 return cls.log
185
186 @staticmethod
187 def format_charm_name(name):
188 '''Format the name to valid charm name
189
190 Charm service name accepts only a to z and -.
191 '''
192
193 new_name = ''
194 for c in name:
195 if c.isdigit():
196 c = chr(97 + int(c))
197 elif not c.isalpha():
198 c = "-"
199 new_name += c
200 return new_name.lower()
201
202 def _get_version_tag(self, tag):
203 version_tag_map = {
204 'applications': {
205 1: 'Services',
206 2: 'applications',
207 },
208 'units': {
209 1: 'Units',
210 2: 'units',
211 },
212 'status': {
213 1: 'Status',
214 2: 'status',
215 },
216 'workload-status': {
217 1: 'Workload',
218 2: 'workload-status',
219 },
220 'charm-url': {
221 1: 'CharmURL',
222 2: 'charm-url',
223 },
224 }
225
226 return version_tag_map[tag][self.version]
227
228 def _get_env1(self):
229 try:
230 env = Env1(self.endpoint)
231 l = env.login(self.secret, user=self.user)
232 return env
233
234 except ConnectionRefusedError as e:
235 msg = "{}: Failed Juju 1.x connect: {}".format(self, e)
236 self.log.error(msg)
237 self.log.exception(e)
238 raise e
239
240 except Exception as e:
241 msg = "{}: Failed Juju 1.x connect: {}".format(self, e)
242 self.log.error(msg)
243 self.log.exception(e)
244 raise JujuEnvError(msg)
245
246 def _get_env2(self):
247 try:
248 env = Env2(self.endpoint)
249 l = env.login(self.secret, user=self.user)
250 except KeyError as e:
251 msg = "{}: Failed Juju 2.x connect: {}".format(self, e)
252 self.log.debug(msg)
253 raise JujuVersionError(msg)
254
255 try:
256 models = env.models.list()
257 for m in models['user-models']:
258 if m['model']['name'] == 'default':
259 mep = '{}/model/{}/api'.format(endpoint, m['model']['uuid'])
260 model = Env2(mep, env_uuid=m['model']['uuid'])
261 l = model.login(args.password, user=args.user)
262 break
263
264 if model is None:
265 raise
266
267 return model
268
269 except Exception as e:
270 msg = "{}: Failed logging to model: {}".format(self, e)
271 self.log.error(msg)
272 self.log.exception(e)
273 env.close()
274 raise JujuModelError(msg)
275
276 def _get_env(self):
277 self.log.debug("{}: Connect to endpoint {}".
278 format(self, self.endpoint))
279
280 if self.version is None:
281 # Try version 2 first
282 try:
283 env = self._get_env2()
284 self.version = 2
285
286 except JujuVersionError as e:
287 self.log.info("Unable to login as Juju 2.x, trying 1.x")
288 env = self._get_env1()
289 self.version = 1
290
291 return env
292
293 elif self.version == 2:
294 return self._get_env2()
295
296 elif self.version == 1:
297 return self._get_env1()
298
299 else:
300 msg = "{}: Unknown version set: {}".format(self, self.version)
301 self.log.error(msg)
302 raise JujuVersionError(msg)
303
304 @asyncio.coroutine
305 def get_env(self):
306 ''' Connect to the Juju controller'''
307 env = yield from self.loop.run_in_executor(
308 None,
309 self._get_env,
310 )
311 return env
312
313 def _get_status(self, env=None):
314 if env is None:
315 env = self._get_env()
316
317 try:
318 status = env.status()
319 return status
320
321 except Exception as e:
322 msg = "{}: exception in getting status: {}". \
323 format(self, e)
324 self.log.error(msg)
325 self.log.exception(e)
326 raise JujuStatusError(msg)
327
328 @asyncio.coroutine
329 def get_status(self, env=None):
330 '''Get Juju controller status'''
331 pf = partial(self._get_status, env=env)
332 status = yield from self.loop.run_in_executor(
333 None,
334 pf,
335 )
336 return status
337
338 def get_all_units(self, status, service=None):
339 '''Parse the status and get the units'''
340 results = {}
341 services = status.get(self._get_version_tag('applications'), {})
342
343 for svc_name, svc_data in services.items():
344 if service and service != svc_name:
345 continue
346 units = svc_data[self._get_version_tag('units')] or {}
347
348 results[svc_name] = {}
349 for unit in units:
350 results[svc_name][unit] = \
351 units[unit][self._get_version_tag('workload-status')] \
352 [self._get_version_tag('status')] or None
353 return results
354
355
356 def _get_service_units(self, service=None, status=None, env=None):
357 if service is None:
358 service = self.service
359
360 # Optimizing calls to Juju, as currently we deploy only 1 unit per
361 # service.
362 if self.service == service and len(self.units):
363 return self.units
364
365 if env is None:
366 env = self._get_env()
367
368 if status is None:
369 status = self._get_status(env=env)
370
371 try:
372 resp = self.get_all_units(status, service=service)
373 self.log.debug("Get all units: {}".format(resp))
374 units = set(resp[service].keys())
375
376 if self.service == service:
377 self.units = units
378
379 return units
380
381 except Exception as e:
382 msg = "{}: exception in get units {}".format(self, e)
383 self.log.error(msg)
384 self.log.exception(e)
385 raise JujuUnitsError(msg)
386
387 @asyncio.coroutine
388 def get_service_units(self, service=None, status=None, env=None):
389 '''Get the unit names for a service'''
390 pf = partial(self._get_service_units,
391 service=service,
392 status=status,
393 env=env)
394 units = yield from self.loop.run_in_executor(
395 None,
396 pf,
397 )
398 return units
399
400 def _get_service_status(self, service=None, status=None, env=None):
401 if env is None:
402 env = self._get_env()
403
404 if status is None:
405 status = self._get_status(env=env)
406
407 if service is None:
408 service = self.service
409
410 try:
411 srv_status = status[self._get_version_tag('applications')] \
412 [service][self._get_version_tag('status')] \
413 [self._get_version_tag('status')]
414 self.log.debug("{}: Service {} status is {}".
415 format(self, service, srv_status))
416 return srv_status
417
418 except KeyError as e:
419 self.log.info("self: Did not find service {}, e={}".format(self, service, e))
420 return 'NA'
421
422 except Exception as e:
423 msg = "{}: exception checking service status for {}, e {}". \
424 format(self, service, e)
425 self.log.error(msg)
426 self.log.exception(e)
427 raise JujuStatusError(msg)
428
429
430 @asyncio.coroutine
431 def get_service_status(self, service=None, status=None, env=None):
432 ''' Get service status
433
434 maintenance : The unit is not yet providing services, but is actively doing stuff.
435 unknown : Service has finished an event but the charm has not called status-set yet.
436 waiting : Service is unable to progress to an active state because of dependency.
437 blocked : Service needs manual intervention to get back to the Running state.
438 active : Service correctly offering all the services.
439 NA : Service is not deployed
440 '''
441 pf = partial(self._get_service_status,
442 service=service,
443 status=status,
444 env=env)
445 srv_status = yield from self.loop.run_in_executor(
446 None,
447 pf,
448 )
449 return srv_status
450
451 def _is_service_deployed(self, service=None, status=None, env=None):
452 resp = self._get_service_status(service=service,
453 status=status,
454 env=env)
455
456 if resp not in ['terminated', 'NA']:
457 return True
458
459 return False
460
461 @asyncio.coroutine
462 def is_service_deployed(self, service=None, status=None, env=None):
463 '''Check if the service is deployed'''
464 pf = partial(self._is_service_deployed,
465 service=service,
466 status=status,
467 env=env)
468 rc = yield from self.loop.run_in_executor(
469 None,
470 pf,
471 )
472 return rc
473
474 def _is_service_error(self, service=None, status=None, env=None):
475 resp = self._get_service_status(service=service,
476 status=status,
477 env=env)
478
479 if resp in ['error']:
480 return True
481
482 return False
483
484 @asyncio.coroutine
485 def is_service_error(self, service=None, status=None, env=None):
486 '''Check if the service is in error state'''
487 pf = partial(self._is_service_error,
488 service=service,
489 status=status,
490 env=env)
491 rc = yield from self.loop.run_in_executor(
492 None,
493 pf,
494 )
495 return rc
496
497 def _is_service_maint(self, service=None, status=None, env=None):
498 resp = self._get_service_status(service=service,
499 status=status,
500 env=env)
501
502 if resp in ['maintenance']:
503 return True
504
505 return False
506
507 @asyncio.coroutine
508 def is_service_maint(self, service=None, status=None, env=None):
509 '''Check if the service is in error state'''
510 pf = partial(self._is_service_maint,
511 service=service,
512 status=status,
513 env=env)
514 rc = yield from self.loop.run_in_executor(
515 None,
516 pf,
517 )
518 return rc
519
520 def _is_service_active(self, service=None, status=None, env=None):
521 resp = self._get_service_status(service=service,
522 status=status,
523 env=env)
524
525 if resp in ['active']:
526 return True
527
528 return False
529
530 @asyncio.coroutine
531 def is_service_active(self, service=None, status=None, env=None):
532 '''Check if the service is active'''
533 pf = partial(self._is_service_active,
534 service=service,
535 status=status,
536 env=env)
537 rc = yield from self.loop.run_in_executor(
538 None,
539 pf,
540 )
541 return rc
542
543 def _is_service_blocked(self, service=None, status=None, env=None):
544 resp = self._get_service_status(service=service,
545 status=status,
546 env=env)
547
548 if resp in ['blocked']:
549 return True
550
551 return False
552
553 @asyncio.coroutine
554 def is_service_blocked(self, service=None, status=None, env=None):
555 '''Check if the service is blocked'''
556 pf = partial(self._is_service_blocked,
557 service=service,
558 status=status,
559 env=env)
560 rc = yield from self.loop.run_in_executor(
561 None,
562 pf,
563 )
564 return rc
565
566 def _is_service_up(self, service=None, status=None, env=None):
567 resp = self._get_service_status(service=service,
568 status=status,
569 env=env)
570
571 if resp in ['active', 'blocked']:
572 return True
573
574 return False
575
576 @asyncio.coroutine
577 def is_service_up(self, service=None, status=None, env=None):
578 '''Check if the service is installed and up'''
579 pf = partial(self._is_service_up,
580 service=service,
581 status=status,
582 env=env)
583
584 rc = yield from self.loop.run_in_executor(
585 None,
586 pf,
587 )
588 return rc
589
590 def _apply_config(self, config, service=None, env=None):
591 if service is None:
592 service = self.service
593
594 if config is None or len(config) == 0:
595 self.log.warn("{}: Empty config passed for service {}".
596 format(self, service))
597 return
598
599 if env is None:
600 env = self._get_env()
601
602 status = self._get_status(env=env)
603
604 if not self._is_service_deployed(service=service,
605 status=status,
606 env=env):
607 raise JujuSrvNotDeployedError("{}: service {} is not deployed".
608 format(self, service))
609
610 self.log.debug("{}: Config for service {} update to: {}".
611 format(self, service, config))
612 try:
613 # Try to fix error on service, most probably due to config issue
614 if self._is_service_error(service=service, status=status, env=env):
615 self._resolve_error(service=service, env=env)
616
617 if self.version == 2:
618 env.service.set(service, config)
619 else:
620 env.set_config(service, config)
621
622 except Exception as e:
623 self.log.error("{}: exception setting config for {} with {}, e {}".
624 format(self, service, config, e))
625 self.log.exception(e)
626 raise e
627
628 @asyncio.coroutine
629 def apply_config(self, config, service=None, env=None, wait=True):
630 '''Apply a config on the service'''
631 pf = partial(self._apply_config,
632 config,
633 service=service,
634 env=env)
635 yield from self.loop.run_in_executor(
636 None,
637 pf,
638 )
639
640 if wait:
641 # Wait till config finished applying
642 self.log.debug("{}: Wait for config apply to finish".
643 format(self))
644 delay = 3 # secs
645 maint = True
646 while maint:
647 # Sleep first to give time for config_changed hook to be invoked
648 yield from asyncio.sleep(delay, loop=self.loop)
649 maint = yield from self.is_service_maint(service=service,
650 env=env)
651
652 err = yield from self.is_service_error(service=service, env=env)
653 if err:
654 self.log.error("{}: Service is in error state".
655 format(self))
656 return False
657
658 self.log.debug("{}: Finished applying config".format(self))
659 return True
660
661 def _set_parameter(self, parameter, value, service=None):
662 return self._apply_config({parameter : value}, service=service)
663
664 @asyncio.coroutine
665 def set_parameter(self, parameter, value, service=None):
666 '''Set a config parameter for a service'''
667 return self.apply_config({parameter : value}, service=service)
668
669 def _resolve_error(self, service=None, status=None, env=None):
670 if env is None:
671 env = self._get_env()
672
673 if status is None:
674 status = self._get_status(env=env)
675
676 if service is None:
677 service = self.service
678
679 if env is None:
680 env = self._get_env()
681 if self._is_service_deployed(service=service, status=status):
682 units = self.get_all_units(status, service=service)
683
684 for unit, ustatus in units[service].items():
685 if ustatus == 'error':
686 self.log.info("{}: Found unit {} with status {}".
687 format(self, unit, ustatus))
688 try:
689 # Takes the unit name as service_name/idx unlike action
690 env.resolved(unit)
691
692 except Exception as e:
693 msg = "{}: Resolve on unit {}: {}". \
694 format(self, unit, e)
695 self.log.error(msg)
696 self.log.exception(e)
697 raise JujuResolveError(msg)
698
699 @asyncio.coroutine
700 def resolve_error(self, service=None, status=None, env=None):
701 '''Resolve units in error state'''
702 pf = partial(self._resolve_error,
703 service=service,
704 status=status,
705 env=env)
706 yield from self.loop.run_in_executor(
707 None,
708 pf,
709 )
710
711 def _deploy_service(self, charm, service,
712 path=None, config=None, env=None):
713 self.log.debug("{}: Deploy service for charm {}({}) with service {}".
714 format(self, charm, path, service))
715
716 if env is None:
717 env = self._get_env()
718
719 self.service = service
720 self.charm = charm
721
722 if self._is_service_deployed(service=service, env=env):
723 self.log.info("{}: Charm service {} already deployed".
724 format (self, service))
725 if config:
726 self._apply_config(config, service=service, env=env)
727 return
728
729 series = "trusty"
730
731 deploy_to = None
732 if self.version == 1:
733 deploy_to = "lxc:0"
734
735 if path is None:
736 prefix=os.getenv('RIFT_INSTALL', '/')
737 path = os.path.join(prefix, 'usr/rift/charms', series, charm)
738
739 try:
740 self.log.debug("{}: Local charm settings: dir={}, series={}".
741 format(self, path, series))
742 result = env.add_local_charm_dir(path, series)
743 url = result[self._get_version_tag('charm-url')]
744
745 except Exception as e:
746 msg = '{}: Error setting local charm directory {} for {}: {}'. \
747 format(self, path, service, e)
748 self.log.error(msg)
749 self.log.exception(e)
750 raise JujuAddCharmError(msg)
751
752 try:
753 self.log.debug("{}: Deploying using: service={}, url={}, to={}, config={}".
754 format(self, service, url, deploy_to, config))
755 env.deploy(service, url, config=config, machine_spec=deploy_to)
756
757 except Exception as e:
758 msg = '{}: Error deploying {}: {}'.format(self, service, e)
759 self.log.error(msg)
760 self.log.exception(e)
761 raise JujuDeployError(msg)
762
763 @asyncio.coroutine
764 def deploy_service(self, charm, service,
765 wait=False, timeout=300,
766 path=None, config=None):
767 '''Deploy a service using the charm name provided'''
768 env = yield from self.get_env()
769
770 pf = partial(self._deploy_service,
771 charm,
772 service,
773 path=path,
774 config=config,
775 env=env)
776 yield from self.loop.run_in_executor(
777 None,
778 pf,
779 )
780
781 rc = True
782 if wait is True:
783 # Wait for the deployed units to start
784 try:
785 self.log.debug("{}: Waiting for service {} to come up".
786 format(self, service))
787 rc = yield from self.wait_for_service(timeout=timeout, env=env)
788
789 except Exception as e:
790 msg = '{}: Error starting all units for {}: {}'. \
791 format(self, service, e)
792 self.log.error(msg)
793 self.log.exception(e)
794 raise JujuWaitUnitsError(msg)
795
796 return rc
797
798 @asyncio.coroutine
799 def wait_for_service(self, service=None, timeout=0, env=None):
800 '''Wait for the service to come up'''
801 if service is None:
802 service = self.service
803
804 if env is None:
805 env = yield from self.get_env()
806
807 status = yield from self.get_status(env=env)
808
809 if self._is_service_up(service=service, status=status, env=env):
810 self.log.debug("{}: Service {} is already up".
811 format(self, service))
812 return True
813
814 # Check if service is deployed
815 if not self._is_service_deployed(service=service, status=status, env=env):
816 raise JujuSrvNotDeployedError("{}: service {} is not deployed".
817 format(self, service))
818
819 if timeout < 0:
820 timeout = 0
821
822 count = 0
823 delay = self.retry_delay # seconds
824 self.log.debug("{}: In wait for service {}".format(self, service))
825
826 start_time = time.time()
827 max_time = time.time() + timeout
828 while timeout != 0 and (time.time() <= max_time):
829 count += 1
830 rc = yield from self.is_service_up(service=service, env=env)
831 if rc:
832 self.log.debug("{}: Service {} is up after {} seconds".
833 format(self, service, time.time()-start_time))
834 return True
835 yield from asyncio.sleep(delay, loop=self.loop)
836 return False
837
838 def _destroy_service(self, service=None):
839 '''Destroy a service on Juju controller'''
840 self.log.debug("{}: Destroy charm service: {}".format(self,service))
841
842 if service is None:
843 service = self.service
844
845 env = self._get_env()
846
847 status = self._get_status(env=env)
848
849 count = 0
850 while self._is_service_deployed(service=service, status=status, env=env):
851 count += 1
852 self.log.debug("{}: Destroy service {}, count {}".
853 format(self, service, count))
854
855 if count > self.destroy_retries:
856 msg = "{}: Not able to destroy service {} after {} tries". \
857 format(self, service, count)
858 self.log.error(msg)
859 raise JujuDestroyError(msg)
860
861
862 if self._is_service_error(service=service, status=status):
863 self._resolve_error(service, status)
864
865 try:
866 env.destroy_service(service)
867
868 except Exception as e:
869 msg = "{}: Exception when running destroy on service {}: {}". \
870 format(self, service, e)
871 self.log.error(msg)
872 self.log.exception(e)
873 raise JujuDestroyError(msg)
874
875 time.sleep(self.retry_delay)
876 status = self._get_status(env=env)
877
878 self.log.debug("{}: Destroyed service {} ({})".
879 format(self, service, count))
880
881 @asyncio.coroutine
882 def destroy_service(self, service=None):
883 '''Destroy a service on Juju controller'''
884 pf = partial(self._destroy_service,
885 service=service)
886 yield from self.loop.run_in_executor(
887 None,
888 pf,
889 )
890
891
892 def _get_action_status(self, action_tag, env=None):
893 if env is None:
894 env = self._get_env()
895
896 if not action_tag.startswith('action-'):
897 action_tag = 'action-{}'.format(action_tag)
898
899 try:
900 action = env.actions
901 except Exception as e:
902 msg = "{}: exception in Action API: {}".format(self, e)
903 self.log.error(msg)
904 self.log.exception(e)
905 raise JujuActionApiError(msg)
906
907 try:
908 status = action.info([{'Tag': action_tag}])
909
910 self.log.debug("{}: Action {} status {}".
911 format(self, action_tag, status))
912 return status['results'][0]
913
914 except Exception as e:
915 msg = "{}: exception in get action status {}".format(self, e)
916 self.log.error(msg)
917 self.log.exception(e)
918 raise JujuActionInfoError(msg)
919
920 @asyncio.coroutine
921 def get_action_status(self, action_tag, env=None):
922 '''
923 Get the status of an action queued on the controller
924
925 responds with the action status, which is one of three values:
926
927 - completed
928 - pending
929 - failed
930
931 @param action_tag - the action UUID return from the enqueue method
932 eg: action-3428e20d-fcd7-4911-803b-9b857a2e5ec9
933 '''
934 pf = partial(self._get_action_status,
935 action_tag,
936 env=env,)
937 status = yield from self.loop.run_in_executor(
938 None,
939 pf,
940 )
941 return status
942
943 def _execute_action(self, action_name, params, service=None, env=None):
944 '''Execute the action on all units of a service'''
945 if service is None:
946 service = self.service
947
948 if env is None:
949 env = self._get_env()
950
951 try:
952 action = env.actions
953 except Exception as e:
954 msg = "{}: exception in Action API: {}".format(self, e)
955 self.log.error(msg)
956 self.log.exception(e)
957 raise JujuActionApiError(msg)
958
959 units = self._get_service_units(service)
960 self.log.debug("{}: Apply action {} on units {}".
961 format(self, action_name, units))
962
963 # Rename units from <service>/<n> to unit-<service>-<n>
964 unit_tags = []
965 for unit in units:
966 idx = int(unit[unit.index('/')+1:])
967 unit_name = "unit-%s-%d" % (service, idx)
968 unit_tags.append(unit_name)
969 self.log.debug("{}: Unit tags for action: {}".
970 format(self, unit_tags))
971
972 try:
973 result = action.enqueue_units(unit_tags, action_name, params)
974 self.log.debug("{}: Response for action: {}".
975 format(self, result))
976 return result['results'][0]
977
978 except Exception as e:
979 msg = "{}: Exception enqueing action {} on units {} with " \
980 "params {}: {}".format(self, action, unit_tags, params, e)
981 self.log.error(msg)
982 self.log.exception(e)
983 raise JujuActionExecError(msg)
984
985 @asyncio.coroutine
986 def execute_action(self, action_name, params, service=None, env=None):
987 '''Execute an action for a service on the controller
988
989 Currently, we execute the action on all units of the service
990 '''
991 pf = partial(self._execute_action,
992 action_name,
993 params,
994 service=service,
995 env=env)
996 result = yield from self.loop.run_in_executor(
997 None,
998 pf,
999 )
1000 return result
1001
1002
1003 if __name__ == "__main__":
1004 parser = argparse.ArgumentParser(description='Test Juju')
1005 parser.add_argument("-s", "--server", default='10.0.202.49', help="Juju controller")
1006 parser.add_argument("-u", "--user", default='admin', help="User, default user-admin")
1007 parser.add_argument("-p", "--password", default='nfvjuju', help="Password for the user")
1008 parser.add_argument("-P", "--port", default=17070, help="Port number, default 17070")
1009 parser.add_argument("-d", "--directory", help="Local directory for the charm")
1010 parser.add_argument("--service", help="Charm service name")
1011 parser.add_argument("--vnf-ip", help="IP of the VNF to configure")
1012 args = parser.parse_args()
1013
1014 api = JujuApi(server=args.server,
1015 port=args.port,
1016 user=args.user,
1017 secret=args.password)
1018
1019 env = api._get_env()
1020 if env is None:
1021 raise "Not able to login to the Juju controller"
1022
1023 print("Status: {}".format(api._get_status(env=env)))
1024
1025 if args.directory and args.service:
1026 # Deploy the charm
1027 charm = os.path.basename(args.directory)
1028 api._deploy_service(charm, args.service,
1029 path=args.directory,
1030 env=env)
1031
1032 while not api._is_service_up():
1033 time.sleep(5)
1034
1035 print ("Service {} is deployed with status {}".
1036 format(args.service, api._get_service_status()))
1037
1038 if args.vnf_ip and \
1039 ('clearwater-aio' in args.directory):
1040 # Execute config on charm
1041 api._apply_config({'proxied_ip': args.vnf_ip})
1042
1043 while not api._is_service_active():
1044 time.sleep(10)
1045
1046 print ("Service {} is in status {}".
1047 format(args.service, api._get_service_status()))
1048
1049 res = api._execute_action('create-update-user', {'number': '125252352525',
1050 'password': 'asfsaf'})
1051
1052 print ("Action 'creat-update-user response: {}".format(res))
1053
1054 status = res['status']
1055 while status not in [ 'completed', 'failed' ]:
1056 time.sleep(2)
1057 status = api._get_action_status(res['action']['tag'])['status']
1058
1059 print("Action status: {}".format(status))
1060
1061 # This action will fail as the number is non-numeric
1062 res = api._execute_action('delete-user', {'number': '125252352525asf'})
1063
1064 print ("Action 'delete-user response: {}".format(res))
1065
1066 status = res['status']
1067 while status not in [ 'completed', 'failed' ]:
1068 time.sleep(2)
1069 status = api._get_action_status(res['action']['tag'])['status']
1070
1071 print("Action status: {}".format(status))