Skip to content

Commit

Permalink
Add authenticated CMD sub-classes
Browse files Browse the repository at this point in the history
  • Loading branch information
tsalemink committed Mar 25, 2024
1 parent 6a00bdf commit 89de9bc
Showing 1 changed file with 103 additions and 1 deletion.
104 changes: 103 additions & 1 deletion src/pmr2/wfctrl/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import sys
from io import BytesIO

import urllib3
from base64 import b64encode

if sys.version_info > (3, 0): # pragma: no cover
from configparser import ConfigParser
else: # pragma: no cover
Expand Down Expand Up @@ -216,6 +219,37 @@ def reset_to_remote(self, workspace, branch=None):
return self.execute(*args)


class AuthenticatedGitDvcsCmd(GitDvcsCmd):
name = 'authenticated_git'

def __init__(self, username, password, remote=None, cmd_binary=None):
super().__init__(remote=remote, cmd_binary=cmd_binary)

auth_bytes = f"{username}:{password}".encode()
self._token = b64encode(auth_bytes).decode()

def _auth_header(self):
return f'Authorization: Basic {self._token}'

def _authenticate(self, workspace):
args = ['config', 'http.extraHeader', self._auth_header()]
return self.execute(*self._args(workspace, *args))

def clone(self, workspace, **kw):
args = ['clone', '--config', f'http.extraHeader={self._auth_header()}', self.remote, workspace.working_dir]
return self.execute(*args)

def pull(self, workspace, **kw):
self._authenticate(workspace)
target = self.read_remote(workspace, target_remote=self.remote)
return self.execute(*self._args(workspace, 'pull', target))

def push(self, workspace, **kw):
self._authenticate(workspace)
target = self.read_remote(workspace, target_remote=self.remote)
return self.execute(*self._args(workspace, 'push', target))


class DulwichDvcsCmd(BaseDvcsCmd):
name = 'dulwich'
marker = '.git'
Expand Down Expand Up @@ -311,8 +345,76 @@ def reset_to_remote(self, workspace, branch=None):
return b'', b'', 0


class AuthenticatedDulwichDvcsCmd(DulwichDvcsCmd):
name = 'authenticated_dulwich'

def __init__(self, username, password, remote=None):
super().__init__(remote=remote)

auth_bytes = f"{username}:{password}".encode()
self._token = b64encode(auth_bytes).decode()

def _authenticate_pool_manager(self, *args, **kwargs):
pool_manager = urllib3.PoolManager()
pool_manager.headers['Authorization'] = f'Basic {self._token}'
return pool_manager

def clone(self, workspace, **kw):
pool_manager = self._authenticate_pool_manager()
out_stream = BytesIO()
err_stream = BytesIO()
porcelain.clone(
self.remote,
workspace.working_dir,
pool_manager=pool_manager,
outstream=out_stream,
errstream=err_stream
)
return out_stream.getvalue(), err_stream.getvalue(), 0

def pull(self, workspace, **kw):
pool_manager = self._authenticate_pool_manager()
target = self.read_remote(workspace, target_remote=self.remote)
out_stream = BytesIO()
err_stream = BytesIO()
try:
result = 0
porcelain.pull(
workspace.working_dir,
target.encode(),
pool_manager=pool_manager,
outstream=out_stream,
errstream=err_stream
)
except NotGitRepository as e:
result = 1
err_stream.write(b'Not a Git repository ' + target.encode())

return out_stream.getvalue(), err_stream.getvalue(), result

def push(self, workspace, **kw):
pool_manager = self._authenticate_pool_manager()
target = self.read_remote(workspace, target_remote=self.remote)
out_stream = BytesIO()
err_stream = BytesIO()
try:
result = 0
porcelain.push(
workspace.working_dir,
target.encode(),
pool_manager=pool_manager,
outstream=out_stream,
errstream=err_stream
)
except NotGitRepository as e:
result = 1
err_stream.write(b'Not a Git repository ' + target.encode())

return out_stream.getvalue(), err_stream.getvalue(), result


def _register():
register_cmd(MercurialDvcsCmd, DulwichDvcsCmd, GitDvcsCmd)
register_cmd(MercurialDvcsCmd, DulwichDvcsCmd, GitDvcsCmd, AuthenticatedDulwichDvcsCmd, AuthenticatedGitDvcsCmd)


register = _register
Expand Down

0 comments on commit 89de9bc

Please sign in to comment.