a457cec92bf791bd8419f819b23d6bbd2da5287b
[osm/N2VC.git] / n2vc / vnf.py
1
2 import logging
3 import os
4 import os.path
5 import re
6 import ssl
7 import sys
8 import time
9
10 # FIXME: this should load the juju inside or modules without having to
11 # explicitly install it. Check why it's not working.
12 # Load our subtree of the juju library
13 path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
14 path = os.path.join(path, "modules/libjuju/")
15 if path not in sys.path:
16 sys.path.insert(1, path)
17
18 from juju.controller import Controller
19 from juju.model import Model, ModelObserver
20
21
22 # We might need this to connect to the websocket securely, but test and verify.
23 try:
24 ssl._create_default_https_context = ssl._create_unverified_context
25 except AttributeError:
26 # Legacy Python doesn't verify by default (see pep-0476)
27 # https://www.python.org/dev/peps/pep-0476/
28 pass
29
30
31 # Custom exceptions
32 class JujuCharmNotFound(Exception):
33 """The Charm can't be found or is not readable."""
34
35
36 class JujuApplicationExists(Exception):
37 """The Application already exists."""
38
39 class N2VCPrimitiveExecutionFailed(Exception):
40 """Something failed while attempting to execute a primitive."""
41
42
43 # Quiet the debug logging
44 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
45 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
46 logging.getLogger('juju.model').setLevel(logging.WARN)
47 logging.getLogger('juju.machine').setLevel(logging.WARN)
48
49 class VCAMonitor(ModelObserver):
50 """Monitor state changes within the Juju Model."""
51 log = None
52 ns_name = None
53 applications = {}
54
55 def __init__(self, ns_name):
56 self.log = logging.getLogger(__name__)
57
58 self.ns_name = ns_name
59
60 def AddApplication(self, application_name, callback, *callback_args):
61 if application_name not in self.applications:
62 self.applications[application_name] = {
63 'callback': callback,
64 'callback_args': callback_args
65 }
66
67 def RemoveApplication(self, application_name):
68 if application_name in self.applications:
69 del self.applications[application_name]
70
71 async def on_change(self, delta, old, new, model):
72 """React to changes in the Juju model."""
73
74 if delta.entity == "unit":
75 # Ignore change events from other applications
76 if delta.data['application'] not in self.applications.keys():
77 return
78
79 try:
80
81 application_name = delta.data['application']
82
83 callback = self.applications[application_name]['callback']
84 callback_args = self.applications[application_name]['callback_args']
85
86 if old and new:
87 old_status = old.workload_status
88 new_status = new.workload_status
89
90 if old_status == new_status:
91 """The workload status may fluctuate around certain events,
92 so wait until the status has stabilized before triggering
93 the callback."""
94 if callback:
95 callback(
96 self.ns_name,
97 delta.data['application'],
98 new_status,
99 *callback_args)
100
101 if old and not new:
102 # This is a charm being removed
103 if callback:
104 callback(
105 self.ns_name,
106 delta.data['application'],
107 "removed",
108 *callback_args)
109 except Exception as e:
110 self.log.debug("[1] notify_callback exception {}".format(e))
111 elif delta.entity == "action":
112 # TODO: Decide how we want to notify the user of actions
113
114 # uuid = delta.data['id'] # The Action's unique id
115 # msg = delta.data['message'] # The output of the action
116 #
117 # if delta.data['status'] == "pending":
118 # # The action is queued
119 # pass
120 # elif delta.data['status'] == "completed""
121 # # The action was successful
122 # pass
123 # elif delta.data['status'] == "failed":
124 # # The action failed.
125 # pass
126
127 pass
128
129 ########
130 # TODO
131 #
132 # Create unique models per network service
133 # Document all public functions
134
135 class N2VC:
136
137 # Juju API
138 api = None
139 log = None
140 controller = None
141 connecting = False
142 authenticated = False
143
144 models = {}
145 default_model = None
146
147 # Model Observers
148 monitors = {}
149
150 # VCA config
151 hostname = ""
152 port = 17070
153 username = ""
154 secret = ""
155
156 def __init__(self,
157 log=None,
158 server='127.0.0.1',
159 port=17070,
160 user='admin',
161 secret=None,
162 artifacts=None
163 ):
164 """Initialize N2VC
165
166 :param vcaconfig dict A dictionary containing the VCA configuration
167
168 :param artifacts str The directory where charms required by a vnfd are
169 stored.
170
171 :Example:
172 n2vc = N2VC(vcaconfig={
173 'secret': 'MzI3MDJhOTYxYmM0YzRjNTJiYmY1Yzdm',
174 'user': 'admin',
175 'ip-address': '10.44.127.137',
176 'port': 17070,
177 'artifacts': '/path/to/charms'
178 })
179
180 """
181
182 if log:
183 self.log = log
184 else:
185 self.log = logging.getLogger(__name__)
186
187 # Quiet websocket traffic
188 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
189 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
190 logging.getLogger('model').setLevel(logging.WARN)
191 # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG)
192
193 self.log.debug('JujuApi: instantiated')
194
195 self.server = server
196 self.port = port
197
198 self.secret = secret
199 if user.startswith('user-'):
200 self.user = user
201 else:
202 self.user = 'user-{}'.format(user)
203
204 self.endpoint = '%s:%d' % (server, int(port))
205
206 self.artifacts = artifacts
207
208 def __del__(self):
209 """Close any open connections."""
210 yield self.logout()
211
212 def notify_callback(self, model_name, application_name, status, callback=None, *callback_args):
213 try:
214 if callback:
215 callback(model_name, application_name, status, *callback_args)
216 except Exception as e:
217 self.log.error("[0] notify_callback exception {}".format(e))
218 raise e
219 return True
220
221 # Public methods
222 async def CreateNetworkService(self, nsd):
223 """Create a new model to encapsulate this network service.
224
225 Create a new model in the Juju controller to encapsulate the
226 charms associated with a network service.
227
228 You can pass either the nsd record or the id of the network
229 service, but this method will fail without one of them.
230 """
231 if not self.authenticated:
232 await self.login()
233
234 # Ideally, we will create a unique model per network service.
235 # This change will require all components, i.e., LCM and SO, to use
236 # N2VC for 100% compatibility. If we adopt unique models for the LCM,
237 # services deployed via LCM would't be manageable via SO and vice versa
238
239 return self.default_model
240
241 async def DeployCharms(self, model_name, application_name, vnfd, charm_path, params={}, machine_spec={}, callback=None, *callback_args):
242 """Deploy one or more charms associated with a VNF.
243
244 Deploy the charm(s) referenced in a VNF Descriptor.
245
246 You can pass either the nsd record or the id of the network
247 service, but this method will fail without one of them.
248
249 :param str ns_name: The name of the network service
250 :param str application_name: The name of the application
251 :param dict vnfd: The name of the application
252 :param str charm_path: The path to the Juju charm
253 :param dict params: A dictionary of runtime parameters
254 Examples::
255 {
256 'rw_mgmt_ip': '1.2.3.4',
257 # Pass the initial-config-primitives section of the vnf or vdu
258 'initial-config-primitives': {...}
259 }
260 :param dict machine_spec: A dictionary describing the machine to install to
261 Examples::
262 {
263 'hostname': '1.2.3.4',
264 'username': 'ubuntu',
265 }
266 :param obj callback: A callback function to receive status changes.
267 :param tuple callback_args: A list of arguments to be passed to the callback
268 """
269
270 ########################################################
271 # Verify the path to the charm exists and is readable. #
272 ########################################################
273 if not os.path.exists(charm_path):
274 self.log.debug("Charm path doesn't exist: {}".format(charm_path))
275 self.notify_callback(model_name, application_name, "failed", callback, *callback_args)
276 raise JujuCharmNotFound("No artifacts configured.")
277
278 ################################
279 # Login to the Juju controller #
280 ################################
281 if not self.authenticated:
282 self.log.debug("Authenticating with Juju")
283 await self.login()
284
285 ##########################################
286 # Get the model for this network service #
287 ##########################################
288 # TODO: In a point release, we will use a model per deployed network
289 # service. In the meantime, we will always use the 'default' model.
290 model_name = 'default'
291 model = await self.get_model(model_name)
292
293 ########################################
294 # Verify the application doesn't exist #
295 ########################################
296 app = await self.get_application(model, application_name)
297 if app:
298 raise JujuApplicationExists("Can't deploy application \"{}\" to model \"{}\" because it already exists.".format(application_name, model))
299
300 ################################################################
301 # Register this application with the model-level event monitor #
302 ################################################################
303 if callback:
304 self.monitors[model_name].AddApplication(
305 application_name,
306 callback,
307 *callback_args
308 )
309
310 ########################################################
311 # Check for specific machine placement (native charms) #
312 ########################################################
313 to = ""
314 if machine_spec.keys():
315 # TODO: This needs to be tested.
316 # if all(k in machine_spec for k in ['hostname', 'username']):
317 # # Enlist the existing machine in Juju
318 # machine = await self.model.add_machine(spec='ssh:%@%'.format(
319 # specs['host'],
320 # specs['user'],
321 # ))
322 # to = machine.id
323 pass
324
325 #######################################
326 # Get the initial charm configuration #
327 #######################################
328
329 rw_mgmt_ip = None
330 if 'rw_mgmt_ip' in params:
331 rw_mgmt_ip = params['rw_mgmt_ip']
332
333 initial_config = self._get_config_from_dict(
334 params['initial-config-primitive'],
335 {'<rw_mgmt_ip>': rw_mgmt_ip}
336 )
337
338 self.log.debug("JujuApi: Deploying charm ({}) from {}".format(
339 application_name,
340 charm_path,
341 to=to,
342 ))
343
344 ########################################################
345 # Deploy the charm and apply the initial configuration #
346 ########################################################
347 app = await model.deploy(
348 # We expect charm_path to be either the path to the charm on disk
349 # or in the format of cs:series/name
350 charm_path,
351 # This is the formatted, unique name for this charm
352 application_name=application_name,
353 # Proxy charms should use the current LTS. This will need to be
354 # changed for native charms.
355 series='xenial',
356 # Apply the initial 'config' primitive during deployment
357 config=initial_config,
358 # TBD: Where to deploy the charm to.
359 to=None,
360 )
361
362 # #######################################
363 # # Execute initial config primitive(s) #
364 # #######################################
365 primitives = {}
366
367 # Build a sequential list of the primitives to execute
368 for primitive in params['initial-config-primitive']:
369 try:
370 if primitive['name'] == 'config':
371 # This is applied when the Application is deployed
372 pass
373 else:
374 seq = primitive['seq']
375
376 primitives[seq] = {
377 'name': primitive['name'],
378 'parameters': self._map_primitive_parameters(
379 primitive['parameter'],
380 {'<rw_mgmt_ip>': rw_mgmt_ip}
381 ),
382 }
383
384 for primitive in sorted(primitives):
385 await self.ExecutePrimitive(
386 model_name,
387 application_name,
388 primitives[primitive]['name'],
389 callback,
390 callback_args,
391 **primitives[primitive]['parameters'],
392 )
393 except N2VCPrimitiveExecutionFailed as e:
394 self.debug.log(
395 "[N2VC] Exception executing primitive: {}".format(e)
396 )
397 raise
398
399 async def ExecutePrimitive(self, model_name, application_name, primitive, callback, *callback_args, **params):
400 try:
401 if not self.authenticated:
402 await self.login()
403
404 # FIXME: This is hard-coded until model-per-ns is added
405 model_name = 'default'
406
407 if primitive == 'config':
408 # config is special, and expecting params to be a dictionary
409 await self.set_config(application_name, params['params'])
410 else:
411 model = await self.controller.get_model(model_name)
412 app = await self.get_application(model, application_name)
413 if app:
414 # Run against the first (and probably only) unit in the app
415 unit = app.units[0]
416 if unit:
417 self.log.debug("Executing primitive {}".format(primitive))
418 action = await unit.run_action(primitive, **params)
419 # action = await action.wait()
420 await model.disconnect()
421 except Exception as e:
422 self.log.debug("Caught exception while executing primitive: {}".format(e))
423 raise e
424
425 async def RemoveCharms(self, model_name, application_name, callback=None, *callback_args):
426 try:
427 if not self.authenticated:
428 await self.login()
429
430 model = await self.get_model(model_name)
431 app = await self.get_application(model, application_name)
432 if app:
433 # Remove this application from event monitoring
434 self.monitors[model_name].RemoveApplication(application_name)
435
436 # self.notify_callback(model_name, application_name, "removing", callback, *callback_args)
437 self.log.debug("Removing the application {}".format(application_name))
438 await app.remove()
439
440 # Notify the callback that this charm has been removed.
441 self.notify_callback(model_name, application_name, "removed", callback, *callback_args)
442
443 except Exception as e:
444 print("Caught exception: {}".format(e))
445 self.log.debug(e)
446 raise e
447
448 async def DestroyNetworkService(self, nsd):
449 raise NotImplementedError()
450
451 async def GetMetrics(self, nsd, vnfd):
452 """Get the metrics collected by the VCA."""
453 raise NotImplementedError()
454
455 # Non-public methods
456 async def add_relation(self, a, b, via=None):
457 """
458 Add a relation between two application endpoints.
459
460 :param a An application endpoint
461 :param b An application endpoint
462 :param via The egress subnet(s) for outbound traffic, e.g.,
463 (192.168.0.0/16,10.0.0.0/8)
464 """
465 if not self.authenticated:
466 await self.login()
467
468 m = await self.get_model()
469 try:
470 m.add_relation(a, b, via)
471 finally:
472 await m.disconnect()
473
474 async def apply_config(self, config, application):
475 """Apply a configuration to the application."""
476 print("JujuApi: Applying configuration to {}.".format(
477 application
478 ))
479 return await self.set_config(application=application, config=config)
480
481 def _get_config_from_dict(self, config_primitive, values):
482 """Transform the yang config primitive to dict.
483
484 Expected result:
485
486 config = {
487 'config':
488 }
489 """
490 config = {}
491 for primitive in config_primitive:
492 if primitive['name'] == 'config':
493 # config = self._map_primitive_parameters()
494 for parameter in primitive['parameter']:
495 param = str(parameter['name'])
496 if parameter['value'] == "<rw_mgmt_ip>":
497 config[param] = str(values[parameter['value']])
498 else:
499 config[param] = str(parameter['value'])
500
501 return config
502
503 def _map_primitive_parameters(self, parameters, values):
504 params = {}
505 for parameter in parameters:
506 param = str(parameter['name'])
507 if parameter['value'] == "<rw_mgmt_ip>":
508 params[param] = str(values[parameter['value']])
509 else:
510 params[param] = str(parameter['value'])
511 return params
512
513 def _get_config_from_yang(self, config_primitive, values):
514 """Transform the yang config primitive to dict."""
515 config = {}
516 for primitive in config_primitive.values():
517 if primitive['name'] == 'config':
518 for parameter in primitive['parameter'].values():
519 param = str(parameter['name'])
520 if parameter['value'] == "<rw_mgmt_ip>":
521 config[param] = str(values[parameter['value']])
522 else:
523 config[param] = str(parameter['value'])
524
525 return config
526
527 def FormatApplicationName(self, *args):
528 """
529 Generate a Juju-compatible Application name
530
531 :param args tuple: Positional arguments to be used to construct the
532 application name.
533
534 Limitations::
535 - Only accepts characters a-z and non-consequitive dashes (-)
536 - Application name should not exceed 50 characters
537
538 Examples::
539
540 FormatApplicationName("ping_pong_ns", "ping_vnf", "a")
541 """
542
543 appname = ""
544 for c in "-".join(list(args)):
545 if c.isdigit():
546 c = chr(97 + int(c))
547 elif not c.isalpha():
548 c = "-"
549 appname += c
550 return re.sub('\-+', '-', appname.lower())
551
552
553 # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0):
554 # """Format the name of the application
555 #
556 # Limitations:
557 # - Only accepts characters a-z and non-consequitive dashes (-)
558 # - Application name should not exceed 50 characters
559 # """
560 # name = "{}-{}-{}".format(nsd_name, vnfr_name, member_vnf_index)
561 # new_name = ''
562 # for c in name:
563 # if c.isdigit():
564 # c = chr(97 + int(c))
565 # elif not c.isalpha():
566 # c = "-"
567 # new_name += c
568 # return re.sub('\-+', '-', new_name.lower())
569
570 def format_model_name(self, name):
571 """Format the name of model.
572
573 Model names may only contain lowercase letters, digits and hyphens
574 """
575
576 return name.replace('_', '-').lower()
577
578 async def get_application(self, model, application):
579 """Get the deployed application."""
580 if not self.authenticated:
581 await self.login()
582
583 app = None
584 if application and model:
585 if model.applications:
586 if application in model.applications:
587 app = model.applications[application]
588
589 return app
590
591 async def get_model(self, model_name='default'):
592 """Get a model from the Juju Controller.
593
594 Note: Model objects returned must call disconnected() before it goes
595 out of scope."""
596 if not self.authenticated:
597 await self.login()
598
599 if model_name not in self.models:
600 print("connecting to model {}".format(model_name))
601 self.models[model_name] = await self.controller.get_model(model_name)
602
603 # Create an observer for this model
604 self.monitors[model_name] = VCAMonitor(model_name)
605 self.models[model_name].add_observer(self.monitors[model_name])
606
607 return self.models[model_name]
608
609 async def login(self):
610 """Login to the Juju controller."""
611
612 if self.authenticated:
613 return
614
615 self.connecting = True
616
617 self.log.debug("JujuApi: Logging into controller")
618
619 cacert = None
620 self.controller = Controller()
621
622 if self.secret:
623 self.log.debug("Connecting to controller... ws://{}:{} as {}/{}".format(self.endpoint, self.port, self.user, self.secret))
624 await self.controller.connect(
625 endpoint=self.endpoint,
626 username=self.user,
627 password=self.secret,
628 cacert=cacert,
629 )
630 else:
631 # current_controller no longer exists
632 # self.log.debug("Connecting to current controller...")
633 # await self.controller.connect_current()
634 # await self.controller.connect(
635 # endpoint=self.endpoint,
636 # username=self.user,
637 # cacert=cacert,
638 # )
639 self.log.fatal("VCA credentials not configured.")
640
641 self.authenticated = True
642 self.log.debug("JujuApi: Logged into controller")
643
644 # self.default_model = await self.controller.get_model("default")
645
646 async def logout(self):
647 """Logout of the Juju controller."""
648 if not self.authenticated:
649 return
650
651 try:
652 if self.default_model:
653 self.log.debug("Disconnecting model {}".format(self.default_model))
654 await self.default_model.disconnect()
655 self.default_model = None
656
657 for model in self.models:
658 await self.models[model].disconnect()
659
660 if self.controller:
661 self.log.debug("Disconnecting controller {}".format(self.controller))
662 await self.controller.disconnect()
663 # self.controller = None
664
665 self.authenticated = False
666 except Exception as e:
667 self.log.fail("Fatal error logging out of Juju Controller: {}".format(e))
668 raise e
669
670
671 # async def remove_application(self, name):
672 # """Remove the application."""
673 # if not self.authenticated:
674 # await self.login()
675 #
676 # app = await self.get_application(name)
677 # if app:
678 # self.log.debug("JujuApi: Destroying application {}".format(
679 # name,
680 # ))
681 #
682 # await app.destroy()
683
684 async def remove_relation(self, a, b):
685 """
686 Remove a relation between two application endpoints
687
688 :param a An application endpoint
689 :param b An application endpoint
690 """
691 if not self.authenticated:
692 await self.login()
693
694 m = await self.get_model()
695 try:
696 m.remove_relation(a, b)
697 finally:
698 await m.disconnect()
699
700 async def resolve_error(self, application=None):
701 """Resolve units in error state."""
702 if not self.authenticated:
703 await self.login()
704
705 app = await self.get_application(self.default_model, application)
706 if app:
707 self.log.debug("JujuApi: Resolving errors for application {}".format(
708 application,
709 ))
710
711 for unit in app.units:
712 app.resolved(retry=True)
713
714 async def run_action(self, application, action_name, **params):
715 """Execute an action and return an Action object."""
716 if not self.authenticated:
717 await self.login()
718 result = {
719 'status': '',
720 'action': {
721 'tag': None,
722 'results': None,
723 }
724 }
725 app = await self.get_application(self.default_model, application)
726 if app:
727 # We currently only have one unit per application
728 # so use the first unit available.
729 unit = app.units[0]
730
731 self.log.debug("JujuApi: Running Action {} against Application {}".format(
732 action_name,
733 application,
734 ))
735
736 action = await unit.run_action(action_name, **params)
737
738 # Wait for the action to complete
739 await action.wait()
740
741 result['status'] = action.status
742 result['action']['tag'] = action.data['id']
743 result['action']['results'] = action.results
744
745 return result
746
747 async def set_config(self, application, config):
748 """Apply a configuration to the application."""
749 if not self.authenticated:
750 await self.login()
751
752 app = await self.get_application(self.default_model, application)
753 if app:
754 self.log.debug("JujuApi: Setting config for Application {}".format(
755 application,
756 ))
757 await app.set_config(config)
758
759 # Verify the config is set
760 newconf = await app.get_config()
761 for key in config:
762 if config[key] != newconf[key]['value']:
763 self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key]))
764
765 async def set_parameter(self, parameter, value, application=None):
766 """Set a config parameter for a service."""
767 if not self.authenticated:
768 await self.login()
769
770 self.log.debug("JujuApi: Setting {}={} for Application {}".format(
771 parameter,
772 value,
773 application,
774 ))
775 return await self.apply_config(
776 {parameter: value},
777 application=application,
778 )
779
780 async def wait_for_application(self, name, timeout=300):
781 """Wait for an application to become active."""
782 if not self.authenticated:
783 await self.login()
784
785 app = await self.get_application(self.default_model, name)
786 if app:
787 self.log.debug(
788 "JujuApi: Waiting {} seconds for Application {}".format(
789 timeout,
790 name,
791 )
792 )
793
794 await self.default_model.block_until(
795 lambda: all(
796 unit.agent_status == 'idle'
797 and unit.workload_status
798 in ['active', 'unknown'] for unit in app.units
799 ),
800 timeout=timeout
801 )