e39b04e8c3bb6708832c0340d91cf59d961fdc9d
[osm/N2VC.git] / n2vc / vnf.py
1
2 import logging
3 import os
4 import os.path
5 import re
6 import ssl
7 import sys
8 import time
9
10 # FIXME: this should load the juju inside or modules without having to
11 # explicitly install it. Check why it's not working.
12 # Load our subtree of the juju library
13 path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
14 path = os.path.join(path, "modules/libjuju/")
15 if path not in sys.path:
16 sys.path.insert(1, path)
17
18 from juju.controller import Controller
19 from juju.model import Model, ModelObserver
20
21
22 # We might need this to connect to the websocket securely, but test and verify.
23 try:
24 ssl._create_default_https_context = ssl._create_unverified_context
25 except AttributeError:
26 # Legacy Python doesn't verify by default (see pep-0476)
27 # https://www.python.org/dev/peps/pep-0476/
28 pass
29
30
31 # Custom exceptions
32 class JujuCharmNotFound(Exception):
33 """The Charm can't be found or is not readable."""
34
35
36 class JujuApplicationExists(Exception):
37 """The Application already exists."""
38
39
40 class N2VCPrimitiveExecutionFailed(Exception):
41 """Something failed while attempting to execute a primitive."""
42
43
44 # Quiet the debug logging
45 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
46 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
47 logging.getLogger('juju.model').setLevel(logging.WARN)
48 logging.getLogger('juju.machine').setLevel(logging.WARN)
49
50
51 class VCAMonitor(ModelObserver):
52 """Monitor state changes within the Juju Model."""
53 log = None
54 ns_name = None
55 applications = {}
56
57 def __init__(self, ns_name):
58 self.log = logging.getLogger(__name__)
59
60 self.ns_name = ns_name
61
62 def AddApplication(self, application_name, callback, *callback_args):
63 if application_name not in self.applications:
64 self.applications[application_name] = {
65 'callback': callback,
66 'callback_args': callback_args
67 }
68
69 def RemoveApplication(self, application_name):
70 if application_name in self.applications:
71 del self.applications[application_name]
72
73 async def on_change(self, delta, old, new, model):
74 """React to changes in the Juju model."""
75
76 if delta.entity == "unit":
77 # Ignore change events from other applications
78 if delta.data['application'] not in self.applications.keys():
79 return
80
81 try:
82
83 application_name = delta.data['application']
84
85 callback = self.applications[application_name]['callback']
86 callback_args = self.applications[application_name]['callback_args']
87
88 if old and new:
89 old_status = old.workload_status
90 new_status = new.workload_status
91
92 if old_status == new_status:
93 """The workload status may fluctuate around certain events,
94 so wait until the status has stabilized before triggering
95 the callback."""
96 if callback:
97 callback(
98 self.ns_name,
99 delta.data['application'],
100 new_status,
101 *callback_args)
102
103 if old and not new:
104 # This is a charm being removed
105 if callback:
106 callback(
107 self.ns_name,
108 delta.data['application'],
109 "removed",
110 *callback_args)
111 except Exception as e:
112 self.log.debug("[1] notify_callback exception {}".format(e))
113 elif delta.entity == "action":
114 # TODO: Decide how we want to notify the user of actions
115
116 # uuid = delta.data['id'] # The Action's unique id
117 # msg = delta.data['message'] # The output of the action
118 #
119 # if delta.data['status'] == "pending":
120 # # The action is queued
121 # pass
122 # elif delta.data['status'] == "completed""
123 # # The action was successful
124 # pass
125 # elif delta.data['status'] == "failed":
126 # # The action failed.
127 # pass
128
129 pass
130
131 ########
132 # TODO
133 #
134 # Create unique models per network service
135 # Document all public functions
136
137
138 class N2VC:
139
140 # Juju API
141 api = None
142 log = None
143 controller = None
144 connecting = False
145 authenticated = False
146
147 models = {}
148 default_model = None
149
150 # Model Observers
151 monitors = {}
152
153 # VCA config
154 hostname = ""
155 port = 17070
156 username = ""
157 secret = ""
158
159 def __init__(self,
160 log=None,
161 server='127.0.0.1',
162 port=17070,
163 user='admin',
164 secret=None,
165 artifacts=None
166 ):
167 """Initialize N2VC
168
169 :param vcaconfig dict A dictionary containing the VCA configuration
170
171 :param artifacts str The directory where charms required by a vnfd are
172 stored.
173
174 :Example:
175 n2vc = N2VC(vcaconfig={
176 'secret': 'MzI3MDJhOTYxYmM0YzRjNTJiYmY1Yzdm',
177 'user': 'admin',
178 'ip-address': '10.44.127.137',
179 'port': 17070,
180 'artifacts': '/path/to/charms'
181 })
182
183 """
184
185 if log:
186 self.log = log
187 else:
188 self.log = logging.getLogger(__name__)
189
190 # Quiet websocket traffic
191 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
192 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
193 logging.getLogger('model').setLevel(logging.WARN)
194 # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG)
195
196 self.log.debug('JujuApi: instantiated')
197
198 self.server = server
199 self.port = port
200
201 self.secret = secret
202 if user.startswith('user-'):
203 self.user = user
204 else:
205 self.user = 'user-{}'.format(user)
206
207 self.endpoint = '%s:%d' % (server, int(port))
208
209 self.artifacts = artifacts
210
211 def __del__(self):
212 """Close any open connections."""
213 yield self.logout()
214
215 def notify_callback(self, model_name, application_name, status, callback=None, *callback_args):
216 try:
217 if callback:
218 callback(model_name, application_name, status, *callback_args)
219 except Exception as e:
220 self.log.error("[0] notify_callback exception {}".format(e))
221 raise e
222 return True
223
224 # Public methods
225 async def CreateNetworkService(self, nsd):
226 """Create a new model to encapsulate this network service.
227
228 Create a new model in the Juju controller to encapsulate the
229 charms associated with a network service.
230
231 You can pass either the nsd record or the id of the network
232 service, but this method will fail without one of them.
233 """
234 if not self.authenticated:
235 await self.login()
236
237 # Ideally, we will create a unique model per network service.
238 # This change will require all components, i.e., LCM and SO, to use
239 # N2VC for 100% compatibility. If we adopt unique models for the LCM,
240 # services deployed via LCM would't be manageable via SO and vice versa
241
242 return self.default_model
243
244 async def DeployCharms(self, model_name, application_name, vnfd, charm_path, params={}, machine_spec={}, callback=None, *callback_args):
245 """Deploy one or more charms associated with a VNF.
246
247 Deploy the charm(s) referenced in a VNF Descriptor.
248
249 :param str model_name: The name of the network service.
250 :param str application_name: The name of the application
251 :param dict vnfd: The name of the application
252 :param str charm_path: The path to the Juju charm
253 :param dict params: A dictionary of runtime parameters
254 Examples::
255 {
256 'rw_mgmt_ip': '1.2.3.4',
257 # Pass the initial-config-primitives section of the vnf or vdu
258 'initial-config-primitives': {...}
259 }
260 :param dict machine_spec: A dictionary describing the machine to install to
261 Examples::
262 {
263 'hostname': '1.2.3.4',
264 'username': 'ubuntu',
265 }
266 :param obj callback: A callback function to receive status changes.
267 :param tuple callback_args: A list of arguments to be passed to the callback
268 """
269
270 ########################################################
271 # Verify the path to the charm exists and is readable. #
272 ########################################################
273 if not os.path.exists(charm_path):
274 self.log.debug("Charm path doesn't exist: {}".format(charm_path))
275 self.notify_callback(model_name, application_name, "failed", callback, *callback_args)
276 raise JujuCharmNotFound("No artifacts configured.")
277
278 ################################
279 # Login to the Juju controller #
280 ################################
281 if not self.authenticated:
282 self.log.debug("Authenticating with Juju")
283 await self.login()
284
285 ##########################################
286 # Get the model for this network service #
287 ##########################################
288 # TODO: In a point release, we will use a model per deployed network
289 # service. In the meantime, we will always use the 'default' model.
290 model_name = 'default'
291 model = await self.get_model(model_name)
292
293 ########################################
294 # Verify the application doesn't exist #
295 ########################################
296 app = await self.get_application(model, application_name)
297 if app:
298 raise JujuApplicationExists("Can't deploy application \"{}\" to model \"{}\" because it already exists.".format(application_name, model))
299
300 ################################################################
301 # Register this application with the model-level event monitor #
302 ################################################################
303 if callback:
304 self.monitors[model_name].AddApplication(
305 application_name,
306 callback,
307 *callback_args
308 )
309
310 ########################################################
311 # Check for specific machine placement (native charms) #
312 ########################################################
313 to = ""
314 if machine_spec.keys():
315 # TODO: This needs to be tested.
316 # if all(k in machine_spec for k in ['hostname', 'username']):
317 # # Enlist the existing machine in Juju
318 # machine = await self.model.add_machine(spec='ssh:%@%'.format(
319 # specs['host'],
320 # specs['user'],
321 # ))
322 # to = machine.id
323 pass
324
325 #######################################
326 # Get the initial charm configuration #
327 #######################################
328
329 rw_mgmt_ip = None
330 if 'rw_mgmt_ip' in params:
331 rw_mgmt_ip = params['rw_mgmt_ip']
332
333 initial_config = self._get_config_from_dict(
334 params['initial-config-primitive'],
335 {'<rw_mgmt_ip>': rw_mgmt_ip}
336 )
337
338 self.log.debug("JujuApi: Deploying charm ({}) from {}".format(
339 application_name,
340 charm_path,
341 to=to,
342 ))
343
344 ########################################################
345 # Deploy the charm and apply the initial configuration #
346 ########################################################
347 app = await model.deploy(
348 # We expect charm_path to be either the path to the charm on disk
349 # or in the format of cs:series/name
350 charm_path,
351 # This is the formatted, unique name for this charm
352 application_name=application_name,
353 # Proxy charms should use the current LTS. This will need to be
354 # changed for native charms.
355 series='xenial',
356 # Apply the initial 'config' primitive during deployment
357 config=initial_config,
358 # TBD: Where to deploy the charm to.
359 to=None,
360 )
361
362 # #######################################
363 # # Execute initial config primitive(s) #
364 # #######################################
365 primitives = {}
366
367 # Build a sequential list of the primitives to execute
368 for primitive in params['initial-config-primitive']:
369 try:
370 if primitive['name'] == 'config':
371 # This is applied when the Application is deployed
372 pass
373 else:
374 seq = primitive['seq']
375
376 primitives[seq] = {
377 'name': primitive['name'],
378 'parameters': self._map_primitive_parameters(
379 primitive['parameter'],
380 {'<rw_mgmt_ip>': rw_mgmt_ip}
381 ),
382 }
383
384 for primitive in sorted(primitives):
385 await self.ExecutePrimitive(
386 model_name,
387 application_name,
388 primitives[primitive]['name'],
389 callback,
390 callback_args,
391 **primitives[primitive]['parameters'],
392 )
393 except N2VCPrimitiveExecutionFailed as e:
394 self.debug.log(
395 "[N2VC] Exception executing primitive: {}".format(e)
396 )
397 raise
398
399 async def ExecutePrimitive(self, model_name, application_name, primitive, callback, *callback_args, **params):
400 """Execute a primitive of a charm for Day 1 or Day 2 configuration.
401
402 Execute a primitive defined in the VNF descriptor.
403
404 :param str model_name: The name of the network service.
405 :param str application_name: The name of the application
406 :param str primitive: The name of the primitive to execute.
407 :param obj callback: A callback function to receive status changes.
408 :param tuple callback_args: A list of arguments to be passed to the callback function.
409 :param dict params: A dictionary of key=value pairs representing the primitive's parameters
410 Examples::
411 {
412 'rw_mgmt_ip': '1.2.3.4',
413 # Pass the initial-config-primitives section of the vnf or vdu
414 'initial-config-primitives': {...}
415 }
416 """
417 uuid = None
418 try:
419 if not self.authenticated:
420 await self.login()
421
422 # FIXME: This is hard-coded until model-per-ns is added
423 model_name = 'default'
424
425 model = await self.controller.get_model(model_name)
426
427 if primitive == 'config':
428 # config is special, and expecting params to be a dictionary
429 self.log.debug("Setting charm configuration for {}".format(application_name))
430 self.log.debug(params['params'])
431 await self.set_config(model, application_name, params['params'])
432 else:
433 app = await self.get_application(model, application_name)
434 if app:
435 # Run against the first (and probably only) unit in the app
436 unit = app.units[0]
437 if unit:
438 self.log.debug("Executing primitive {}".format(primitive))
439 action = await unit.run_action(primitive, **params)
440 uuid = action.id
441 await model.disconnect()
442 except Exception as e:
443 self.log.debug("Caught exception while executing primitive: {}".format(e))
444 raise e
445 return uuid
446
447 async def RemoveCharms(self, model_name, application_name, callback=None, *callback_args):
448 """Remove a charm from the VCA.
449
450 Remove a charm referenced in a VNF Descriptor.
451
452 :param str model_name: The name of the network service.
453 :param str application_name: The name of the application
454 :param obj callback: A callback function to receive status changes.
455 :param tuple callback_args: A list of arguments to be passed to the callback function.
456 """
457 try:
458 if not self.authenticated:
459 await self.login()
460
461 model = await self.get_model(model_name)
462 app = await self.get_application(model, application_name)
463 if app:
464 # Remove this application from event monitoring
465 self.monitors[model_name].RemoveApplication(application_name)
466
467 # self.notify_callback(model_name, application_name, "removing", callback, *callback_args)
468 self.log.debug("Removing the application {}".format(application_name))
469 await app.remove()
470
471 # Notify the callback that this charm has been removed.
472 self.notify_callback(model_name, application_name, "removed", callback, *callback_args)
473
474 except Exception as e:
475 print("Caught exception: {}".format(e))
476 self.log.debug(e)
477 raise e
478
479 async def DestroyNetworkService(self, nsd):
480 raise NotImplementedError()
481
482 async def GetMetrics(self, model_name, application_name):
483 """Get the metrics collected by the VCA.
484
485 :param model_name The name of the model
486 :param application_name The name of the application
487 """
488 metrics = {}
489 model = await self.get_model(model_name)
490 app = await self.get_application(model, application_name)
491 if app:
492 metrics = await app.get_metrics()
493
494 return metrics
495
496 # Non-public methods
497 async def add_relation(self, a, b, via=None):
498 """
499 Add a relation between two application endpoints.
500
501 :param a An application endpoint
502 :param b An application endpoint
503 :param via The egress subnet(s) for outbound traffic, e.g.,
504 (192.168.0.0/16,10.0.0.0/8)
505 """
506 if not self.authenticated:
507 await self.login()
508
509 m = await self.get_model()
510 try:
511 m.add_relation(a, b, via)
512 finally:
513 await m.disconnect()
514
515 # async def apply_config(self, config, application):
516 # """Apply a configuration to the application."""
517 # print("JujuApi: Applying configuration to {}.".format(
518 # application
519 # ))
520 # return await self.set_config(application=application, config=config)
521
522 def _get_config_from_dict(self, config_primitive, values):
523 """Transform the yang config primitive to dict.
524
525 Expected result:
526
527 config = {
528 'config':
529 }
530 """
531 config = {}
532 for primitive in config_primitive:
533 if primitive['name'] == 'config':
534 # config = self._map_primitive_parameters()
535 for parameter in primitive['parameter']:
536 param = str(parameter['name'])
537 if parameter['value'] == "<rw_mgmt_ip>":
538 config[param] = str(values[parameter['value']])
539 else:
540 config[param] = str(parameter['value'])
541
542 return config
543
544 def _map_primitive_parameters(self, parameters, values):
545 params = {}
546 for parameter in parameters:
547 param = str(parameter['name'])
548 if parameter['value'] == "<rw_mgmt_ip>":
549 params[param] = str(values[parameter['value']])
550 else:
551 params[param] = str(parameter['value'])
552 return params
553
554 def _get_config_from_yang(self, config_primitive, values):
555 """Transform the yang config primitive to dict."""
556 config = {}
557 for primitive in config_primitive.values():
558 if primitive['name'] == 'config':
559 for parameter in primitive['parameter'].values():
560 param = str(parameter['name'])
561 if parameter['value'] == "<rw_mgmt_ip>":
562 config[param] = str(values[parameter['value']])
563 else:
564 config[param] = str(parameter['value'])
565
566 return config
567
568 def FormatApplicationName(self, *args):
569 """
570 Generate a Juju-compatible Application name
571
572 :param args tuple: Positional arguments to be used to construct the
573 application name.
574
575 Limitations::
576 - Only accepts characters a-z and non-consequitive dashes (-)
577 - Application name should not exceed 50 characters
578
579 Examples::
580
581 FormatApplicationName("ping_pong_ns", "ping_vnf", "a")
582 """
583
584 appname = ""
585 for c in "-".join(list(args)):
586 if c.isdigit():
587 c = chr(97 + int(c))
588 elif not c.isalpha():
589 c = "-"
590 appname += c
591 return re.sub('\-+', '-', appname.lower())
592
593
594 # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0):
595 # """Format the name of the application
596 #
597 # Limitations:
598 # - Only accepts characters a-z and non-consequitive dashes (-)
599 # - Application name should not exceed 50 characters
600 # """
601 # name = "{}-{}-{}".format(nsd_name, vnfr_name, member_vnf_index)
602 # new_name = ''
603 # for c in name:
604 # if c.isdigit():
605 # c = chr(97 + int(c))
606 # elif not c.isalpha():
607 # c = "-"
608 # new_name += c
609 # return re.sub('\-+', '-', new_name.lower())
610
611 def format_model_name(self, name):
612 """Format the name of model.
613
614 Model names may only contain lowercase letters, digits and hyphens
615 """
616
617 return name.replace('_', '-').lower()
618
619 async def get_application(self, model, application):
620 """Get the deployed application."""
621 if not self.authenticated:
622 await self.login()
623
624 app = None
625 if application and model:
626 if model.applications:
627 if application in model.applications:
628 app = model.applications[application]
629
630 return app
631
632 async def get_model(self, model_name='default'):
633 """Get a model from the Juju Controller.
634
635 Note: Model objects returned must call disconnected() before it goes
636 out of scope."""
637 if not self.authenticated:
638 await self.login()
639
640 if model_name not in self.models:
641 print("connecting to model {}".format(model_name))
642 self.models[model_name] = await self.controller.get_model(model_name)
643
644 # Create an observer for this model
645 self.monitors[model_name] = VCAMonitor(model_name)
646 self.models[model_name].add_observer(self.monitors[model_name])
647
648 return self.models[model_name]
649
650 async def login(self):
651 """Login to the Juju controller."""
652
653 if self.authenticated:
654 return
655
656 self.connecting = True
657
658 self.log.debug("JujuApi: Logging into controller")
659
660 cacert = None
661 self.controller = Controller()
662
663 if self.secret:
664 self.log.debug("Connecting to controller... ws://{}:{} as {}/{}".format(self.endpoint, self.port, self.user, self.secret))
665 await self.controller.connect(
666 endpoint=self.endpoint,
667 username=self.user,
668 password=self.secret,
669 cacert=cacert,
670 )
671 else:
672 # current_controller no longer exists
673 # self.log.debug("Connecting to current controller...")
674 # await self.controller.connect_current()
675 # await self.controller.connect(
676 # endpoint=self.endpoint,
677 # username=self.user,
678 # cacert=cacert,
679 # )
680 self.log.fatal("VCA credentials not configured.")
681
682 self.authenticated = True
683 self.log.debug("JujuApi: Logged into controller")
684
685 # self.default_model = await self.controller.get_model("default")
686
687 async def logout(self):
688 """Logout of the Juju controller."""
689 if not self.authenticated:
690 return
691
692 try:
693 if self.default_model:
694 self.log.debug("Disconnecting model {}".format(self.default_model))
695 await self.default_model.disconnect()
696 self.default_model = None
697
698 for model in self.models:
699 await self.models[model].disconnect()
700
701 if self.controller:
702 self.log.debug("Disconnecting controller {}".format(self.controller))
703 await self.controller.disconnect()
704 # self.controller = None
705
706 self.authenticated = False
707 except Exception as e:
708 self.log.fail("Fatal error logging out of Juju Controller: {}".format(e))
709 raise e
710
711
712 # async def remove_application(self, name):
713 # """Remove the application."""
714 # if not self.authenticated:
715 # await self.login()
716 #
717 # app = await self.get_application(name)
718 # if app:
719 # self.log.debug("JujuApi: Destroying application {}".format(
720 # name,
721 # ))
722 #
723 # await app.destroy()
724
725 async def remove_relation(self, a, b):
726 """
727 Remove a relation between two application endpoints
728
729 :param a An application endpoint
730 :param b An application endpoint
731 """
732 if not self.authenticated:
733 await self.login()
734
735 m = await self.get_model()
736 try:
737 m.remove_relation(a, b)
738 finally:
739 await m.disconnect()
740
741 async def resolve_error(self, application=None):
742 """Resolve units in error state."""
743 if not self.authenticated:
744 await self.login()
745
746 app = await self.get_application(self.default_model, application)
747 if app:
748 self.log.debug("JujuApi: Resolving errors for application {}".format(
749 application,
750 ))
751
752 for unit in app.units:
753 app.resolved(retry=True)
754
755 async def run_action(self, application, action_name, **params):
756 """Execute an action and return an Action object."""
757 if not self.authenticated:
758 await self.login()
759 result = {
760 'status': '',
761 'action': {
762 'tag': None,
763 'results': None,
764 }
765 }
766 app = await self.get_application(self.default_model, application)
767 if app:
768 # We currently only have one unit per application
769 # so use the first unit available.
770 unit = app.units[0]
771
772 self.log.debug("JujuApi: Running Action {} against Application {}".format(
773 action_name,
774 application,
775 ))
776
777 action = await unit.run_action(action_name, **params)
778
779 # Wait for the action to complete
780 await action.wait()
781
782 result['status'] = action.status
783 result['action']['tag'] = action.data['id']
784 result['action']['results'] = action.results
785
786 return result
787
788 async def set_config(self, model_name, application, config):
789 """Apply a configuration to the application."""
790 if not self.authenticated:
791 await self.login()
792
793 app = await self.get_application(model_name, application)
794 if app:
795 self.log.debug("JujuApi: Setting config for Application {}".format(
796 application,
797 ))
798 await app.set_config(config)
799
800 # Verify the config is set
801 newconf = await app.get_config()
802 for key in config:
803 if config[key] != newconf[key]['value']:
804 self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key]))
805
806 # async def set_parameter(self, parameter, value, application=None):
807 # """Set a config parameter for a service."""
808 # if not self.authenticated:
809 # await self.login()
810 #
811 # self.log.debug("JujuApi: Setting {}={} for Application {}".format(
812 # parameter,
813 # value,
814 # application,
815 # ))
816 # return await self.apply_config(
817 # {parameter: value},
818 # application=application,
819 # )
820
821 async def wait_for_application(self, name, timeout=300):
822 """Wait for an application to become active."""
823 if not self.authenticated:
824 await self.login()
825
826 app = await self.get_application(self.default_model, name)
827 if app:
828 self.log.debug(
829 "JujuApi: Waiting {} seconds for Application {}".format(
830 timeout,
831 name,
832 )
833 )
834
835 await self.default_model.block_until(
836 lambda: all(
837 unit.agent_status == 'idle'
838 and unit.workload_status
839 in ['active', 'unknown'] for unit in app.units
840 ),
841 timeout=timeout
842 )