Bug fixes + Metrics
[osm/N2VC.git] / n2vc / vnf.py
1
2 import logging
3 import os
4 import os.path
5 import re
6 import ssl
7 import sys
8 import time
9
10 # FIXME: this should load the juju inside or modules without having to
11 # explicitly install it. Check why it's not working.
12 # Load our subtree of the juju library
13 path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
14 path = os.path.join(path, "modules/libjuju/")
15 if path not in sys.path:
16 sys.path.insert(1, path)
17
18 from juju.controller import Controller
19 from juju.model import Model, ModelObserver
20
21
22 # We might need this to connect to the websocket securely, but test and verify.
23 try:
24 ssl._create_default_https_context = ssl._create_unverified_context
25 except AttributeError:
26 # Legacy Python doesn't verify by default (see pep-0476)
27 # https://www.python.org/dev/peps/pep-0476/
28 pass
29
30
31 # Custom exceptions
32 class JujuCharmNotFound(Exception):
33 """The Charm can't be found or is not readable."""
34
35
36 class JujuApplicationExists(Exception):
37 """The Application already exists."""
38
39
40 class N2VCPrimitiveExecutionFailed(Exception):
41 """Something failed while attempting to execute a primitive."""
42
43
44 # Quiet the debug logging
45 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
46 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
47 logging.getLogger('juju.model').setLevel(logging.WARN)
48 logging.getLogger('juju.machine').setLevel(logging.WARN)
49
50
51 class VCAMonitor(ModelObserver):
52 """Monitor state changes within the Juju Model."""
53 log = None
54 ns_name = None
55 applications = {}
56
57 def __init__(self, ns_name):
58 self.log = logging.getLogger(__name__)
59
60 self.ns_name = ns_name
61
62 def AddApplication(self, application_name, callback, *callback_args):
63 if application_name not in self.applications:
64 self.applications[application_name] = {
65 'callback': callback,
66 'callback_args': callback_args
67 }
68
69 def RemoveApplication(self, application_name):
70 if application_name in self.applications:
71 del self.applications[application_name]
72
73 async def on_change(self, delta, old, new, model):
74 """React to changes in the Juju model."""
75
76 if delta.entity == "unit":
77 # Ignore change events from other applications
78 if delta.data['application'] not in self.applications.keys():
79 return
80
81 try:
82
83 application_name = delta.data['application']
84
85 callback = self.applications[application_name]['callback']
86 callback_args = self.applications[application_name]['callback_args']
87
88 if old and new:
89 old_status = old.workload_status
90 new_status = new.workload_status
91
92 if old_status == new_status:
93 """The workload status may fluctuate around certain events,
94 so wait until the status has stabilized before triggering
95 the callback."""
96 if callback:
97 callback(
98 self.ns_name,
99 delta.data['application'],
100 new_status,
101 *callback_args)
102
103 if old and not new:
104 # This is a charm being removed
105 if callback:
106 callback(
107 self.ns_name,
108 delta.data['application'],
109 "removed",
110 *callback_args)
111 except Exception as e:
112 self.log.debug("[1] notify_callback exception {}".format(e))
113 elif delta.entity == "action":
114 # TODO: Decide how we want to notify the user of actions
115
116 # uuid = delta.data['id'] # The Action's unique id
117 # msg = delta.data['message'] # The output of the action
118 #
119 # if delta.data['status'] == "pending":
120 # # The action is queued
121 # pass
122 # elif delta.data['status'] == "completed""
123 # # The action was successful
124 # pass
125 # elif delta.data['status'] == "failed":
126 # # The action failed.
127 # pass
128
129 pass
130
131 ########
132 # TODO
133 #
134 # Create unique models per network service
135 # Document all public functions
136
137
138 class N2VC:
139
140 # Juju API
141 api = None
142 log = None
143 controller = None
144 connecting = False
145 authenticated = False
146
147 models = {}
148 default_model = None
149
150 # Model Observers
151 monitors = {}
152
153 # VCA config
154 hostname = ""
155 port = 17070
156 username = ""
157 secret = ""
158
159 def __init__(self,
160 log=None,
161 server='127.0.0.1',
162 port=17070,
163 user='admin',
164 secret=None,
165 artifacts=None
166 ):
167 """Initialize N2VC
168
169 :param vcaconfig dict A dictionary containing the VCA configuration
170
171 :param artifacts str The directory where charms required by a vnfd are
172 stored.
173
174 :Example:
175 n2vc = N2VC(vcaconfig={
176 'secret': 'MzI3MDJhOTYxYmM0YzRjNTJiYmY1Yzdm',
177 'user': 'admin',
178 'ip-address': '10.44.127.137',
179 'port': 17070,
180 'artifacts': '/path/to/charms'
181 })
182
183 """
184
185 if log:
186 self.log = log
187 else:
188 self.log = logging.getLogger(__name__)
189
190 # Quiet websocket traffic
191 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
192 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
193 logging.getLogger('model').setLevel(logging.WARN)
194 # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG)
195
196 self.log.debug('JujuApi: instantiated')
197
198 self.server = server
199 self.port = port
200
201 self.secret = secret
202 if user.startswith('user-'):
203 self.user = user
204 else:
205 self.user = 'user-{}'.format(user)
206
207 self.endpoint = '%s:%d' % (server, int(port))
208
209 self.artifacts = artifacts
210
211 def __del__(self):
212 """Close any open connections."""
213 yield self.logout()
214
215 def notify_callback(self, model_name, application_name, status, callback=None, *callback_args):
216 try:
217 if callback:
218 callback(model_name, application_name, status, *callback_args)
219 except Exception as e:
220 self.log.error("[0] notify_callback exception {}".format(e))
221 raise e
222 return True
223
224 # Public methods
225 async def CreateNetworkService(self, nsd):
226 """Create a new model to encapsulate this network service.
227
228 Create a new model in the Juju controller to encapsulate the
229 charms associated with a network service.
230
231 You can pass either the nsd record or the id of the network
232 service, but this method will fail without one of them.
233 """
234 if not self.authenticated:
235 await self.login()
236
237 # Ideally, we will create a unique model per network service.
238 # This change will require all components, i.e., LCM and SO, to use
239 # N2VC for 100% compatibility. If we adopt unique models for the LCM,
240 # services deployed via LCM would't be manageable via SO and vice versa
241
242 return self.default_model
243
244 async def DeployCharms(self, model_name, application_name, vnfd, charm_path, params={}, machine_spec={}, callback=None, *callback_args):
245 """Deploy one or more charms associated with a VNF.
246
247 Deploy the charm(s) referenced in a VNF Descriptor.
248
249 You can pass either the nsd record or the id of the network
250 service, but this method will fail without one of them.
251
252 :param str ns_name: The name of the network service
253 :param str application_name: The name of the application
254 :param dict vnfd: The name of the application
255 :param str charm_path: The path to the Juju charm
256 :param dict params: A dictionary of runtime parameters
257 Examples::
258 {
259 'rw_mgmt_ip': '1.2.3.4',
260 # Pass the initial-config-primitives section of the vnf or vdu
261 'initial-config-primitives': {...}
262 }
263 :param dict machine_spec: A dictionary describing the machine to install to
264 Examples::
265 {
266 'hostname': '1.2.3.4',
267 'username': 'ubuntu',
268 }
269 :param obj callback: A callback function to receive status changes.
270 :param tuple callback_args: A list of arguments to be passed to the callback
271 """
272
273 ########################################################
274 # Verify the path to the charm exists and is readable. #
275 ########################################################
276 if not os.path.exists(charm_path):
277 self.log.debug("Charm path doesn't exist: {}".format(charm_path))
278 self.notify_callback(model_name, application_name, "failed", callback, *callback_args)
279 raise JujuCharmNotFound("No artifacts configured.")
280
281 ################################
282 # Login to the Juju controller #
283 ################################
284 if not self.authenticated:
285 self.log.debug("Authenticating with Juju")
286 await self.login()
287
288 ##########################################
289 # Get the model for this network service #
290 ##########################################
291 # TODO: In a point release, we will use a model per deployed network
292 # service. In the meantime, we will always use the 'default' model.
293 model_name = 'default'
294 model = await self.get_model(model_name)
295
296 ########################################
297 # Verify the application doesn't exist #
298 ########################################
299 app = await self.get_application(model, application_name)
300 if app:
301 raise JujuApplicationExists("Can't deploy application \"{}\" to model \"{}\" because it already exists.".format(application_name, model))
302
303 ################################################################
304 # Register this application with the model-level event monitor #
305 ################################################################
306 if callback:
307 self.monitors[model_name].AddApplication(
308 application_name,
309 callback,
310 *callback_args
311 )
312
313 ########################################################
314 # Check for specific machine placement (native charms) #
315 ########################################################
316 to = ""
317 if machine_spec.keys():
318 # TODO: This needs to be tested.
319 # if all(k in machine_spec for k in ['hostname', 'username']):
320 # # Enlist the existing machine in Juju
321 # machine = await self.model.add_machine(spec='ssh:%@%'.format(
322 # specs['host'],
323 # specs['user'],
324 # ))
325 # to = machine.id
326 pass
327
328 #######################################
329 # Get the initial charm configuration #
330 #######################################
331
332 rw_mgmt_ip = None
333 if 'rw_mgmt_ip' in params:
334 rw_mgmt_ip = params['rw_mgmt_ip']
335
336 initial_config = self._get_config_from_dict(
337 params['initial-config-primitive'],
338 {'<rw_mgmt_ip>': rw_mgmt_ip}
339 )
340
341 self.log.debug("JujuApi: Deploying charm ({}) from {}".format(
342 application_name,
343 charm_path,
344 to=to,
345 ))
346
347 ########################################################
348 # Deploy the charm and apply the initial configuration #
349 ########################################################
350 app = await model.deploy(
351 # We expect charm_path to be either the path to the charm on disk
352 # or in the format of cs:series/name
353 charm_path,
354 # This is the formatted, unique name for this charm
355 application_name=application_name,
356 # Proxy charms should use the current LTS. This will need to be
357 # changed for native charms.
358 series='xenial',
359 # Apply the initial 'config' primitive during deployment
360 config=initial_config,
361 # TBD: Where to deploy the charm to.
362 to=None,
363 )
364
365 # #######################################
366 # # Execute initial config primitive(s) #
367 # #######################################
368 primitives = {}
369
370 # Build a sequential list of the primitives to execute
371 for primitive in params['initial-config-primitive']:
372 try:
373 if primitive['name'] == 'config':
374 # This is applied when the Application is deployed
375 pass
376 else:
377 seq = primitive['seq']
378
379 primitives[seq] = {
380 'name': primitive['name'],
381 'parameters': self._map_primitive_parameters(
382 primitive['parameter'],
383 {'<rw_mgmt_ip>': rw_mgmt_ip}
384 ),
385 }
386
387 for primitive in sorted(primitives):
388 await self.ExecutePrimitive(
389 model_name,
390 application_name,
391 primitives[primitive]['name'],
392 callback,
393 callback_args,
394 **primitives[primitive]['parameters'],
395 )
396 except N2VCPrimitiveExecutionFailed as e:
397 self.debug.log(
398 "[N2VC] Exception executing primitive: {}".format(e)
399 )
400 raise
401
402 async def ExecutePrimitive(self, model_name, application_name, primitive, callback, *callback_args, **params):
403 try:
404 if not self.authenticated:
405 await self.login()
406
407 # FIXME: This is hard-coded until model-per-ns is added
408 model_name = 'default'
409
410 model = await self.controller.get_model(model_name)
411
412 if primitive == 'config':
413 # config is special, and expecting params to be a dictionary
414 self.log.debug("Setting charm configuration for {}".format(application_name))
415 self.log.debug(params['params'])
416 await self.set_config(model, application_name, params['params'])
417 else:
418 app = await self.get_application(model, application_name)
419 if app:
420 # Run against the first (and probably only) unit in the app
421 unit = app.units[0]
422 if unit:
423 self.log.debug("Executing primitive {}".format(primitive))
424 action = await unit.run_action(primitive, **params)
425 # action = await action.wait()
426 await model.disconnect()
427 except Exception as e:
428 self.log.debug("Caught exception while executing primitive: {}".format(e))
429 raise e
430
431 async def RemoveCharms(self, model_name, application_name, callback=None, *callback_args):
432 try:
433 if not self.authenticated:
434 await self.login()
435
436 model = await self.get_model(model_name)
437 app = await self.get_application(model, application_name)
438 if app:
439 # Remove this application from event monitoring
440 self.monitors[model_name].RemoveApplication(application_name)
441
442 # self.notify_callback(model_name, application_name, "removing", callback, *callback_args)
443 self.log.debug("Removing the application {}".format(application_name))
444 await app.remove()
445
446 # Notify the callback that this charm has been removed.
447 self.notify_callback(model_name, application_name, "removed", callback, *callback_args)
448
449 except Exception as e:
450 print("Caught exception: {}".format(e))
451 self.log.debug(e)
452 raise e
453
454 async def DestroyNetworkService(self, nsd):
455 raise NotImplementedError()
456
457 async def GetMetrics(self, model_name, application_name):
458 """Get the metrics collected by the VCA.
459
460 :param model_name The name of the model
461 :param application_name The name of the application
462 """
463 metrics = {}
464 model = await self.get_model(model_name)
465 app = await self.get_application(model, application_name)
466 if app:
467 metrics = await app.get_metrics()
468
469 return metrics
470
471 # Non-public methods
472 async def add_relation(self, a, b, via=None):
473 """
474 Add a relation between two application endpoints.
475
476 :param a An application endpoint
477 :param b An application endpoint
478 :param via The egress subnet(s) for outbound traffic, e.g.,
479 (192.168.0.0/16,10.0.0.0/8)
480 """
481 if not self.authenticated:
482 await self.login()
483
484 m = await self.get_model()
485 try:
486 m.add_relation(a, b, via)
487 finally:
488 await m.disconnect()
489
490 # async def apply_config(self, config, application):
491 # """Apply a configuration to the application."""
492 # print("JujuApi: Applying configuration to {}.".format(
493 # application
494 # ))
495 # return await self.set_config(application=application, config=config)
496
497 def _get_config_from_dict(self, config_primitive, values):
498 """Transform the yang config primitive to dict.
499
500 Expected result:
501
502 config = {
503 'config':
504 }
505 """
506 config = {}
507 for primitive in config_primitive:
508 if primitive['name'] == 'config':
509 # config = self._map_primitive_parameters()
510 for parameter in primitive['parameter']:
511 param = str(parameter['name'])
512 if parameter['value'] == "<rw_mgmt_ip>":
513 config[param] = str(values[parameter['value']])
514 else:
515 config[param] = str(parameter['value'])
516
517 return config
518
519 def _map_primitive_parameters(self, parameters, values):
520 params = {}
521 for parameter in parameters:
522 param = str(parameter['name'])
523 if parameter['value'] == "<rw_mgmt_ip>":
524 params[param] = str(values[parameter['value']])
525 else:
526 params[param] = str(parameter['value'])
527 return params
528
529 def _get_config_from_yang(self, config_primitive, values):
530 """Transform the yang config primitive to dict."""
531 config = {}
532 for primitive in config_primitive.values():
533 if primitive['name'] == 'config':
534 for parameter in primitive['parameter'].values():
535 param = str(parameter['name'])
536 if parameter['value'] == "<rw_mgmt_ip>":
537 config[param] = str(values[parameter['value']])
538 else:
539 config[param] = str(parameter['value'])
540
541 return config
542
543 def FormatApplicationName(self, *args):
544 """
545 Generate a Juju-compatible Application name
546
547 :param args tuple: Positional arguments to be used to construct the
548 application name.
549
550 Limitations::
551 - Only accepts characters a-z and non-consequitive dashes (-)
552 - Application name should not exceed 50 characters
553
554 Examples::
555
556 FormatApplicationName("ping_pong_ns", "ping_vnf", "a")
557 """
558
559 appname = ""
560 for c in "-".join(list(args)):
561 if c.isdigit():
562 c = chr(97 + int(c))
563 elif not c.isalpha():
564 c = "-"
565 appname += c
566 return re.sub('\-+', '-', appname.lower())
567
568
569 # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0):
570 # """Format the name of the application
571 #
572 # Limitations:
573 # - Only accepts characters a-z and non-consequitive dashes (-)
574 # - Application name should not exceed 50 characters
575 # """
576 # name = "{}-{}-{}".format(nsd_name, vnfr_name, member_vnf_index)
577 # new_name = ''
578 # for c in name:
579 # if c.isdigit():
580 # c = chr(97 + int(c))
581 # elif not c.isalpha():
582 # c = "-"
583 # new_name += c
584 # return re.sub('\-+', '-', new_name.lower())
585
586 def format_model_name(self, name):
587 """Format the name of model.
588
589 Model names may only contain lowercase letters, digits and hyphens
590 """
591
592 return name.replace('_', '-').lower()
593
594 async def get_application(self, model, application):
595 """Get the deployed application."""
596 if not self.authenticated:
597 await self.login()
598
599 app = None
600 if application and model:
601 if model.applications:
602 if application in model.applications:
603 app = model.applications[application]
604
605 return app
606
607 async def get_model(self, model_name='default'):
608 """Get a model from the Juju Controller.
609
610 Note: Model objects returned must call disconnected() before it goes
611 out of scope."""
612 if not self.authenticated:
613 await self.login()
614
615 if model_name not in self.models:
616 print("connecting to model {}".format(model_name))
617 self.models[model_name] = await self.controller.get_model(model_name)
618
619 # Create an observer for this model
620 self.monitors[model_name] = VCAMonitor(model_name)
621 self.models[model_name].add_observer(self.monitors[model_name])
622
623 return self.models[model_name]
624
625 async def login(self):
626 """Login to the Juju controller."""
627
628 if self.authenticated:
629 return
630
631 self.connecting = True
632
633 self.log.debug("JujuApi: Logging into controller")
634
635 cacert = None
636 self.controller = Controller()
637
638 if self.secret:
639 self.log.debug("Connecting to controller... ws://{}:{} as {}/{}".format(self.endpoint, self.port, self.user, self.secret))
640 await self.controller.connect(
641 endpoint=self.endpoint,
642 username=self.user,
643 password=self.secret,
644 cacert=cacert,
645 )
646 else:
647 # current_controller no longer exists
648 # self.log.debug("Connecting to current controller...")
649 # await self.controller.connect_current()
650 # await self.controller.connect(
651 # endpoint=self.endpoint,
652 # username=self.user,
653 # cacert=cacert,
654 # )
655 self.log.fatal("VCA credentials not configured.")
656
657 self.authenticated = True
658 self.log.debug("JujuApi: Logged into controller")
659
660 # self.default_model = await self.controller.get_model("default")
661
662 async def logout(self):
663 """Logout of the Juju controller."""
664 if not self.authenticated:
665 return
666
667 try:
668 if self.default_model:
669 self.log.debug("Disconnecting model {}".format(self.default_model))
670 await self.default_model.disconnect()
671 self.default_model = None
672
673 for model in self.models:
674 await self.models[model].disconnect()
675
676 if self.controller:
677 self.log.debug("Disconnecting controller {}".format(self.controller))
678 await self.controller.disconnect()
679 # self.controller = None
680
681 self.authenticated = False
682 except Exception as e:
683 self.log.fail("Fatal error logging out of Juju Controller: {}".format(e))
684 raise e
685
686
687 # async def remove_application(self, name):
688 # """Remove the application."""
689 # if not self.authenticated:
690 # await self.login()
691 #
692 # app = await self.get_application(name)
693 # if app:
694 # self.log.debug("JujuApi: Destroying application {}".format(
695 # name,
696 # ))
697 #
698 # await app.destroy()
699
700 async def remove_relation(self, a, b):
701 """
702 Remove a relation between two application endpoints
703
704 :param a An application endpoint
705 :param b An application endpoint
706 """
707 if not self.authenticated:
708 await self.login()
709
710 m = await self.get_model()
711 try:
712 m.remove_relation(a, b)
713 finally:
714 await m.disconnect()
715
716 async def resolve_error(self, application=None):
717 """Resolve units in error state."""
718 if not self.authenticated:
719 await self.login()
720
721 app = await self.get_application(self.default_model, application)
722 if app:
723 self.log.debug("JujuApi: Resolving errors for application {}".format(
724 application,
725 ))
726
727 for unit in app.units:
728 app.resolved(retry=True)
729
730 async def run_action(self, application, action_name, **params):
731 """Execute an action and return an Action object."""
732 if not self.authenticated:
733 await self.login()
734 result = {
735 'status': '',
736 'action': {
737 'tag': None,
738 'results': None,
739 }
740 }
741 app = await self.get_application(self.default_model, application)
742 if app:
743 # We currently only have one unit per application
744 # so use the first unit available.
745 unit = app.units[0]
746
747 self.log.debug("JujuApi: Running Action {} against Application {}".format(
748 action_name,
749 application,
750 ))
751
752 action = await unit.run_action(action_name, **params)
753
754 # Wait for the action to complete
755 await action.wait()
756
757 result['status'] = action.status
758 result['action']['tag'] = action.data['id']
759 result['action']['results'] = action.results
760
761 return result
762
763 async def set_config(self, model_name, application, config):
764 """Apply a configuration to the application."""
765 if not self.authenticated:
766 await self.login()
767
768 app = await self.get_application(model_name, application)
769 if app:
770 self.log.debug("JujuApi: Setting config for Application {}".format(
771 application,
772 ))
773 await app.set_config(config)
774
775 # Verify the config is set
776 newconf = await app.get_config()
777 for key in config:
778 if config[key] != newconf[key]['value']:
779 self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key]))
780
781 # async def set_parameter(self, parameter, value, application=None):
782 # """Set a config parameter for a service."""
783 # if not self.authenticated:
784 # await self.login()
785 #
786 # self.log.debug("JujuApi: Setting {}={} for Application {}".format(
787 # parameter,
788 # value,
789 # application,
790 # ))
791 # return await self.apply_config(
792 # {parameter: value},
793 # application=application,
794 # )
795
796 async def wait_for_application(self, name, timeout=300):
797 """Wait for an application to become active."""
798 if not self.authenticated:
799 await self.login()
800
801 app = await self.get_application(self.default_model, name)
802 if app:
803 self.log.debug(
804 "JujuApi: Waiting {} seconds for Application {}".format(
805 timeout,
806 name,
807 )
808 )
809
810 await self.default_model.block_until(
811 lambda: all(
812 unit.agent_status == 'idle'
813 and unit.workload_status
814 in ['active', 'unknown'] for unit in app.units
815 ),
816 timeout=timeout
817 )