Skip to content

Commit

Permalink
Merge branch 'tsalemink-dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
metatoaster committed Aug 2, 2024
2 parents cd53872 + 0bdeb31 commit d7bf5b9
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 3 deletions.
112 changes: 111 additions & 1 deletion src/pmr2/wfctrl/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import sys
from io import BytesIO

import urllib3

if sys.version_info > (3, 0): # pragma: no cover
from configparser import ConfigParser
else: # pragma: no cover
Expand Down Expand Up @@ -216,6 +218,40 @@ def reset_to_remote(self, workspace, branch=None):
return self.execute(*args)


class AuthenticatedGitDvcsCmd(GitDvcsCmd):
name = 'authenticated_git'

def __init__(self, remote=None, cmd_binary=None):
super().__init__(remote=remote, cmd_binary=cmd_binary)

self._auth_header = None

def set_authorization(self, authorization_header):
"""
Sets the authorization header for requests made by this class. The input should specify the type of authorization along with the
authorization token itself (e.g., "Basic {token}").
"""
self._auth_header = authorization_header

def _authenticate(self, workspace):
args = ['config', 'http.extraHeader', f'Authorization: {self._auth_header}']
return self.execute(*self._args(workspace, *args))

def clone(self, workspace, **kw):
args = ['clone', '--config', f'http.extraHeader=Authorization: {self._auth_header}', self.remote, workspace.working_dir]
return self.execute(*args)

def pull(self, workspace, **kw):
self._authenticate(workspace)
target = self.read_remote(workspace)
return self.execute(*self._args(workspace, 'pull', target))

def push(self, workspace, **kw):
self._authenticate(workspace)
target = self.get_remote(workspace)
return self.execute(*self._args(workspace, 'push', target))


class DulwichDvcsCmd(BaseDvcsCmd):
name = 'dulwich'
marker = '.git'
Expand Down Expand Up @@ -311,8 +347,82 @@ def reset_to_remote(self, workspace, branch=None):
return b'', b'', 0


class AuthenticatedDulwichDvcsCmd(DulwichDvcsCmd):
name = 'authenticated_dulwich'

def __init__(self, remote=None):
super().__init__(remote=remote)

self._auth_header = None

def set_authorization(self, authorization_header):
"""
Sets the authorization header for requests made by this class. The input should specify the type of authorization along with the
authorization token itself (e.g., "Basic {token}").
"""
self._auth_header = authorization_header

def _authenticate_pool_manager(self, *args, **kwargs):
pool_manager = urllib3.PoolManager()
pool_manager.headers['Authorization'] = self._auth_header
return pool_manager

def clone(self, workspace, **kw):
pool_manager = self._authenticate_pool_manager()
out_stream = BytesIO()
err_stream = BytesIO()
porcelain.clone(
self.remote,
workspace.working_dir,
pool_manager=pool_manager,
outstream=out_stream,
errstream=err_stream
)
return out_stream.getvalue(), err_stream.getvalue(), 0

def pull(self, workspace, **kw):
pool_manager = self._authenticate_pool_manager()
target = self.read_remote(workspace)
out_stream = BytesIO()
err_stream = BytesIO()
try:
result = 0
porcelain.pull(
workspace.working_dir,
target.encode(),
pool_manager=pool_manager,
outstream=out_stream,
errstream=err_stream
)
except NotGitRepository as e:
result = 1
err_stream.write(b'Not a Git repository ' + target.encode())

return out_stream.getvalue(), err_stream.getvalue(), result

def push(self, workspace, **kw):
pool_manager = self._authenticate_pool_manager()
target = self.read_remote(workspace)
out_stream = BytesIO()
err_stream = BytesIO()
try:
result = 0
porcelain.push(
workspace.working_dir,
target.encode(),
pool_manager=pool_manager,
outstream=out_stream,
errstream=err_stream
)
except NotGitRepository as e:
result = 1
err_stream.write(b'Not a Git repository ' + target.encode())

return out_stream.getvalue(), err_stream.getvalue(), result


def _register():
register_cmd(MercurialDvcsCmd, DulwichDvcsCmd, GitDvcsCmd)
register_cmd(MercurialDvcsCmd, DulwichDvcsCmd, GitDvcsCmd, AuthenticatedDulwichDvcsCmd, AuthenticatedGitDvcsCmd)


register = _register
Expand Down
140 changes: 138 additions & 2 deletions tests/test_cmd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest import TestCase, skipIf
from unittest import TestCase, skipIf, skip

import os
import sys
Expand All @@ -12,7 +12,8 @@
from StringIO import StringIO

try:
from dulwich import porcelain
from dulwich import porcelain, client

except ImportError:
pass

Expand All @@ -21,6 +22,8 @@
from pmr2.wfctrl.cmd import GitDvcsCmd
from pmr2.wfctrl.cmd import MercurialDvcsCmd
from pmr2.wfctrl.cmd import DulwichDvcsCmd
from pmr2.wfctrl.cmd import AuthenticatedGitDvcsCmd
from pmr2.wfctrl.cmd import AuthenticatedDulwichDvcsCmd

from pmr2.wfctrl.testing.base import CoreTestCase
from pmr2.wfctrl.testing.base import CoreTests
Expand Down Expand Up @@ -344,3 +347,136 @@ def _ls_root(self, workspace=None):

def test_get_cmd_by_name(self):
self.assertEqual(get_cmd_by_name('dulwich'), self.cmdcls)


@skipIf(not AuthenticatedGitDvcsCmd.available(), 'git is not available')
class AuthenticatedGitDvcsCmdTestCase(GitDvcsCmdTestCase):
cmdcls = AuthenticatedGitDvcsCmd

def setUp(self):
super(AuthenticatedGitDvcsCmdTestCase, self).setUp()
self.cmd = AuthenticatedGitDvcsCmd()
self.workspace = CmdWorkspace(self.workspace_dir, self.cmd)

def _log(self, workspace=None):
return AuthenticatedGitDvcsCmd._execute(self.cmd._args(self.workspace, 'log'))

def _ls_root(self, workspace=None):
branch, _, *_ = self.cmd.execute(
*self.cmd._args(self.workspace, 'branch', '--show-current'))
branch = branch.strip()
return AuthenticatedGitDvcsCmd._execute(
self.cmd._args(self.workspace, 'ls-tree', branch))

def _make_remote(self):
target = os.path.join(self.working_dir, 'remote')
AuthenticatedGitDvcsCmd._execute(['init', target, '--bare'])
return target

def test_get_cmd_by_name(self):
self.assertEqual(get_cmd_by_name('authenticated_git'), self.cmdcls)

def test_push_url_with_creds(self):
credentials = 'Basic username:password'
cmd = self.TrapCmd(remote='http://example.com/')
cmd.set_authorization(credentials)
workspace = CmdWorkspace(self.workspace_dir, cmd)
cmd.push(workspace)
auth_header = cmd.execute(*cmd._args(workspace, 'config', '--get', 'http.extraHeader'))
self.assertEqual(f'Authorization: {credentials}', auth_header[0].decode().strip())

def test_pull_url_with_creds(self):
credentials = 'Basic username:password'
cmd = self.TrapCmd(remote='http://example.com/')
cmd.set_authorization(credentials)
workspace = CmdWorkspace(self.workspace_dir, cmd)
cmd.pull(workspace)
auth_header = cmd.execute(*cmd._args(workspace, 'config', '--get', 'http.extraHeader'))
self.assertEqual(f'Authorization: {credentials}', auth_header[0].decode().strip())

def test_clone(self):
credentials = 'Basic username:password'
cmd = self.TrapCmd(remote='http://example.com/')
cmd.set_authorization(credentials)
target = os.path.join(self.working_dir, 'new_target')
workspace = CmdWorkspace(target, cmd)
cmd.init_new(workspace)
result = cmd.clone(workspace)

self.assertTrue(isdir(join(target, self.marker)))
authorisation = f'http.extraHeader=Authorization: {credentials}'
self.assertIn(authorisation, result[0].decode())

@skip("Not applicable.")
def test_auto_init(self):
pass


class CustomLocalGitClient(client.LocalGitClient):
def __init__(self, pool_manager=None, *args, **kwargs):
super().__init__(*args, **kwargs)


class TrapDulwichCmd(AuthenticatedDulwichDvcsCmd):
def __init__(self, remote=None):
super(TrapDulwichCmd, self).__init__(remote)
self._pool_manager = None

def _authenticate_pool_manager(self, *args, **kwargs):
self._pool_manager = super(TrapDulwichCmd, self)._authenticate_pool_manager(*args, **kwargs)


@skipIf(not AuthenticatedDulwichDvcsCmd.available(), 'dulwich is not available')
class AuthenticatedDulwichDvcsCmdTestCase(DulwichDvcsCmdTestCase):
cmdcls = AuthenticatedDulwichDvcsCmd

def setUp(self):
super(DulwichDvcsCmdTestCase, self).setUp()
self.cmd = AuthenticatedDulwichDvcsCmd()
self.workspace = CmdWorkspace(self.workspace_dir, self.cmd)

# Temporarily override the local dulwich client.
self._default_client = client.default_local_git_client_cls
client.default_local_git_client_cls = CustomLocalGitClient

def tearDown(self):
# Reset the local dulwich client.
client.default_local_git_client_cls = self._default_client

def test_get_cmd_by_name(self):
self.assertEqual(get_cmd_by_name('authenticated_dulwich'), self.cmdcls)

def test_push_url_with_creds(self):
credentials = 'Basic username:password'
cmd = TrapDulwichCmd()
cmd.set_authorization(credentials)
workspace = CmdWorkspace(self.workspace_dir, cmd)
cmd.push(workspace)
auth_header = cmd._pool_manager.headers['Authorization']
self.assertEqual(credentials, auth_header)

def test_pull_url_with_creds(self):
credentials = 'Basic username:password'
cmd = TrapDulwichCmd()
cmd.set_authorization(credentials)
workspace = CmdWorkspace(self.workspace_dir, cmd)
cmd.pull(workspace)
auth_header = cmd._pool_manager.headers['Authorization']
self.assertEqual(credentials, auth_header)

def test_clone(self):
self.cmd.init_new(self.workspace)
target = os.path.join(self.working_dir, 'new_target')
workspace = CmdWorkspace(target)
credentials = 'Basic username:password'
cmd = TrapDulwichCmd(remote=self.workspace_dir)
cmd.set_authorization(credentials)
cmd.clone(workspace)

self.assertTrue(isdir(join(target, self.marker)))
auth_header = cmd._pool_manager.headers['Authorization']
self.assertEqual(credentials, auth_header)

@skip("Not applicable.")
def test_auto_init(self):
pass

0 comments on commit d7bf5b9

Please sign in to comment.