Source code for dlab_deployment.infrastructure.command_executor

# *****************************************************************************
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
# *****************************************************************************
import abc
from contextlib import contextmanager
import os
from shutil import copyfile
import subprocess
from time import sleep

from scp import SCPClient
import paramiko
import six

from dlab_core.domain.helper import break_after


[docs]@six.add_metaclass(abc.ABCMeta) class BaseCommandExecutor(object): @abc.abstractmethod
[docs] def run(self, command): """Run cli command :type command: str :param command: cli command """ raise NotImplementedError
@abc.abstractmethod
[docs] def sudo(self, command): """Run cli sudo command :type command: str :param command: cli command """ raise NotImplementedError
@abc.abstractmethod
[docs] def cd(self, path): """Change work directory to path :type path: str :param path: directory location """ raise NotImplementedError
@abc.abstractmethod
[docs] def put(self, local_path, remote_path): """Copy file :type local_path: str :param local_path: path to local object :type remote_path: str :param remote_path: path to remote object """ raise NotImplementedError
[docs]class LocalCommandExecutor(BaseCommandExecutor):
[docs] def run(self, command): # pragma: no cover """Run cli command :type command: str :param command: cli command :rtype: str :return execution result """ lines = [] process = subprocess.Popen( command, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) while process.poll() is None: line = process.stdout.readline() lines.append(line) # TODO: Add logging return ' '.join(lines)
[docs] def sudo(self, command): """Run cli sudo command :type command: str :param command: cli command :rtype: str :return execution result """ raise NotImplementedError
@contextmanager
[docs] def cd(self, path): """Change work directory to path :type path: str :param path: directory location """ current_dir = os.getcwd() try: os.chdir(path) yield finally: os.chdir(current_dir)
[docs] def put(self, local_path, remote_path): """Copy file :type local_path: str :param local_path: path to local object :type remote_path: str :param remote_path: path to remote object """ copyfile(local_path, remote_path)
[docs]class ParamikoCommandExecutor(BaseCommandExecutor): def __init__(self, host, name, identity_file): """ :type host: str :param host: ip address or host name :type name: str :param name: user name :type: str :param identity_file: path to file """ self.current_dir = None self._connection = None self.host = host self.name = name self.identity_file = identity_file @property
[docs] def connection(self): """Return paramiko connection""" return self._connection or self.init_connection()
@break_after(180)
[docs] def init_connection(self): """Init connection""" connection = paramiko.SSHClient() connection.set_missing_host_key_policy( paramiko.AutoAddPolicy()) while True: try: connection.connect(self.host, username=self.name, key_filename=self.identity_file) connection.exec_command('ls') return connection except Exception: sleep(10)
@property
[docs] def current_dir(self): """Default directory""" return self._current_dir
@current_dir.setter def current_dir(self, val): """Set default directory :type val: str :param val: new directory """ self._current_dir = val
[docs] def run(self, command): """Run cli command :type command: str :param command: cli command :rtype: str :return execution result """ if self.current_dir: command = 'cd {}; {}'.format(self.current_dir, command) stdin, stdout, stderr = self.connection.exec_command(command) return stdout.read().decode('ascii').strip("\n")
[docs] def sudo(self, command): """Run sudo cli command :type command: str :param command: cli command :rtype: str :return execution result """ command = 'sudo {}'.format(command) return self.run(command)
@contextmanager
[docs] def cd(self, path): try: self.current_dir = path yield finally: self.current_dir = None
[docs] def put(self, local_path, remote_path): """Copy file :type local_path: str :param local_path: path to local object :type remote_path: str :param remote_path: path to remote object """ scp = SCPClient(self.connection.get_transport()) scp.put(local_path, recursive=True, remote_path=remote_path) scp.close()