mirror of
https://codeberg.org/streams/streams.git
synced 2024-09-19 23:55:19 +00:00
517 lines
14 KiB
Python
Executable file
517 lines
14 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import sys, os
|
|
import readline
|
|
import pathlib
|
|
import urllib
|
|
import fnmatch
|
|
import configparser
|
|
import requests
|
|
import argparse
|
|
from requests.auth import HTTPBasicAuth
|
|
import base64
|
|
|
|
# use sys and pathlib to amend the path so we can find/import easywebdav
|
|
sys.path.append('util/py')
|
|
sys.path.append(str(pathlib.Path(__file__).parent.resolve()) + '/py')
|
|
import easywebdav
|
|
import easywebdav.__version__ as easywebdavversion
|
|
|
|
__version__= "2021.10.06"
|
|
|
|
SERVER = None
|
|
USER = None
|
|
PASSWD = None
|
|
VERIFY_SSL=True
|
|
|
|
class Completer():
|
|
|
|
matches = []
|
|
_commnd = []
|
|
_rfiles = []
|
|
_lfiles = []
|
|
_nomads = []
|
|
|
|
def __init__(self):
|
|
# Setup autocompletion
|
|
readline.parse_and_bind("tab:complete");
|
|
readline.set_completer(self.complete)
|
|
return
|
|
|
|
@property
|
|
def commnd(self):
|
|
return self._commnd
|
|
|
|
@commnd.setter
|
|
def commnd(self,names):
|
|
self._commnd = names
|
|
|
|
|
|
def get_command(self):
|
|
return self._commnd
|
|
|
|
@property
|
|
def lfiles(self):
|
|
return self._lfiles
|
|
|
|
@lfiles.setter
|
|
def lfiles(self,names):
|
|
self._lfiles = names
|
|
|
|
@property
|
|
def rfiles(self):
|
|
return self._rfiles
|
|
|
|
@rfiles.setter
|
|
def rfiles(self,names):
|
|
self._rfiles = names
|
|
|
|
@property
|
|
def nomads(self):
|
|
return self._nomads
|
|
|
|
@nomads.setter
|
|
def nomads(self,names):
|
|
self._nomads = names
|
|
|
|
|
|
# this is the completion function
|
|
def complete(self,text,state):
|
|
|
|
# set initial return state to base commands
|
|
self.matches = self._commnd
|
|
|
|
# peek at the typed input state and reconfigure the completion
|
|
# base map accordingly
|
|
|
|
current = self.get_cur_cmd()
|
|
|
|
if current in [ 'connect' ]:
|
|
self.matches = self._nomads
|
|
|
|
if current in [ 'put', 'lcd', 'lls' ]:
|
|
self.matches = self._lfiles
|
|
|
|
if current in [ 'get', 'cd', 'delete', 'cat', 'ls', 'rmdir', 'exists' ]:
|
|
self.matches = self._rfiles
|
|
|
|
if current in [ 'mkdir', 'mkdirs', 'pwd', 'lpwd', 'quit' ]:
|
|
self.matches = []
|
|
|
|
# On the current "word" reduce the base map to a subset of the results
|
|
if text != "":
|
|
self.matches = [x + " " for x in self.matches if x.startswith(text)]
|
|
|
|
try:
|
|
response = self.matches[state]
|
|
except IndexError:
|
|
response = None
|
|
|
|
return response
|
|
|
|
|
|
def get_cur_cmd(self):
|
|
idx = readline.get_begidx()
|
|
full = readline.get_line_buffer()
|
|
n = full[:idx].split()
|
|
return n[0] if len(n) > 0 else ""
|
|
|
|
|
|
|
|
|
|
#####################################################
|
|
|
|
class CommandNotFound(Exception):
|
|
pass
|
|
|
|
class NSH(object):
|
|
commands = ['cd','ls','exists','mkdir','mkdirs','rmdir','delete','put','get',
|
|
'connect', 'pwd','cat',
|
|
'lcd','lpwd', 'lls',
|
|
'quit', 'help','rfiles']
|
|
def __init__(self, host, session=None, davclient=None):
|
|
self.sessions = {}
|
|
self.host = host
|
|
self.session = session
|
|
self.davclient = davclient
|
|
|
|
|
|
@property
|
|
def host(self):
|
|
return self._host
|
|
|
|
@host.setter
|
|
def host(self, host):
|
|
self._host = host
|
|
self._hostname = host.replace("https:","").replace("/","")
|
|
|
|
@property
|
|
def hostname(self):
|
|
return self._hostname
|
|
|
|
@hostname.setter
|
|
def hostname(self, hostname):
|
|
self._host = "https://{}/".format(hostname)
|
|
self._hostname = hostname
|
|
|
|
@property
|
|
def session(self):
|
|
return self._session
|
|
|
|
@session.setter
|
|
def session(self, session):
|
|
self._session = session
|
|
self.davclient = easywebdav.connect( self.hostname, protocol='https', session=session, path="cloud", verify_ssl=VERIFY_SSL)
|
|
|
|
@property
|
|
def PS1(self):
|
|
if self.davclient is None:
|
|
return "[!]> "
|
|
return "{}:{}> ".format(self.hostname, self.davclient.cwd)
|
|
|
|
def get_host_session(self, host=None):
|
|
if self.session is None:
|
|
session = requests.Session()
|
|
else:
|
|
session = self.session
|
|
return session
|
|
|
|
def do(self, command, *args):
|
|
if not command in self.commands:
|
|
raise CommandNotFound("Unknown command '{}'".format(command))
|
|
|
|
cmd = getattr(self, "cmd_{}".format(command), None)
|
|
if cmd is None:
|
|
cmd = getattr(self.davclient, command)
|
|
|
|
return cmd(*args)
|
|
|
|
def cmd_cd(self, *args):
|
|
if len(args) == 0:
|
|
return;
|
|
if self.davclient.exists(args[0]):
|
|
self.davclient.cd(args[0])
|
|
|
|
def cmd_exists(self, *args):
|
|
if (len(args)==0):
|
|
return
|
|
return self.davclient.exists(args[0])
|
|
|
|
def cmd_mkdir(self, *args):
|
|
if (len(args)==0):
|
|
return
|
|
return self.davclient.mkdir(args[0])
|
|
|
|
def cmd_mkdirs(self, *args):
|
|
if (len(args)==0):
|
|
return
|
|
return self.davclient.mkdirs(args[0])
|
|
|
|
def cmd_rmdir(self, *args):
|
|
if (len(args)==0):
|
|
return
|
|
return self.davclient.rmdir(args[0])
|
|
|
|
def cmd_delete(self, *args):
|
|
if (len(args)==0):
|
|
return
|
|
return self.davclient.delete(args[0])
|
|
|
|
def cmd_put(self, *args):
|
|
if (len(args)==0):
|
|
return
|
|
args = list(args)
|
|
if (len(args)==1):
|
|
args.append(args[0])
|
|
|
|
return self.davclient.upload(args[0], args[1])
|
|
|
|
def cmd_get(self, *args):
|
|
if (len(args)==0):
|
|
return
|
|
args = list(args)
|
|
if (len(args)==1):
|
|
args.append(args[0])
|
|
|
|
return self.davclient.download(args[0], args[1])
|
|
|
|
def cmd_connect(self, *args):
|
|
ruser = ''
|
|
if (len(args)==0):
|
|
return
|
|
newhostname = args[0]
|
|
i = newhostname.find('@')
|
|
if (i != (-1)):
|
|
ruser = newhostname[0:i]
|
|
newhostname = newhostname[i+1:]
|
|
|
|
newhost = "https://{}/".format(newhostname)
|
|
if newhostname == "~" or newhost == SERVER:
|
|
# back to home server
|
|
self.host = SERVER
|
|
self.session = self.get_host_session(SERVER)
|
|
return
|
|
|
|
session_remote = self.get_host_session(newhost)
|
|
session_home = self.get_host_session(SERVER)
|
|
|
|
bnewhost = newhost + 'cloud'
|
|
bnewhost = bnewhost.encode('utf-8').hex()
|
|
|
|
r = session_home.get(
|
|
SERVER + "/magic",
|
|
params={'bdest': bnewhost, 'owa': 1},
|
|
allow_redirects=True,
|
|
verify=VERIFY_SSL )
|
|
|
|
self.hostname = newhostname
|
|
self.session = session_remote
|
|
|
|
if (ruser):
|
|
if (self.do('exists',*[ruser])):
|
|
self.do('cd', *[ruser])
|
|
else:
|
|
print('not found')
|
|
|
|
def cmd_pwd(self, *args):
|
|
return "{}:{}".format( self.hostname, self.davclient.cwd )
|
|
|
|
def cmd_ls(self, *args):
|
|
extra_args = ["-a", "-l", "-d"]
|
|
|
|
show_hidden = "-a" in args
|
|
show_list = "-l" in args
|
|
show_only_dir = "-d" in args
|
|
args = [ a for a in args if not a in extra_args ]
|
|
wildcards = set('?*[]')
|
|
pattern = False
|
|
|
|
# if we see any wildcard characters in non-flag arguments, unset args and set pattern
|
|
# instead. We will filter the result set (the entire directory list) by pattern.
|
|
# It would be better to perform this matching inside easywebdav but that is an
|
|
# imported standalone component which we may have no direct control over.
|
|
|
|
if args and any((c in wildcards) for c in args[0]):
|
|
pattern = args[0]
|
|
args = [];
|
|
|
|
r = self.davclient.ls(*args)
|
|
l = max([ len(str(f.size)) for f in r ] + [7,])
|
|
|
|
def _fmt(typ, size, name):
|
|
clean = urllib.parse.unquote(name)
|
|
if clean != name:
|
|
name = name + " (\"" + clean.rstrip('/') + "\")"
|
|
|
|
if show_list:
|
|
return "{t} {num: {width}} {nm}".format(t = typ, num = f.size, width = l, nm = name)
|
|
else:
|
|
return name
|
|
|
|
if show_hidden :
|
|
print( _fmt('d', 0, "./"))
|
|
if self.davclient.cwd!="/":
|
|
print( _fmt('d', 0, "../"))
|
|
|
|
for f in r:
|
|
|
|
name = f.name.replace("/cloud"+self.davclient.cwd,"")
|
|
#print (name)
|
|
#print (pattern)
|
|
if pattern and not fnmatch.fnmatch(name,pattern):
|
|
continue
|
|
|
|
type = "-"
|
|
if name.endswith("/"):
|
|
type = "d"
|
|
if name != "":
|
|
if show_hidden or not name.startswith("."):
|
|
if not show_only_dir or type=="d":
|
|
print( _fmt(type, f.size , name))
|
|
|
|
|
|
# This isn't a normal "user" command, but exists to update the
|
|
# autocompleter as it has intimate access to the current remote environment
|
|
def cmd_rfiles(self, *args):
|
|
ret = []
|
|
r = self.davclient.ls(*args)
|
|
for f in r:
|
|
ret.append(f.name.replace("/cloud" + self.davclient.cwd,""))
|
|
return ret
|
|
|
|
def cmd_lpwd(self, *args):
|
|
return os.getcwd()
|
|
|
|
def cmd_lcd(self, *args):
|
|
if (len(args)==0):
|
|
return
|
|
os.chdir(args[0])
|
|
|
|
def cmd_lls(self, *args):
|
|
for f in os.listdir(os.getcwd()):
|
|
if os.path.isdir(f):
|
|
f=f+"/"
|
|
print( f)
|
|
|
|
def cmd_help(self, *args):
|
|
print("NSH",__version__)
|
|
print()
|
|
print("Commands:")
|
|
for c in self.commands:
|
|
if c != 'rfiles':
|
|
print("\t",c)
|
|
print()
|
|
print("easywebdav", easywebdavversion.__version__, "(mod)")
|
|
print("requests", requests.__version__)
|
|
|
|
def cmd_cat(self,*args):
|
|
if (len(args)==0):
|
|
return
|
|
rfile = args[0]
|
|
resp = self.davclient._send('GET', rfile, (200,))
|
|
print(resp.text)
|
|
|
|
def load_conf(conffile):
|
|
global SERVER,USER,PASSWD,VERIFY_SSL
|
|
homedir = os.getenv("HOME")
|
|
if homedir is None:
|
|
homedir = os.path.join(os.getenv("HOMEDRIVE"), os.getenv("HOMEPATH"))
|
|
|
|
optsfile = ".nshrc" + "." + conffile if conffile else ".nshrc"
|
|
if not os.path.isfile(optsfile):
|
|
optsfile = os.path.join(homedir, optsfile)
|
|
|
|
if not os.path.isfile(optsfile):
|
|
print("Please create a configuration file called '{}':".format(optsfile))
|
|
print("[nsh]")
|
|
print("host = https://yourhost.com/")
|
|
print("username = your_username")
|
|
print("password = your_password")
|
|
sys.exit(-1)
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read(optsfile)
|
|
SERVER = config.get('nsh', 'host')
|
|
USER = config.get('nsh', 'username')
|
|
PASSWD = config.get('nsh', 'password')
|
|
if config.has_option('nsh', 'verify_ssl'):
|
|
VERIFY_SSL = config.getboolean('nsh', 'verify_ssl')
|
|
|
|
def get_lfiles():
|
|
ret = []
|
|
for f in os.listdir(os.getcwd()):
|
|
if os.path.isdir(f):
|
|
f=f+"/"
|
|
ret.append(f)
|
|
|
|
return ret
|
|
|
|
def nsh():
|
|
|
|
nsh = NSH(SERVER)
|
|
completer = Completer()
|
|
|
|
session_home = nsh.get_host_session()
|
|
|
|
#~ #login on home server
|
|
if(sys.stdin.isatty()):
|
|
print("logging in...")
|
|
r = session_home.get(
|
|
SERVER + "/api/z/1.0/verify",
|
|
auth=HTTPBasicAuth(USER, PASSWD),
|
|
verify=VERIFY_SSL )
|
|
|
|
if(sys.stdin.isatty()):
|
|
print("Hi - ", r.json()['channel_name'])
|
|
|
|
nsh.session = session_home
|
|
|
|
completer.commnd = ['cd','ls','exists','mkdir','mkdirs','rmdir','delete','put','get', 'connect', 'pwd','cat', 'lcd','lpwd', 'lls', 'quit', 'help']
|
|
|
|
|
|
# initialise list of available connections from json endpoint
|
|
# this will not change
|
|
|
|
r = session_home.get(SERVER + "/connac")
|
|
completer.nomads = r.json() if r else []
|
|
|
|
# initialise local file list
|
|
completer.lfiles = get_lfiles()
|
|
|
|
|
|
# since the site directory may be empty, automatically cd to
|
|
# your own cloud storage folder and update remote file list
|
|
|
|
nsh.do('cd', *[USER])
|
|
completer.rfiles = nsh.do('rfiles', *[])
|
|
|
|
# command loop
|
|
try:
|
|
input_str = input(nsh.PS1 if sys.stdin.isatty() else "")
|
|
|
|
except EOFError as e:
|
|
input_str = "quit"
|
|
|
|
while (input_str != "quit"):
|
|
input_str = input_str.strip()
|
|
if len(input_str) > 0:
|
|
toks = [ x.strip() for x in input_str.split(" ") ]
|
|
|
|
command = toks[0]
|
|
args = toks[1:]
|
|
try:
|
|
ret = nsh.do(command, *args)
|
|
|
|
# update the internal file lists for the autocompleter if we just
|
|
# performed an action which may have changed them
|
|
|
|
if command in [ 'cd', 'connect', 'mkdir','mkdirs','rmdir','delete','put']:
|
|
completer.rfiles = nsh.do('rfiles', *[])
|
|
if command in [ 'get', 'lcd' ]:
|
|
completer.lfiles = get_lfiles()
|
|
|
|
except easywebdav.client.OperationFailed as e:
|
|
print(e)
|
|
except CommandNotFound as e:
|
|
print(e)
|
|
except urllib.exceptions.NewConnectionError as e:
|
|
print(e)
|
|
except urllib.exceptions.MaxRetryError as e:
|
|
print(e)
|
|
except requests.exceptions.ConnectionError as e:
|
|
print(e)
|
|
else:
|
|
if ret is not None:
|
|
print(ret)
|
|
|
|
try:
|
|
input_str = input(nsh.PS1 if sys.stdin.isatty() else "")
|
|
|
|
except EOFError as e:
|
|
input_str = "quit"
|
|
|
|
|
|
|
|
|
|
if __name__=="__main__":
|
|
conffile = ""
|
|
parser = argparse.ArgumentParser(description = "Nomad shell - CLI for accessing local/remote Nomad|Zot cloud storage resources")
|
|
parser.add_argument('-c', nargs=1, metavar="name", help="load alternate configuration .nshrc.name", default="")
|
|
parser.add_argument('--version', action="store_true", help="print version and exit")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.c:
|
|
conffile = args.c[0]
|
|
|
|
if args.version:
|
|
print (__version__)
|
|
sys.exit()
|
|
|
|
load_conf(conffile)
|
|
nsh()
|
|
sys.exit()
|
|
|
|
|
|
|
|
|