Integration test for metrics + bug fix
[osm/N2VC.git] / n2vc / vnf.py
1
2 import logging
3 import os
4 import os.path
5 import re
6 import ssl
7 import sys
8 import time
9
10 # FIXME: this should load the juju inside or modules without having to
11 # explicitly install it. Check why it's not working.
12 # Load our subtree of the juju library
13 path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
14 path = os.path.join(path, "modules/libjuju/")
15 if path not in sys.path:
16 sys.path.insert(1, path)
17
18 from juju.controller import Controller
19 from juju.model import Model, ModelObserver
20
21
22 # We might need this to connect to the websocket securely, but test and verify.
23 try:
24 ssl._create_default_https_context = ssl._create_unverified_context
25 except AttributeError:
26 # Legacy Python doesn't verify by default (see pep-0476)
27 # https://www.python.org/dev/peps/pep-0476/
28 pass
29
30
31 # Custom exceptions
32 class JujuCharmNotFound(Exception):
33 """The Charm can't be found or is not readable."""
34
35
36 class JujuApplicationExists(Exception):
37 """The Application already exists."""
38
39
40 class N2VCPrimitiveExecutionFailed(Exception):
41 """Something failed while attempting to execute a primitive."""
42
43
44 # Quiet the debug logging
45 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
46 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
47 logging.getLogger('juju.model').setLevel(logging.WARN)
48 logging.getLogger('juju.machine').setLevel(logging.WARN)
49
50
51 class VCAMonitor(ModelObserver):
52 """Monitor state changes within the Juju Model."""
53 log = None
54 ns_name = None
55 applications = {}
56
57 def __init__(self, ns_name):
58 self.log = logging.getLogger(__name__)
59
60 self.ns_name = ns_name
61
62 def AddApplication(self, application_name, callback, *callback_args):
63 if application_name not in self.applications:
64 self.applications[application_name] = {
65 'callback': callback,
66 'callback_args': callback_args
67 }
68
69 def RemoveApplication(self, application_name):
70 if application_name in self.applications:
71 del self.applications[application_name]
72
73 async def on_change(self, delta, old, new, model):
74 """React to changes in the Juju model."""
75
76 if delta.entity == "unit":
77 # Ignore change events from other applications
78 if delta.data['application'] not in self.applications.keys():
79 return
80
81 try:
82
83 application_name = delta.data['application']
84
85 callback = self.applications[application_name]['callback']
86 callback_args = self.applications[application_name]['callback_args']
87
88 if old and new:
89 old_status = old.workload_status
90 new_status = new.workload_status
91
92 if old_status == new_status:
93 """The workload status may fluctuate around certain events,
94 so wait until the status has stabilized before triggering
95 the callback."""
96 if callback:
97 callback(
98 self.ns_name,
99 delta.data['application'],
100 new_status,
101 new.workload_status_message,
102 *callback_args)
103
104 if old and not new:
105 # This is a charm being removed
106 if callback:
107 callback(
108 self.ns_name,
109 delta.data['application'],
110 "removed",
111 "",
112 *callback_args)
113 except Exception as e:
114 self.log.debug("[1] notify_callback exception {}".format(e))
115 elif delta.entity == "action":
116 # TODO: Decide how we want to notify the user of actions
117
118 # uuid = delta.data['id'] # The Action's unique id
119 # msg = delta.data['message'] # The output of the action
120 #
121 # if delta.data['status'] == "pending":
122 # # The action is queued
123 # pass
124 # elif delta.data['status'] == "completed""
125 # # The action was successful
126 # pass
127 # elif delta.data['status'] == "failed":
128 # # The action failed.
129 # pass
130
131 pass
132
133 ########
134 # TODO
135 #
136 # Create unique models per network service
137 # Document all public functions
138
139
140 class N2VC:
141
142 # Juju API
143 api = None
144 log = None
145 controller = None
146 connecting = False
147 authenticated = False
148
149 models = {}
150 default_model = None
151
152 # Model Observers
153 monitors = {}
154
155 # VCA config
156 hostname = ""
157 port = 17070
158 username = ""
159 secret = ""
160
161 def __init__(self,
162 log=None,
163 server='127.0.0.1',
164 port=17070,
165 user='admin',
166 secret=None,
167 artifacts=None
168 ):
169 """Initialize N2VC
170
171 :param vcaconfig dict A dictionary containing the VCA configuration
172
173 :param artifacts str The directory where charms required by a vnfd are
174 stored.
175
176 :Example:
177 n2vc = N2VC(vcaconfig={
178 'secret': 'MzI3MDJhOTYxYmM0YzRjNTJiYmY1Yzdm',
179 'user': 'admin',
180 'ip-address': '10.44.127.137',
181 'port': 17070,
182 'artifacts': '/path/to/charms'
183 })
184
185 """
186
187 if log:
188 self.log = log
189 else:
190 self.log = logging.getLogger(__name__)
191
192 # Quiet websocket traffic
193 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
194 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
195 logging.getLogger('model').setLevel(logging.WARN)
196 # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG)
197
198 self.log.debug('JujuApi: instantiated')
199
200 self.server = server
201 self.port = port
202
203 self.secret = secret
204 if user.startswith('user-'):
205 self.user = user
206 else:
207 self.user = 'user-{}'.format(user)
208
209 self.endpoint = '%s:%d' % (server, int(port))
210
211 self.artifacts = artifacts
212
213 def __del__(self):
214 """Close any open connections."""
215 yield self.logout()
216
217 def notify_callback(self, model_name, application_name, status, message, callback=None, *callback_args):
218 try:
219 if callback:
220 callback(model_name, application_name, status, message, *callback_args)
221 except Exception as e:
222 self.log.error("[0] notify_callback exception {}".format(e))
223 raise e
224 return True
225
226 # Public methods
227 async def CreateNetworkService(self, nsd):
228 """Create a new model to encapsulate this network service.
229
230 Create a new model in the Juju controller to encapsulate the
231 charms associated with a network service.
232
233 You can pass either the nsd record or the id of the network
234 service, but this method will fail without one of them.
235 """
236 if not self.authenticated:
237 await self.login()
238
239 # Ideally, we will create a unique model per network service.
240 # This change will require all components, i.e., LCM and SO, to use
241 # N2VC for 100% compatibility. If we adopt unique models for the LCM,
242 # services deployed via LCM would't be manageable via SO and vice versa
243
244 return self.default_model
245
246 async def DeployCharms(self, model_name, application_name, vnfd, charm_path, params={}, machine_spec={}, callback=None, *callback_args):
247 """Deploy one or more charms associated with a VNF.
248
249 Deploy the charm(s) referenced in a VNF Descriptor.
250
251 :param str model_name: The name of the network service.
252 :param str application_name: The name of the application
253 :param dict vnfd: The name of the application
254 :param str charm_path: The path to the Juju charm
255 :param dict params: A dictionary of runtime parameters
256 Examples::
257 {
258 'rw_mgmt_ip': '1.2.3.4',
259 # Pass the initial-config-primitives section of the vnf or vdu
260 'initial-config-primitives': {...}
261 }
262 :param dict machine_spec: A dictionary describing the machine to install to
263 Examples::
264 {
265 'hostname': '1.2.3.4',
266 'username': 'ubuntu',
267 }
268 :param obj callback: A callback function to receive status changes.
269 :param tuple callback_args: A list of arguments to be passed to the callback
270 """
271
272 ########################################################
273 # Verify the path to the charm exists and is readable. #
274 ########################################################
275 if not os.path.exists(charm_path):
276 self.log.debug("Charm path doesn't exist: {}".format(charm_path))
277 self.notify_callback(model_name, application_name, "failed", callback, *callback_args)
278 raise JujuCharmNotFound("No artifacts configured.")
279
280 ################################
281 # Login to the Juju controller #
282 ################################
283 if not self.authenticated:
284 self.log.debug("Authenticating with Juju")
285 await self.login()
286
287 ##########################################
288 # Get the model for this network service #
289 ##########################################
290 # TODO: In a point release, we will use a model per deployed network
291 # service. In the meantime, we will always use the 'default' model.
292 model_name = 'default'
293 model = await self.get_model(model_name)
294
295 ########################################
296 # Verify the application doesn't exist #
297 ########################################
298 app = await self.get_application(model, application_name)
299 if app:
300 raise JujuApplicationExists("Can't deploy application \"{}\" to model \"{}\" because it already exists.".format(application_name, model_name))
301
302 ################################################################
303 # Register this application with the model-level event monitor #
304 ################################################################
305 if callback:
306 self.monitors[model_name].AddApplication(
307 application_name,
308 callback,
309 *callback_args
310 )
311
312 ########################################################
313 # Check for specific machine placement (native charms) #
314 ########################################################
315 to = ""
316 if machine_spec.keys():
317 # TODO: This needs to be tested.
318 # if all(k in machine_spec for k in ['hostname', 'username']):
319 # # Enlist the existing machine in Juju
320 # machine = await self.model.add_machine(spec='ssh:%@%'.format(
321 # specs['host'],
322 # specs['user'],
323 # ))
324 # to = machine.id
325 pass
326
327 #######################################
328 # Get the initial charm configuration #
329 #######################################
330
331 rw_mgmt_ip = None
332 if 'rw_mgmt_ip' in params:
333 rw_mgmt_ip = params['rw_mgmt_ip']
334
335 # initial_config = {}
336 if 'initial-config-primitive' not in params:
337 params['initial-config-primitive'] = {}
338
339 initial_config = self._get_config_from_dict(
340 params['initial-config-primitive'],
341 {'<rw_mgmt_ip>': rw_mgmt_ip}
342 )
343
344 self.log.debug("JujuApi: Deploying charm ({}) from {}".format(
345 application_name,
346 charm_path,
347 to=to,
348 ))
349
350 ########################################################
351 # Deploy the charm and apply the initial configuration #
352 ########################################################
353 app = await model.deploy(
354 # We expect charm_path to be either the path to the charm on disk
355 # or in the format of cs:series/name
356 charm_path,
357 # This is the formatted, unique name for this charm
358 application_name=application_name,
359 # Proxy charms should use the current LTS. This will need to be
360 # changed for native charms.
361 series='xenial',
362 # Apply the initial 'config' primitive during deployment
363 config=initial_config,
364 # TBD: Where to deploy the charm to.
365 to=None,
366 )
367
368 # #######################################
369 # # Execute initial config primitive(s) #
370 # #######################################
371 primitives = {}
372
373 # Build a sequential list of the primitives to execute
374 for primitive in params['initial-config-primitive']:
375 try:
376 if primitive['name'] == 'config':
377 # This is applied when the Application is deployed
378 pass
379 else:
380 seq = primitive['seq']
381
382 params = {}
383 if 'parameter' in primitive:
384 params = primitive['parameter']
385
386 primitives[seq] = {
387 'name': primitive['name'],
388 'parameters': self._map_primitive_parameters(
389 params,
390 {'<rw_mgmt_ip>': rw_mgmt_ip}
391 ),
392 }
393
394 for primitive in sorted(primitives):
395 await self.ExecutePrimitive(
396 model_name,
397 application_name,
398 primitives[primitive]['name'],
399 callback,
400 callback_args,
401 **primitives[primitive]['parameters'],
402 )
403 except N2VCPrimitiveExecutionFailed as e:
404 self.log.debug(
405 "[N2VC] Exception executing primitive: {}".format(e)
406 )
407 raise
408
409 async def GetPrimitiveStatus(self, model_name, uuid):
410 results = None
411 try:
412 if not self.authenticated:
413 await self.login()
414
415 # FIXME: This is hard-coded until model-per-ns is added
416 model_name = 'default'
417
418 model = await self.controller.get_model(model_name)
419
420 results = await model.get_action_output(uuid)
421
422 await model.disconnect()
423 except Exception as e:
424 self.log.debug(
425 "Caught exception while getting primitive status: {}".format(e)
426 )
427 raise N2VCPrimitiveExecutionFailed(e)
428
429 return results
430
431
432 async def ExecutePrimitive(self, model_name, application_name, primitive, callback, *callback_args, **params):
433 """Execute a primitive of a charm for Day 1 or Day 2 configuration.
434
435 Execute a primitive defined in the VNF descriptor.
436
437 :param str model_name: The name of the network service.
438 :param str application_name: The name of the application
439 :param str primitive: The name of the primitive to execute.
440 :param obj callback: A callback function to receive status changes.
441 :param tuple callback_args: A list of arguments to be passed to the callback function.
442 :param dict params: A dictionary of key=value pairs representing the primitive's parameters
443 Examples::
444 {
445 'rw_mgmt_ip': '1.2.3.4',
446 # Pass the initial-config-primitives section of the vnf or vdu
447 'initial-config-primitives': {...}
448 }
449 """
450 uuid = None
451 try:
452 if not self.authenticated:
453 await self.login()
454
455 # FIXME: This is hard-coded until model-per-ns is added
456 model_name = 'default'
457
458 model = await self.controller.get_model(model_name)
459
460 if primitive == 'config':
461 # config is special, and expecting params to be a dictionary
462 await self.set_config(
463 model,
464 application_name,
465 params['params'],
466 )
467 else:
468 app = await self.get_application(model, application_name)
469 if app:
470 # Run against the first (and probably only) unit in the app
471 unit = app.units[0]
472 if unit:
473 self.log.debug(
474 "Executing primitive {}".format(primitive)
475 )
476 action = await unit.run_action(primitive, **params)
477 uuid = action.id
478 await model.disconnect()
479 except Exception as e:
480 self.log.debug(
481 "Caught exception while executing primitive: {}".format(e)
482 )
483 raise N2VCPrimitiveExecutionFailed(e)
484 return uuid
485
486 async def RemoveCharms(self, model_name, application_name, callback=None, *callback_args):
487 """Remove a charm from the VCA.
488
489 Remove a charm referenced in a VNF Descriptor.
490
491 :param str model_name: The name of the network service.
492 :param str application_name: The name of the application
493 :param obj callback: A callback function to receive status changes.
494 :param tuple callback_args: A list of arguments to be passed to the callback function.
495 """
496 try:
497 if not self.authenticated:
498 await self.login()
499
500 model = await self.get_model(model_name)
501 app = await self.get_application(model, application_name)
502 if app:
503 # Remove this application from event monitoring
504 self.monitors[model_name].RemoveApplication(application_name)
505
506 # self.notify_callback(model_name, application_name, "removing", callback, *callback_args)
507 self.log.debug("Removing the application {}".format(application_name))
508 await app.remove()
509
510 # Notify the callback that this charm has been removed.
511 self.notify_callback(model_name, application_name, "removed", callback, *callback_args)
512
513 except Exception as e:
514 print("Caught exception: {}".format(e))
515 self.log.debug(e)
516 raise e
517
518 async def DestroyNetworkService(self, nsd):
519 raise NotImplementedError()
520
521 async def GetMetrics(self, model_name, application_name):
522 """Get the metrics collected by the VCA.
523
524 :param model_name The name of the model
525 :param application_name The name of the application
526 """
527 metrics = {}
528 model = await self.get_model(model_name)
529 app = await self.get_application(model, application_name)
530 if app:
531 metrics = await app.get_metrics()
532
533 return metrics
534
535 # Non-public methods
536 async def add_relation(self, a, b, via=None):
537 """
538 Add a relation between two application endpoints.
539
540 :param a An application endpoint
541 :param b An application endpoint
542 :param via The egress subnet(s) for outbound traffic, e.g.,
543 (192.168.0.0/16,10.0.0.0/8)
544 """
545 if not self.authenticated:
546 await self.login()
547
548 m = await self.get_model()
549 try:
550 m.add_relation(a, b, via)
551 finally:
552 await m.disconnect()
553
554 # async def apply_config(self, config, application):
555 # """Apply a configuration to the application."""
556 # print("JujuApi: Applying configuration to {}.".format(
557 # application
558 # ))
559 # return await self.set_config(application=application, config=config)
560
561 def _get_config_from_dict(self, config_primitive, values):
562 """Transform the yang config primitive to dict.
563
564 Expected result:
565
566 config = {
567 'config':
568 }
569 """
570 config = {}
571 for primitive in config_primitive:
572 if primitive['name'] == 'config':
573 # config = self._map_primitive_parameters()
574 for parameter in primitive['parameter']:
575 param = str(parameter['name'])
576 if parameter['value'] == "<rw_mgmt_ip>":
577 config[param] = str(values[parameter['value']])
578 else:
579 config[param] = str(parameter['value'])
580
581 return config
582
583 def _map_primitive_parameters(self, parameters, values):
584 params = {}
585 for parameter in parameters:
586 param = str(parameter['name'])
587 if parameter['value'] == "<rw_mgmt_ip>":
588 params[param] = str(values[parameter['value']])
589 else:
590 params[param] = str(parameter['value'])
591 return params
592
593 def _get_config_from_yang(self, config_primitive, values):
594 """Transform the yang config primitive to dict."""
595 config = {}
596 for primitive in config_primitive.values():
597 if primitive['name'] == 'config':
598 for parameter in primitive['parameter'].values():
599 param = str(parameter['name'])
600 if parameter['value'] == "<rw_mgmt_ip>":
601 config[param] = str(values[parameter['value']])
602 else:
603 config[param] = str(parameter['value'])
604
605 return config
606
607 def FormatApplicationName(self, *args):
608 """
609 Generate a Juju-compatible Application name
610
611 :param args tuple: Positional arguments to be used to construct the
612 application name.
613
614 Limitations::
615 - Only accepts characters a-z and non-consequitive dashes (-)
616 - Application name should not exceed 50 characters
617
618 Examples::
619
620 FormatApplicationName("ping_pong_ns", "ping_vnf", "a")
621 """
622
623 appname = ""
624 for c in "-".join(list(args)):
625 if c.isdigit():
626 c = chr(97 + int(c))
627 elif not c.isalpha():
628 c = "-"
629 appname += c
630 return re.sub('\-+', '-', appname.lower())
631
632
633 # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0):
634 # """Format the name of the application
635 #
636 # Limitations:
637 # - Only accepts characters a-z and non-consequitive dashes (-)
638 # - Application name should not exceed 50 characters
639 # """
640 # name = "{}-{}-{}".format(nsd_name, vnfr_name, member_vnf_index)
641 # new_name = ''
642 # for c in name:
643 # if c.isdigit():
644 # c = chr(97 + int(c))
645 # elif not c.isalpha():
646 # c = "-"
647 # new_name += c
648 # return re.sub('\-+', '-', new_name.lower())
649
650 def format_model_name(self, name):
651 """Format the name of model.
652
653 Model names may only contain lowercase letters, digits and hyphens
654 """
655
656 return name.replace('_', '-').lower()
657
658 async def get_application(self, model, application):
659 """Get the deployed application."""
660 if not self.authenticated:
661 await self.login()
662
663 app = None
664 if application and model:
665 if model.applications:
666 if application in model.applications:
667 app = model.applications[application]
668
669 return app
670
671 async def get_model(self, model_name='default'):
672 """Get a model from the Juju Controller.
673
674 Note: Model objects returned must call disconnected() before it goes
675 out of scope."""
676 if not self.authenticated:
677 await self.login()
678
679 if model_name not in self.models:
680 print("connecting to model {}".format(model_name))
681 self.models[model_name] = await self.controller.get_model(model_name)
682
683 # Create an observer for this model
684 self.monitors[model_name] = VCAMonitor(model_name)
685 self.models[model_name].add_observer(self.monitors[model_name])
686
687 return self.models[model_name]
688
689 async def login(self):
690 """Login to the Juju controller."""
691
692 if self.authenticated:
693 return
694
695 self.connecting = True
696
697 self.log.debug("JujuApi: Logging into controller")
698
699 cacert = None
700 self.controller = Controller()
701
702 if self.secret:
703 self.log.debug("Connecting to controller... ws://{}:{} as {}/{}".format(self.endpoint, self.port, self.user, self.secret))
704 await self.controller.connect(
705 endpoint=self.endpoint,
706 username=self.user,
707 password=self.secret,
708 cacert=cacert,
709 )
710 else:
711 # current_controller no longer exists
712 # self.log.debug("Connecting to current controller...")
713 # await self.controller.connect_current()
714 # await self.controller.connect(
715 # endpoint=self.endpoint,
716 # username=self.user,
717 # cacert=cacert,
718 # )
719 self.log.fatal("VCA credentials not configured.")
720
721 self.authenticated = True
722 self.log.debug("JujuApi: Logged into controller")
723
724 # self.default_model = await self.controller.get_model("default")
725
726 async def logout(self):
727 """Logout of the Juju controller."""
728 if not self.authenticated:
729 return
730
731 try:
732 if self.default_model:
733 self.log.debug("Disconnecting model {}".format(self.default_model))
734 await self.default_model.disconnect()
735 self.default_model = None
736
737 for model in self.models:
738 await self.models[model].disconnect()
739
740 if self.controller:
741 self.log.debug("Disconnecting controller {}".format(self.controller))
742 await self.controller.disconnect()
743 # self.controller = None
744
745 self.authenticated = False
746 except Exception as e:
747 self.log.fail("Fatal error logging out of Juju Controller: {}".format(e))
748 raise e
749
750
751 # async def remove_application(self, name):
752 # """Remove the application."""
753 # if not self.authenticated:
754 # await self.login()
755 #
756 # app = await self.get_application(name)
757 # if app:
758 # self.log.debug("JujuApi: Destroying application {}".format(
759 # name,
760 # ))
761 #
762 # await app.destroy()
763
764 async def remove_relation(self, a, b):
765 """
766 Remove a relation between two application endpoints
767
768 :param a An application endpoint
769 :param b An application endpoint
770 """
771 if not self.authenticated:
772 await self.login()
773
774 m = await self.get_model()
775 try:
776 m.remove_relation(a, b)
777 finally:
778 await m.disconnect()
779
780 async def resolve_error(self, application=None):
781 """Resolve units in error state."""
782 if not self.authenticated:
783 await self.login()
784
785 app = await self.get_application(self.default_model, application)
786 if app:
787 self.log.debug("JujuApi: Resolving errors for application {}".format(
788 application,
789 ))
790
791 for unit in app.units:
792 app.resolved(retry=True)
793
794 async def run_action(self, application, action_name, **params):
795 """Execute an action and return an Action object."""
796 if not self.authenticated:
797 await self.login()
798 result = {
799 'status': '',
800 'action': {
801 'tag': None,
802 'results': None,
803 }
804 }
805 app = await self.get_application(self.default_model, application)
806 if app:
807 # We currently only have one unit per application
808 # so use the first unit available.
809 unit = app.units[0]
810
811 self.log.debug("JujuApi: Running Action {} against Application {}".format(
812 action_name,
813 application,
814 ))
815
816 action = await unit.run_action(action_name, **params)
817
818 # Wait for the action to complete
819 await action.wait()
820
821 result['status'] = action.status
822 result['action']['tag'] = action.data['id']
823 result['action']['results'] = action.results
824
825 return result
826
827 async def set_config(self, model_name, application, config):
828 """Apply a configuration to the application."""
829 if not self.authenticated:
830 await self.login()
831
832 app = await self.get_application(model_name, application)
833 if app:
834 self.log.debug("JujuApi: Setting config for Application {}".format(
835 application,
836 ))
837 await app.set_config(config)
838
839 # Verify the config is set
840 newconf = await app.get_config()
841 for key in config:
842 if config[key] != newconf[key]['value']:
843 self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key]))
844
845 # async def set_parameter(self, parameter, value, application=None):
846 # """Set a config parameter for a service."""
847 # if not self.authenticated:
848 # await self.login()
849 #
850 # self.log.debug("JujuApi: Setting {}={} for Application {}".format(
851 # parameter,
852 # value,
853 # application,
854 # ))
855 # return await self.apply_config(
856 # {parameter: value},
857 # application=application,
858 # )
859
860 async def wait_for_application(self, name, timeout=300):
861 """Wait for an application to become active."""
862 if not self.authenticated:
863 await self.login()
864
865 app = await self.get_application(self.default_model, name)
866 if app:
867 self.log.debug(
868 "JujuApi: Waiting {} seconds for Application {}".format(
869 timeout,
870 name,
871 )
872 )
873
874 await self.default_model.block_until(
875 lambda: all(
876 unit.agent_status == 'idle'
877 and unit.workload_status
878 in ['active', 'unknown'] for unit in app.units
879 ),
880 timeout=timeout
881 )