add6a29ddaa2df1ba94a0b0e2312fd4e82a07f08
[osm/SO.git] / rwcm / plugins / rwconman / rift / tasklets / rwconmantasklet / jujuconf.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import asyncio
18 import os
19 import re
20 import tempfile
21 import time
22 import yaml
23
24 import rift.mano.utils.juju_api as juju
25 from . import riftcm_config_plugin
26
27
28 # Charm service name accepts only a to z and -.
29 def get_vnf_unique_name(nsr_name, vnfr_short_name, member_vnf_index):
30 name = "{}-{}-{}".format(nsr_name, vnfr_short_name, member_vnf_index)
31 new_name = ''
32 for c in name:
33 if c.isdigit():
34 c = chr(97 + int(c))
35 elif not c.isalpha():
36 c = "-"
37 new_name += c
38 return new_name.lower()
39
40
41 class JujuConfigPlugin(riftcm_config_plugin.RiftCMConfigPluginBase):
42 """
43 Juju implementation of the riftcm_config_plugin.RiftCMConfigPluginBase
44 """
45 def __init__(self, dts, log, loop, account):
46 riftcm_config_plugin.RiftCMConfigPluginBase.__init__(self, dts, log, loop, account)
47 self._name = account.name
48 self._type = 'juju'
49 self._ip_address = account.juju.ip_address
50 self._port = account.juju.port
51 self._user = account.juju.user
52 self._secret = account.juju.secret
53 self._rift_install_dir = os.environ['RIFT_INSTALL']
54 self._rift_artif_dir = os.environ['RIFT_ARTIFACTS']
55
56 ############################################################
57 # This is wrongfully overloaded with 'juju' private data. #
58 # Really need to separate agent_vnfr from juju vnfr data. #
59 # Currently, this holds agent_vnfr, which has actual vnfr, #
60 # then this juju overloads actual vnfr with its own #
61 # dictionary elemetns (WRONG!!!) #
62 self._juju_vnfs = {}
63 ############################################################
64
65 self._tasks = {}
66 self._api = juju.JujuApi(log, loop,
67 self._ip_address, self._port,
68 self._user, self._secret)
69
70 @property
71 def name(self):
72 return self._name
73
74 @property
75 def agent_type(self):
76 return self._type
77
78 @property
79 def api(self):
80 return self._api
81
82 @property
83 def agent_data(self):
84 return dict(
85 type=self.agent_type,
86 name=self.name,
87 host=self._ip_address,
88 port=self._port,
89 user=self._user,
90 secret=self._secret
91 )
92
93 def vnfr(self, vnfr_id):
94 try:
95 vnfr = self._juju_vnfs[vnfr_id].vnfr
96 except KeyError:
97 self._log.error("jujuCA: Did not find VNFR %s in juju plugin", vnfr_id)
98 return None
99
100 return vnfr
101
102 def get_service_name(self, vnfr_id):
103 vnfr = self.vnfr(vnfr_id)
104 if vnfr and 'vnf_juju_name' in vnfr:
105 return vnfr['vnf_juju_name']
106 return None
107
108 def juju_log(self, level, name, log_str, *args):
109 if name is not None:
110 g_log_str = 'jujuCA:({}) {}'.format(name, log_str)
111 else:
112 g_log_str = 'jujuCA: {}'.format(log_str)
113 getattr(self._log, level)(g_log_str, *args)
114
115 # TBD: Do a better, similar to config manager
116 def xlate(self, tag, tags):
117 # TBD
118 if tag is None:
119 return tag
120 val = tag
121 if re.search('<.*>', tag):
122 self._log.debug("jujuCA: Xlate value %s", tag)
123 try:
124 if tag == '<rw_mgmt_ip>':
125 val = tags['rw_mgmt_ip']
126 except KeyError as e:
127 self._log.info("jujuCA: Did not get a value for tag %s, e=%s",
128 tag, e)
129 return val
130
131 @asyncio.coroutine
132 def notify_create_vlr(self, agent_nsr, agent_vnfr, vld, vlr):
133 """
134 Notification of create VL record
135 """
136 return True
137
138 @asyncio.coroutine
139 def notify_create_vnfr(self, agent_nsr, agent_vnfr):
140 """
141 Notification of create Network VNF record
142 Returns True if configured using config_agent
143 """
144 # Deploy the charm if specified for the vnf
145 self._log.debug("jujuCA: create vnfr nsr=%s vnfr=%s",
146 agent_nsr.name, agent_vnfr.name)
147 self._log.debug("jujuCA: Config = %s",
148 agent_vnfr.vnf_configuration)
149 try:
150 vnf_config = agent_vnfr.vnfr_msg.vnf_configuration
151 self._log.debug("jujuCA: vnf_configuration = %s", vnf_config)
152 if not vnf_config.has_field('juju'):
153 return False
154 charm = vnf_config.juju.charm
155 self._log.debug("jujuCA: charm = %s", charm)
156 except Exception as e:
157 self._log.Error("jujuCA: vnf_configuration error for vnfr {}: {}".
158 format(agent_vnfr.name, e))
159 return False
160
161 # Prepare unique name for this VNF
162 vnf_unique_name = get_vnf_unique_name(agent_nsr.name,
163 agent_vnfr.name,
164 agent_vnfr.member_vnf_index)
165 if vnf_unique_name in self._tasks:
166 self._log.warn("jujuCA: Service %s already deployed",
167 vnf_unique_name)
168
169 vnfr_dict = agent_vnfr.vnfr
170 vnfr_dict.update({'vnf_juju_name': vnf_unique_name,
171 'charm': charm,
172 'nsr_id': agent_nsr.id,
173 'member_vnf_index': agent_vnfr.member_vnf_index,
174 'tags': {},
175 'active': False,
176 'config': vnf_config,
177 'vnfr_name' : agent_vnfr.name})
178 self._log.debug("jujuCA: Charm %s for vnf %s to be deployed as %s",
179 charm, agent_vnfr.name, vnf_unique_name)
180
181 # Find the charm directory
182 try:
183 path = os.path.join(self._rift_artif_dir,
184 'launchpad/libs',
185 agent_vnfr.vnfr_msg.vnfd.id,
186 'charms/trusty',
187 charm)
188 self._log.debug("jujuCA: Charm dir is {}".format(path))
189 if not os.path.isdir(path):
190 self._log.error("jujuCA: Did not find the charm directory at {}".
191 format(path))
192 path = None
193 except Exception as e:
194 self.log.exception(e)
195 return False
196
197 if vnf_unique_name not in self._tasks:
198 self._tasks[vnf_unique_name] = {}
199
200 self._tasks[vnf_unique_name]['deploy'] = self.loop.create_task(
201 self.api.deploy_service(charm, vnf_unique_name, path=path))
202
203 self._log.debug("jujuCA: Deploying service %s",
204 vnf_unique_name)
205
206 return True
207
208 @asyncio.coroutine
209 def notify_instantiate_vnfr(self, agent_nsr, agent_vnfr):
210 """
211 Notification of Instantiate NSR with the passed nsr id
212 """
213 return True
214
215 @asyncio.coroutine
216 def notify_instantiate_vlr(self, agent_nsr, agent_vnfr, vlr):
217 """
218 Notification of Instantiate NSR with the passed nsr id
219 """
220 return True
221
222 @asyncio.coroutine
223 def notify_terminate_nsr(self, agent_nsr, agent_vnfr):
224 """
225 Notification of Terminate the network service
226 """
227 return True
228
229 @asyncio.coroutine
230 def notify_terminate_vnfr(self, agent_nsr, agent_vnfr):
231 """
232 Notification of Terminate the network service
233 """
234 self._log.debug("jujuCA: Terminate VNFr {}, current vnfrs={}".
235 format(agent_vnfr.name, self._juju_vnfs))
236 try:
237 vnfr = agent_vnfr.vnfr
238 service = vnfr['vnf_juju_name']
239
240 self._log.debug ("jujuCA: Terminating VNFr %s, %s",
241 agent_vnfr.name, service)
242 self._tasks[service]['destroy'] = self.loop.create_task(
243 self.api.destroy_service(service)
244 )
245
246 del self._juju_vnfs[agent_vnfr.id]
247 self._log.debug ("jujuCA: current vnfrs={}".
248 format(self._juju_vnfs))
249 if service in self._tasks:
250 tasks = []
251 for action in self._tasks[service].keys():
252 #if self.check_task_status(service, action):
253 tasks.append(action)
254 del tasks
255 except KeyError as e:
256 self._log.debug ("jujuCA: Termiating charm service for VNFr {}, e={}".
257 format(agent_vnfr.name, e))
258 except Exception as e:
259 self._log.error("jujuCA: Exception terminating charm service for VNFR {}: {}".
260 format(agent_vnfr.name, e))
261
262 return True
263
264 @asyncio.coroutine
265 def notify_terminate_vlr(self, agent_nsr, agent_vnfr, vlr):
266 """
267 Notification of Terminate the virtual link
268 """
269 return True
270
271 def check_task_status(self, service, action):
272 #self.log.debug("jujuCA: check task status for %s, %s" % (service, action))
273 try:
274 task = self._tasks[service][action]
275 if task.done():
276 self.log.debug("jujuCA: Task for %s, %s done" % (service, action))
277 e = task.exception()
278 if e:
279 self.log.error("jujuCA: Error in task for {} and {} : {}".
280 format(service, action, e))
281 raise Exception(e)
282 r= task.result()
283 if r:
284 self.log.debug("jujuCA: Task for {} and {}, returned {}".
285 format(service, action,r))
286 return True
287 else:
288 self.log.debug("jujuCA: task {}, {} not done".
289 format(service, action))
290 return False
291 except KeyError as e:
292 self.log.error("jujuCA: KeyError for task for {} and {}: {}".
293 format(service, action, e))
294 except Exception as e:
295 self.log.error("jujuCA: Error for task for {} and {}: {}".
296 format(service, action, e))
297 raise
298 return True
299
300 @asyncio.coroutine
301 def vnf_config_primitive(self, nsr_id, vnfr_id, primitive, output):
302 self._log.debug("jujuCA: VNF config primititve {} for nsr {}, vnfr_id {}".
303 format(primitive, nsr_id, vnfr_id))
304 try:
305 vnfr = self._juju_vnfs[vnfr_id].vnfr
306 except KeyError:
307 self._log.error("jujuCA: Did not find VNFR %s in juju plugin",
308 vnfr_id)
309 return
310
311 output.execution_status = "failed"
312 output.execution_id = ''
313 output.execution_error_details = ''
314
315 try:
316 service = vnfr['vnf_juju_name']
317 vnf_config = vnfr['config']
318 self._log.debug("VNF config %s", vnf_config)
319 configs = vnf_config.service_primitive
320 for config in configs:
321 if config.name == primitive.name:
322 self._log.debug("jujuCA: Found the config primitive %s",
323 config.name)
324 params = {}
325 for parameter in primitive.parameter:
326 if parameter.value:
327 val = self.xlate(parameter.value, vnfr['tags'])
328 # TBD do validation of the parameters
329 data_type = 'STRING'
330 found = False
331 for ca_param in config.parameter:
332 if ca_param.name == parameter.name:
333 data_type = ca_param.data_type
334 found = True
335 break
336 try:
337 if data_type == 'INTEGER':
338 tmp = int(val)
339 val = tmp
340 except Exception as e:
341 pass
342
343 if not found:
344 self._log.warn("jujuCA: Did not find parameter {} for {}".
345 format(parameter, config.name))
346 params.update({parameter.name: val})
347
348 if config.name == 'config':
349 output.execution_id = 'config'
350 if len(params):
351 self._log.debug("jujuCA: applying config with params {} for service {}".
352 format(params, service))
353
354 rc = yield from self.api.apply_config(params, service=service, wait=False)
355
356 if rc:
357 # Mark as pending and check later for the status
358 output.execution_status = "pending"
359 self._log.debug("jujuCA: applied config {} on {}".
360 format(params, service))
361 else:
362 output.execution_status = 'failed'
363 output.execution_error_details = \
364 'Failed to apply config: {}'.format(params)
365 self._log.error("jujuCA: Error applying config {} on service {}".
366 format(params, service))
367 else:
368 self._log.warn("jujuCA: Did not find valid parameters for config : {}".
369 format(primitive.parameter))
370 output.execution_status = "completed"
371 else:
372 self._log.debug("jujuCA: Execute action {} on service {} with params {}".
373 format(config.name, service, params))
374
375 resp = yield from self.api.execute_action(config.name,
376 params,
377 service=service)
378
379 if resp:
380 if 'error' in resp:
381 output.execution_error_details = resp['error']['Message']
382 else:
383 output.execution_id = resp['action']['tag']
384 output.execution_status = resp['status']
385 if output.execution_status == 'failed':
386 output.execution_error_details = resp['message']
387 self._log.debug("jujuCA: execute action {} on service {} returned {}".
388 format(config.name, service, output.execution_status))
389 else:
390 self._log.error("jujuCA: error executing action {} for {} with {}".
391 format(config.name, service, params))
392 output.execution_id = ''
393 output.execution_status = 'failed'
394 output.execution_error_details = "Failed to queue the action"
395 break
396
397 except KeyError as e:
398 self._log.info("VNF %s does not have config primititves, e=%s", vnfr_id, e)
399
400 @asyncio.coroutine
401 def apply_config(self, agent_nsr, agent_vnfr, config, rpc_ip):
402 """ Notification on configuration of an NSR """
403 pass
404
405 @asyncio.coroutine
406 def apply_ns_config(self, agent_nsr, agent_vnfrs, rpc_ip):
407 """
408
409 ###### TBD - This really does not belong here. Looks more like NS level script ####
410 ###### apply_config should be called for a particular VNF only here ###############
411
412 Hook: Runs the user defined script. Feeds all the necessary data
413 for the script thro' yaml file.
414
415 Args:
416 rpc_ip (YangInput_Nsr_ExecNsConfigPrimitive): The input data.
417 nsr (NetworkServiceRecord): Description
418 vnfrs (dict): VNFR ID => VirtualNetworkFunctionRecord
419
420 """
421 def get_meta(agent_nsr):
422 unit_names, initial_params, vnfr_index_map = {}, {}, {}
423
424 for vnfr_id in agent_nsr.vnfr_ids:
425 juju_vnf = self._juju_vnfs[vnfr_id].vnfr
426
427 # Vnfr -> index ref
428 vnfr_index_map[vnfr_id] = juju_vnf['member_vnf_index']
429
430 # Unit name
431 unit_names[vnfr_id] = juju_vnf['vnf_juju_name']
432
433 # Flatten the data for simplicity
434 param_data = {}
435 self._log.debug("Juju Config:%s", juju_vnf['config'])
436 for primitive in juju_vnf['config'].initial_config_primitive:
437 for parameter in primitive.parameter:
438 value = self.xlate(parameter.value, juju_vnf['tags'])
439 param_data[parameter.name] = value
440
441 initial_params[vnfr_id] = param_data
442
443
444 return unit_names, initial_params, vnfr_index_map
445
446 unit_names, init_data, vnfr_index_map = get_meta(agent_nsr)
447
448 # The data consists of 4 sections
449 # 1. Account data
450 # 2. The input passed.
451 # 3. Juju unit names (keyed by vnfr ID).
452 # 4. Initial config data (keyed by vnfr ID).
453 data = dict()
454 data['config_agent'] = dict(
455 name=self._name,
456 host=self._ip_address,
457 port=self._port,
458 user=self._user,
459 secret=self._secret
460 )
461 data["rpc_ip"] = rpc_ip.as_dict()
462 data["unit_names"] = unit_names
463 data["init_config"] = init_data
464 data["vnfr_index_map"] = vnfr_index_map
465
466 tmp_file = None
467 with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
468 tmp_file.write(yaml.dump(data, default_flow_style=True)
469 .encode("UTF-8"))
470
471 self._log.debug("jujuCA: Creating a temp file: {} with input data".format(
472 tmp_file.name))
473
474 # Get the full path to the script
475 script = ''
476 if rpc_ip.user_defined_script[0] == '/':
477 # The script has full path, use as is
478 script = rpc_ip.user_defined_script
479 else:
480 script = os.path.join(self._rift_artif_dir, 'launchpad/libs', agent_nsr.id, 'scripts',
481 rpc_ip.user_defined_script)
482 self.log.debug("jujuCA: Checking for script in %s", script)
483 if not os.path.exists(script):
484 script = os.path.join(self._rift_install_dir, 'usr/bin', rpc_ip.user_defined_script)
485
486 cmd = "{} {}".format(rpc_ip.user_defined_script, tmp_file.name)
487 self._log.debug("jujuCA: Running the CMD: {}".format(cmd))
488
489 coro = asyncio.create_subprocess_shell(cmd, loop=self._loop,
490 stderr=asyncio.subprocess.PIPE)
491 process = yield from coro
492 err = yield from process.stderr.read()
493 task = self._loop.create_task(process.wait())
494
495 return task, err
496
497 @asyncio.coroutine
498 def apply_initial_config(self, agent_nsr, agent_vnfr):
499 """
500 Apply the initial configuration
501 Expect config directives mostly, not actions
502 Actions in initial config may not work based on charm design
503 """
504
505 vnfr = agent_vnfr.vnfr
506 service = vnfr['vnf_juju_name']
507
508 rc = yield from self.api.is_service_up(service=service)
509 if not rc:
510 return False
511
512 action_ids = []
513 try:
514 vnf_cat = agent_vnfr.vnfr_msg
515 if vnf_cat and vnf_cat.mgmt_interface.ip_address:
516 vnfr['tags'].update({'rw_mgmt_ip': vnf_cat.mgmt_interface.ip_address})
517 self._log.debug("jujuCA:(%s) tags: %s", vnfr['vnf_juju_name'], vnfr['tags'])
518
519 config = {}
520 try:
521 for primitive in vnfr['config'].initial_config_primitive:
522 self._log.debug("jujuCA:(%s) Initial config primitive %s", vnfr['vnf_juju_name'], primitive)
523 if primitive.name == 'config':
524 for param in primitive.parameter:
525 if vnfr['tags']:
526 val = self.xlate(param.value, vnfr['tags'])
527 config.update({param.name: val})
528 except KeyError as e:
529 self._log.exception("jujuCA:(%s) Initial config error(%s): config=%s",
530 vnfr['vnf_juju_name'], str(e), config)
531 config = None
532 return False
533
534 if config:
535 self.juju_log('info', vnfr['vnf_juju_name'],
536 "Applying Initial config:%s",
537 config)
538
539 rc = yield from self.api.apply_config(config, service=service)
540 if rc is False:
541 self.log.error("Service {} is in error state".format(service))
542 return False
543
544
545 # Apply any actions specified as part of initial config
546 for primitive in vnfr['config'].initial_config_primitive:
547 if primitive.name != 'config':
548 self._log.debug("jujuCA:(%s) Initial config action primitive %s",
549 vnfr['vnf_juju_name'], primitive)
550 action = primitive.name
551 params = {}
552 for param in primitive.parameter:
553 val = self.xlate(param.value, vnfr['tags'])
554 params.update({param.name: val})
555
556 self._log.info("jujuCA:(%s) Action %s with params %s",
557 vnfr['vnf_juju_name'], action, params)
558
559 resp = yield from self.api.execute_action(action, params,
560 service=service)
561 if 'error' in resp:
562 self._log.error("Applying initial config on {} failed for {} with {}: {}".
563 format(vnfr['vnf_juju_name'], action, params, resp))
564 return False
565
566 action_ids.append(resp['action']['tag'])
567
568 except KeyError as e:
569 self._log.info("Juju config agent(%s): VNFR %s not managed by Juju",
570 vnfr['vnf_juju_name'], agent_vnfr.id)
571 return False
572 except Exception as e:
573 self._log.exception("jujuCA:(%s) Exception juju apply_initial_config for VNFR {}: {}".
574 format(vnfr['vnf_juju_name'], agent_vnfr.id, e))
575 return False
576
577 # Check if all actions completed
578 pending = True
579 while pending:
580 pending = False
581 for act in action_ids:
582 resp = yield from self.api.get_action_status(act)
583 if 'error' in resp:
584 self._log.error("Initial config failed: {}".format(resp))
585 return False
586
587 if resp['status'] == 'failed':
588 self._log.error("Initial config action failed: {}".format(resp))
589 return False
590
591 if resp['status'] == 'pending':
592 pending = True
593
594 return True
595
596 def add_vnfr_managed(self, agent_vnfr):
597 if agent_vnfr.id not in self._juju_vnfs.keys():
598 self._log.info("juju config agent: add vnfr={}/{}".
599 format(agent_vnfr.name, agent_vnfr.id))
600 self._juju_vnfs[agent_vnfr.id] = agent_vnfr
601
602 def is_vnfr_managed(self, vnfr_id):
603 try:
604 if vnfr_id in self._juju_vnfs:
605 return True
606 except Exception as e:
607 self._log.debug("jujuCA: Is VNFR {} managed: {}".
608 format(vnfr_id, e))
609 return False
610
611 @asyncio.coroutine
612 def is_configured(self, vnfr_id):
613 try:
614 agent_vnfr = self._juju_vnfs[vnfr_id]
615 vnfr = agent_vnfr.vnfr
616 if vnfr['active']:
617 return True
618
619 vnfr = self._juju_vnfs[vnfr_id].vnfr
620 service = vnfr['vnf_juju_name']
621 resp = self.api.is_service_active(service=service)
622 self._juju_vnfs[vnfr_id]['active'] = resp
623 self._log.debug("jujuCA: Service state for {} is {}".
624 format(service, resp))
625 return resp
626
627 except KeyError:
628 self._log.debug("jujuCA: VNFR id {} not found in config agent".
629 format(vnfr_id))
630 return False
631 except Exception as e:
632 self._log.error("jujuCA: VNFR id {} is_configured: {}".
633 format(vnfr_id, e))
634 return False
635
636 @asyncio.coroutine
637 def get_config_status(self, agent_nsr, agent_vnfr):
638 """Get the configuration status for the VNF"""
639 rc = 'unknown'
640
641 try:
642 vnfr = agent_vnfr.vnfr
643 service = vnfr['vnf_juju_name']
644 except KeyError:
645 # This VNF is not managed by Juju
646 return rc
647
648 rc = 'configuring'
649
650 if not self.check_task_status(service, 'deploy'):
651 return rc
652
653 try:
654 resp = yield from self.api.get_service_status(service=service)
655 self._log.debug("jujuCA: Get service %s status? %s", service, resp)
656
657 if resp == 'error':
658 return 'error'
659 if resp == 'active':
660 return 'configured'
661 except KeyError:
662 self._log.error("jujuCA: Check unknown service %s status", service)
663 except Exception as e:
664 self._log.error("jujuCA: Caught exception when checking for service is active: %s", e)
665 self._log.exception(e)
666
667 return rc
668
669 def get_action_status(self, execution_id):
670 ''' Get the action status for an execution ID
671 *** Make sure this is NOT a asyncio coroutine function ***
672 '''
673
674 try:
675 self._log.debug("jujuCA: Get action status for {}".format(execution_id))
676 resp = self.api._get_action_status(execution_id)
677 self._log.debug("jujuCA: Action status: {}".format(resp))
678 return resp
679 except Exception as e:
680 self._log.error("jujuCA: Error fetching execution status for %s",
681 execution_id)
682 self._log.exception(e)
683 raise e
684
685 def get_service_status(self, vnfr_id):
686 '''Get the service status, used by job status handle
687 Make sure this is NOT a coroutine
688 '''
689 service = self.get_service_name(vnfr_id)
690 if service is None:
691 self._log.error("jujuCA: VNFR {} not managed by this Juju agent".
692 format(vnfr_id))
693 return None
694
695 # Delay for 3 seconds before checking as config apply takes a
696 # few seconds to transfer to the service
697 time.sleep(3)
698 return self.api._get_service_status(service=service)