Enhancements K8s helm connector
[osm/N2VC.git] / n2vc / vnf.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 import os
18 import os.path
19 import re
20 import shlex
21 import ssl
22 import subprocess
23 import sys
24 # import time
25 from n2vc.provisioner import SSHProvisioner
26
27 # FIXME: this should load the juju inside or modules without having to
28 # explicitly install it. Check why it's not working.
29 # Load our subtree of the juju library
30 path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
31 path = os.path.join(path, "modules/libjuju/")
32 if path not in sys.path:
33 sys.path.insert(1, path)
34
35 from juju.client import client
36 from juju.controller import Controller
37 from juju.model import ModelObserver
38 from juju.errors import JujuAPIError, JujuError
39
40
41 # We might need this to connect to the websocket securely, but test and verify.
42 try:
43 ssl._create_default_https_context = ssl._create_unverified_context
44 except AttributeError:
45 # Legacy Python doesn't verify by default (see pep-0476)
46 # https://www.python.org/dev/peps/pep-0476/
47 pass
48
49
50 # Custom exceptions
51 # Deprecated. Please use n2vc.exceptions namespace.
52 class JujuCharmNotFound(Exception):
53 """The Charm can't be found or is not readable."""
54
55
56 class JujuApplicationExists(Exception):
57 """The Application already exists."""
58
59
60 class N2VCPrimitiveExecutionFailed(Exception):
61 """Something failed while attempting to execute a primitive."""
62
63
64 class NetworkServiceDoesNotExist(Exception):
65 """The Network Service being acted against does not exist."""
66
67
68 class PrimitiveDoesNotExist(Exception):
69 """The Primitive being executed does not exist."""
70
71
72 # Quiet the debug logging
73 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
74 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
75 logging.getLogger('juju.model').setLevel(logging.WARN)
76 logging.getLogger('juju.machine').setLevel(logging.WARN)
77
78
79 class VCAMonitor(ModelObserver):
80 """Monitor state changes within the Juju Model."""
81 log = None
82
83 def __init__(self, ns_name):
84 self.log = logging.getLogger(__name__)
85
86 self.ns_name = ns_name
87 self.applications = {}
88
89 def AddApplication(self, application_name, callback, *callback_args):
90 if application_name not in self.applications:
91 self.applications[application_name] = {
92 'callback': callback,
93 'callback_args': callback_args
94 }
95
96 def RemoveApplication(self, application_name):
97 if application_name in self.applications:
98 del self.applications[application_name]
99
100 async def on_change(self, delta, old, new, model):
101 """React to changes in the Juju model."""
102
103 if delta.entity == "unit":
104 # Ignore change events from other applications
105 if delta.data['application'] not in self.applications.keys():
106 return
107
108 try:
109
110 application_name = delta.data['application']
111
112 callback = self.applications[application_name]['callback']
113 callback_args = \
114 self.applications[application_name]['callback_args']
115
116 if old and new:
117 # Fire off a callback with the application state
118 if callback:
119 callback(
120 self.ns_name,
121 delta.data['application'],
122 new.workload_status,
123 new.workload_status_message,
124 *callback_args)
125
126 if old and not new:
127 # This is a charm being removed
128 if callback:
129 callback(
130 self.ns_name,
131 delta.data['application'],
132 "removed",
133 "",
134 *callback_args)
135 except Exception as e:
136 self.log.debug("[1] notify_callback exception: {}".format(e))
137
138 elif delta.entity == "action":
139 # TODO: Decide how we want to notify the user of actions
140
141 # uuid = delta.data['id'] # The Action's unique id
142 # msg = delta.data['message'] # The output of the action
143 #
144 # if delta.data['status'] == "pending":
145 # # The action is queued
146 # pass
147 # elif delta.data['status'] == "completed""
148 # # The action was successful
149 # pass
150 # elif delta.data['status'] == "failed":
151 # # The action failed.
152 # pass
153
154 pass
155
156 ########
157 # TODO
158 #
159 # Create unique models per network service
160 # Document all public functions
161
162
163 class N2VC:
164 def __init__(self,
165 log=None,
166 server='127.0.0.1',
167 port=17070,
168 user='admin',
169 secret=None,
170 artifacts=None,
171 loop=None,
172 juju_public_key=None,
173 ca_cert=None,
174 api_proxy=None
175 ):
176 """Initialize N2VC
177
178 Initializes the N2VC object, allowing the caller to interoperate with the VCA.
179
180
181 :param log obj: The logging object to log to
182 :param server str: The IP Address or Hostname of the Juju controller
183 :param port int: The port of the Juju Controller
184 :param user str: The Juju username to authenticate with
185 :param secret str: The Juju password to authenticate with
186 :param artifacts str: The directory where charms required by a vnfd are
187 stored.
188 :param loop obj: The loop to use.
189 :param juju_public_key str: The contents of the Juju public SSH key
190 :param ca_cert str: The CA certificate to use to authenticate
191 :param api_proxy str: The IP of the host machine
192
193 :Example:
194 client = n2vc.vnf.N2VC(
195 log=log,
196 server='10.1.1.28',
197 port=17070,
198 user='admin',
199 secret='admin',
200 artifacts='/app/storage/myvnf/charms',
201 loop=loop,
202 juju_public_key='<contents of the juju public key>',
203 ca_cert='<contents of CA certificate>',
204 api_proxy='192.168.1.155'
205 )
206 """
207
208 # Initialize instance-level variables
209 self.api = None
210 self.log = None
211 self.controller = None
212 self.connecting = False
213 self.authenticated = False
214 self.api_proxy = api_proxy
215
216 # For debugging
217 self.refcount = {
218 'controller': 0,
219 'model': 0,
220 }
221
222 self.models = {}
223
224 # Model Observers
225 self.monitors = {}
226
227 # VCA config
228 self.hostname = ""
229 self.port = 17070
230 self.username = ""
231 self.secret = ""
232
233 self.juju_public_key = juju_public_key
234 if juju_public_key:
235 self._create_juju_public_key(juju_public_key)
236
237 # TODO: Verify ca_cert is valid before using. VCA will crash
238 # if the ca_cert isn't formatted correctly.
239 # self.ca_cert = ca_cert
240 self.ca_cert = None
241
242 if log:
243 self.log = log
244 else:
245 self.log = logging.getLogger(__name__)
246
247 # Quiet websocket traffic
248 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
249 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
250 logging.getLogger('model').setLevel(logging.WARN)
251 # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG)
252
253 self.log.debug('JujuApi: instantiated')
254
255 self.server = server
256 self.port = port
257
258 self.secret = secret
259 if user.startswith('user-'):
260 self.user = user
261 else:
262 self.user = 'user-{}'.format(user)
263
264 self.endpoint = '%s:%d' % (server, int(port))
265
266 self.artifacts = artifacts
267
268 self.loop = loop or asyncio.get_event_loop()
269
270 def __del__(self):
271 """Close any open connections."""
272 yield self.logout()
273
274 def _create_juju_public_key(self, public_key):
275 """Recreate the Juju public key on disk.
276
277 Certain libjuju commands expect to be run from the same machine as Juju
278 is bootstrapped to. This method will write the public key to disk in
279 that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
280 """
281 # Make sure that we have a public key before writing to disk
282 if public_key is None or len(public_key) == 0:
283 if 'OSM_VCA_PUBKEY' in os.environ:
284 public_key = os.getenv('OSM_VCA_PUBKEY', '')
285 if len(public_key == 0):
286 return
287 else:
288 return
289
290 path = "{}/.local/share/juju/ssh".format(
291 os.path.expanduser('~'),
292 )
293 if not os.path.exists(path):
294 os.makedirs(path)
295
296 with open('{}/juju_id_rsa.pub'.format(path), 'w') as f:
297 f.write(public_key)
298
299 def notify_callback(self, model_name, application_name, status, message,
300 callback=None, *callback_args):
301 try:
302 if callback:
303 callback(
304 model_name,
305 application_name,
306 status, message,
307 *callback_args,
308 )
309 except Exception as e:
310 self.log.error("[0] notify_callback exception {}".format(e))
311 raise e
312 return True
313
314 # Public methods
315 async def Relate(self, model_name, vnfd):
316 """Create a relation between the charm-enabled VDUs in a VNF.
317
318 The Relation mapping has two parts: the id of the vdu owning the endpoint, and the name of the endpoint.
319
320 vdu:
321 ...
322 vca-relationships:
323 relation:
324 - provides: dataVM:db
325 requires: mgmtVM:app
326
327 This tells N2VC that the charm referred to by the dataVM vdu offers a relation named 'db', and the mgmtVM vdu has an 'app' endpoint that should be connected to a database.
328
329 :param str ns_name: The name of the network service.
330 :param dict vnfd: The parsed yaml VNF descriptor.
331 """
332
333 # Currently, the call to Relate() is made automatically after the
334 # deployment of each charm; if the relation depends on a charm that
335 # hasn't been deployed yet, the call will fail silently. This will
336 # prevent an API breakage, with the intent of making this an explicitly
337 # required call in a more object-oriented refactor of the N2VC API.
338
339 configs = []
340 vnf_config = vnfd.get("vnf-configuration")
341 if vnf_config:
342 juju = vnf_config['juju']
343 if juju:
344 configs.append(vnf_config)
345
346 for vdu in vnfd['vdu']:
347 vdu_config = vdu.get('vdu-configuration')
348 if vdu_config:
349 juju = vdu_config['juju']
350 if juju:
351 configs.append(vdu_config)
352
353 def _get_application_name(name):
354 """Get the application name that's mapped to a vnf/vdu."""
355 vnf_member_index = 0
356 vnf_name = vnfd['name']
357
358 for vdu in vnfd.get('vdu'):
359 # Compare the named portion of the relation to the vdu's id
360 if vdu['id'] == name:
361 application_name = self.FormatApplicationName(
362 model_name,
363 vnf_name,
364 str(vnf_member_index),
365 )
366 return application_name
367 else:
368 vnf_member_index += 1
369
370 return None
371
372 # Loop through relations
373 for cfg in configs:
374 if 'juju' in cfg:
375 juju = cfg['juju']
376 if 'vca-relationships' in juju and 'relation' in juju['vca-relationships']:
377 for rel in juju['vca-relationships']['relation']:
378 try:
379
380 # get the application name for the provides
381 (name, endpoint) = rel['provides'].split(':')
382 application_name = _get_application_name(name)
383
384 provides = "{}:{}".format(
385 application_name,
386 endpoint
387 )
388
389 # get the application name for thr requires
390 (name, endpoint) = rel['requires'].split(':')
391 application_name = _get_application_name(name)
392
393 requires = "{}:{}".format(
394 application_name,
395 endpoint
396 )
397 self.log.debug("Relation: {} <-> {}".format(
398 provides,
399 requires
400 ))
401 await self.add_relation(
402 model_name,
403 provides,
404 requires,
405 )
406 except Exception as e:
407 self.log.debug("Exception: {}".format(e))
408
409 return
410
411 async def DeployCharms(self, model_name, application_name, vnfd,
412 charm_path, params={}, machine_spec={},
413 callback=None, *callback_args):
414 """Deploy one or more charms associated with a VNF.
415
416 Deploy the charm(s) referenced in a VNF Descriptor.
417
418 :param str model_name: The name or unique id of the network service.
419 :param str application_name: The name of the application
420 :param dict vnfd: The name of the application
421 :param str charm_path: The path to the Juju charm
422 :param dict params: A dictionary of runtime parameters
423 Examples::
424 {
425 'rw_mgmt_ip': '1.2.3.4',
426 # Pass the initial-config-primitives section of the vnf or vdu
427 'initial-config-primitives': {...}
428 'user_values': dictionary with the day-1 parameters provided at instantiation time. It will replace values
429 inside < >. rw_mgmt_ip will be included here also
430 }
431 :param dict machine_spec: A dictionary describing the machine to
432 install to
433 Examples::
434 {
435 'hostname': '1.2.3.4',
436 'username': 'ubuntu',
437 }
438 :param obj callback: A callback function to receive status changes.
439 :param tuple callback_args: A list of arguments to be passed to the
440 callback
441 """
442
443 ########################################################
444 # Verify the path to the charm exists and is readable. #
445 ########################################################
446 if not os.path.exists(charm_path):
447 self.log.debug("Charm path doesn't exist: {}".format(charm_path))
448 self.notify_callback(
449 model_name,
450 application_name,
451 "failed",
452 callback,
453 *callback_args,
454 )
455 raise JujuCharmNotFound("No artifacts configured.")
456
457 ################################
458 # Login to the Juju controller #
459 ################################
460 if not self.authenticated:
461 self.log.debug("Authenticating with Juju")
462 await self.login()
463
464 ##########################################
465 # Get the model for this network service #
466 ##########################################
467 model = await self.get_model(model_name)
468
469 ########################################
470 # Verify the application doesn't exist #
471 ########################################
472 app = await self.get_application(model, application_name)
473 if app:
474 raise JujuApplicationExists("Can't deploy application \"{}\" to model \"{}\" because it already exists.".format(application_name, model_name))
475
476 ################################################################
477 # Register this application with the model-level event monitor #
478 ################################################################
479 if callback:
480 self.log.debug("JujuApi: Registering callback for {}".format(
481 application_name,
482 ))
483 await self.Subscribe(model_name, application_name, callback, *callback_args)
484
485 #######################################
486 # Get the initial charm configuration #
487 #######################################
488
489 rw_mgmt_ip = None
490 if 'rw_mgmt_ip' in params:
491 rw_mgmt_ip = params['rw_mgmt_ip']
492
493 if 'initial-config-primitive' not in params:
494 params['initial-config-primitive'] = {}
495
496 initial_config = self._get_config_from_dict(
497 params['initial-config-primitive'],
498 {'<rw_mgmt_ip>': rw_mgmt_ip}
499 )
500
501 ########################################################
502 # Check for specific machine placement (native charms) #
503 ########################################################
504 to = ""
505 series = "xenial"
506
507 if machine_spec.keys():
508 if all(k in machine_spec for k in ['hostname', 'username']):
509
510 # Allow series to be derived from the native charm
511 series = None
512
513 self.log.debug("Provisioning manual machine {}@{}".format(
514 machine_spec['username'],
515 machine_spec['hostname'],
516 ))
517
518 """Native Charm support
519
520 Taking a bare VM (assumed to be an Ubuntu cloud image),
521 the provisioning process will:
522 - Create an ubuntu user w/sudo access
523 - Detect hardware
524 - Detect architecture
525 - Download and install Juju agent from controller
526 - Enable Juju agent
527 - Add an iptables rule to route traffic to the API proxy
528 """
529
530 to = await self.provision_machine(
531 model_name=model_name,
532 username=machine_spec['username'],
533 hostname=machine_spec['hostname'],
534 private_key_path=self.GetPrivateKeyPath(),
535 )
536 self.log.debug("Provisioned machine id {}".format(to))
537
538 # TODO: If to is none, raise an exception
539
540 # The native charm won't have the sshproxy layer, typically, but LCM uses the config primitive
541 # to interpret what the values are. That's a gap to fill.
542
543 """
544 The ssh-* config parameters are unique to the sshproxy layer,
545 which most native charms will not be aware of.
546
547 Setting invalid config parameters will cause the deployment to
548 fail.
549
550 For the moment, we will strip the ssh-* parameters from native
551 charms, until the feature gap is addressed in the information
552 model.
553 """
554
555 # Native charms don't include the ssh-* config values, so strip them
556 # from the initial_config, otherwise the deploy will raise an error.
557 # self.log.debug("Removing ssh-* from initial-config")
558 for k in ['ssh-hostname', 'ssh-username', 'ssh-password']:
559 if k in initial_config:
560 self.log.debug("Removing parameter {}".format(k))
561 del initial_config[k]
562
563 self.log.debug("JujuApi: Deploying charm ({}/{}) from {} to {}".format(
564 model_name,
565 application_name,
566 charm_path,
567 to,
568 ))
569
570 ########################################################
571 # Deploy the charm and apply the initial configuration #
572 ########################################################
573 app = await model.deploy(
574 # We expect charm_path to be either the path to the charm on disk
575 # or in the format of cs:series/name
576 charm_path,
577 # This is the formatted, unique name for this charm
578 application_name=application_name,
579 # Proxy charms should use the current LTS. This will need to be
580 # changed for native charms.
581 series=series,
582 # Apply the initial 'config' primitive during deployment
583 config=initial_config,
584 # Where to deploy the charm to.
585 to=to,
586 )
587
588 #############################
589 # Map the vdu id<->app name #
590 #############################
591 try:
592 await self.Relate(model_name, vnfd)
593 except KeyError as ex:
594 # We don't currently support relations between NS and VNF/VDU charms
595 self.log.warn("[N2VC] Relations not supported: {}".format(ex))
596 except Exception as ex:
597 # This may happen if not all of the charms needed by the relation
598 # are ready. We can safely ignore this, because Relate will be
599 # retried when the endpoint of the relation is deployed.
600 self.log.warn("[N2VC] Relations not ready")
601
602 # #######################################
603 # # Execute initial config primitive(s) #
604 # #######################################
605 uuids = await self.ExecuteInitialPrimitives(
606 model_name,
607 application_name,
608 params,
609 )
610 return uuids
611
612 # primitives = {}
613 #
614 # # Build a sequential list of the primitives to execute
615 # for primitive in params['initial-config-primitive']:
616 # try:
617 # if primitive['name'] == 'config':
618 # # This is applied when the Application is deployed
619 # pass
620 # else:
621 # seq = primitive['seq']
622 #
623 # params = {}
624 # if 'parameter' in primitive:
625 # params = primitive['parameter']
626 #
627 # primitives[seq] = {
628 # 'name': primitive['name'],
629 # 'parameters': self._map_primitive_parameters(
630 # params,
631 # {'<rw_mgmt_ip>': rw_mgmt_ip}
632 # ),
633 # }
634 #
635 # for primitive in sorted(primitives):
636 # await self.ExecutePrimitive(
637 # model_name,
638 # application_name,
639 # primitives[primitive]['name'],
640 # callback,
641 # callback_args,
642 # **primitives[primitive]['parameters'],
643 # )
644 # except N2VCPrimitiveExecutionFailed as e:
645 # self.log.debug(
646 # "[N2VC] Exception executing primitive: {}".format(e)
647 # )
648 # raise
649
650 async def GetPrimitiveStatus(self, model_name, uuid):
651 """Get the status of an executed Primitive.
652
653 The status of an executed Primitive will be one of three values:
654 - completed
655 - failed
656 - running
657 """
658 status = None
659 try:
660 if not self.authenticated:
661 await self.login()
662
663 model = await self.get_model(model_name)
664
665 results = await model.get_action_status(uuid)
666
667 if uuid in results:
668 status = results[uuid]
669
670 except Exception as e:
671 self.log.debug(
672 "Caught exception while getting primitive status: {}".format(e)
673 )
674 raise N2VCPrimitiveExecutionFailed(e)
675
676 return status
677
678 async def GetPrimitiveOutput(self, model_name, uuid):
679 """Get the output of an executed Primitive.
680
681 Note: this only returns output for a successfully executed primitive.
682 """
683 results = None
684 try:
685 if not self.authenticated:
686 await self.login()
687
688 model = await self.get_model(model_name)
689 results = await model.get_action_output(uuid, 60)
690 except Exception as e:
691 self.log.debug(
692 "Caught exception while getting primitive status: {}".format(e)
693 )
694 raise N2VCPrimitiveExecutionFailed(e)
695
696 return results
697
698 # async def ProvisionMachine(self, model_name, hostname, username):
699 # """Provision machine for usage with Juju.
700 #
701 # Provisions a previously instantiated machine for use with Juju.
702 # """
703 # try:
704 # if not self.authenticated:
705 # await self.login()
706 #
707 # # FIXME: This is hard-coded until model-per-ns is added
708 # model_name = 'default'
709 #
710 # model = await self.get_model(model_name)
711 # model.add_machine(spec={})
712 #
713 # machine = await model.add_machine(spec='ssh:{}@{}:{}'.format(
714 # "ubuntu",
715 # host['address'],
716 # private_key_path,
717 # ))
718 # return machine.id
719 #
720 # except Exception as e:
721 # self.log.debug(
722 # "Caught exception while getting primitive status: {}".format(e)
723 # )
724 # raise N2VCPrimitiveExecutionFailed(e)
725
726 def GetPrivateKeyPath(self):
727 homedir = os.environ['HOME']
728 sshdir = "{}/.ssh".format(homedir)
729 private_key_path = "{}/id_n2vc_rsa".format(sshdir)
730 return private_key_path
731
732 async def GetPublicKey(self):
733 """Get the N2VC SSH public key.abs
734
735 Returns the SSH public key, to be injected into virtual machines to
736 be managed by the VCA.
737
738 The first time this is run, a ssh keypair will be created. The public
739 key is injected into a VM so that we can provision the machine with
740 Juju, after which Juju will communicate with the VM directly via the
741 juju agent.
742 """
743 public_key = ""
744
745 # Find the path to where we expect our key to live.
746 homedir = os.environ['HOME']
747 sshdir = "{}/.ssh".format(homedir)
748 if not os.path.exists(sshdir):
749 os.mkdir(sshdir)
750
751 private_key_path = "{}/id_n2vc_rsa".format(sshdir)
752 public_key_path = "{}.pub".format(private_key_path)
753
754 # If we don't have a key generated, generate it.
755 if not os.path.exists(private_key_path):
756 cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
757 "rsa",
758 "4096",
759 private_key_path
760 )
761 subprocess.check_output(shlex.split(cmd))
762
763 # Read the public key
764 with open(public_key_path, "r") as f:
765 public_key = f.readline()
766
767 return public_key
768
769 async def ExecuteInitialPrimitives(self, model_name, application_name,
770 params, callback=None, *callback_args):
771 """Execute multiple primitives.
772
773 Execute multiple primitives as declared in initial-config-primitive.
774 This is useful in cases where the primitives initially failed -- for
775 example, if the charm is a proxy but the proxy hasn't been configured
776 yet.
777 """
778 uuids = []
779 primitives = {}
780
781 # Build a sequential list of the primitives to execute
782 for primitive in params['initial-config-primitive']:
783 try:
784 if primitive['name'] == 'config':
785 pass
786 else:
787 seq = primitive['seq']
788
789 params_ = {}
790 if 'parameter' in primitive:
791 params_ = primitive['parameter']
792
793 user_values = params.get("user_values", {})
794 if 'rw_mgmt_ip' not in user_values:
795 user_values['rw_mgmt_ip'] = None
796 # just for backward compatibility, because it will be provided always by modern version of LCM
797
798 primitives[seq] = {
799 'name': primitive['name'],
800 'parameters': self._map_primitive_parameters(
801 params_,
802 user_values
803 ),
804 }
805
806 for primitive in sorted(primitives):
807 try:
808 # self.log.debug("Queuing action {}".format(primitives[primitive]['name']))
809 uuids.append(
810 await self.ExecutePrimitive(
811 model_name,
812 application_name,
813 primitives[primitive]['name'],
814 callback,
815 callback_args,
816 **primitives[primitive]['parameters'],
817 )
818 )
819 except PrimitiveDoesNotExist as e:
820 self.log.debug("Ignoring exception PrimitiveDoesNotExist: {}".format(e))
821 pass
822 except Exception as e:
823 self.log.debug("XXXXXXXXXXXXXXXXXXXXXXXXX Unexpected exception: {}".format(e))
824 raise e
825
826 except N2VCPrimitiveExecutionFailed as e:
827 self.log.debug(
828 "[N2VC] Exception executing primitive: {}".format(e)
829 )
830 raise
831 return uuids
832
833 async def ExecutePrimitive(self, model_name, application_name, primitive,
834 callback, *callback_args, **params):
835 """Execute a primitive of a charm for Day 1 or Day 2 configuration.
836
837 Execute a primitive defined in the VNF descriptor.
838
839 :param str model_name: The name or unique id of the network service.
840 :param str application_name: The name of the application
841 :param str primitive: The name of the primitive to execute.
842 :param obj callback: A callback function to receive status changes.
843 :param tuple callback_args: A list of arguments to be passed to the
844 callback function.
845 :param dict params: A dictionary of key=value pairs representing the
846 primitive's parameters
847 Examples::
848 {
849 'rw_mgmt_ip': '1.2.3.4',
850 # Pass the initial-config-primitives section of the vnf or vdu
851 'initial-config-primitives': {...}
852 }
853 """
854 self.log.debug("Executing primitive={} params={}".format(primitive, params))
855 uuid = None
856 try:
857 if not self.authenticated:
858 await self.login()
859
860 model = await self.get_model(model_name)
861
862 if primitive == 'config':
863 # config is special, and expecting params to be a dictionary
864 await self.set_config(
865 model,
866 application_name,
867 params['params'],
868 )
869 else:
870 app = await self.get_application(model, application_name)
871 if app:
872 # Does this primitive exist?
873 actions = await app.get_actions()
874
875 if primitive not in actions.keys():
876 raise PrimitiveDoesNotExist("Primitive {} does not exist".format(primitive))
877
878 # Run against the first (and probably only) unit in the app
879 unit = app.units[0]
880 if unit:
881 action = await unit.run_action(primitive, **params)
882 uuid = action.id
883 except PrimitiveDoesNotExist as e:
884 # Catch and raise this exception if it's thrown from the inner block
885 raise e
886 except Exception as e:
887 # An unexpected exception was caught
888 self.log.debug(
889 "Caught exception while executing primitive: {}".format(e)
890 )
891 raise N2VCPrimitiveExecutionFailed(e)
892 return uuid
893
894 async def RemoveCharms(self, model_name, application_name, callback=None,
895 *callback_args):
896 """Remove a charm from the VCA.
897
898 Remove a charm referenced in a VNF Descriptor.
899
900 :param str model_name: The name of the network service.
901 :param str application_name: The name of the application
902 :param obj callback: A callback function to receive status changes.
903 :param tuple callback_args: A list of arguments to be passed to the
904 callback function.
905 """
906 try:
907 if not self.authenticated:
908 await self.login()
909
910 model = await self.get_model(model_name)
911 app = await self.get_application(model, application_name)
912 if app:
913 # Remove this application from event monitoring
914 await self.Unsubscribe(model_name, application_name)
915
916 # self.notify_callback(model_name, application_name, "removing", callback, *callback_args)
917 self.log.debug(
918 "Removing the application {}".format(application_name)
919 )
920 await app.remove()
921
922 # await self.disconnect_model(self.monitors[model_name])
923
924 self.notify_callback(
925 model_name,
926 application_name,
927 "removed",
928 "Removing charm {}".format(application_name),
929 callback,
930 *callback_args,
931 )
932
933 except Exception as e:
934 print("Caught exception: {}".format(e))
935 self.log.debug(e)
936 raise e
937
938 async def CreateNetworkService(self, ns_uuid):
939 """Create a new Juju model for the Network Service.
940
941 Creates a new Model in the Juju Controller.
942
943 :param str ns_uuid: A unique id representing an instaance of a
944 Network Service.
945
946 :returns: True if the model was created. Raises JujuError on failure.
947 """
948 if not self.authenticated:
949 await self.login()
950
951 models = await self.controller.list_models()
952 if ns_uuid not in models:
953 try:
954 self.models[ns_uuid] = await self.controller.add_model(
955 ns_uuid
956 )
957 except JujuError as e:
958 if "already exists" not in e.message:
959 raise e
960
961 # Create an observer for this model
962 await self.create_model_monitor(ns_uuid)
963
964 return True
965
966 async def DestroyNetworkService(self, ns_uuid):
967 """Destroy a Network Service.
968
969 Destroy the Network Service and any deployed charms.
970
971 :param ns_uuid The unique id of the Network Service
972
973 :returns: True if the model was created. Raises JujuError on failure.
974 """
975
976 # Do not delete the default model. The default model was used by all
977 # Network Services, prior to the implementation of a model per NS.
978 if ns_uuid.lower() == "default":
979 return False
980
981 if not self.authenticated:
982 await self.login()
983
984 models = await self.controller.list_models()
985 if ns_uuid in models:
986 model = await self.controller.get_model(ns_uuid)
987
988 for application in model.applications:
989 app = model.applications[application]
990
991 await self.RemoveCharms(ns_uuid, application)
992
993 self.log.debug("Unsubscribing Watcher for {}".format(application))
994 await self.Unsubscribe(ns_uuid, application)
995
996 self.log.debug("Waiting for application to terminate")
997 timeout = 30
998 try:
999 await model.block_until(
1000 lambda: all(
1001 unit.workload_status in ['terminated'] for unit in app.units
1002 ),
1003 timeout=timeout
1004 )
1005 except Exception as e:
1006 self.log.debug("Timed out waiting for {} to terminate.".format(application))
1007
1008 for machine in model.machines:
1009 try:
1010 self.log.debug("Destroying machine {}".format(machine))
1011 await model.machines[machine].destroy(force=True)
1012 except JujuAPIError as e:
1013 if 'does not exist' in str(e):
1014 # Our cached model may be stale, because the machine
1015 # has already been removed. It's safe to continue.
1016 continue
1017 else:
1018 self.log.debug("Caught exception: {}".format(e))
1019 raise e
1020
1021 # Disconnect from the Model
1022 if ns_uuid in self.models:
1023 self.log.debug("Disconnecting model {}".format(ns_uuid))
1024 # await self.disconnect_model(self.models[ns_uuid])
1025 await self.disconnect_model(ns_uuid)
1026
1027 try:
1028 self.log.debug("Destroying model {}".format(ns_uuid))
1029 await self.controller.destroy_models(ns_uuid)
1030 except JujuError:
1031 raise NetworkServiceDoesNotExist(
1032 "The Network Service '{}' does not exist".format(ns_uuid)
1033 )
1034
1035 return True
1036
1037 async def GetMetrics(self, model_name, application_name):
1038 """Get the metrics collected by the VCA.
1039
1040 :param model_name The name or unique id of the network service
1041 :param application_name The name of the application
1042 """
1043 metrics = {}
1044 model = await self.get_model(model_name)
1045 app = await self.get_application(model, application_name)
1046 if app:
1047 metrics = await app.get_metrics()
1048
1049 return metrics
1050
1051 async def HasApplication(self, model_name, application_name):
1052 model = await self.get_model(model_name)
1053 app = await self.get_application(model, application_name)
1054 if app:
1055 return True
1056 return False
1057
1058 async def Subscribe(self, ns_name, application_name, callback, *callback_args):
1059 """Subscribe to callbacks for an application.
1060
1061 :param ns_name str: The name of the Network Service
1062 :param application_name str: The name of the application
1063 :param callback obj: The callback method
1064 :param callback_args list: The list of arguments to append to calls to
1065 the callback method
1066 """
1067 self.monitors[ns_name].AddApplication(
1068 application_name,
1069 callback,
1070 *callback_args
1071 )
1072
1073 async def Unsubscribe(self, ns_name, application_name):
1074 """Unsubscribe to callbacks for an application.
1075
1076 Unsubscribes the caller from notifications from a deployed application.
1077
1078 :param ns_name str: The name of the Network Service
1079 :param application_name str: The name of the application
1080 """
1081 self.monitors[ns_name].RemoveApplication(
1082 application_name,
1083 )
1084
1085 # Non-public methods
1086 async def add_relation(self, model_name, relation1, relation2):
1087 """
1088 Add a relation between two application endpoints.
1089
1090 :param str model_name: The name or unique id of the network service
1091 :param str relation1: '<application>[:<relation_name>]'
1092 :param str relation2: '<application>[:<relation_name>]'
1093 """
1094
1095 if not self.authenticated:
1096 await self.login()
1097
1098 m = await self.get_model(model_name)
1099 try:
1100 await m.add_relation(relation1, relation2)
1101 except JujuAPIError as e:
1102 # If one of the applications in the relationship doesn't exist,
1103 # or the relation has already been added, let the operation fail
1104 # silently.
1105 if 'not found' in e.message:
1106 return
1107 if 'already exists' in e.message:
1108 return
1109
1110 raise e
1111
1112 # async def apply_config(self, config, application):
1113 # """Apply a configuration to the application."""
1114 # print("JujuApi: Applying configuration to {}.".format(
1115 # application
1116 # ))
1117 # return await self.set_config(application=application, config=config)
1118
1119 def _get_config_from_dict(self, config_primitive, values):
1120 """Transform the yang config primitive to dict.
1121
1122 Expected result:
1123
1124 config = {
1125 'config':
1126 }
1127 """
1128 config = {}
1129 for primitive in config_primitive:
1130 if primitive['name'] == 'config':
1131 # config = self._map_primitive_parameters()
1132 for parameter in primitive['parameter']:
1133 param = str(parameter['name'])
1134 if parameter['value'] == "<rw_mgmt_ip>":
1135 config[param] = str(values[parameter['value']])
1136 else:
1137 config[param] = str(parameter['value'])
1138
1139 return config
1140
1141 def _map_primitive_parameters(self, parameters, user_values):
1142 params = {}
1143 for parameter in parameters:
1144 param = str(parameter['name'])
1145 value = parameter.get('value')
1146
1147 # map parameters inside a < >; e.g. <rw_mgmt_ip>. with the provided user_values.
1148 # Must exist at user_values except if there is a default value
1149 if isinstance(value, str) and value.startswith("<") and value.endswith(">"):
1150 if parameter['value'][1:-1] in user_values:
1151 value = user_values[parameter['value'][1:-1]]
1152 elif 'default-value' in parameter:
1153 value = parameter['default-value']
1154 else:
1155 raise KeyError("parameter {}='{}' not supplied ".format(param, value))
1156
1157 # If there's no value, use the default-value (if set)
1158 if value is None and 'default-value' in parameter:
1159 value = parameter['default-value']
1160
1161 # Typecast parameter value, if present
1162 paramtype = "string"
1163 try:
1164 if 'data-type' in parameter:
1165 paramtype = str(parameter['data-type']).lower()
1166
1167 if paramtype == "integer":
1168 value = int(value)
1169 elif paramtype == "boolean":
1170 value = bool(value)
1171 else:
1172 value = str(value)
1173 else:
1174 # If there's no data-type, assume the value is a string
1175 value = str(value)
1176 except ValueError:
1177 raise ValueError("parameter {}='{}' cannot be converted to type {}".format(param, value, paramtype))
1178
1179 params[param] = value
1180 return params
1181
1182 def _get_config_from_yang(self, config_primitive, values):
1183 """Transform the yang config primitive to dict."""
1184 config = {}
1185 for primitive in config_primitive.values():
1186 if primitive['name'] == 'config':
1187 for parameter in primitive['parameter'].values():
1188 param = str(parameter['name'])
1189 if parameter['value'] == "<rw_mgmt_ip>":
1190 config[param] = str(values[parameter['value']])
1191 else:
1192 config[param] = str(parameter['value'])
1193
1194 return config
1195
1196 def FormatApplicationName(self, *args):
1197 """
1198 Generate a Juju-compatible Application name
1199
1200 :param args tuple: Positional arguments to be used to construct the
1201 application name.
1202
1203 Limitations::
1204 - Only accepts characters a-z and non-consequitive dashes (-)
1205 - Application name should not exceed 50 characters
1206
1207 Examples::
1208
1209 FormatApplicationName("ping_pong_ns", "ping_vnf", "a")
1210 """
1211 appname = ""
1212 for c in "-".join(list(args)):
1213 if c.isdigit():
1214 c = chr(97 + int(c))
1215 elif not c.isalpha():
1216 c = "-"
1217 appname += c
1218 return re.sub('-+', '-', appname.lower())
1219
1220 # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0):
1221 # """Format the name of the application
1222 #
1223 # Limitations:
1224 # - Only accepts characters a-z and non-consequitive dashes (-)
1225 # - Application name should not exceed 50 characters
1226 # """
1227 # name = "{}-{}-{}".format(nsd_name, vnfr_name, member_vnf_index)
1228 # new_name = ''
1229 # for c in name:
1230 # if c.isdigit():
1231 # c = chr(97 + int(c))
1232 # elif not c.isalpha():
1233 # c = "-"
1234 # new_name += c
1235 # return re.sub('\-+', '-', new_name.lower())
1236
1237 def format_model_name(self, name):
1238 """Format the name of model.
1239
1240 Model names may only contain lowercase letters, digits and hyphens
1241 """
1242
1243 return name.replace('_', '-').lower()
1244
1245 async def get_application(self, model, application):
1246 """Get the deployed application."""
1247 if not self.authenticated:
1248 await self.login()
1249
1250 app = None
1251 if application and model:
1252 if model.applications:
1253 if application in model.applications:
1254 app = model.applications[application]
1255
1256 return app
1257
1258 async def get_model(self, model_name):
1259 """Get a model from the Juju Controller.
1260
1261 Note: Model objects returned must call disconnected() before it goes
1262 out of scope."""
1263 if not self.authenticated:
1264 await self.login()
1265
1266 if model_name not in self.models:
1267 # Get the models in the controller
1268 models = await self.controller.list_models()
1269
1270 if model_name not in models:
1271 try:
1272 self.models[model_name] = await self.controller.add_model(
1273 model_name
1274 )
1275 except JujuError as e:
1276 if "already exists" not in e.message:
1277 raise e
1278 else:
1279 self.models[model_name] = await self.controller.get_model(
1280 model_name
1281 )
1282
1283 self.refcount['model'] += 1
1284
1285 # Create an observer for this model
1286 await self.create_model_monitor(model_name)
1287
1288 return self.models[model_name]
1289
1290 async def create_model_monitor(self, model_name):
1291 """Create a monitor for the model, if none exists."""
1292 if not self.authenticated:
1293 await self.login()
1294
1295 if model_name not in self.monitors:
1296 self.monitors[model_name] = VCAMonitor(model_name)
1297 self.models[model_name].add_observer(self.monitors[model_name])
1298
1299 return True
1300
1301 async def login(self):
1302 """Login to the Juju controller."""
1303
1304 if self.authenticated:
1305 return
1306
1307 self.connecting = True
1308
1309 self.log.debug("JujuApi: Logging into controller")
1310
1311 self.controller = Controller(loop=self.loop)
1312
1313 if self.secret:
1314 self.log.debug(
1315 "Connecting to controller... ws://{}:{} as {}/{}".format(
1316 self.endpoint,
1317 self.port,
1318 self.user,
1319 self.secret,
1320 )
1321 )
1322 await self.controller.connect(
1323 endpoint=self.endpoint,
1324 username=self.user,
1325 password=self.secret,
1326 cacert=self.ca_cert,
1327 )
1328 self.refcount['controller'] += 1
1329 else:
1330 # current_controller no longer exists
1331 # self.log.debug("Connecting to current controller...")
1332 # await self.controller.connect_current()
1333 # await self.controller.connect(
1334 # endpoint=self.endpoint,
1335 # username=self.user,
1336 # cacert=cacert,
1337 # )
1338 self.log.fatal("VCA credentials not configured.")
1339
1340 self.authenticated = True
1341 self.log.debug("JujuApi: Logged into controller")
1342
1343 async def logout(self):
1344 """Logout of the Juju controller."""
1345 if not self.authenticated:
1346 return False
1347
1348 try:
1349 for model in self.models:
1350 await self.disconnect_model(model)
1351
1352 if self.controller:
1353 self.log.debug("Disconnecting controller {}".format(
1354 self.controller
1355 ))
1356 await self.controller.disconnect()
1357 self.refcount['controller'] -= 1
1358 self.controller = None
1359
1360 self.authenticated = False
1361
1362 self.log.debug(self.refcount)
1363
1364 except Exception as e:
1365 self.log.fatal(
1366 "Fatal error logging out of Juju Controller: {}".format(e)
1367 )
1368 raise e
1369 return True
1370
1371 async def disconnect_model(self, model):
1372 self.log.debug("Disconnecting model {}".format(model))
1373 if model in self.models:
1374 try:
1375 await self.models[model].disconnect()
1376 self.refcount['model'] -= 1
1377 self.models[model] = None
1378 except Exception as e:
1379 self.log.debug("Caught exception: {}".format(e))
1380
1381 async def provision_machine(self, model_name: str,
1382 hostname: str, username: str,
1383 private_key_path: str) -> int:
1384 """Provision a machine.
1385
1386 This executes the SSH provisioner, which will log in to a machine via
1387 SSH and prepare it for use with the Juju model
1388
1389 :param model_name str: The name of the model
1390 :param hostname str: The IP or hostname of the target VM
1391 :param user str: The username to login to
1392 :param private_key_path str: The path to the private key that's been injected to the VM via cloud-init
1393 :return machine_id int: Returns the id of the machine or None if provisioning fails
1394 """
1395 if not self.authenticated:
1396 await self.login()
1397
1398 machine_id = None
1399
1400 if self.api_proxy:
1401 self.log.debug("Instantiating SSH Provisioner for {}@{} ({})".format(
1402 username,
1403 hostname,
1404 private_key_path
1405 ))
1406 provisioner = SSHProvisioner(
1407 host=hostname,
1408 user=username,
1409 private_key_path=private_key_path,
1410 log=self.log,
1411 )
1412
1413 params = None
1414 try:
1415 params = provisioner.provision_machine()
1416 except Exception as ex:
1417 self.log.debug("caught exception from provision_machine: {}".format(ex))
1418 return None
1419
1420 if params:
1421 params.jobs = ['JobHostUnits']
1422
1423 model = await self.get_model(model_name)
1424
1425 connection = model.connection()
1426
1427 # Submit the request.
1428 self.log.debug("Adding machine to model")
1429 client_facade = client.ClientFacade.from_connection(connection)
1430 results = await client_facade.AddMachines(params=[params])
1431 error = results.machines[0].error
1432 if error:
1433 raise ValueError("Error adding machine: %s" % error.message)
1434
1435 machine_id = results.machines[0].machine
1436
1437 # Need to run this after AddMachines has been called,
1438 # as we need the machine_id
1439 self.log.debug("Installing Juju agent")
1440 await provisioner.install_agent(
1441 connection,
1442 params.nonce,
1443 machine_id,
1444 self.api_proxy,
1445 )
1446 else:
1447 self.log.debug("Missing API Proxy")
1448 return machine_id
1449
1450 # async def remove_application(self, name):
1451 # """Remove the application."""
1452 # if not self.authenticated:
1453 # await self.login()
1454 #
1455 # app = await self.get_application(name)
1456 # if app:
1457 # self.log.debug("JujuApi: Destroying application {}".format(
1458 # name,
1459 # ))
1460 #
1461 # await app.destroy()
1462
1463 async def remove_relation(self, a, b):
1464 """
1465 Remove a relation between two application endpoints
1466
1467 :param a An application endpoint
1468 :param b An application endpoint
1469 """
1470 if not self.authenticated:
1471 await self.login()
1472
1473 m = await self.get_model()
1474 try:
1475 m.remove_relation(a, b)
1476 finally:
1477 await m.disconnect()
1478
1479 async def resolve_error(self, model_name, application=None):
1480 """Resolve units in error state."""
1481 if not self.authenticated:
1482 await self.login()
1483
1484 model = await self.get_model(model_name)
1485
1486 app = await self.get_application(model, application)
1487 if app:
1488 self.log.debug(
1489 "JujuApi: Resolving errors for application {}".format(
1490 application,
1491 )
1492 )
1493
1494 for unit in app.units:
1495 app.resolved(retry=True)
1496
1497 async def run_action(self, model_name, application, action_name, **params):
1498 """Execute an action and return an Action object."""
1499 if not self.authenticated:
1500 await self.login()
1501 result = {
1502 'status': '',
1503 'action': {
1504 'tag': None,
1505 'results': None,
1506 }
1507 }
1508
1509 model = await self.get_model(model_name)
1510
1511 app = await self.get_application(model, application)
1512 if app:
1513 # We currently only have one unit per application
1514 # so use the first unit available.
1515 unit = app.units[0]
1516
1517 self.log.debug(
1518 "JujuApi: Running Action {} against Application {}".format(
1519 action_name,
1520 application,
1521 )
1522 )
1523
1524 action = await unit.run_action(action_name, **params)
1525
1526 # Wait for the action to complete
1527 await action.wait()
1528
1529 result['status'] = action.status
1530 result['action']['tag'] = action.data['id']
1531 result['action']['results'] = action.results
1532
1533 return result
1534
1535 async def set_config(self, model_name, application, config):
1536 """Apply a configuration to the application."""
1537 if not self.authenticated:
1538 await self.login()
1539
1540 app = await self.get_application(model_name, application)
1541 if app:
1542 self.log.debug("JujuApi: Setting config for Application {}".format(
1543 application,
1544 ))
1545 await app.set_config(config)
1546
1547 # Verify the config is set
1548 newconf = await app.get_config()
1549 for key in config:
1550 if config[key] != newconf[key]['value']:
1551 self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key]))
1552
1553 # async def set_parameter(self, parameter, value, application=None):
1554 # """Set a config parameter for a service."""
1555 # if not self.authenticated:
1556 # await self.login()
1557 #
1558 # self.log.debug("JujuApi: Setting {}={} for Application {}".format(
1559 # parameter,
1560 # value,
1561 # application,
1562 # ))
1563 # return await self.apply_config(
1564 # {parameter: value},
1565 # application=application,
1566 # )
1567
1568 async def wait_for_application(self, model_name, application_name,
1569 timeout=300):
1570 """Wait for an application to become active."""
1571 if not self.authenticated:
1572 await self.login()
1573
1574 model = await self.get_model(model_name)
1575
1576 app = await self.get_application(model, application_name)
1577 self.log.debug("Application: {}".format(app))
1578 if app:
1579 self.log.debug(
1580 "JujuApi: Waiting {} seconds for Application {}".format(
1581 timeout,
1582 application_name,
1583 )
1584 )
1585
1586 await model.block_until(
1587 lambda: all(
1588 unit.agent_status == 'idle' and unit.workload_status in
1589 ['active', 'unknown'] for unit in app.units
1590 ),
1591 timeout=timeout
1592 )