add zotsh to util

This commit is contained in:
friendica 2015-02-15 16:29:12 -08:00
parent 0249fbb7e6
commit 8d6e3f5390
9 changed files with 646 additions and 0 deletions

109
util/zotsh/README.txt Normal file
View file

@ -0,0 +1,109 @@
ZotSH - v.0.0.2
Client for browsing RedDAVs.
Install
-------
ZotSH requires 'requests'(1).
Please refer to requests docs on how to install it (2)
Extract somewere and launch zotsh.py
Description
-----------
ZotSH is a command line WebDAV client for RedMatrix.
It knows how to magic-auth to remote hubs using Zot.
ZotSH uses 'easywebdav' library (0) with small modifications
to 'zotify' it. (See easywebdav/LICENSE)
Commands
--------
host <hostname>
Authenticate to 'hostname' and switch to it
cd <dirname|..>
changhe remote dir
ls [path] [-a] [-l] [-d]
list remote files in current dir if 'path' not defined
-a list all, show hidden dot-files
-l list verbose
-d list only dirs
exists <path>
Check existence of 'path'
mkdir <name>
Create directory 'name'
mkdirs <path>
Create parent directories to path, if they don't exists
rmdir <name>
Delete directory 'name'
delete <path>
Delete file 'path'
upload <local_path> [remote_path]
Upload local file 'local_paht' to 'remote_paht'
download <remote_path> [local_path]
Download remote file 'remote_path' and save it as 'local_path'
cat <remote_paht>
Print content of 'remote_path'
pwd
Print current path
lcd
lpwd
lls
Local file management
quit
help
Config
------
Create a .zotshrc file in your home or in same folder with zotsh.py:
[zotsh]
host = https://yourhost.com/
username = your_username
password = your_password
Optionally adds
verify_ssl = false
to skip verification of ssl certs
Changelog
----------
0.0.2 Fix "CommandNotFound" exception, new 'cat' command
0.0.1 First release
Links
-----
_0 : https://github.com/amnong/easywebdav
_1 : http://docs.python-requests.org/en/latest/
_2 : http://docs.python-requests.org/en/latest/user/install/

View file

@ -0,0 +1,5 @@
Copyright (c) 2012 year, Amnon Grossman
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

View file

@ -0,0 +1,5 @@
from .client import *
def connect(*args, **kwargs):
"""connect(host, port=0, auth=None, username=None, password=None, protocol='http', path="/")"""
return Client(*args, **kwargs)

Binary file not shown.

View file

@ -0,0 +1 @@
__version__ = "1.2.0"

Binary file not shown.

View file

@ -0,0 +1,202 @@
import requests
import platform
from numbers import Number
import xml.etree.cElementTree as xml
from collections import namedtuple
py_majversion, py_minversion, py_revversion = platform.python_version_tuple()
if py_majversion == '2':
from httplib import responses as HTTP_CODES
from urlparse import urlparse
else:
from http.client import responses as HTTP_CODES
from urllib.parse import urlparse
DOWNLOAD_CHUNK_SIZE_BYTES = 1 * 1024 * 1024
class WebdavException(Exception):
pass
class ConnectionFailed(WebdavException):
pass
def codestr(code):
return HTTP_CODES.get(code, 'UNKNOWN')
File = namedtuple('File', ['name', 'size', 'mtime', 'ctime', 'contenttype'])
def prop(elem, name, default=None):
child = elem.find('.//{DAV:}' + name)
return default if child is None else child.text
def elem2file(elem):
return File(
prop(elem, 'href'),
int(prop(elem, 'getcontentlength', 0)),
prop(elem, 'getlastmodified', ''),
prop(elem, 'creationdate', ''),
prop(elem, 'getcontenttype', ''),
)
class OperationFailed(WebdavException):
_OPERATIONS = dict(
HEAD = "get header",
GET = "download",
PUT = "upload",
DELETE = "delete",
MKCOL = "create directory",
PROPFIND = "list directory",
)
def __init__(self, method, path, expected_code, actual_code):
self.method = method
self.path = path
self.expected_code = expected_code
self.actual_code = actual_code
operation_name = self._OPERATIONS[method]
self.reason = 'Failed to {operation_name} "{path}"'.format(**locals())
expected_codes = (expected_code,) if isinstance(expected_code, Number) else expected_code
expected_codes_str = ", ".join('{0} {1}'.format(code, codestr(code)) for code in expected_codes)
actual_code_str = codestr(actual_code)
msg = '''\
{self.reason}.
Operation : {method} {path}
Expected code : {expected_codes_str}
Actual code : {actual_code} {actual_code_str}'''.format(**locals())
super(OperationFailed, self).__init__(msg)
class Client(object):
def __init__(self, host, port=0, auth=None, username=None, password=None,
protocol='http', verify_ssl=True, path=None, cert=None, session=None):
if not port:
port = 443 if protocol == 'https' else 80
self.baseurl = '{0}://{1}:{2}'.format(protocol, host, port)
if path:
self.baseurl = '{0}/{1}'.format(self.baseurl, path)
self.cwd = '/'
if session is None:
self.session = requests.session()
else:
self.session = session
self.session.verify = verify_ssl
self.session.stream = True
if cert:
self.session.cert = cert
if auth:
self.session.auth = auth
elif username and password:
self.session.auth = (username, password)
def _send(self, method, path, expected_code, **kwargs):
url = self._get_url(path).strip(".")
#~ print self.session
#~ print self.session.verify
#~ print self.session.params
#~ print self.session.cookies
response = self.session.request(method, url, allow_redirects=False, **kwargs)
#~ print response.request.method
#~ print response.request.url
if isinstance(expected_code, Number) and response.status_code != expected_code \
or not isinstance(expected_code, Number) and response.status_code not in expected_code:
raise OperationFailed(method, path, expected_code, response.status_code)
return response
def _get_url(self, path):
path = str(path).strip()
if path.startswith('/'):
return self.baseurl + path
return "".join((self.baseurl, self.cwd, path))
def cd(self, path):
path = path.strip()
if not path:
return
stripped_path = '/'.join(part for part in path.split('/') if part) + '/'
if stripped_path == '/':
self.cwd = stripped_path
elif path.startswith('/'):
self.cwd = '/' + stripped_path
elif stripped_path == "./":
return
elif stripped_path == "../":
self.cwd ='/'.join( self.cwd.split('/')[:-2] ) + '/'
else:
self.cwd += stripped_path
def mkdir(self, path, safe=False):
expected_codes = 201 if not safe else (201, 301, 405)
self._send('MKCOL', path, expected_codes)
def mkdirs(self, path):
dirs = [d for d in path.split('/') if d]
if not dirs:
return
if path.startswith('/'):
dirs[0] = '/' + dirs[0]
old_cwd = self.cwd
try:
for dir in dirs:
try:
self.mkdir(dir, safe=True)
except Exception as e:
if e.actual_code == 409:
raise
finally:
self.cd(dir)
finally:
self.cd(old_cwd)
def rmdir(self, path, safe=False):
path = str(path).rstrip('/') + '/'
expected_codes = 204 if not safe else (204, 404)
self._send('DELETE', path, expected_codes)
def delete(self, path):
self._send('DELETE', path, 204)
def upload(self, local_path_or_fileobj, remote_path):
if isinstance(local_path_or_fileobj, basestring):
with open(local_path_or_fileobj, 'rb') as f:
self._upload(f, remote_path)
else:
self._upload(local_path_or_fileobj, remote_path)
def _upload(self, fileobj, remote_path):
self._send('PUT', remote_path, (200, 201, 204), data=fileobj)
def download(self, remote_path, local_path_or_fileobj):
response = self._send('GET', remote_path, 200, stream=True)
if isinstance(local_path_or_fileobj, basestring):
with open(local_path_or_fileobj, 'wb') as f:
self._download(f, response)
else:
self._download(local_path_or_fileobj, response)
def _download(self, fileobj, response):
for chunk in response.iter_content(DOWNLOAD_CHUNK_SIZE_BYTES):
fileobj.write(chunk)
def ls(self, remote_path='.'):
headers = {'Depth': '1'}
response = self._send('PROPFIND', remote_path, (207, 301), headers=headers)
# Redirect
if response.status_code == 301:
url = urlparse(response.headers['location'])
return self.ls(url.path)
tree = xml.fromstring(response.content)
return [elem2file(elem) for elem in tree.findall('{DAV:}response')]
def exists(self, remote_path):
response = self._send('HEAD', remote_path, (200, 301, 404))
return True if response.status_code != 404 else False

Binary file not shown.

324
util/zotsh/zotsh.py Executable file
View file

@ -0,0 +1,324 @@
#!/usr/bin/env python2
import sys, os
import ConfigParser
import requests
from requests.auth import HTTPBasicAuth
import easywebdav
import easywebdav.__version__ as easywebdavversion
__version__= "0.0.2"
SERVER = None
USER = None
PASSWD = None
VERIFY_SSL=True
#####################################################
class CommandNotFound(Exception):
pass
class ZotSH(object):
commands = ['cd','ls','exists','mkdir','mkdirs','rmdir','delete','upload','download',
'host', 'pwd','cat',
'lcd','lpwd', 'lls',
'quit', 'help']
def __init__(self, host, session=None, davclient=None):
self.sessions = {}
self.host = host
self.session = session
self.davclient = davclient
@property
def host(self):
return self._host
@host.setter
def host(self, host):
self._host = host
self._hostname = host.replace("https:","").replace("/","")
@property
def hostname(self):
return self._hostname
@hostname.setter
def hostname(self, hostname):
self._host = "https://%s/" % (hostname)
self._hostname = hostname
@property
def session(self):
return self._session
@session.setter
def session(self, session):
self._session = session
self.davclient = easywebdav.connect( self.hostname, protocol='https', session=session, path="cloud", verify_ssl=VERIFY_SSL)
@property
def PS1(self):
if self.davclient is None:
return "[!]> "
return "%s:%s> " % (self.hostname, self.davclient.cwd)
def get_host_session(self, host=None):
#~ if host is None:
#~ host = self.host
#~ if not host.startswith("https"):
#~ host = "https://%s/" % (host)
#~ if host in self.sessions:
#~ session = self.sessions[host]
#~ else:
#~ session = requests.Session()
#~ self.sessions[host] = session
#~ if not host == SERVER
#~ session.params.update({'davguest':1})
#~ return session
if self.session is None:
session = requests.Session()
#session.params.update({'davguest':1})
else:
session = self.session
session.params.update({'davguest': (not host == SERVER) })
return session
def do(self, command, *args):
if not command in self.commands:
raise CommandNotFound("Unknow command '%s'" % command)
cmd = getattr(self, "cmd_%s"%command, None)
if cmd is None:
cmd = getattr(self.davclient, command)
return cmd(*args)
def cmd_exists(self, *args):
if (len(args)==0):
return
return self.davclient.exists(args[0])
def cmd_mkdir(self, *args):
if (len(args)==0):
return
return self.davclient.mkdir(args[0])
def cmd_mkdirs(self, *args):
if (len(args)==0):
return
return self.davclient.mkdirs(args[0])
def cmd_rmdir(self, *args):
if (len(args)==0):
return
return self.davclient.rmdir(args[0])
def cmd_delete(self, *args):
if (len(args)==0):
return
return self.davclient.delete(args[0])
def cmd_upload(self, *args):
if (len(args)==0):
return
args = list(args)
if (len(args)==1):
args.append(args[0])
return self.davclient.upload(args[0], args[1])
def cmd_download(self, *args):
if (len(args)==0):
return
args = list(args)
if (len(args)==1):
args.append(args[0])
return self.davclient.download(args[0], args[1])
def cmd_host(self, *args):
if (len(args)==0):
return
newhostname = args[0]
newhost = "https://%s/" % newhostname
if newhostname == "~" or newhost == SERVER:
# bach to home server
self.host = SERVER
self.session = self.get_host_session(SERVER)
return
session_remote = self.get_host_session(newhost)
session_home = self.get_host_session(SERVER)
# call /magic on SERVER
r = session_home.get(
SERVER + "magic",
params={'dest': newhost},
allow_redirects=False,
verify=VERIFY_SSL )
if not 'location' in r.headers:
raise Exception("Cannot start magic auth to '%s'" % newhostname)
auth_url = r.headers['location']
# call auth_url with "test" param
r = session_remote.get(
auth_url,
params={'test': 1 },
verify=VERIFY_SSL )
if r.json()['success']:
self.hostname = newhostname
self.session = session_remote
else:
raise Exception("Cannot magic auth to '%s'" % newhostname)
def cmd_pwd(self, *args):
return "%s%s" % ( self.davclient.baseurl, self.davclient.cwd )
def cmd_ls(self, *args):
extra_args = ["-a", "-l", "-d"]
show_hidden = "-a" in args
show_list = "-l" in args
show_only_dir = "-d" in args
args = [ a for a in args if not a in extra_args ]
r = self.davclient.ls(*args)
l = max([ len(str(f.size)) for f in r ] + [7,])
def _fmt(type, size, name):
if show_list:
return "%s %*d %s" % (type, l, f.size , name)
else:
return name
if show_hidden :
print _fmt('d', 0, "./")
if self.davclient.cwd!="/":
print _fmt('d', 0, "../")
for f in r:
name = f.name.replace("/cloud"+self.davclient.cwd,"")
type = "-"
if name.endswith("/"):
type = "d"
if name!="":
if show_hidden or not name.startswith("."):
if not show_only_dir or type=="d":
print _fmt(type, f.size , name)
def cmd_lpwd(self, *args):
return os.getcwd()
def cmd_lcd(self, *args):
if (len(args)==0):
return
os.chdir(args[0])
def cmd_lls(self, *args):
for f in os.listdir(os.getcwd()):
if os.path.isdir(f):
f=f+"/"
print f
def cmd_help(self, *args):
print "ZotSH",__version__
print
print "Commands:"
for c in self.commands:
print "\t",c
print
print "easywebdav", easywebdavversion.__version__, "(mod)"
print "requests", requests.__version__
def cmd_cat(self,*args):
if (len(args)==0):
return
rfile = args[0]
resp = self.davclient._send('GET', rfile, (200,))
print resp.text
def load_conf():
global SERVER,USER,PASSWD,VERIFY_SSL
homedir = os.getenv("HOME")
if homedir is None:
homedir = os.path.join(os.getenv("HOMEDRIVE"), os.getenv("HOMEPATH"))
optsfile = ".zotshrc"
if not os.path.isfile(optsfile):
optsfile = os.path.join(homedir, ".zotshrc")
if not os.path.isfile(optsfile):
print "Please create a configuration file called '.zotshrc':"
print "[zotsh]"
print "host = https://yourhost.com/"
print "username = your_username"
print "password = your_password"
sys.exit(-1)
config = ConfigParser.ConfigParser()
config.read(optsfile)
SERVER = config.get('zotsh', 'host')
USER = config.get('zotsh', 'username')
PASSWD = config.get('zotsh', 'password')
if config.has_option('zotsh', 'verify_ssl'):
VERIFY_SSL = config.getboolean('zotsh', 'verify_ssl')
def zotsh():
zotsh = ZotSH( SERVER)
session_home = zotsh.get_host_session()
#~ #login on home server
print "loggin in..."
r = session_home.get(
SERVER + "api/account/verify_credentials",
auth=HTTPBasicAuth(USER, PASSWD),
verify=VERIFY_SSL )
print "Hi", r.json()['name']
zotsh.session = session_home
# command loop
input = raw_input(zotsh.PS1)
while (input != "quit"):
input = input.strip()
if len(input)>0:
toks = [ x.strip() for x in input.split(" ") ]
command = toks[0]
args = toks[1:]
try:
ret = zotsh.do(command, *args)
except easywebdav.client.OperationFailed, e:
print e
except CommandNotFound, e:
print e
else:
if ret is not None:
print ret
input = raw_input(zotsh.PS1)
if __name__=="__main__":
load_conf()
zotsh()
sys.exit()