#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import getpass import hashlib import hmac import json import sys import urllib2 import yaml def request_registration(user, password, server_location, shared_secret, admin=False): req = urllib2.Request( "%s/_matrix/client/r0/admin/register" % (server_location,), headers={'Content-Type': 'application/json'} ) try: if sys.version_info[:3] >= (2, 7, 9): # As of version 2.7.9, urllib2 now checks SSL certs import ssl f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23)) else: f = urllib2.urlopen(req) body = f.read() f.close() nonce = json.loads(body)["nonce"] except urllib2.HTTPError as e: print "ERROR! Received %d %s" % (e.code, e.reason,) if 400 <= e.code < 500: if e.info().type == "application/json": resp = json.load(e) if "error" in resp: print resp["error"] sys.exit(1) mac = hmac.new( key=shared_secret, digestmod=hashlib.sha1, ) mac.update(nonce) mac.update("\x00") mac.update(user) mac.update("\x00") mac.update(password) mac.update("\x00") mac.update("admin" if admin else "notadmin") mac = mac.hexdigest() data = { "nonce": nonce, "username": user, "password": password, "mac": mac, "admin": admin, } server_location = server_location.rstrip("/") print "Sending registration request..." req = urllib2.Request( "%s/_matrix/client/r0/admin/register" % (server_location,), data=json.dumps(data), headers={'Content-Type': 'application/json'} ) try: if sys.version_info[:3] >= (2, 7, 9): # As of version 2.7.9, urllib2 now checks SSL certs import ssl f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23)) else: f = urllib2.urlopen(req) f.read() f.close() print "Success." except urllib2.HTTPError as e: print "ERROR! Received %d %s" % (e.code, e.reason,) if 400 <= e.code < 500: if e.info().type == "application/json": resp = json.load(e) if "error" in resp: print resp["error"] sys.exit(1) def register_new_user(user, password, server_location, shared_secret, admin): if not user: try: default_user = getpass.getuser() except: default_user = None if default_user: user = raw_input("New user localpart [%s]: " % (default_user,)) if not user: user = default_user else: user = raw_input("New user localpart: ") if not user: print "Invalid user name" sys.exit(1) if not password: password = getpass.getpass("Password: ") if not password: print "Password cannot be blank." sys.exit(1) confirm_password = getpass.getpass("Confirm password: ") if password != confirm_password: print "Passwords do not match" sys.exit(1) if not admin: admin = raw_input("Make admin [no]: ") if admin in ("y", "yes", "true"): admin = True else: admin = False request_registration(user, password, server_location, shared_secret, bool(admin)) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Used to register new users with a given home server when" " registration has been disabled. The home server must be" " configured with the 'registration_shared_secret' option" " set.", ) parser.add_argument( "-u", "--user", default=None, help="Local part of the new user. Will prompt if omitted.", ) parser.add_argument( "-p", "--password", default=None, help="New password for user. Will prompt if omitted.", ) parser.add_argument( "-a", "--admin", action="store_true", help="Register new user as an admin. Will prompt if omitted.", ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument( "-c", "--config", type=argparse.FileType('r'), help="Path to server config file. Used to read in shared secret.", ) group.add_argument( "-k", "--shared-secret", help="Shared secret as defined in server config file.", ) parser.add_argument( "server_url", default="https://localhost:8448", nargs='?', help="URL to use to talk to the home server. Defaults to " " 'https://localhost:8448'.", ) args = parser.parse_args() if "config" in args and args.config: config = yaml.safe_load(args.config) secret = config.get("registration_shared_secret", None) if not secret: print "No 'registration_shared_secret' defined in config." sys.exit(1) else: secret = args.shared_secret register_new_user(args.user, args.password, args.server_url, secret, args.admin)