spug/spug_api/consumer/executors.py

64 lines
2.0 KiB
Python
Raw Normal View History

2020-01-15 12:48:22 +00:00
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the MIT License.
2019-11-24 09:05:58 +00:00
from channels.consumer import SyncConsumer
from apps.setting.utils import AppSetting
from django_redis import get_redis_connection
from libs.ssh import SSH
import threading
import socket
import json
class SSHExecutor(SyncConsumer):
def exec(self, job):
pkey = AppSetting.get('private_key')
job = Job(pkey=pkey, **job)
threading.Thread(target=job.run).start()
class Job:
def __init__(self, hostname, port, username, pkey, command, token=None, **kwargs):
self.ssh_cli = SSH(hostname, port, username, pkey)
self.key = f'{hostname}:{port}'
self.command = command
self.token = token
self.rds_cli = None
2019-11-25 11:26:45 +00:00
def _send(self, message, with_expire=False):
2019-11-24 09:05:58 +00:00
if self.rds_cli is None:
self.rds_cli = get_redis_connection()
self.rds_cli.rpush(self.token, json.dumps(message))
2019-11-25 11:26:45 +00:00
if with_expire:
self.rds_cli.expire(self.token, 300)
2019-11-24 09:05:58 +00:00
def send(self, data):
message = {'key': self.key, 'type': 'info', 'data': data}
self._send(message)
def send_system(self, data):
message = {'key': self.key, 'type': 'system', 'data': data}
self._send(message)
def send_error(self, data):
message = {'key': self.key, 'type': 'error', 'data': data}
self._send(message)
def send_status(self, code):
message = {'key': self.key, 'status': code}
2019-11-25 11:26:45 +00:00
self._send(message, True)
2019-11-24 09:05:58 +00:00
def run(self):
if not self.token:
return self.ssh_cli.exec_command(self.command)
self.send_system('### Executing')
code = -1
try:
2019-11-25 11:26:45 +00:00
for code, out in self.ssh_cli.exec_command_with_stream(self.command):
2019-11-24 09:05:58 +00:00
self.send(out)
except socket.timeout:
code = 130
self.send_error('### Time out')
finally:
self.send_status(code)