Improved Primitive support and better testing
[osm/N2VC.git] / n2vc / vnf.py
1
2 import logging
3 import os
4 import os.path
5 import re
6 import ssl
7 import sys
8 import time
9
10 # FIXME: this should load the juju inside or modules without having to
11 # explicitly install it. Check why it's not working.
12 # Load our subtree of the juju library
13 path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
14 path = os.path.join(path, "modules/libjuju/")
15 if path not in sys.path:
16 sys.path.insert(1, path)
17
18 from juju.controller import Controller
19 from juju.model import Model, ModelObserver
20
21
22 # We might need this to connect to the websocket securely, but test and verify.
23 try:
24 ssl._create_default_https_context = ssl._create_unverified_context
25 except AttributeError:
26 # Legacy Python doesn't verify by default (see pep-0476)
27 # https://www.python.org/dev/peps/pep-0476/
28 pass
29
30
31 # Custom exceptions
32 class JujuCharmNotFound(Exception):
33 """The Charm can't be found or is not readable."""
34
35
36 class JujuApplicationExists(Exception):
37 """The Application already exists."""
38
39
40 class N2VCPrimitiveExecutionFailed(Exception):
41 """Something failed while attempting to execute a primitive."""
42
43
44 # Quiet the debug logging
45 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
46 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
47 logging.getLogger('juju.model').setLevel(logging.WARN)
48 logging.getLogger('juju.machine').setLevel(logging.WARN)
49
50
51 class VCAMonitor(ModelObserver):
52 """Monitor state changes within the Juju Model."""
53 log = None
54 ns_name = None
55 applications = {}
56
57 def __init__(self, ns_name):
58 self.log = logging.getLogger(__name__)
59
60 self.ns_name = ns_name
61
62 def AddApplication(self, application_name, callback, *callback_args):
63 if application_name not in self.applications:
64 self.applications[application_name] = {
65 'callback': callback,
66 'callback_args': callback_args
67 }
68
69 def RemoveApplication(self, application_name):
70 if application_name in self.applications:
71 del self.applications[application_name]
72
73 async def on_change(self, delta, old, new, model):
74 """React to changes in the Juju model."""
75
76 if delta.entity == "unit":
77 # Ignore change events from other applications
78 if delta.data['application'] not in self.applications.keys():
79 return
80
81 try:
82
83 application_name = delta.data['application']
84
85 callback = self.applications[application_name]['callback']
86 callback_args = self.applications[application_name]['callback_args']
87
88 if old and new:
89 old_status = old.workload_status
90 new_status = new.workload_status
91
92 if old_status == new_status:
93 """The workload status may fluctuate around certain events,
94 so wait until the status has stabilized before triggering
95 the callback."""
96 if callback:
97 callback(
98 self.ns_name,
99 delta.data['application'],
100 new_status,
101 new.workload_status_message,
102 *callback_args)
103
104 if old and not new:
105 # This is a charm being removed
106 if callback:
107 callback(
108 self.ns_name,
109 delta.data['application'],
110 "removed",
111 "",
112 *callback_args)
113 except Exception as e:
114 self.log.debug("[1] notify_callback exception {}".format(e))
115 elif delta.entity == "action":
116 # TODO: Decide how we want to notify the user of actions
117
118 # uuid = delta.data['id'] # The Action's unique id
119 # msg = delta.data['message'] # The output of the action
120 #
121 # if delta.data['status'] == "pending":
122 # # The action is queued
123 # pass
124 # elif delta.data['status'] == "completed""
125 # # The action was successful
126 # pass
127 # elif delta.data['status'] == "failed":
128 # # The action failed.
129 # pass
130
131 pass
132
133 ########
134 # TODO
135 #
136 # Create unique models per network service
137 # Document all public functions
138
139
140 class N2VC:
141
142 # Juju API
143 api = None
144 log = None
145 controller = None
146 connecting = False
147 authenticated = False
148
149 models = {}
150 default_model = None
151
152 # Model Observers
153 monitors = {}
154
155 # VCA config
156 hostname = ""
157 port = 17070
158 username = ""
159 secret = ""
160
161 def __init__(self,
162 log=None,
163 server='127.0.0.1',
164 port=17070,
165 user='admin',
166 secret=None,
167 artifacts=None
168 ):
169 """Initialize N2VC
170
171 :param vcaconfig dict A dictionary containing the VCA configuration
172
173 :param artifacts str The directory where charms required by a vnfd are
174 stored.
175
176 :Example:
177 n2vc = N2VC(vcaconfig={
178 'secret': 'MzI3MDJhOTYxYmM0YzRjNTJiYmY1Yzdm',
179 'user': 'admin',
180 'ip-address': '10.44.127.137',
181 'port': 17070,
182 'artifacts': '/path/to/charms'
183 })
184
185 """
186
187 if log:
188 self.log = log
189 else:
190 self.log = logging.getLogger(__name__)
191
192 # Quiet websocket traffic
193 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
194 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
195 logging.getLogger('model').setLevel(logging.WARN)
196 # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG)
197
198 self.log.debug('JujuApi: instantiated')
199
200 self.server = server
201 self.port = port
202
203 self.secret = secret
204 if user.startswith('user-'):
205 self.user = user
206 else:
207 self.user = 'user-{}'.format(user)
208
209 self.endpoint = '%s:%d' % (server, int(port))
210
211 self.artifacts = artifacts
212
213 def __del__(self):
214 """Close any open connections."""
215 yield self.logout()
216
217 def notify_callback(self, model_name, application_name, status, message, callback=None, *callback_args):
218 try:
219 if callback:
220 callback(model_name, application_name, status, message, *callback_args)
221 except Exception as e:
222 self.log.error("[0] notify_callback exception {}".format(e))
223 raise e
224 return True
225
226 # Public methods
227 async def CreateNetworkService(self, nsd):
228 """Create a new model to encapsulate this network service.
229
230 Create a new model in the Juju controller to encapsulate the
231 charms associated with a network service.
232
233 You can pass either the nsd record or the id of the network
234 service, but this method will fail without one of them.
235 """
236 if not self.authenticated:
237 await self.login()
238
239 # Ideally, we will create a unique model per network service.
240 # This change will require all components, i.e., LCM and SO, to use
241 # N2VC for 100% compatibility. If we adopt unique models for the LCM,
242 # services deployed via LCM would't be manageable via SO and vice versa
243
244 return self.default_model
245
246 async def DeployCharms(self, model_name, application_name, vnfd, charm_path, params={}, machine_spec={}, callback=None, *callback_args):
247 """Deploy one or more charms associated with a VNF.
248
249 Deploy the charm(s) referenced in a VNF Descriptor.
250
251 :param str model_name: The name of the network service.
252 :param str application_name: The name of the application
253 :param dict vnfd: The name of the application
254 :param str charm_path: The path to the Juju charm
255 :param dict params: A dictionary of runtime parameters
256 Examples::
257 {
258 'rw_mgmt_ip': '1.2.3.4',
259 # Pass the initial-config-primitives section of the vnf or vdu
260 'initial-config-primitives': {...}
261 }
262 :param dict machine_spec: A dictionary describing the machine to install to
263 Examples::
264 {
265 'hostname': '1.2.3.4',
266 'username': 'ubuntu',
267 }
268 :param obj callback: A callback function to receive status changes.
269 :param tuple callback_args: A list of arguments to be passed to the callback
270 """
271
272 ########################################################
273 # Verify the path to the charm exists and is readable. #
274 ########################################################
275 if not os.path.exists(charm_path):
276 self.log.debug("Charm path doesn't exist: {}".format(charm_path))
277 self.notify_callback(model_name, application_name, "failed", callback, *callback_args)
278 raise JujuCharmNotFound("No artifacts configured.")
279
280 ################################
281 # Login to the Juju controller #
282 ################################
283 if not self.authenticated:
284 self.log.debug("Authenticating with Juju")
285 await self.login()
286
287 ##########################################
288 # Get the model for this network service #
289 ##########################################
290 # TODO: In a point release, we will use a model per deployed network
291 # service. In the meantime, we will always use the 'default' model.
292 model_name = 'default'
293 model = await self.get_model(model_name)
294
295 ########################################
296 # Verify the application doesn't exist #
297 ########################################
298 app = await self.get_application(model, application_name)
299 if app:
300 raise JujuApplicationExists("Can't deploy application \"{}\" to model \"{}\" because it already exists.".format(application_name, model_name))
301
302 ################################################################
303 # Register this application with the model-level event monitor #
304 ################################################################
305 if callback:
306 self.monitors[model_name].AddApplication(
307 application_name,
308 callback,
309 *callback_args
310 )
311
312 ########################################################
313 # Check for specific machine placement (native charms) #
314 ########################################################
315 to = ""
316 if machine_spec.keys():
317 # TODO: This needs to be tested.
318 # if all(k in machine_spec for k in ['hostname', 'username']):
319 # # Enlist the existing machine in Juju
320 # machine = await self.model.add_machine(spec='ssh:%@%'.format(
321 # specs['host'],
322 # specs['user'],
323 # ))
324 # to = machine.id
325 pass
326
327 #######################################
328 # Get the initial charm configuration #
329 #######################################
330
331 rw_mgmt_ip = None
332 if 'rw_mgmt_ip' in params:
333 rw_mgmt_ip = params['rw_mgmt_ip']
334
335 initial_config = self._get_config_from_dict(
336 params['initial-config-primitive'],
337 {'<rw_mgmt_ip>': rw_mgmt_ip}
338 )
339
340 self.log.debug("JujuApi: Deploying charm ({}) from {}".format(
341 application_name,
342 charm_path,
343 to=to,
344 ))
345
346 ########################################################
347 # Deploy the charm and apply the initial configuration #
348 ########################################################
349 app = await model.deploy(
350 # We expect charm_path to be either the path to the charm on disk
351 # or in the format of cs:series/name
352 charm_path,
353 # This is the formatted, unique name for this charm
354 application_name=application_name,
355 # Proxy charms should use the current LTS. This will need to be
356 # changed for native charms.
357 series='xenial',
358 # Apply the initial 'config' primitive during deployment
359 config=initial_config,
360 # TBD: Where to deploy the charm to.
361 to=None,
362 )
363
364 # #######################################
365 # # Execute initial config primitive(s) #
366 # #######################################
367 primitives = {}
368
369 # Build a sequential list of the primitives to execute
370 for primitive in params['initial-config-primitive']:
371 try:
372 if primitive['name'] == 'config':
373 # This is applied when the Application is deployed
374 pass
375 else:
376 seq = primitive['seq']
377
378 params = {}
379 if 'parameter' in primitive:
380 params = primitive['parameter']
381
382 primitives[seq] = {
383 'name': primitive['name'],
384 'parameters': self._map_primitive_parameters(
385 params,
386 {'<rw_mgmt_ip>': rw_mgmt_ip}
387 ),
388 }
389
390 for primitive in sorted(primitives):
391 await self.ExecutePrimitive(
392 model_name,
393 application_name,
394 primitives[primitive]['name'],
395 callback,
396 callback_args,
397 **primitives[primitive]['parameters'],
398 )
399 except N2VCPrimitiveExecutionFailed as e:
400 self.log.debug(
401 "[N2VC] Exception executing primitive: {}".format(e)
402 )
403 raise
404
405 async def GetPrimitiveStatus(self, model_name, uuid):
406 results = None
407 try:
408 if not self.authenticated:
409 await self.login()
410
411 # FIXME: This is hard-coded until model-per-ns is added
412 model_name = 'default'
413
414 model = await self.controller.get_model(model_name)
415
416 results = await model.get_action_output(uuid)
417
418 await model.disconnect()
419 except Exception as e:
420 self.log.debug(
421 "Caught exception while getting primitive status: {}".format(e)
422 )
423 raise N2VCPrimitiveExecutionFailed(e)
424
425 return results
426
427
428 async def ExecutePrimitive(self, model_name, application_name, primitive, callback, *callback_args, **params):
429 """Execute a primitive of a charm for Day 1 or Day 2 configuration.
430
431 Execute a primitive defined in the VNF descriptor.
432
433 :param str model_name: The name of the network service.
434 :param str application_name: The name of the application
435 :param str primitive: The name of the primitive to execute.
436 :param obj callback: A callback function to receive status changes.
437 :param tuple callback_args: A list of arguments to be passed to the callback function.
438 :param dict params: A dictionary of key=value pairs representing the primitive's parameters
439 Examples::
440 {
441 'rw_mgmt_ip': '1.2.3.4',
442 # Pass the initial-config-primitives section of the vnf or vdu
443 'initial-config-primitives': {...}
444 }
445 """
446 uuid = None
447 try:
448 if not self.authenticated:
449 await self.login()
450
451 # FIXME: This is hard-coded until model-per-ns is added
452 model_name = 'default'
453
454 model = await self.controller.get_model(model_name)
455
456 if primitive == 'config':
457 # config is special, and expecting params to be a dictionary
458 await self.set_config(
459 model,
460 application_name,
461 params['params'],
462 )
463 else:
464 app = await self.get_application(model, application_name)
465 if app:
466 # Run against the first (and probably only) unit in the app
467 unit = app.units[0]
468 if unit:
469 self.log.debug(
470 "Executing primitive {}".format(primitive)
471 )
472 action = await unit.run_action(primitive, **params)
473 uuid = action.id
474 await model.disconnect()
475 except Exception as e:
476 self.log.debug(
477 "Caught exception while executing primitive: {}".format(e)
478 )
479 raise N2VCPrimitiveExecutionFailed(e)
480 return uuid
481
482 async def RemoveCharms(self, model_name, application_name, callback=None, *callback_args):
483 """Remove a charm from the VCA.
484
485 Remove a charm referenced in a VNF Descriptor.
486
487 :param str model_name: The name of the network service.
488 :param str application_name: The name of the application
489 :param obj callback: A callback function to receive status changes.
490 :param tuple callback_args: A list of arguments to be passed to the callback function.
491 """
492 try:
493 if not self.authenticated:
494 await self.login()
495
496 model = await self.get_model(model_name)
497 app = await self.get_application(model, application_name)
498 if app:
499 # Remove this application from event monitoring
500 self.monitors[model_name].RemoveApplication(application_name)
501
502 # self.notify_callback(model_name, application_name, "removing", callback, *callback_args)
503 self.log.debug("Removing the application {}".format(application_name))
504 await app.remove()
505
506 # Notify the callback that this charm has been removed.
507 self.notify_callback(model_name, application_name, "removed", callback, *callback_args)
508
509 except Exception as e:
510 print("Caught exception: {}".format(e))
511 self.log.debug(e)
512 raise e
513
514 async def DestroyNetworkService(self, nsd):
515 raise NotImplementedError()
516
517 async def GetMetrics(self, model_name, application_name):
518 """Get the metrics collected by the VCA.
519
520 :param model_name The name of the model
521 :param application_name The name of the application
522 """
523 metrics = {}
524 model = await self.get_model(model_name)
525 app = await self.get_application(model, application_name)
526 if app:
527 metrics = await app.get_metrics()
528
529 return metrics
530
531 # Non-public methods
532 async def add_relation(self, a, b, via=None):
533 """
534 Add a relation between two application endpoints.
535
536 :param a An application endpoint
537 :param b An application endpoint
538 :param via The egress subnet(s) for outbound traffic, e.g.,
539 (192.168.0.0/16,10.0.0.0/8)
540 """
541 if not self.authenticated:
542 await self.login()
543
544 m = await self.get_model()
545 try:
546 m.add_relation(a, b, via)
547 finally:
548 await m.disconnect()
549
550 # async def apply_config(self, config, application):
551 # """Apply a configuration to the application."""
552 # print("JujuApi: Applying configuration to {}.".format(
553 # application
554 # ))
555 # return await self.set_config(application=application, config=config)
556
557 def _get_config_from_dict(self, config_primitive, values):
558 """Transform the yang config primitive to dict.
559
560 Expected result:
561
562 config = {
563 'config':
564 }
565 """
566 config = {}
567 for primitive in config_primitive:
568 if primitive['name'] == 'config':
569 # config = self._map_primitive_parameters()
570 for parameter in primitive['parameter']:
571 param = str(parameter['name'])
572 if parameter['value'] == "<rw_mgmt_ip>":
573 config[param] = str(values[parameter['value']])
574 else:
575 config[param] = str(parameter['value'])
576
577 return config
578
579 def _map_primitive_parameters(self, parameters, values):
580 params = {}
581 for parameter in parameters:
582 param = str(parameter['name'])
583 if parameter['value'] == "<rw_mgmt_ip>":
584 params[param] = str(values[parameter['value']])
585 else:
586 params[param] = str(parameter['value'])
587 return params
588
589 def _get_config_from_yang(self, config_primitive, values):
590 """Transform the yang config primitive to dict."""
591 config = {}
592 for primitive in config_primitive.values():
593 if primitive['name'] == 'config':
594 for parameter in primitive['parameter'].values():
595 param = str(parameter['name'])
596 if parameter['value'] == "<rw_mgmt_ip>":
597 config[param] = str(values[parameter['value']])
598 else:
599 config[param] = str(parameter['value'])
600
601 return config
602
603 def FormatApplicationName(self, *args):
604 """
605 Generate a Juju-compatible Application name
606
607 :param args tuple: Positional arguments to be used to construct the
608 application name.
609
610 Limitations::
611 - Only accepts characters a-z and non-consequitive dashes (-)
612 - Application name should not exceed 50 characters
613
614 Examples::
615
616 FormatApplicationName("ping_pong_ns", "ping_vnf", "a")
617 """
618
619 appname = ""
620 for c in "-".join(list(args)):
621 if c.isdigit():
622 c = chr(97 + int(c))
623 elif not c.isalpha():
624 c = "-"
625 appname += c
626 return re.sub('\-+', '-', appname.lower())
627
628
629 # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0):
630 # """Format the name of the application
631 #
632 # Limitations:
633 # - Only accepts characters a-z and non-consequitive dashes (-)
634 # - Application name should not exceed 50 characters
635 # """
636 # name = "{}-{}-{}".format(nsd_name, vnfr_name, member_vnf_index)
637 # new_name = ''
638 # for c in name:
639 # if c.isdigit():
640 # c = chr(97 + int(c))
641 # elif not c.isalpha():
642 # c = "-"
643 # new_name += c
644 # return re.sub('\-+', '-', new_name.lower())
645
646 def format_model_name(self, name):
647 """Format the name of model.
648
649 Model names may only contain lowercase letters, digits and hyphens
650 """
651
652 return name.replace('_', '-').lower()
653
654 async def get_application(self, model, application):
655 """Get the deployed application."""
656 if not self.authenticated:
657 await self.login()
658
659 app = None
660 if application and model:
661 if model.applications:
662 if application in model.applications:
663 app = model.applications[application]
664
665 return app
666
667 async def get_model(self, model_name='default'):
668 """Get a model from the Juju Controller.
669
670 Note: Model objects returned must call disconnected() before it goes
671 out of scope."""
672 if not self.authenticated:
673 await self.login()
674
675 if model_name not in self.models:
676 print("connecting to model {}".format(model_name))
677 self.models[model_name] = await self.controller.get_model(model_name)
678
679 # Create an observer for this model
680 self.monitors[model_name] = VCAMonitor(model_name)
681 self.models[model_name].add_observer(self.monitors[model_name])
682
683 return self.models[model_name]
684
685 async def login(self):
686 """Login to the Juju controller."""
687
688 if self.authenticated:
689 return
690
691 self.connecting = True
692
693 self.log.debug("JujuApi: Logging into controller")
694
695 cacert = None
696 self.controller = Controller()
697
698 if self.secret:
699 self.log.debug("Connecting to controller... ws://{}:{} as {}/{}".format(self.endpoint, self.port, self.user, self.secret))
700 await self.controller.connect(
701 endpoint=self.endpoint,
702 username=self.user,
703 password=self.secret,
704 cacert=cacert,
705 )
706 else:
707 # current_controller no longer exists
708 # self.log.debug("Connecting to current controller...")
709 # await self.controller.connect_current()
710 # await self.controller.connect(
711 # endpoint=self.endpoint,
712 # username=self.user,
713 # cacert=cacert,
714 # )
715 self.log.fatal("VCA credentials not configured.")
716
717 self.authenticated = True
718 self.log.debug("JujuApi: Logged into controller")
719
720 # self.default_model = await self.controller.get_model("default")
721
722 async def logout(self):
723 """Logout of the Juju controller."""
724 if not self.authenticated:
725 return
726
727 try:
728 if self.default_model:
729 self.log.debug("Disconnecting model {}".format(self.default_model))
730 await self.default_model.disconnect()
731 self.default_model = None
732
733 for model in self.models:
734 await self.models[model].disconnect()
735
736 if self.controller:
737 self.log.debug("Disconnecting controller {}".format(self.controller))
738 await self.controller.disconnect()
739 # self.controller = None
740
741 self.authenticated = False
742 except Exception as e:
743 self.log.fail("Fatal error logging out of Juju Controller: {}".format(e))
744 raise e
745
746
747 # async def remove_application(self, name):
748 # """Remove the application."""
749 # if not self.authenticated:
750 # await self.login()
751 #
752 # app = await self.get_application(name)
753 # if app:
754 # self.log.debug("JujuApi: Destroying application {}".format(
755 # name,
756 # ))
757 #
758 # await app.destroy()
759
760 async def remove_relation(self, a, b):
761 """
762 Remove a relation between two application endpoints
763
764 :param a An application endpoint
765 :param b An application endpoint
766 """
767 if not self.authenticated:
768 await self.login()
769
770 m = await self.get_model()
771 try:
772 m.remove_relation(a, b)
773 finally:
774 await m.disconnect()
775
776 async def resolve_error(self, application=None):
777 """Resolve units in error state."""
778 if not self.authenticated:
779 await self.login()
780
781 app = await self.get_application(self.default_model, application)
782 if app:
783 self.log.debug("JujuApi: Resolving errors for application {}".format(
784 application,
785 ))
786
787 for unit in app.units:
788 app.resolved(retry=True)
789
790 async def run_action(self, application, action_name, **params):
791 """Execute an action and return an Action object."""
792 if not self.authenticated:
793 await self.login()
794 result = {
795 'status': '',
796 'action': {
797 'tag': None,
798 'results': None,
799 }
800 }
801 app = await self.get_application(self.default_model, application)
802 if app:
803 # We currently only have one unit per application
804 # so use the first unit available.
805 unit = app.units[0]
806
807 self.log.debug("JujuApi: Running Action {} against Application {}".format(
808 action_name,
809 application,
810 ))
811
812 action = await unit.run_action(action_name, **params)
813
814 # Wait for the action to complete
815 await action.wait()
816
817 result['status'] = action.status
818 result['action']['tag'] = action.data['id']
819 result['action']['results'] = action.results
820
821 return result
822
823 async def set_config(self, model_name, application, config):
824 """Apply a configuration to the application."""
825 if not self.authenticated:
826 await self.login()
827
828 app = await self.get_application(model_name, application)
829 if app:
830 self.log.debug("JujuApi: Setting config for Application {}".format(
831 application,
832 ))
833 await app.set_config(config)
834
835 # Verify the config is set
836 newconf = await app.get_config()
837 for key in config:
838 if config[key] != newconf[key]['value']:
839 self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key]))
840
841 # async def set_parameter(self, parameter, value, application=None):
842 # """Set a config parameter for a service."""
843 # if not self.authenticated:
844 # await self.login()
845 #
846 # self.log.debug("JujuApi: Setting {}={} for Application {}".format(
847 # parameter,
848 # value,
849 # application,
850 # ))
851 # return await self.apply_config(
852 # {parameter: value},
853 # application=application,
854 # )
855
856 async def wait_for_application(self, name, timeout=300):
857 """Wait for an application to become active."""
858 if not self.authenticated:
859 await self.login()
860
861 app = await self.get_application(self.default_model, name)
862 if app:
863 self.log.debug(
864 "JujuApi: Waiting {} seconds for Application {}".format(
865 timeout,
866 name,
867 )
868 )
869
870 await self.default_model.block_until(
871 lambda: all(
872 unit.agent_status == 'idle'
873 and unit.workload_status
874 in ['active', 'unknown'] for unit in app.units
875 ),
876 timeout=timeout
877 )