Add unit tests for gitosis-serve.
This commit is contained in:
parent
97c093470e
commit
3339783581
144
gitosis/serve.py
144
gitosis/serve.py
|
@ -1,10 +1,10 @@
|
|||
"""
|
||||
Enforce git-shell to only serve repositories in the given
|
||||
directory. The client should refer to them without any directory
|
||||
prefix. Repository names are forced to match ALLOW.
|
||||
Enforce git-shell to only serve allowed by access control policy.
|
||||
directory. The client should refer to them without any extra directory
|
||||
prefix. Repository names are forced to match ALLOW_RE.
|
||||
"""
|
||||
|
||||
import logging; logging.basicConfig(level=logging.DEBUG)
|
||||
import logging
|
||||
|
||||
import sys, os, optparse, re
|
||||
from ConfigParser import RawConfigParser
|
||||
|
@ -29,7 +29,7 @@ def getParser():
|
|||
)
|
||||
return parser
|
||||
|
||||
ALLOW_RE = re.compile("^(?P<command>git-(?:receive|upload)-pack) '(?P<path>[a-zA-Z0-9][a-zA-Z0-9@._-]*(/[a-zA-Z0-9][a-zA-Z0-9@._-]*)*)'$")
|
||||
ALLOW_RE = re.compile("^'(?P<path>[a-zA-Z0-9][a-zA-Z0-9@._-]*(/[a-zA-Z0-9][a-zA-Z0-9@._-]*)*)'$")
|
||||
|
||||
COMMANDS_READONLY = [
|
||||
'git-upload-pack',
|
||||
|
@ -39,49 +39,47 @@ COMMANDS_WRITE = [
|
|||
'git-receive-pack',
|
||||
]
|
||||
|
||||
def main():
|
||||
log = logging.getLogger('gitosis.serve.main')
|
||||
os.umask(0022)
|
||||
class ServingError(Exception):
|
||||
"""Serving error"""
|
||||
|
||||
parser = getParser()
|
||||
(options, args) = parser.parse_args()
|
||||
try:
|
||||
(user,) = args
|
||||
except ValueError:
|
||||
parser.error('Missing argument USER.')
|
||||
def __str__(self):
|
||||
return '%s' % self.__doc__
|
||||
|
||||
cmd = os.environ.get('SSH_ORIGINAL_COMMAND', None)
|
||||
if cmd is None:
|
||||
die("Need SSH_ORIGINAL_COMMAND in environment.")
|
||||
class CommandMayNotContainNewlineError(ServingError):
|
||||
"""Command may not contain newline"""
|
||||
|
||||
log.debug('Got command %(cmd)r' % dict(
|
||||
cmd=cmd,
|
||||
))
|
||||
class UnknownCommandError(ServingError):
|
||||
"""Unknown command denied"""
|
||||
|
||||
if '\n' in cmd:
|
||||
die("Command may not contain newlines.")
|
||||
class UnsafeArgumentsError(ServingError):
|
||||
"""Arguments to command look dangerous"""
|
||||
|
||||
match = ALLOW_RE.match(cmd)
|
||||
class AccessDenied(ServingError):
|
||||
"""Access denied"""
|
||||
|
||||
class WriteAccessDenied(AccessDenied):
|
||||
"""Write access denied"""
|
||||
|
||||
class ReadAccessDenied(AccessDenied):
|
||||
"""Read access denied"""
|
||||
|
||||
def serve(
|
||||
cfg,
|
||||
user,
|
||||
command,
|
||||
):
|
||||
if '\n' in command:
|
||||
raise CommandMayNotContainNewlineError()
|
||||
|
||||
verb, args = command.split(None, 1)
|
||||
|
||||
if (verb not in COMMANDS_WRITE
|
||||
and verb not in COMMANDS_READONLY):
|
||||
raise UnknownCommandError()
|
||||
|
||||
match = ALLOW_RE.match(args)
|
||||
if match is None:
|
||||
die("Command to run looks dangerous")
|
||||
|
||||
cfg = RawConfigParser()
|
||||
try:
|
||||
conffile = file(options.config)
|
||||
except (IOError, OSError), e:
|
||||
# I trust the exception has the path.
|
||||
die("Unable to read config file: %s." % e)
|
||||
try:
|
||||
cfg.readfp(conffile)
|
||||
finally:
|
||||
conffile.close()
|
||||
|
||||
os.chdir(os.path.expanduser('~'))
|
||||
|
||||
command = match.group('command')
|
||||
if (command not in COMMANDS_WRITE
|
||||
and command not in COMMANDS_READONLY):
|
||||
die("Unknown command denied.")
|
||||
raise UnsafeArgumentsError()
|
||||
|
||||
path = match.group('path')
|
||||
|
||||
|
@ -102,21 +100,61 @@ def main():
|
|||
path=path)
|
||||
|
||||
if newpath is None:
|
||||
die("Read access denied.")
|
||||
raise ReadAccessDenied()
|
||||
|
||||
if command in COMMANDS_WRITE:
|
||||
if verb in COMMANDS_WRITE:
|
||||
# didn't have write access and tried to write
|
||||
die("Write access denied.")
|
||||
raise WriteAccessDenied()
|
||||
|
||||
log.debug('Serving %(command)r %(newpath)r' % dict(
|
||||
command=command,
|
||||
newpath=newpath,
|
||||
))
|
||||
|
||||
# put the command back together with the new path
|
||||
newcmd = "%(command)s '%(newpath)s'" % dict(
|
||||
command=command,
|
||||
# put the verb back together with the new path
|
||||
newcmd = "%(verb)s '%(newpath)s'" % dict(
|
||||
verb=verb,
|
||||
newpath=newpath,
|
||||
)
|
||||
return newcmd
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
log = logging.getLogger('gitosis.serve.main')
|
||||
os.umask(0022)
|
||||
|
||||
parser = getParser()
|
||||
(options, args) = parser.parse_args()
|
||||
try:
|
||||
(user,) = args
|
||||
except ValueError:
|
||||
parser.error('Missing argument USER.')
|
||||
|
||||
cmd = os.environ.get('SSH_ORIGINAL_COMMAND', None)
|
||||
if cmd is None:
|
||||
die("Need SSH_ORIGINAL_COMMAND in environment.")
|
||||
|
||||
log.debug('Got command %(cmd)r' % dict(
|
||||
cmd=cmd,
|
||||
))
|
||||
|
||||
cfg = RawConfigParser()
|
||||
try:
|
||||
conffile = file(options.config)
|
||||
except (IOError, OSError), e:
|
||||
# I trust the exception has the path.
|
||||
die("Unable to read config file: %s." % e)
|
||||
try:
|
||||
cfg.readfp(conffile)
|
||||
finally:
|
||||
conffile.close()
|
||||
|
||||
os.chdir(os.path.expanduser('~'))
|
||||
|
||||
try:
|
||||
newcmd = serve(
|
||||
cfg=cfg,
|
||||
user=user,
|
||||
command=cmd,
|
||||
)
|
||||
except ServingError, e:
|
||||
die(str(e))
|
||||
|
||||
log.debug('Serving %s', newcmd)
|
||||
os.execvpe('git-shell', ['git-shell', '-c', newcmd], {})
|
||||
die("Cannot execute git-shell.")
|
||||
|
|
112
gitosis/test/test_serve.py
Normal file
112
gitosis/test/test_serve.py
Normal file
|
@ -0,0 +1,112 @@
|
|||
from nose.tools import eq_ as eq
|
||||
from gitosis.test.util import assert_raises
|
||||
|
||||
from ConfigParser import RawConfigParser
|
||||
|
||||
from gitosis import serve
|
||||
|
||||
from gitosis.test import util
|
||||
|
||||
def test_bad_newLine():
|
||||
cfg = RawConfigParser()
|
||||
e = assert_raises(
|
||||
serve.CommandMayNotContainNewlineError,
|
||||
serve.serve,
|
||||
cfg=cfg,
|
||||
user='jdoe',
|
||||
command='ev\nil',
|
||||
)
|
||||
eq(str(e), 'Command may not contain newline')
|
||||
assert isinstance(e, serve.ServingError)
|
||||
|
||||
def test_bad_command():
|
||||
cfg = RawConfigParser()
|
||||
e = assert_raises(
|
||||
serve.UnknownCommandError,
|
||||
serve.serve,
|
||||
cfg=cfg,
|
||||
user='jdoe',
|
||||
command="evil 'foo'",
|
||||
)
|
||||
eq(str(e), 'Unknown command denied')
|
||||
assert isinstance(e, serve.ServingError)
|
||||
|
||||
def test_bad_unsafeArguments():
|
||||
cfg = RawConfigParser()
|
||||
e = assert_raises(
|
||||
serve.UnsafeArgumentsError,
|
||||
serve.serve,
|
||||
cfg=cfg,
|
||||
user='jdoe',
|
||||
command='git-upload-pack /evil/attack',
|
||||
)
|
||||
eq(str(e), 'Arguments to command look dangerous')
|
||||
assert isinstance(e, serve.ServingError)
|
||||
|
||||
def test_bad_forbiddenCommand_read():
|
||||
cfg = RawConfigParser()
|
||||
e = assert_raises(
|
||||
serve.ReadAccessDenied,
|
||||
serve.serve,
|
||||
cfg=cfg,
|
||||
user='jdoe',
|
||||
command="git-upload-pack 'foo'",
|
||||
)
|
||||
eq(str(e), 'Read access denied')
|
||||
assert isinstance(e, serve.AccessDenied)
|
||||
assert isinstance(e, serve.ServingError)
|
||||
|
||||
def test_bad_forbiddenCommand_write_noAccess():
|
||||
cfg = RawConfigParser()
|
||||
e = assert_raises(
|
||||
serve.ReadAccessDenied,
|
||||
serve.serve,
|
||||
cfg=cfg,
|
||||
user='jdoe',
|
||||
command="git-receive-pack 'foo'",
|
||||
)
|
||||
# error message talks about read in an effort to make it more
|
||||
# obvious that jdoe doesn't have *even* read access
|
||||
eq(str(e), 'Read access denied')
|
||||
assert isinstance(e, serve.AccessDenied)
|
||||
assert isinstance(e, serve.ServingError)
|
||||
|
||||
def test_bad_forbiddenCommand_write_readAccess():
|
||||
cfg = RawConfigParser()
|
||||
cfg.add_section('group foo')
|
||||
cfg.set('group foo', 'members', 'jdoe')
|
||||
cfg.set('group foo', 'readonly', 'foo')
|
||||
e = assert_raises(
|
||||
serve.WriteAccessDenied,
|
||||
serve.serve,
|
||||
cfg=cfg,
|
||||
user='jdoe',
|
||||
command="git-receive-pack 'foo'",
|
||||
)
|
||||
eq(str(e), 'Write access denied')
|
||||
assert isinstance(e, serve.AccessDenied)
|
||||
assert isinstance(e, serve.ServingError)
|
||||
|
||||
def test_simple_read():
|
||||
cfg = RawConfigParser()
|
||||
cfg.add_section('group foo')
|
||||
cfg.set('group foo', 'members', 'jdoe')
|
||||
cfg.set('group foo', 'readonly', 'foo')
|
||||
got = serve.serve(
|
||||
cfg=cfg,
|
||||
user='jdoe',
|
||||
command="git-upload-pack 'foo'",
|
||||
)
|
||||
eq(got, "git-upload-pack 'repositories/foo'")
|
||||
|
||||
def test_simple_write():
|
||||
cfg = RawConfigParser()
|
||||
cfg.add_section('group foo')
|
||||
cfg.set('group foo', 'members', 'jdoe')
|
||||
cfg.set('group foo', 'writable', 'foo')
|
||||
got = serve.serve(
|
||||
cfg=cfg,
|
||||
user='jdoe',
|
||||
command="git-receive-pack 'foo'",
|
||||
)
|
||||
eq(got, "git-receive-pack 'repositories/foo'")
|
Loading…
Reference in a new issue