#!/usr/bin/env python3 import sys, os import readline import pathlib import urllib import fnmatch import configparser import requests import argparse from requests.auth import HTTPBasicAuth import base64 # use sys and pathlib to amend the path so we can find/import easywebdav sys.path.append('util/py') sys.path.append(str(pathlib.Path(__file__).parent.resolve()) + '/py') import easywebdav import easywebdav.__version__ as easywebdavversion __version__= "2021.10.06" SERVER = None USER = None PASSWD = None VERIFY_SSL=True class Completer(): matches = [] _commnd = [] _rfiles = [] _lfiles = [] _nomads = [] def __init__(self): # Setup autocompletion readline.parse_and_bind("tab:complete"); readline.set_completer(self.complete) return @property def commnd(self): return self._commnd @commnd.setter def commnd(self,names): self._commnd = names def get_command(self): return self._commnd @property def lfiles(self): return self._lfiles @lfiles.setter def lfiles(self,names): self._lfiles = names @property def rfiles(self): return self._rfiles @rfiles.setter def rfiles(self,names): self._rfiles = names @property def nomads(self): return self._nomads @nomads.setter def nomads(self,names): self._nomads = names # this is the completion function def complete(self,text,state): # set initial return state to base commands self.matches = self._commnd # peek at the typed input state and reconfigure the completion # base map accordingly current = self.get_cur_cmd() if current in [ 'connect' ]: self.matches = self._nomads if current in [ 'put', 'lcd', 'lls' ]: self.matches = self._lfiles if current in [ 'get', 'cd', 'delete', 'cat', 'ls', 'rmdir', 'exists' ]: self.matches = self._rfiles if current in [ 'mkdir', 'mkdirs', 'pwd', 'lpwd', 'quit' ]: self.matches = [] # On the current "word" reduce the base map to a subset of the results if text != "": self.matches = [x + " " for x in self.matches if x.startswith(text)] try: response = self.matches[state] except IndexError: response = None return response def get_cur_cmd(self): idx = readline.get_begidx() full = readline.get_line_buffer() n = full[:idx].split() return n[0] if len(n) > 0 else "" ##################################################### class CommandNotFound(Exception): pass class NSH(object): commands = ['cd','ls','exists','mkdir','mkdirs','rmdir','delete','put','get', 'connect', 'pwd','cat', 'lcd','lpwd', 'lls', 'quit', 'help','rfiles'] def __init__(self, host, session=None, davclient=None): self.sessions = {} self.host = host self.session = session self.davclient = davclient @property def host(self): return self._host @host.setter def host(self, host): self._host = host self._hostname = host.replace("https:","").replace("/","") @property def hostname(self): return self._hostname @hostname.setter def hostname(self, hostname): self._host = "https://{}/".format(hostname) self._hostname = hostname @property def session(self): return self._session @session.setter def session(self, session): self._session = session self.davclient = easywebdav.connect( self.hostname, protocol='https', session=session, path="cloud", verify_ssl=VERIFY_SSL) @property def PS1(self): if self.davclient is None: return "[!]> " return "{}:{}> ".format(self.hostname, self.davclient.cwd) def get_host_session(self, host=None): if self.session is None: session = requests.Session() else: session = self.session return session def do(self, command, *args): if not command in self.commands: raise CommandNotFound("Unknown command '{}'".format(command)) cmd = getattr(self, "cmd_{}".format(command), None) if cmd is None: cmd = getattr(self.davclient, command) return cmd(*args) def cmd_cd(self, *args): if len(args) == 0: return; if self.davclient.exists(args[0]): self.davclient.cd(args[0]) def cmd_exists(self, *args): if (len(args)==0): return return self.davclient.exists(args[0]) def cmd_mkdir(self, *args): if (len(args)==0): return return self.davclient.mkdir(args[0]) def cmd_mkdirs(self, *args): if (len(args)==0): return return self.davclient.mkdirs(args[0]) def cmd_rmdir(self, *args): if (len(args)==0): return return self.davclient.rmdir(args[0]) def cmd_delete(self, *args): if (len(args)==0): return return self.davclient.delete(args[0]) def cmd_put(self, *args): if (len(args)==0): return args = list(args) if (len(args)==1): args.append(args[0]) return self.davclient.upload(args[0], args[1]) def cmd_get(self, *args): if (len(args)==0): return args = list(args) if (len(args)==1): args.append(args[0]) return self.davclient.download(args[0], args[1]) def cmd_connect(self, *args): ruser = '' if (len(args)==0): return newhostname = args[0] i = newhostname.find('@') if (i != (-1)): ruser = newhostname[0:i] newhostname = newhostname[i+1:] newhost = "https://{}/".format(newhostname) if newhostname == "~" or newhost == SERVER: # back to home server self.host = SERVER self.session = self.get_host_session(SERVER) return session_remote = self.get_host_session(newhost) session_home = self.get_host_session(SERVER) bnewhost = newhost + 'cloud' bnewhost = bnewhost.encode('utf-8').hex() r = session_home.get( SERVER + "/magic", params={'bdest': bnewhost, 'owa': 1}, allow_redirects=True, verify=VERIFY_SSL ) self.hostname = newhostname self.session = session_remote if (ruser): if (self.do('exists',*[ruser])): self.do('cd', *[ruser]) else: print('not found') def cmd_pwd(self, *args): return "{}:{}".format( self.hostname, self.davclient.cwd ) def cmd_ls(self, *args): extra_args = ["-a", "-l", "-d"] show_hidden = "-a" in args show_list = "-l" in args show_only_dir = "-d" in args args = [ a for a in args if not a in extra_args ] wildcards = set('?*[]') pattern = False # if we see any wildcard characters in non-flag arguments, unset args and set pattern # instead. We will filter the result set (the entire directory list) by pattern. # It would be better to perform this matching inside easywebdav but that is an # imported standalone component which we may have no direct control over. if args and any((c in wildcards) for c in args[0]): pattern = args[0] args = []; r = self.davclient.ls(*args) l = max([ len(str(f.size)) for f in r ] + [7,]) def _fmt(typ, size, name): clean = urllib.parse.unquote(name) if clean != name: name = name + " (\"" + clean.rstrip('/') + "\")" if show_list: return "{t} {num: {width}} {nm}".format(t = typ, num = f.size, width = l, nm = name) else: return name if show_hidden : print( _fmt('d', 0, "./")) if self.davclient.cwd!="/": print( _fmt('d', 0, "../")) for f in r: name = f.name.replace("/cloud"+self.davclient.cwd,"") #print (name) #print (pattern) if pattern and not fnmatch.fnmatch(name,pattern): continue type = "-" if name.endswith("/"): type = "d" if name != "": if show_hidden or not name.startswith("."): if not show_only_dir or type=="d": print( _fmt(type, f.size , name)) # This isn't a normal "user" command, but exists to update the # autocompleter as it has intimate access to the current remote environment def cmd_rfiles(self, *args): ret = [] r = self.davclient.ls(*args) for f in r: ret.append(f.name.replace("/cloud" + self.davclient.cwd,"")) return ret def cmd_lpwd(self, *args): return os.getcwd() def cmd_lcd(self, *args): if (len(args)==0): return os.chdir(args[0]) def cmd_lls(self, *args): for f in os.listdir(os.getcwd()): if os.path.isdir(f): f=f+"/" print( f) def cmd_help(self, *args): print("NSH",__version__) print() print("Commands:") for c in self.commands: if c != 'rfiles': print("\t",c) print() print("easywebdav", easywebdavversion.__version__, "(mod)") print("requests", requests.__version__) def cmd_cat(self,*args): if (len(args)==0): return rfile = args[0] resp = self.davclient._send('GET', rfile, (200,)) print(resp.text) def load_conf(conffile): global SERVER,USER,PASSWD,VERIFY_SSL homedir = os.getenv("HOME") if homedir is None: homedir = os.path.join(os.getenv("HOMEDRIVE"), os.getenv("HOMEPATH")) optsfile = ".nshrc" + "." + conffile if conffile else ".nshrc" if not os.path.isfile(optsfile): optsfile = os.path.join(homedir, optsfile) if not os.path.isfile(optsfile): print("Please create a configuration file called '{}':".format(optsfile)) print("[nsh]") print("host = https://yourhost.com/") print("username = your_username") print("password = your_password") sys.exit(-1) config = configparser.ConfigParser() config.read(optsfile) SERVER = config.get('nsh', 'host') USER = config.get('nsh', 'username') PASSWD = config.get('nsh', 'password') if config.has_option('nsh', 'verify_ssl'): VERIFY_SSL = config.getboolean('nsh', 'verify_ssl') def get_lfiles(): ret = [] for f in os.listdir(os.getcwd()): if os.path.isdir(f): f=f+"/" ret.append(f) return ret def nsh(): nsh = NSH(SERVER) completer = Completer() session_home = nsh.get_host_session() #~ #login on home server if(sys.stdin.isatty()): print("logging in...") r = session_home.get( SERVER + "/api/z/1.0/verify", auth=HTTPBasicAuth(USER, PASSWD), verify=VERIFY_SSL ) if(sys.stdin.isatty()): print("Hi - ", r.json()['channel_name']) nsh.session = session_home completer.commnd = ['cd','ls','exists','mkdir','mkdirs','rmdir','delete','put','get', 'connect', 'pwd','cat', 'lcd','lpwd', 'lls', 'quit', 'help'] # initialise list of available connections from json endpoint # this will not change r = session_home.get(SERVER + "/connac") completer.nomads = r.json() if r else [] # initialise local file list completer.lfiles = get_lfiles() # since the site directory may be empty, automatically cd to # your own cloud storage folder and update remote file list nsh.do('cd', *[USER]) completer.rfiles = nsh.do('rfiles', *[]) # command loop try: input_str = input(nsh.PS1 if sys.stdin.isatty() else "") except EOFError as e: input_str = "quit" while (input_str != "quit"): input_str = input_str.strip() if len(input_str) > 0: toks = [ x.strip() for x in input_str.split(" ") ] command = toks[0] args = toks[1:] try: ret = nsh.do(command, *args) # update the internal file lists for the autocompleter if we just # performed an action which may have changed them if command in [ 'cd', 'connect', 'mkdir','mkdirs','rmdir','delete','put']: completer.rfiles = nsh.do('rfiles', *[]) if command in [ 'get', 'lcd' ]: completer.lfiles = get_lfiles() except easywebdav.client.OperationFailed as e: print(e) except CommandNotFound as e: print(e) except urllib.exceptions.NewConnectionError as e: print(e) except urllib.exceptions.MaxRetryError as e: print(e) except requests.exceptions.ConnectionError as e: print(e) else: if ret is not None: print(ret) try: input_str = input(nsh.PS1 if sys.stdin.isatty() else "") except EOFError as e: input_str = "quit" if __name__=="__main__": conffile = "" parser = argparse.ArgumentParser(description = "Nomad shell - CLI for accessing local/remote Nomad|Zot cloud storage resources") parser.add_argument('-c', nargs=1, metavar="name", help="load alternate configuration .nshrc.name", default="") parser.add_argument('--version', action="store_true", help="print version and exit") args = parser.parse_args() if args.c: conffile = args.c[0] if args.version: print (__version__) sys.exit() load_conf(conffile) nsh() sys.exit()