Fix missing import
[osm/N2VC.git] / n2vc / vnf.py
1 # Copyright 2019 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import base64
17 import binascii
18 import logging
19 import os.path
20 import re
21 import shlex
22 import ssl
23 import subprocess
24
25 from juju.client import client
26 from juju.controller import Controller
27 from juju.errors import JujuAPIError, JujuError
28 from juju.model import ModelObserver
29
30 import n2vc.exceptions
31 from n2vc.provisioner import SSHProvisioner
32
33
34 # import time
35 # FIXME: this should load the juju inside or modules without having to
36 # explicitly install it. Check why it's not working.
37 # Load our subtree of the juju library
38 # path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
39 # path = os.path.join(path, "modules/libjuju/")
40 # if path not in sys.path:
41 # sys.path.insert(1, path)
42 # We might need this to connect to the websocket securely, but test and verify.
43 try:
44 ssl._create_default_https_context = ssl._create_unverified_context
45 except AttributeError:
46 # Legacy Python doesn't verify by default (see pep-0476)
47 # https://www.python.org/dev/peps/pep-0476/
48 pass
49
50
51 # Custom exceptions
52 # Deprecated. Please use n2vc.exceptions namespace.
53 class JujuCharmNotFound(Exception):
54 """The Charm can't be found or is not readable."""
55
56
57 class JujuApplicationExists(Exception):
58 """The Application already exists."""
59
60
61 class N2VCPrimitiveExecutionFailed(Exception):
62 """Something failed while attempting to execute a primitive."""
63
64
65 class NetworkServiceDoesNotExist(Exception):
66 """The Network Service being acted against does not exist."""
67
68
69 class PrimitiveDoesNotExist(Exception):
70 """The Primitive being executed does not exist."""
71
72
73 # Quiet the debug logging
74 logging.getLogger("websockets.protocol").setLevel(logging.INFO)
75 logging.getLogger("juju.client.connection").setLevel(logging.WARN)
76 logging.getLogger("juju.model").setLevel(logging.WARN)
77 logging.getLogger("juju.machine").setLevel(logging.WARN)
78
79
80 class VCAMonitor(ModelObserver):
81 """Monitor state changes within the Juju Model."""
82
83 log = None
84
85 def __init__(self, ns_name):
86 self.log = logging.getLogger(__name__)
87
88 self.ns_name = ns_name
89 self.applications = {}
90
91 def AddApplication(self, application_name, callback, *callback_args):
92 if application_name not in self.applications:
93 self.applications[application_name] = {
94 "callback": callback,
95 "callback_args": callback_args,
96 }
97
98 def RemoveApplication(self, application_name):
99 if application_name in self.applications:
100 del self.applications[application_name]
101
102 async def on_change(self, delta, old, new, model):
103 """React to changes in the Juju model."""
104
105 if delta.entity == "unit":
106 # Ignore change events from other applications
107 if delta.data["application"] not in self.applications.keys():
108 return
109
110 try:
111
112 application_name = delta.data["application"]
113
114 callback = self.applications[application_name]["callback"]
115 callback_args = self.applications[application_name]["callback_args"]
116
117 if old and new:
118 # Fire off a callback with the application state
119 if callback:
120 callback(
121 self.ns_name,
122 delta.data["application"],
123 new.workload_status,
124 new.workload_status_message,
125 *callback_args,
126 )
127
128 if old and not new:
129 # This is a charm being removed
130 if callback:
131 callback(
132 self.ns_name,
133 delta.data["application"],
134 "removed",
135 "",
136 *callback_args,
137 )
138 except Exception as e:
139 self.log.debug("[1] notify_callback exception: {}".format(e))
140
141 elif delta.entity == "action":
142 # TODO: Decide how we want to notify the user of actions
143
144 # uuid = delta.data['id'] # The Action's unique id
145 # msg = delta.data['message'] # The output of the action
146 #
147 # if delta.data['status'] == "pending":
148 # # The action is queued
149 # pass
150 # elif delta.data['status'] == "completed""
151 # # The action was successful
152 # pass
153 # elif delta.data['status'] == "failed":
154 # # The action failed.
155 # pass
156
157 pass
158
159
160 ########
161 # TODO
162 #
163 # Create unique models per network service
164 # Document all public functions
165
166
167 class N2VC:
168 def __init__(
169 self,
170 log=None,
171 server="127.0.0.1",
172 port=17070,
173 user="admin",
174 secret=None,
175 artifacts=None,
176 loop=None,
177 juju_public_key=None,
178 ca_cert=None,
179 api_proxy=None,
180 ):
181 """Initialize N2VC
182
183 Initializes the N2VC object, allowing the caller to interoperate with the VCA.
184
185
186 :param log obj: The logging object to log to
187 :param server str: The IP Address or Hostname of the Juju controller
188 :param port int: The port of the Juju Controller
189 :param user str: The Juju username to authenticate with
190 :param secret str: The Juju password to authenticate with
191 :param artifacts str: The directory where charms required by a vnfd are
192 stored.
193 :param loop obj: The loop to use.
194 :param juju_public_key str: The contents of the Juju public SSH key
195 :param ca_cert str: The CA certificate to use to authenticate
196 :param api_proxy str: The IP of the host machine
197
198 :Example:
199 client = n2vc.vnf.N2VC(
200 log=log,
201 server='10.1.1.28',
202 port=17070,
203 user='admin',
204 secret='admin',
205 artifacts='/app/storage/myvnf/charms',
206 loop=loop,
207 juju_public_key='<contents of the juju public key>',
208 ca_cert='<contents of CA certificate>',
209 api_proxy='192.168.1.155'
210 )
211 """
212
213 # Initialize instance-level variables
214 self.api = None
215 self.log = None
216 self.controller = None
217 self.connecting = False
218 self.authenticated = False
219 self.api_proxy = api_proxy
220
221 if log:
222 self.log = log
223 else:
224 self.log = logging.getLogger(__name__)
225
226 # For debugging
227 self.refcount = {
228 "controller": 0,
229 "model": 0,
230 }
231
232 self.models = {}
233
234 # Model Observers
235 self.monitors = {}
236
237 # VCA config
238 self.hostname = ""
239 self.port = 17070
240 self.username = ""
241 self.secret = ""
242
243 self.juju_public_key = juju_public_key
244 if juju_public_key:
245 self._create_juju_public_key(juju_public_key)
246 else:
247 self.juju_public_key = ""
248
249 # TODO: Verify ca_cert is valid before using. VCA will crash
250 # if the ca_cert isn't formatted correctly.
251 def base64_to_cacert(b64string):
252 """Convert the base64-encoded string containing the VCA CACERT.
253
254 The input string....
255
256 """
257 try:
258 cacert = base64.b64decode(b64string).decode("utf-8")
259
260 cacert = re.sub(r"\\n", r"\n", cacert,)
261 except binascii.Error as e:
262 self.log.debug("Caught binascii.Error: {}".format(e))
263 raise n2vc.exceptions.N2VCInvalidCertificate("Invalid CA Certificate")
264
265 return cacert
266
267 self.ca_cert = None
268 if ca_cert:
269 self.ca_cert = base64_to_cacert(ca_cert)
270
271 # Quiet websocket traffic
272 logging.getLogger("websockets.protocol").setLevel(logging.INFO)
273 logging.getLogger("juju.client.connection").setLevel(logging.WARN)
274 logging.getLogger("model").setLevel(logging.WARN)
275 # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG)
276
277 self.log.debug("JujuApi: instantiated")
278
279 self.server = server
280 self.port = port
281
282 self.secret = secret
283 if user.startswith("user-"):
284 self.user = user
285 else:
286 self.user = "user-{}".format(user)
287
288 self.endpoint = "%s:%d" % (server, int(port))
289
290 self.artifacts = artifacts
291
292 self.loop = loop or asyncio.get_event_loop()
293
294 def __del__(self):
295 """Close any open connections."""
296 yield self.logout()
297
298 def _create_juju_public_key(self, public_key):
299 """Recreate the Juju public key on disk.
300
301 Certain libjuju commands expect to be run from the same machine as Juju
302 is bootstrapped to. This method will write the public key to disk in
303 that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
304 """
305 # Make sure that we have a public key before writing to disk
306 if public_key is None or len(public_key) == 0:
307 if "OSM_VCA_PUBKEY" in os.environ:
308 public_key = os.getenv("OSM_VCA_PUBKEY", "")
309 if len(public_key == 0):
310 return
311 else:
312 return
313
314 path = "{}/.local/share/juju/ssh".format(os.path.expanduser("~"),)
315 if not os.path.exists(path):
316 os.makedirs(path)
317
318 with open("{}/juju_id_rsa.pub".format(path), "w") as f:
319 f.write(public_key)
320
321 def notify_callback(
322 self,
323 model_name,
324 application_name,
325 status,
326 message,
327 callback=None,
328 *callback_args
329 ):
330 try:
331 if callback:
332 callback(
333 model_name, application_name, status, message, *callback_args,
334 )
335 except Exception as e:
336 self.log.error("[0] notify_callback exception {}".format(e))
337 raise e
338 return True
339
340 # Public methods
341 async def Relate(self, model_name, vnfd):
342 """Create a relation between the charm-enabled VDUs in a VNF.
343
344 The Relation mapping has two parts: the id of the vdu owning the endpoint, and
345 the name of the endpoint.
346
347 vdu:
348 ...
349 vca-relationships:
350 relation:
351 - provides: dataVM:db
352 requires: mgmtVM:app
353
354 This tells N2VC that the charm referred to by the dataVM vdu offers a relation
355 named 'db', and the mgmtVM vdu
356 has an 'app' endpoint that should be connected to a database.
357
358 :param str ns_name: The name of the network service.
359 :param dict vnfd: The parsed yaml VNF descriptor.
360 """
361
362 # Currently, the call to Relate() is made automatically after the
363 # deployment of each charm; if the relation depends on a charm that
364 # hasn't been deployed yet, the call will fail silently. This will
365 # prevent an API breakage, with the intent of making this an explicitly
366 # required call in a more object-oriented refactor of the N2VC API.
367
368 configs = []
369 vnf_config = vnfd.get("vnf-configuration")
370 if vnf_config:
371 juju = vnf_config["juju"]
372 if juju:
373 configs.append(vnf_config)
374
375 for vdu in vnfd["vdu"]:
376 vdu_config = vdu.get("vdu-configuration")
377 if vdu_config:
378 juju = vdu_config["juju"]
379 if juju:
380 configs.append(vdu_config)
381
382 def _get_application_name(name):
383 """Get the application name that's mapped to a vnf/vdu."""
384 vnf_member_index = 0
385 vnf_name = vnfd["name"]
386
387 for vdu in vnfd.get("vdu"):
388 # Compare the named portion of the relation to the vdu's id
389 if vdu["id"] == name:
390 application_name = self.FormatApplicationName(
391 model_name, vnf_name, str(vnf_member_index),
392 )
393 return application_name
394 else:
395 vnf_member_index += 1
396
397 return None
398
399 # Loop through relations
400 for cfg in configs:
401 if "juju" in cfg:
402 juju = cfg["juju"]
403 if (
404 "vca-relationships" in juju
405 and "relation" in juju["vca-relationships"]
406 ):
407 for rel in juju["vca-relationships"]["relation"]:
408 try:
409
410 # get the application name for the provides
411 (name, endpoint) = rel["provides"].split(":")
412 application_name = _get_application_name(name)
413
414 provides = "{}:{}".format(application_name, endpoint)
415
416 # get the application name for thr requires
417 (name, endpoint) = rel["requires"].split(":")
418 application_name = _get_application_name(name)
419
420 requires = "{}:{}".format(application_name, endpoint)
421 self.log.debug(
422 "Relation: {} <-> {}".format(provides, requires)
423 )
424 await self.add_relation(
425 model_name, provides, requires,
426 )
427 except Exception as e:
428 self.log.debug("Exception: {}".format(e))
429
430 return
431
432 async def DeployCharms(
433 self,
434 model_name,
435 application_name,
436 vnfd,
437 charm_path,
438 params={},
439 machine_spec={},
440 callback=None,
441 *callback_args
442 ):
443 """Deploy one or more charms associated with a VNF.
444
445 Deploy the charm(s) referenced in a VNF Descriptor.
446
447 :param str model_name: The name or unique id of the network service.
448 :param str application_name: The name of the application
449 :param dict vnfd: The name of the application
450 :param str charm_path: The path to the Juju charm
451 :param dict params: A dictionary of runtime parameters
452 Examples::
453 {
454 'rw_mgmt_ip': '1.2.3.4',
455 # Pass the initial-config-primitives section of the vnf or vdu
456 'initial-config-primitives': {...}
457 'user_values': dictionary with the day-1 parameters provided at
458 instantiation time. It will replace values
459 inside < >. rw_mgmt_ip will be included here also
460 }
461 :param dict machine_spec: A dictionary describing the machine to
462 install to
463 Examples::
464 {
465 'hostname': '1.2.3.4',
466 'username': 'ubuntu',
467 }
468 :param obj callback: A callback function to receive status changes.
469 :param tuple callback_args: A list of arguments to be passed to the
470 callback
471 """
472
473 ########################################################
474 # Verify the path to the charm exists and is readable. #
475 ########################################################
476 if not os.path.exists(charm_path):
477 self.log.debug("Charm path doesn't exist: {}".format(charm_path))
478 self.notify_callback(
479 model_name,
480 application_name,
481 "error",
482 "failed",
483 callback,
484 *callback_args,
485 )
486 raise JujuCharmNotFound("No artifacts configured.")
487
488 ################################
489 # Login to the Juju controller #
490 ################################
491 if not self.authenticated:
492 self.log.debug("Authenticating with Juju")
493 await self.login()
494
495 ##########################################
496 # Get the model for this network service #
497 ##########################################
498 model = await self.get_model(model_name)
499
500 ########################################
501 # Verify the application doesn't exist #
502 ########################################
503 app = await self.get_application(model, application_name)
504 if app:
505 raise JujuApplicationExists(
506 (
507 'Can\'t deploy application "{}" to model '
508 ' "{}" because it already exists.'
509 ).format(application_name, model_name)
510 )
511
512 ################################################################
513 # Register this application with the model-level event monitor #
514 ################################################################
515 if callback:
516 self.log.debug(
517 "JujuApi: Registering callback for {}".format(application_name,)
518 )
519 await self.Subscribe(model_name, application_name, callback, *callback_args)
520
521 #######################################
522 # Get the initial charm configuration #
523 #######################################
524
525 rw_mgmt_ip = None
526 if "rw_mgmt_ip" in params:
527 rw_mgmt_ip = params["rw_mgmt_ip"]
528
529 if "initial-config-primitive" not in params:
530 params["initial-config-primitive"] = {}
531
532 initial_config = self._get_config_from_dict(
533 params["initial-config-primitive"], {"<rw_mgmt_ip>": rw_mgmt_ip}
534 )
535
536 ########################################################
537 # Check for specific machine placement (native charms) #
538 ########################################################
539 to = ""
540 series = "xenial"
541
542 if machine_spec.keys():
543 if all(k in machine_spec for k in ["hostname", "username"]):
544
545 # Allow series to be derived from the native charm
546 series = None
547
548 self.log.debug(
549 "Provisioning manual machine {}@{}".format(
550 machine_spec["username"], machine_spec["hostname"],
551 )
552 )
553
554 """Native Charm support
555
556 Taking a bare VM (assumed to be an Ubuntu cloud image),
557 the provisioning process will:
558 - Create an ubuntu user w/sudo access
559 - Detect hardware
560 - Detect architecture
561 - Download and install Juju agent from controller
562 - Enable Juju agent
563 - Add an iptables rule to route traffic to the API proxy
564 """
565
566 to = await self.provision_machine(
567 model_name=model_name,
568 username=machine_spec["username"],
569 hostname=machine_spec["hostname"],
570 private_key_path=self.GetPrivateKeyPath(),
571 )
572 self.log.debug("Provisioned machine id {}".format(to))
573
574 # TODO: If to is none, raise an exception
575
576 # The native charm won't have the sshproxy layer, typically, but LCM
577 # uses the config primitive
578 # to interpret what the values are. That's a gap to fill.
579
580 """
581 The ssh-* config parameters are unique to the sshproxy layer,
582 which most native charms will not be aware of.
583
584 Setting invalid config parameters will cause the deployment to
585 fail.
586
587 For the moment, we will strip the ssh-* parameters from native
588 charms, until the feature gap is addressed in the information
589 model.
590 """
591
592 # Native charms don't include the ssh-* config values, so strip them
593 # from the initial_config, otherwise the deploy will raise an error.
594 # self.log.debug("Removing ssh-* from initial-config")
595 for k in ["ssh-hostname", "ssh-username", "ssh-password"]:
596 if k in initial_config:
597 self.log.debug("Removing parameter {}".format(k))
598 del initial_config[k]
599
600 self.log.debug(
601 "JujuApi: Deploying charm ({}/{}) from {} to {}".format(
602 model_name, application_name, charm_path, to,
603 )
604 )
605
606 ########################################################
607 # Deploy the charm and apply the initial configuration #
608 ########################################################
609 app = await model.deploy(
610 # We expect charm_path to be either the path to the charm on disk
611 # or in the format of cs:series/name
612 charm_path,
613 # This is the formatted, unique name for this charm
614 application_name=application_name,
615 # Proxy charms should use the current LTS. This will need to be
616 # changed for native charms.
617 series=series,
618 # Apply the initial 'config' primitive during deployment
619 config=initial_config,
620 # Where to deploy the charm to.
621 to=to,
622 )
623
624 #############################
625 # Map the vdu id<->app name #
626 #############################
627 try:
628 await self.Relate(model_name, vnfd)
629 except KeyError as ex:
630 # We don't currently support relations between NS and VNF/VDU charms
631 self.log.warn("[N2VC] Relations not supported: {}".format(ex))
632 except Exception:
633 # This may happen if not all of the charms needed by the relation
634 # are ready. We can safely ignore this, because Relate will be
635 # retried when the endpoint of the relation is deployed.
636 self.log.warn("[N2VC] Relations not ready")
637
638 # #######################################
639 # # Execute initial config primitive(s) #
640 # #######################################
641 uuids = await self.ExecuteInitialPrimitives(
642 model_name, application_name, params,
643 )
644 return uuids
645
646 # primitives = {}
647 #
648 # # Build a sequential list of the primitives to execute
649 # for primitive in params['initial-config-primitive']:
650 # try:
651 # if primitive['name'] == 'config':
652 # # This is applied when the Application is deployed
653 # pass
654 # else:
655 # seq = primitive['seq']
656 #
657 # params = {}
658 # if 'parameter' in primitive:
659 # params = primitive['parameter']
660 #
661 # primitives[seq] = {
662 # 'name': primitive['name'],
663 # 'parameters': self._map_primitive_parameters(
664 # params,
665 # {'<rw_mgmt_ip>': rw_mgmt_ip}
666 # ),
667 # }
668 #
669 # for primitive in sorted(primitives):
670 # await self.ExecutePrimitive(
671 # model_name,
672 # application_name,
673 # primitives[primitive]['name'],
674 # callback,
675 # callback_args,
676 # **primitives[primitive]['parameters'],
677 # )
678 # except N2VCPrimitiveExecutionFailed as e:
679 # self.log.debug(
680 # "[N2VC] Exception executing primitive: {}".format(e)
681 # )
682 # raise
683
684 async def GetPrimitiveStatus(self, model_name, uuid):
685 """Get the status of an executed Primitive.
686
687 The status of an executed Primitive will be one of three values:
688 - completed
689 - failed
690 - running
691 """
692 status = None
693 try:
694 if not self.authenticated:
695 await self.login()
696
697 model = await self.get_model(model_name)
698
699 results = await model.get_action_status(uuid)
700
701 if uuid in results:
702 status = results[uuid]
703
704 except Exception as e:
705 self.log.debug(
706 "Caught exception while getting primitive status: {}".format(e)
707 )
708 raise N2VCPrimitiveExecutionFailed(e)
709
710 return status
711
712 async def GetPrimitiveOutput(self, model_name, uuid):
713 """Get the output of an executed Primitive.
714
715 Note: this only returns output for a successfully executed primitive.
716 """
717 results = None
718 try:
719 if not self.authenticated:
720 await self.login()
721
722 model = await self.get_model(model_name)
723 results = await model.get_action_output(uuid, 60)
724 except Exception as e:
725 self.log.debug(
726 "Caught exception while getting primitive status: {}".format(e)
727 )
728 raise N2VCPrimitiveExecutionFailed(e)
729
730 return results
731
732 # async def ProvisionMachine(self, model_name, hostname, username):
733 # """Provision machine for usage with Juju.
734 #
735 # Provisions a previously instantiated machine for use with Juju.
736 # """
737 # try:
738 # if not self.authenticated:
739 # await self.login()
740 #
741 # # FIXME: This is hard-coded until model-per-ns is added
742 # model_name = 'default'
743 #
744 # model = await self.get_model(model_name)
745 # model.add_machine(spec={})
746 #
747 # machine = await model.add_machine(spec='ssh:{}@{}:{}'.format(
748 # "ubuntu",
749 # host['address'],
750 # private_key_path,
751 # ))
752 # return machine.id
753 #
754 # except Exception as e:
755 # self.log.debug(
756 # "Caught exception while getting primitive status: {}".format(e)
757 # )
758 # raise N2VCPrimitiveExecutionFailed(e)
759
760 def GetPrivateKeyPath(self):
761 homedir = os.environ["HOME"]
762 sshdir = "{}/.ssh".format(homedir)
763 private_key_path = "{}/id_n2vc_rsa".format(sshdir)
764 return private_key_path
765
766 async def GetPublicKey(self):
767 """Get the N2VC SSH public key.abs
768
769 Returns the SSH public key, to be injected into virtual machines to
770 be managed by the VCA.
771
772 The first time this is run, a ssh keypair will be created. The public
773 key is injected into a VM so that we can provision the machine with
774 Juju, after which Juju will communicate with the VM directly via the
775 juju agent.
776 """
777 # public_key = ""
778
779 # Find the path to where we expect our key to live.
780 homedir = os.environ["HOME"]
781 sshdir = "{}/.ssh".format(homedir)
782 if not os.path.exists(sshdir):
783 os.mkdir(sshdir)
784
785 private_key_path = "{}/id_n2vc_rsa".format(sshdir)
786 public_key_path = "{}.pub".format(private_key_path)
787
788 # If we don't have a key generated, generate it.
789 if not os.path.exists(private_key_path):
790 cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
791 "rsa", "4096", private_key_path
792 )
793 subprocess.check_output(shlex.split(cmd))
794
795 # Read the public key
796 with open(public_key_path, "r") as f:
797 public_key = f.readline()
798
799 return public_key
800
801 async def ExecuteInitialPrimitives(
802 self, model_name, application_name, params, callback=None, *callback_args
803 ):
804 """Execute multiple primitives.
805
806 Execute multiple primitives as declared in initial-config-primitive.
807 This is useful in cases where the primitives initially failed -- for
808 example, if the charm is a proxy but the proxy hasn't been configured
809 yet.
810 """
811 uuids = []
812 primitives = {}
813
814 # Build a sequential list of the primitives to execute
815 for primitive in params["initial-config-primitive"]:
816 try:
817 if primitive["name"] == "config":
818 pass
819 else:
820 seq = primitive["seq"]
821
822 params_ = {}
823 if "parameter" in primitive:
824 params_ = primitive["parameter"]
825
826 user_values = params.get("user_values", {})
827 if "rw_mgmt_ip" not in user_values:
828 user_values["rw_mgmt_ip"] = None
829 # just for backward compatibility, because it will be provided
830 # always by modern version of LCM
831
832 primitives[seq] = {
833 "name": primitive["name"],
834 "parameters": self._map_primitive_parameters(
835 params_, user_values
836 ),
837 }
838
839 for primitive in sorted(primitives):
840 try:
841 # self.log.debug("Queuing action {}".format(
842 # primitives[primitive]['name']))
843 uuids.append(
844 await self.ExecutePrimitive(
845 model_name,
846 application_name,
847 primitives[primitive]["name"],
848 callback,
849 callback_args,
850 **primitives[primitive]["parameters"],
851 )
852 )
853 except PrimitiveDoesNotExist as e:
854 self.log.debug(
855 "Ignoring exception PrimitiveDoesNotExist: {}".format(e)
856 )
857 pass
858 except Exception as e:
859 self.log.debug(
860 (
861 "XXXXXXXXXXXXXXXXXXXXXXXXX Unexpected exception: {}"
862 ).format(e)
863 )
864 raise e
865
866 except N2VCPrimitiveExecutionFailed as e:
867 self.log.debug("[N2VC] Exception executing primitive: {}".format(e))
868 raise
869 return uuids
870
871 async def ExecutePrimitive(
872 self,
873 model_name,
874 application_name,
875 primitive,
876 callback,
877 *callback_args,
878 **params
879 ):
880 """Execute a primitive of a charm for Day 1 or Day 2 configuration.
881
882 Execute a primitive defined in the VNF descriptor.
883
884 :param str model_name: The name or unique id of the network service.
885 :param str application_name: The name of the application
886 :param str primitive: The name of the primitive to execute.
887 :param obj callback: A callback function to receive status changes.
888 :param tuple callback_args: A list of arguments to be passed to the
889 callback function.
890 :param dict params: A dictionary of key=value pairs representing the
891 primitive's parameters
892 Examples::
893 {
894 'rw_mgmt_ip': '1.2.3.4',
895 # Pass the initial-config-primitives section of the vnf or vdu
896 'initial-config-primitives': {...}
897 }
898 """
899 self.log.debug("Executing primitive={} params={}".format(primitive, params))
900 uuid = None
901 try:
902 if not self.authenticated:
903 await self.login()
904
905 model = await self.get_model(model_name)
906
907 if primitive == "config":
908 # config is special, and expecting params to be a dictionary
909 await self.set_config(
910 model, application_name, params["params"],
911 )
912 else:
913 app = await self.get_application(model, application_name)
914 if app:
915 # Does this primitive exist?
916 actions = await app.get_actions()
917
918 if primitive not in actions.keys():
919 raise PrimitiveDoesNotExist(
920 "Primitive {} does not exist".format(primitive)
921 )
922
923 # Run against the first (and probably only) unit in the app
924 unit = app.units[0]
925 if unit:
926 action = await unit.run_action(primitive, **params)
927 uuid = action.id
928 except PrimitiveDoesNotExist as e:
929 # Catch and raise this exception if it's thrown from the inner block
930 raise e
931 except Exception as e:
932 # An unexpected exception was caught
933 self.log.debug("Caught exception while executing primitive: {}".format(e))
934 raise N2VCPrimitiveExecutionFailed(e)
935 return uuid
936
937 async def RemoveCharms(
938 self, model_name, application_name, callback=None, *callback_args
939 ):
940 """Remove a charm from the VCA.
941
942 Remove a charm referenced in a VNF Descriptor.
943
944 :param str model_name: The name of the network service.
945 :param str application_name: The name of the application
946 :param obj callback: A callback function to receive status changes.
947 :param tuple callback_args: A list of arguments to be passed to the
948 callback function.
949 """
950 try:
951 if not self.authenticated:
952 await self.login()
953
954 model = await self.get_model(model_name)
955 app = await self.get_application(model, application_name)
956 if app:
957 # Remove this application from event monitoring
958 await self.Unsubscribe(model_name, application_name)
959
960 # self.notify_callback(model_name, application_name, "removing",
961 # callback, *callback_args)
962 self.log.debug("Removing the application {}".format(application_name))
963 await app.remove()
964
965 # await self.disconnect_model(self.monitors[model_name])
966
967 self.notify_callback(
968 model_name,
969 application_name,
970 "removed",
971 "Removing charm {}".format(application_name),
972 callback,
973 *callback_args,
974 )
975
976 except Exception as e:
977 print("Caught exception: {}".format(e))
978 self.log.debug(e)
979 raise e
980
981 async def CreateNetworkService(self, ns_uuid):
982 """Create a new Juju model for the Network Service.
983
984 Creates a new Model in the Juju Controller.
985
986 :param str ns_uuid: A unique id representing an instaance of a
987 Network Service.
988
989 :returns: True if the model was created. Raises JujuError on failure.
990 """
991 if not self.authenticated:
992 await self.login()
993
994 models = await self.controller.list_models()
995 if ns_uuid not in models:
996 # Get the new model
997 await self.get_model(ns_uuid)
998
999 return True
1000
1001 async def DestroyNetworkService(self, ns_uuid):
1002 """Destroy a Network Service.
1003
1004 Destroy the Network Service and any deployed charms.
1005
1006 :param ns_uuid The unique id of the Network Service
1007
1008 :returns: True if the model was created. Raises JujuError on failure.
1009 """
1010
1011 # Do not delete the default model. The default model was used by all
1012 # Network Services, prior to the implementation of a model per NS.
1013 if ns_uuid.lower() == "default":
1014 return False
1015
1016 if not self.authenticated:
1017 await self.login()
1018
1019 models = await self.controller.list_models()
1020 if ns_uuid in models:
1021 model = await self.controller.get_model(ns_uuid)
1022
1023 for application in model.applications:
1024 app = model.applications[application]
1025
1026 await self.RemoveCharms(ns_uuid, application)
1027
1028 self.log.debug("Unsubscribing Watcher for {}".format(application))
1029 await self.Unsubscribe(ns_uuid, application)
1030
1031 self.log.debug("Waiting for application to terminate")
1032 timeout = 30
1033 try:
1034 await model.block_until(
1035 lambda: all(
1036 unit.workload_status in ["terminated"] for unit in app.units
1037 ),
1038 timeout=timeout,
1039 )
1040 except Exception:
1041 self.log.debug(
1042 "Timed out waiting for {} to terminate.".format(application)
1043 )
1044
1045 for machine in model.machines:
1046 try:
1047 self.log.debug("Destroying machine {}".format(machine))
1048 await model.machines[machine].destroy(force=True)
1049 except JujuAPIError as e:
1050 if "does not exist" in str(e):
1051 # Our cached model may be stale, because the machine
1052 # has already been removed. It's safe to continue.
1053 continue
1054 else:
1055 self.log.debug("Caught exception: {}".format(e))
1056 raise e
1057
1058 # Disconnect from the Model
1059 if ns_uuid in self.models:
1060 self.log.debug("Disconnecting model {}".format(ns_uuid))
1061 # await self.disconnect_model(self.models[ns_uuid])
1062 await self.disconnect_model(ns_uuid)
1063
1064 try:
1065 self.log.debug("Destroying model {}".format(ns_uuid))
1066 await self.controller.destroy_models(ns_uuid)
1067 except JujuError:
1068 raise NetworkServiceDoesNotExist(
1069 "The Network Service '{}' does not exist".format(ns_uuid)
1070 )
1071
1072 return True
1073
1074 async def GetMetrics(self, model_name, application_name):
1075 """Get the metrics collected by the VCA.
1076
1077 :param model_name The name or unique id of the network service
1078 :param application_name The name of the application
1079 """
1080 metrics = {}
1081 model = await self.get_model(model_name)
1082 app = await self.get_application(model, application_name)
1083 if app:
1084 metrics = await app.get_metrics()
1085
1086 return metrics
1087
1088 async def HasApplication(self, model_name, application_name):
1089 model = await self.get_model(model_name)
1090 app = await self.get_application(model, application_name)
1091 if app:
1092 return True
1093 return False
1094
1095 async def Subscribe(self, ns_name, application_name, callback, *callback_args):
1096 """Subscribe to callbacks for an application.
1097
1098 :param ns_name str: The name of the Network Service
1099 :param application_name str: The name of the application
1100 :param callback obj: The callback method
1101 :param callback_args list: The list of arguments to append to calls to
1102 the callback method
1103 """
1104 self.monitors[ns_name].AddApplication(
1105 application_name, callback, *callback_args
1106 )
1107
1108 async def Unsubscribe(self, ns_name, application_name):
1109 """Unsubscribe to callbacks for an application.
1110
1111 Unsubscribes the caller from notifications from a deployed application.
1112
1113 :param ns_name str: The name of the Network Service
1114 :param application_name str: The name of the application
1115 """
1116 self.monitors[ns_name].RemoveApplication(application_name,)
1117
1118 # Non-public methods
1119 async def add_relation(self, model_name, relation1, relation2):
1120 """
1121 Add a relation between two application endpoints.
1122
1123 :param str model_name: The name or unique id of the network service
1124 :param str relation1: '<application>[:<relation_name>]'
1125 :param str relation2: '<application>[:<relation_name>]'
1126 """
1127
1128 if not self.authenticated:
1129 await self.login()
1130
1131 m = await self.get_model(model_name)
1132 try:
1133 await m.add_relation(relation1, relation2)
1134 except JujuAPIError as e:
1135 # If one of the applications in the relationship doesn't exist,
1136 # or the relation has already been added, let the operation fail
1137 # silently.
1138 if "not found" in e.message:
1139 return
1140 if "already exists" in e.message:
1141 return
1142
1143 raise e
1144
1145 # async def apply_config(self, config, application):
1146 # """Apply a configuration to the application."""
1147 # print("JujuApi: Applying configuration to {}.".format(
1148 # application
1149 # ))
1150 # return await self.set_config(application=application, config=config)
1151
1152 def _get_config_from_dict(self, config_primitive, values):
1153 """Transform the yang config primitive to dict.
1154
1155 Expected result:
1156
1157 config = {
1158 'config':
1159 }
1160 """
1161 config = {}
1162 for primitive in config_primitive:
1163 if primitive["name"] == "config":
1164 # config = self._map_primitive_parameters()
1165 for parameter in primitive["parameter"]:
1166 param = str(parameter["name"])
1167 if parameter["value"] == "<rw_mgmt_ip>":
1168 config[param] = str(values[parameter["value"]])
1169 else:
1170 config[param] = str(parameter["value"])
1171
1172 return config
1173
1174 def _map_primitive_parameters(self, parameters, user_values):
1175 params = {}
1176 for parameter in parameters:
1177 param = str(parameter["name"])
1178 value = parameter.get("value")
1179
1180 # map parameters inside a < >; e.g. <rw_mgmt_ip>. with the provided user
1181 # _values.
1182 # Must exist at user_values except if there is a default value
1183 if isinstance(value, str) and value.startswith("<") and value.endswith(">"):
1184 if parameter["value"][1:-1] in user_values:
1185 value = user_values[parameter["value"][1:-1]]
1186 elif "default-value" in parameter:
1187 value = parameter["default-value"]
1188 else:
1189 raise KeyError(
1190 "parameter {}='{}' not supplied ".format(param, value)
1191 )
1192
1193 # If there's no value, use the default-value (if set)
1194 if value is None and "default-value" in parameter:
1195 value = parameter["default-value"]
1196
1197 # Typecast parameter value, if present
1198 paramtype = "string"
1199 try:
1200 if "data-type" in parameter:
1201 paramtype = str(parameter["data-type"]).lower()
1202
1203 if paramtype == "integer":
1204 value = int(value)
1205 elif paramtype == "boolean":
1206 value = bool(value)
1207 else:
1208 value = str(value)
1209 else:
1210 # If there's no data-type, assume the value is a string
1211 value = str(value)
1212 except ValueError:
1213 raise ValueError(
1214 "parameter {}='{}' cannot be converted to type {}".format(
1215 param, value, paramtype
1216 )
1217 )
1218
1219 params[param] = value
1220 return params
1221
1222 def _get_config_from_yang(self, config_primitive, values):
1223 """Transform the yang config primitive to dict."""
1224 config = {}
1225 for primitive in config_primitive.values():
1226 if primitive["name"] == "config":
1227 for parameter in primitive["parameter"].values():
1228 param = str(parameter["name"])
1229 if parameter["value"] == "<rw_mgmt_ip>":
1230 config[param] = str(values[parameter["value"]])
1231 else:
1232 config[param] = str(parameter["value"])
1233
1234 return config
1235
1236 def FormatApplicationName(self, *args):
1237 """
1238 Generate a Juju-compatible Application name
1239
1240 :param args tuple: Positional arguments to be used to construct the
1241 application name.
1242
1243 Limitations::
1244 - Only accepts characters a-z and non-consequitive dashes (-)
1245 - Application name should not exceed 50 characters
1246
1247 Examples::
1248
1249 FormatApplicationName("ping_pong_ns", "ping_vnf", "a")
1250 """
1251 appname = ""
1252 for c in "-".join(list(args)):
1253 if c.isdigit():
1254 c = chr(97 + int(c))
1255 elif not c.isalpha():
1256 c = "-"
1257 appname += c
1258 return re.sub("-+", "-", appname.lower())
1259
1260 # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0):
1261 # """Format the name of the application
1262 #
1263 # Limitations:
1264 # - Only accepts characters a-z and non-consequitive dashes (-)
1265 # - Application name should not exceed 50 characters
1266 # """
1267 # name = "{}-{}-{}".format(nsd_name, vnfr_name, member_vnf_index)
1268 # new_name = ''
1269 # for c in name:
1270 # if c.isdigit():
1271 # c = chr(97 + int(c))
1272 # elif not c.isalpha():
1273 # c = "-"
1274 # new_name += c
1275 # return re.sub('\-+', '-', new_name.lower())
1276
1277 def format_model_name(self, name):
1278 """Format the name of model.
1279
1280 Model names may only contain lowercase letters, digits and hyphens
1281 """
1282
1283 return name.replace("_", "-").lower()
1284
1285 async def get_application(self, model, application):
1286 """Get the deployed application."""
1287 if not self.authenticated:
1288 await self.login()
1289
1290 app = None
1291 if application and model:
1292 if model.applications:
1293 if application in model.applications:
1294 app = model.applications[application]
1295
1296 return app
1297
1298 async def get_model(self, model_name):
1299 """Get a model from the Juju Controller.
1300
1301 Note: Model objects returned must call disconnected() before it goes
1302 out of scope."""
1303 if not self.authenticated:
1304 await self.login()
1305
1306 if model_name not in self.models:
1307 # Get the models in the controller
1308 models = await self.controller.list_models()
1309
1310 if model_name not in models:
1311 try:
1312 self.models[model_name] = await self.controller.add_model(
1313 model_name, config={"authorized-keys": self.juju_public_key}
1314 )
1315 except JujuError as e:
1316 if "already exists" not in e.message:
1317 raise e
1318 else:
1319 self.models[model_name] = await self.controller.get_model(model_name)
1320
1321 self.refcount["model"] += 1
1322
1323 # Create an observer for this model
1324 await self.create_model_monitor(model_name)
1325
1326 return self.models[model_name]
1327
1328 async def create_model_monitor(self, model_name):
1329 """Create a monitor for the model, if none exists."""
1330 if not self.authenticated:
1331 await self.login()
1332
1333 if model_name not in self.monitors:
1334 self.monitors[model_name] = VCAMonitor(model_name)
1335 self.models[model_name].add_observer(self.monitors[model_name])
1336
1337 return True
1338
1339 async def login(self):
1340 """Login to the Juju controller."""
1341
1342 if self.authenticated:
1343 return
1344
1345 self.connecting = True
1346
1347 self.log.debug("JujuApi: Logging into controller")
1348
1349 self.controller = Controller(loop=self.loop)
1350
1351 if self.secret:
1352 self.log.debug(
1353 "Connecting to controller... ws://{} as {}/{}".format(
1354 self.endpoint, self.user, self.secret,
1355 )
1356 )
1357 try:
1358 await self.controller.connect(
1359 endpoint=self.endpoint,
1360 username=self.user,
1361 password=self.secret,
1362 cacert=self.ca_cert,
1363 )
1364 self.refcount["controller"] += 1
1365 self.authenticated = True
1366 self.log.debug("JujuApi: Logged into controller")
1367 except Exception as ex:
1368 self.log.debug("Caught exception: {}".format(ex))
1369 else:
1370 # current_controller no longer exists
1371 # self.log.debug("Connecting to current controller...")
1372 # await self.controller.connect_current()
1373 # await self.controller.connect(
1374 # endpoint=self.endpoint,
1375 # username=self.user,
1376 # cacert=cacert,
1377 # )
1378 self.log.fatal("VCA credentials not configured.")
1379 self.authenticated = False
1380
1381 async def logout(self):
1382 """Logout of the Juju controller."""
1383 if not self.authenticated:
1384 return False
1385
1386 try:
1387 for model in self.models:
1388 await self.disconnect_model(model)
1389
1390 if self.controller:
1391 self.log.debug("Disconnecting controller {}".format(self.controller))
1392 await self.controller.disconnect()
1393 self.refcount["controller"] -= 1
1394 self.controller = None
1395
1396 self.authenticated = False
1397
1398 self.log.debug(self.refcount)
1399
1400 except Exception as e:
1401 self.log.fatal("Fatal error logging out of Juju Controller: {}".format(e))
1402 raise e
1403 return True
1404
1405 async def disconnect_model(self, model):
1406 self.log.debug("Disconnecting model {}".format(model))
1407 if model in self.models:
1408 try:
1409 await self.models[model].disconnect()
1410 self.refcount["model"] -= 1
1411 self.models[model] = None
1412 except Exception as e:
1413 self.log.debug("Caught exception: {}".format(e))
1414
1415 async def provision_machine(
1416 self, model_name: str, hostname: str, username: str, private_key_path: str
1417 ) -> int:
1418 """Provision a machine.
1419
1420 This executes the SSH provisioner, which will log in to a machine via
1421 SSH and prepare it for use with the Juju model
1422
1423 :param model_name str: The name of the model
1424 :param hostname str: The IP or hostname of the target VM
1425 :param user str: The username to login to
1426 :param private_key_path str: The path to the private key that's been injected
1427 to the VM via cloud-init
1428 :return machine_id int: Returns the id of the machine or None if provisioning
1429 fails
1430 """
1431 if not self.authenticated:
1432 await self.login()
1433
1434 machine_id = None
1435
1436 if self.api_proxy:
1437 self.log.debug(
1438 "Instantiating SSH Provisioner for {}@{} ({})".format(
1439 username, hostname, private_key_path
1440 )
1441 )
1442 provisioner = SSHProvisioner(
1443 host=hostname,
1444 user=username,
1445 private_key_path=private_key_path,
1446 log=self.log,
1447 )
1448
1449 params = None
1450 try:
1451 params = provisioner.provision_machine()
1452 except Exception as ex:
1453 self.log.debug("caught exception from provision_machine: {}".format(ex))
1454 return None
1455
1456 if params:
1457 params.jobs = ["JobHostUnits"]
1458
1459 model = await self.get_model(model_name)
1460
1461 connection = model.connection()
1462
1463 # Submit the request.
1464 self.log.debug("Adding machine to model")
1465 client_facade = client.ClientFacade.from_connection(connection)
1466 results = await client_facade.AddMachines(params=[params])
1467 error = results.machines[0].error
1468 if error:
1469 raise ValueError("Error adding machine: %s" % error.message)
1470
1471 machine_id = results.machines[0].machine
1472
1473 # Need to run this after AddMachines has been called,
1474 # as we need the machine_id
1475 self.log.debug("Installing Juju agent")
1476 await provisioner.install_agent(
1477 connection, params.nonce, machine_id, self.api_proxy,
1478 )
1479 else:
1480 self.log.debug("Missing API Proxy")
1481 return machine_id
1482
1483 # async def remove_application(self, name):
1484 # """Remove the application."""
1485 # if not self.authenticated:
1486 # await self.login()
1487 #
1488 # app = await self.get_application(name)
1489 # if app:
1490 # self.log.debug("JujuApi: Destroying application {}".format(
1491 # name,
1492 # ))
1493 #
1494 # await app.destroy()
1495
1496 async def remove_relation(self, a, b):
1497 """
1498 Remove a relation between two application endpoints
1499
1500 :param a An application endpoint
1501 :param b An application endpoint
1502 """
1503 if not self.authenticated:
1504 await self.login()
1505
1506 # m = await self.get_model()
1507 # try:
1508 # m.remove_relation(a, b)
1509 # finally:
1510 # await m.disconnect()
1511
1512 async def resolve_error(self, model_name, application=None):
1513 """Resolve units in error state."""
1514 if not self.authenticated:
1515 await self.login()
1516
1517 model = await self.get_model(model_name)
1518
1519 app = await self.get_application(model, application)
1520 if app:
1521 self.log.debug(
1522 "JujuApi: Resolving errors for application {}".format(application,)
1523 )
1524
1525 for _ in app.units:
1526 app.resolved(retry=True)
1527
1528 async def run_action(self, model_name, application, action_name, **params):
1529 """Execute an action and return an Action object."""
1530 if not self.authenticated:
1531 await self.login()
1532 result = {"status": "", "action": {"tag": None, "results": None}}
1533
1534 model = await self.get_model(model_name)
1535
1536 app = await self.get_application(model, application)
1537 if app:
1538 # We currently only have one unit per application
1539 # so use the first unit available.
1540 unit = app.units[0]
1541
1542 self.log.debug(
1543 "JujuApi: Running Action {} against Application {}".format(
1544 action_name, application,
1545 )
1546 )
1547
1548 action = await unit.run_action(action_name, **params)
1549
1550 # Wait for the action to complete
1551 await action.wait()
1552
1553 result["status"] = action.status
1554 result["action"]["tag"] = action.data["id"]
1555 result["action"]["results"] = action.results
1556
1557 return result
1558
1559 async def set_config(self, model_name, application, config):
1560 """Apply a configuration to the application."""
1561 if not self.authenticated:
1562 await self.login()
1563
1564 app = await self.get_application(model_name, application)
1565 if app:
1566 self.log.debug(
1567 "JujuApi: Setting config for Application {}".format(application,)
1568 )
1569 await app.set_config(config)
1570
1571 # Verify the config is set
1572 newconf = await app.get_config()
1573 for key in config:
1574 if config[key] != newconf[key]["value"]:
1575 self.log.debug(
1576 (
1577 "JujuApi: Config not set! Key {} Value {} doesn't match {}"
1578 ).format(key, config[key], newconf[key])
1579 )
1580
1581 # async def set_parameter(self, parameter, value, application=None):
1582 # """Set a config parameter for a service."""
1583 # if not self.authenticated:
1584 # await self.login()
1585 #
1586 # self.log.debug("JujuApi: Setting {}={} for Application {}".format(
1587 # parameter,
1588 # value,
1589 # application,
1590 # ))
1591 # return await self.apply_config(
1592 # {parameter: value},
1593 # application=application,
1594 # )
1595
1596 async def wait_for_application(self, model_name, application_name, timeout=300):
1597 """Wait for an application to become active."""
1598 if not self.authenticated:
1599 await self.login()
1600
1601 model = await self.get_model(model_name)
1602
1603 app = await self.get_application(model, application_name)
1604 self.log.debug("Application: {}".format(app))
1605 if app:
1606 self.log.debug(
1607 "JujuApi: Waiting {} seconds for Application {}".format(
1608 timeout, application_name,
1609 )
1610 )
1611
1612 await model.block_until(
1613 lambda: all(
1614 unit.agent_status == "idle"
1615 and unit.workload_status in ["active", "unknown"]
1616 for unit in app.units
1617 ),
1618 timeout=timeout,
1619 )