9641e73d22fac006ba9c7c694a83af3d22dad945
[osm/N2VC.git] / vnf.py
1
2 import logging
3 import os
4 import os.path
5 import re
6 import ssl
7 import sys
8 import time
9
10 # FIXME: this should load the juju inside or modules without having to
11 # explicitly install it. Check why it's not working.
12 # Load our subtree of the juju library
13 path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
14 path = os.path.join(path, "modules/libjuju/")
15 if path not in sys.path:
16 sys.path.insert(1, path)
17
18 from juju.controller import Controller
19 from juju.model import Model, ModelObserver
20
21
22 # We might need this to connect to the websocket securely, but test and verify.
23 try:
24 ssl._create_default_https_context = ssl._create_unverified_context
25 except AttributeError:
26 # Legacy Python doesn't verify by default (see pep-0476)
27 # https://www.python.org/dev/peps/pep-0476/
28 pass
29
30
31 # Custom exceptions
32 class JujuCharmNotFound(Exception):
33 """The Charm can't be found or is not readable."""
34
35
36 class JujuApplicationExists(Exception):
37 """The Application already exists."""
38
39
40 class N2VCPrimitiveExecutionFailed(Exception):
41 """Something failed while attempting to execute a primitive."""
42
43
44 # Quiet the debug logging
45 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
46 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
47 logging.getLogger('juju.model').setLevel(logging.WARN)
48 logging.getLogger('juju.machine').setLevel(logging.WARN)
49
50
51 class VCAMonitor(ModelObserver):
52 """Monitor state changes within the Juju Model."""
53 log = None
54 ns_name = None
55 applications = {}
56
57 def __init__(self, ns_name):
58 self.log = logging.getLogger(__name__)
59
60 self.ns_name = ns_name
61
62 def AddApplication(self, application_name, callback, *callback_args):
63 if application_name not in self.applications:
64 self.applications[application_name] = {
65 'callback': callback,
66 'callback_args': callback_args
67 }
68
69 def RemoveApplication(self, application_name):
70 if application_name in self.applications:
71 del self.applications[application_name]
72
73 async def on_change(self, delta, old, new, model):
74 """React to changes in the Juju model."""
75
76 if delta.entity == "unit":
77 # Ignore change events from other applications
78 if delta.data['application'] not in self.applications.keys():
79 return
80
81 try:
82
83 application_name = delta.data['application']
84
85 callback = self.applications[application_name]['callback']
86 callback_args = self.applications[application_name]['callback_args']
87
88 if old and new:
89 old_status = old.workload_status
90 new_status = new.workload_status
91
92 if old_status == new_status:
93 """The workload status may fluctuate around certain events,
94 so wait until the status has stabilized before triggering
95 the callback."""
96 if callback:
97 callback(
98 self.ns_name,
99 delta.data['application'],
100 new_status,
101 *callback_args)
102
103 if old and not new:
104 # This is a charm being removed
105 if callback:
106 callback(
107 self.ns_name,
108 delta.data['application'],
109 "removed",
110 *callback_args)
111 except Exception as e:
112 self.log.debug("[1] notify_callback exception {}".format(e))
113 elif delta.entity == "action":
114 # TODO: Decide how we want to notify the user of actions
115
116 # uuid = delta.data['id'] # The Action's unique id
117 # msg = delta.data['message'] # The output of the action
118 #
119 # if delta.data['status'] == "pending":
120 # # The action is queued
121 # pass
122 # elif delta.data['status'] == "completed""
123 # # The action was successful
124 # pass
125 # elif delta.data['status'] == "failed":
126 # # The action failed.
127 # pass
128
129 pass
130
131 ########
132 # TODO
133 #
134 # Create unique models per network service
135 # Document all public functions
136
137
138 class N2VC:
139
140 # Juju API
141 api = None
142 log = None
143 controller = None
144 connecting = False
145 authenticated = False
146
147 models = {}
148 default_model = None
149
150 # Model Observers
151 monitors = {}
152
153 # VCA config
154 hostname = ""
155 port = 17070
156 username = ""
157 secret = ""
158
159 def __init__(self,
160 log=None,
161 server='127.0.0.1',
162 port=17070,
163 user='admin',
164 secret=None,
165 artifacts=None
166 ):
167 """Initialize N2VC
168
169 :param vcaconfig dict A dictionary containing the VCA configuration
170
171 :param artifacts str The directory where charms required by a vnfd are
172 stored.
173
174 :Example:
175 n2vc = N2VC(vcaconfig={
176 'secret': 'MzI3MDJhOTYxYmM0YzRjNTJiYmY1Yzdm',
177 'user': 'admin',
178 'ip-address': '10.44.127.137',
179 'port': 17070,
180 'artifacts': '/path/to/charms'
181 })
182
183 """
184
185 if log:
186 self.log = log
187 else:
188 self.log = logging.getLogger(__name__)
189
190 # Quiet websocket traffic
191 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
192 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
193 logging.getLogger('model').setLevel(logging.WARN)
194 # logging.getLogger('websockets.protocol').setLevel(logging.DEBUG)
195
196 self.log.debug('JujuApi: instantiated')
197
198 self.server = server
199 self.port = port
200
201 self.secret = secret
202 if user.startswith('user-'):
203 self.user = user
204 else:
205 self.user = 'user-{}'.format(user)
206
207 self.endpoint = '%s:%d' % (server, int(port))
208
209 self.artifacts = artifacts
210
211 def __del__(self):
212 """Close any open connections."""
213 yield self.logout()
214
215 def notify_callback(self, model_name, application_name, status, callback=None, *callback_args):
216 try:
217 if callback:
218 callback(model_name, application_name, status, *callback_args)
219 except Exception as e:
220 self.log.error("[0] notify_callback exception {}".format(e))
221 raise e
222 return True
223
224 # Public methods
225 async def CreateNetworkService(self, nsd):
226 """Create a new model to encapsulate this network service.
227
228 Create a new model in the Juju controller to encapsulate the
229 charms associated with a network service.
230
231 You can pass either the nsd record or the id of the network
232 service, but this method will fail without one of them.
233 """
234 if not self.authenticated:
235 await self.login()
236
237 # Ideally, we will create a unique model per network service.
238 # This change will require all components, i.e., LCM and SO, to use
239 # N2VC for 100% compatibility. If we adopt unique models for the LCM,
240 # services deployed via LCM would't be manageable via SO and vice versa
241
242 return self.default_model
243
244 async def DeployCharms(self, model_name, application_name, vnfd, charm_path, params={}, machine_spec={}, callback=None, *callback_args):
245 """Deploy one or more charms associated with a VNF.
246
247 Deploy the charm(s) referenced in a VNF Descriptor.
248
249 You can pass either the nsd record or the id of the network
250 service, but this method will fail without one of them.
251
252 :param str ns_name: The name of the network service
253 :param str application_name: The name of the application
254 :param dict vnfd: The name of the application
255 :param str charm_path: The path to the Juju charm
256 :param dict params: A dictionary of runtime parameters
257 Examples::
258 {
259 'rw_mgmt_ip': '1.2.3.4',
260 # Pass the initial-config-primitives section of the vnf or vdu
261 'initial-config-primitives': {...}
262 }
263 :param dict machine_spec: A dictionary describing the machine to install to
264 Examples::
265 {
266 'hostname': '1.2.3.4',
267 'username': 'ubuntu',
268 }
269 :param obj callback: A callback function to receive status changes.
270 :param tuple callback_args: A list of arguments to be passed to the callback
271 """
272
273 ########################################################
274 # Verify the path to the charm exists and is readable. #
275 ########################################################
276 if not os.path.exists(charm_path):
277 self.log.debug("Charm path doesn't exist: {}".format(charm_path))
278 self.notify_callback(model_name, application_name, "failed", callback, *callback_args)
279 raise JujuCharmNotFound("No artifacts configured.")
280
281 ################################
282 # Login to the Juju controller #
283 ################################
284 if not self.authenticated:
285 self.log.debug("Authenticating with Juju")
286 await self.login()
287
288 ##########################################
289 # Get the model for this network service #
290 ##########################################
291 # TODO: In a point release, we will use a model per deployed network
292 # service. In the meantime, we will always use the 'default' model.
293 model_name = 'default'
294 model = await self.get_model(model_name)
295
296 ########################################
297 # Verify the application doesn't exist #
298 ########################################
299 app = await self.get_application(model, application_name)
300 if app:
301 raise JujuApplicationExists("Can't deploy application \"{}\" to model \"{}\" because it already exists.".format(application_name, model))
302
303 ################################################################
304 # Register this application with the model-level event monitor #
305 ################################################################
306 if callback:
307 self.monitors[model_name].AddApplication(
308 application_name,
309 callback,
310 *callback_args
311 )
312
313 ########################################################
314 # Check for specific machine placement (native charms) #
315 ########################################################
316 to = ""
317 if machine_spec.keys():
318 # TODO: This needs to be tested.
319 # if all(k in machine_spec for k in ['hostname', 'username']):
320 # # Enlist the existing machine in Juju
321 # machine = await self.model.add_machine(spec='ssh:%@%'.format(
322 # specs['host'],
323 # specs['user'],
324 # ))
325 # to = machine.id
326 pass
327
328 #######################################
329 # Get the initial charm configuration #
330 #######################################
331
332 rw_mgmt_ip = None
333 if 'rw_mgmt_ip' in params:
334 rw_mgmt_ip = params['rw_mgmt_ip']
335
336 initial_config = self._get_config_from_dict(
337 params['initial-config-primitive'],
338 {'<rw_mgmt_ip>': rw_mgmt_ip}
339 )
340
341 self.log.debug("JujuApi: Deploying charm ({}) from {}".format(
342 application_name,
343 charm_path,
344 to=to,
345 ))
346
347 ########################################################
348 # Deploy the charm and apply the initial configuration #
349 ########################################################
350 app = await model.deploy(
351 # We expect charm_path to be either the path to the charm on disk
352 # or in the format of cs:series/name
353 charm_path,
354 # This is the formatted, unique name for this charm
355 application_name=application_name,
356 # Proxy charms should use the current LTS. This will need to be
357 # changed for native charms.
358 series='xenial',
359 # Apply the initial 'config' primitive during deployment
360 config=initial_config,
361 # TBD: Where to deploy the charm to.
362 to=None,
363 )
364
365 # #######################################
366 # # Execute initial config primitive(s) #
367 # #######################################
368 primitives = {}
369
370 # Build a sequential list of the primitives to execute
371 for primitive in params['initial-config-primitive']:
372 try:
373 if primitive['name'] == 'config':
374 # This is applied when the Application is deployed
375 pass
376 else:
377 seq = primitive['seq']
378
379 primitives[seq] = {
380 'name': primitive['name'],
381 'parameters': self._map_primitive_parameters(
382 primitive['parameter'],
383 {'<rw_mgmt_ip>': rw_mgmt_ip}
384 ),
385 }
386
387 for primitive in sorted(primitives):
388 await self.ExecutePrimitive(
389 model_name,
390 application_name,
391 primitives[primitive]['name'],
392 callback,
393 callback_args,
394 **primitives[primitive]['parameters'],
395 )
396 except N2VCPrimitiveExecutionFailed as e:
397 self.debug.log(
398 "[N2VC] Exception executing primitive: {}".format(e)
399 )
400 raise
401
402 async def ExecutePrimitive(self, model_name, application_name, primitive, callback, *callback_args, **params):
403 """
404 Queue the execution of a primitive
405
406 returns the UUID of the executed primitive
407 """
408 uuid = None
409 try:
410 if not self.authenticated:
411 await self.login()
412
413 # FIXME: This is hard-coded until model-per-ns is added
414 model_name = 'default'
415
416 model = await self.controller.get_model(model_name)
417
418 if primitive == 'config':
419 # config is special, and expecting params to be a dictionary
420 self.log.debug("Setting charm configuration for {}".format(application_name))
421 self.log.debug(params['params'])
422 await self.set_config(model, application_name, params['params'])
423 else:
424 app = await self.get_application(model, application_name)
425 if app:
426 # Run against the first (and probably only) unit in the app
427 unit = app.units[0]
428 if unit:
429 self.log.debug("Executing primitive {}".format(primitive))
430 action = await unit.run_action(primitive, **params)
431 uuid = action.id
432 await model.disconnect()
433 except Exception as e:
434 self.log.debug("Caught exception while executing primitive: {}".format(e))
435 raise e
436 return uuid
437
438 async def RemoveCharms(self, model_name, application_name, callback=None, *callback_args):
439 try:
440 if not self.authenticated:
441 await self.login()
442
443 model = await self.get_model(model_name)
444 app = await self.get_application(model, application_name)
445 if app:
446 # Remove this application from event monitoring
447 self.monitors[model_name].RemoveApplication(application_name)
448
449 # self.notify_callback(model_name, application_name, "removing", callback, *callback_args)
450 self.log.debug("Removing the application {}".format(application_name))
451 await app.remove()
452
453 # Notify the callback that this charm has been removed.
454 self.notify_callback(model_name, application_name, "removed", callback, *callback_args)
455
456 except Exception as e:
457 print("Caught exception: {}".format(e))
458 self.log.debug(e)
459 raise e
460
461 async def DestroyNetworkService(self, nsd):
462 raise NotImplementedError()
463
464 async def GetMetrics(self, model_name, application_name):
465 """Get the metrics collected by the VCA.
466
467 :param model_name The name of the model
468 :param application_name The name of the application
469 """
470 metrics = {}
471 model = await self.get_model(model_name)
472 app = await self.get_application(model, application_name)
473 if app:
474 metrics = await app.get_metrics()
475
476 return metrics
477
478 # Non-public methods
479 async def add_relation(self, a, b, via=None):
480 """
481 Add a relation between two application endpoints.
482
483 :param a An application endpoint
484 :param b An application endpoint
485 :param via The egress subnet(s) for outbound traffic, e.g.,
486 (192.168.0.0/16,10.0.0.0/8)
487 """
488 if not self.authenticated:
489 await self.login()
490
491 m = await self.get_model()
492 try:
493 m.add_relation(a, b, via)
494 finally:
495 await m.disconnect()
496
497 # async def apply_config(self, config, application):
498 # """Apply a configuration to the application."""
499 # print("JujuApi: Applying configuration to {}.".format(
500 # application
501 # ))
502 # return await self.set_config(application=application, config=config)
503
504 def _get_config_from_dict(self, config_primitive, values):
505 """Transform the yang config primitive to dict.
506
507 Expected result:
508
509 config = {
510 'config':
511 }
512 """
513 config = {}
514 for primitive in config_primitive:
515 if primitive['name'] == 'config':
516 # config = self._map_primitive_parameters()
517 for parameter in primitive['parameter']:
518 param = str(parameter['name'])
519 if parameter['value'] == "<rw_mgmt_ip>":
520 config[param] = str(values[parameter['value']])
521 else:
522 config[param] = str(parameter['value'])
523
524 return config
525
526 def _map_primitive_parameters(self, parameters, values):
527 params = {}
528 for parameter in parameters:
529 param = str(parameter['name'])
530 if parameter['value'] == "<rw_mgmt_ip>":
531 params[param] = str(values[parameter['value']])
532 else:
533 params[param] = str(parameter['value'])
534 return params
535
536 def _get_config_from_yang(self, config_primitive, values):
537 """Transform the yang config primitive to dict."""
538 config = {}
539 for primitive in config_primitive.values():
540 if primitive['name'] == 'config':
541 for parameter in primitive['parameter'].values():
542 param = str(parameter['name'])
543 if parameter['value'] == "<rw_mgmt_ip>":
544 config[param] = str(values[parameter['value']])
545 else:
546 config[param] = str(parameter['value'])
547
548 return config
549
550 def FormatApplicationName(self, *args):
551 """
552 Generate a Juju-compatible Application name
553
554 :param args tuple: Positional arguments to be used to construct the
555 application name.
556
557 Limitations::
558 - Only accepts characters a-z and non-consequitive dashes (-)
559 - Application name should not exceed 50 characters
560
561 Examples::
562
563 FormatApplicationName("ping_pong_ns", "ping_vnf", "a")
564 """
565
566 appname = ""
567 for c in "-".join(list(args)):
568 if c.isdigit():
569 c = chr(97 + int(c))
570 elif not c.isalpha():
571 c = "-"
572 appname += c
573 return re.sub('\-+', '-', appname.lower())
574
575
576 # def format_application_name(self, nsd_name, vnfr_name, member_vnf_index=0):
577 # """Format the name of the application
578 #
579 # Limitations:
580 # - Only accepts characters a-z and non-consequitive dashes (-)
581 # - Application name should not exceed 50 characters
582 # """
583 # name = "{}-{}-{}".format(nsd_name, vnfr_name, member_vnf_index)
584 # new_name = ''
585 # for c in name:
586 # if c.isdigit():
587 # c = chr(97 + int(c))
588 # elif not c.isalpha():
589 # c = "-"
590 # new_name += c
591 # return re.sub('\-+', '-', new_name.lower())
592
593 def format_model_name(self, name):
594 """Format the name of model.
595
596 Model names may only contain lowercase letters, digits and hyphens
597 """
598
599 return name.replace('_', '-').lower()
600
601 async def get_application(self, model, application):
602 """Get the deployed application."""
603 if not self.authenticated:
604 await self.login()
605
606 app = None
607 if application and model:
608 if model.applications:
609 if application in model.applications:
610 app = model.applications[application]
611
612 return app
613
614 async def get_model(self, model_name='default'):
615 """Get a model from the Juju Controller.
616
617 Note: Model objects returned must call disconnected() before it goes
618 out of scope."""
619 if not self.authenticated:
620 await self.login()
621
622 if model_name not in self.models:
623 print("connecting to model {}".format(model_name))
624 self.models[model_name] = await self.controller.get_model(model_name)
625
626 # Create an observer for this model
627 self.monitors[model_name] = VCAMonitor(model_name)
628 self.models[model_name].add_observer(self.monitors[model_name])
629
630 return self.models[model_name]
631
632 async def login(self):
633 """Login to the Juju controller."""
634
635 if self.authenticated:
636 return
637
638 self.connecting = True
639
640 self.log.debug("JujuApi: Logging into controller")
641
642 cacert = None
643 self.controller = Controller()
644
645 if self.secret:
646 self.log.debug("Connecting to controller... ws://{}:{} as {}/{}".format(self.endpoint, self.port, self.user, self.secret))
647 await self.controller.connect(
648 endpoint=self.endpoint,
649 username=self.user,
650 password=self.secret,
651 cacert=cacert,
652 )
653 else:
654 # current_controller no longer exists
655 # self.log.debug("Connecting to current controller...")
656 # await self.controller.connect_current()
657 # await self.controller.connect(
658 # endpoint=self.endpoint,
659 # username=self.user,
660 # cacert=cacert,
661 # )
662 self.log.fatal("VCA credentials not configured.")
663
664 self.authenticated = True
665 self.log.debug("JujuApi: Logged into controller")
666
667 # self.default_model = await self.controller.get_model("default")
668
669 async def logout(self):
670 """Logout of the Juju controller."""
671 if not self.authenticated:
672 return
673
674 try:
675 if self.default_model:
676 self.log.debug("Disconnecting model {}".format(self.default_model))
677 await self.default_model.disconnect()
678 self.default_model = None
679
680 for model in self.models:
681 await self.models[model].disconnect()
682
683 if self.controller:
684 self.log.debug("Disconnecting controller {}".format(self.controller))
685 await self.controller.disconnect()
686 # self.controller = None
687
688 self.authenticated = False
689 except Exception as e:
690 self.log.fail("Fatal error logging out of Juju Controller: {}".format(e))
691 raise e
692
693
694 # async def remove_application(self, name):
695 # """Remove the application."""
696 # if not self.authenticated:
697 # await self.login()
698 #
699 # app = await self.get_application(name)
700 # if app:
701 # self.log.debug("JujuApi: Destroying application {}".format(
702 # name,
703 # ))
704 #
705 # await app.destroy()
706
707 async def remove_relation(self, a, b):
708 """
709 Remove a relation between two application endpoints
710
711 :param a An application endpoint
712 :param b An application endpoint
713 """
714 if not self.authenticated:
715 await self.login()
716
717 m = await self.get_model()
718 try:
719 m.remove_relation(a, b)
720 finally:
721 await m.disconnect()
722
723 async def resolve_error(self, application=None):
724 """Resolve units in error state."""
725 if not self.authenticated:
726 await self.login()
727
728 app = await self.get_application(self.default_model, application)
729 if app:
730 self.log.debug("JujuApi: Resolving errors for application {}".format(
731 application,
732 ))
733
734 for unit in app.units:
735 app.resolved(retry=True)
736
737 async def run_action(self, application, action_name, **params):
738 """Execute an action and return an Action object."""
739 if not self.authenticated:
740 await self.login()
741 result = {
742 'status': '',
743 'action': {
744 'tag': None,
745 'results': None,
746 }
747 }
748 app = await self.get_application(self.default_model, application)
749 if app:
750 # We currently only have one unit per application
751 # so use the first unit available.
752 unit = app.units[0]
753
754 self.log.debug("JujuApi: Running Action {} against Application {}".format(
755 action_name,
756 application,
757 ))
758
759 action = await unit.run_action(action_name, **params)
760
761 # Wait for the action to complete
762 await action.wait()
763
764 result['status'] = action.status
765 result['action']['tag'] = action.data['id']
766 result['action']['results'] = action.results
767
768 return result
769
770 async def set_config(self, model_name, application, config):
771 """Apply a configuration to the application."""
772 if not self.authenticated:
773 await self.login()
774
775 app = await self.get_application(model_name, application)
776 if app:
777 self.log.debug("JujuApi: Setting config for Application {}".format(
778 application,
779 ))
780 await app.set_config(config)
781
782 # Verify the config is set
783 newconf = await app.get_config()
784 for key in config:
785 if config[key] != newconf[key]['value']:
786 self.log.debug("JujuApi: Config not set! Key {} Value {} doesn't match {}".format(key, config[key], newconf[key]))
787
788 # async def set_parameter(self, parameter, value, application=None):
789 # """Set a config parameter for a service."""
790 # if not self.authenticated:
791 # await self.login()
792 #
793 # self.log.debug("JujuApi: Setting {}={} for Application {}".format(
794 # parameter,
795 # value,
796 # application,
797 # ))
798 # return await self.apply_config(
799 # {parameter: value},
800 # application=application,
801 # )
802
803 async def wait_for_application(self, name, timeout=300):
804 """Wait for an application to become active."""
805 if not self.authenticated:
806 await self.login()
807
808 app = await self.get_application(self.default_model, name)
809 if app:
810 self.log.debug(
811 "JujuApi: Waiting {} seconds for Application {}".format(
812 timeout,
813 name,
814 )
815 )
816
817 await self.default_model.block_until(
818 lambda: all(
819 unit.agent_status == 'idle'
820 and unit.workload_status
821 in ['active', 'unknown'] for unit in app.units
822 ),
823 timeout=timeout
824 )