-
Notifications
You must be signed in to change notification settings - Fork 3
/
test_msconvert.py
71 lines (61 loc) · 2.31 KB
/
test_msconvert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import os
import tempfile
import shutil
import subprocess
from unittest import TestCase
from unittest.mock import patch, MagicMock
class TestMSConvert(TestCase):
def setUp(self):
self._ch = MagicMock()
self._apiv2 = (
self._ch.systemsx.cisd.etlserver.registrator.api.v2
)
self._dto = (
self._ch.systemsx.cisd.openbis.generic.shared.api.v1.dto
)
modules = {
'ch': self._ch,
'ch.systemsx.cisd.openbis.generic.shared.api.v1.dto': self._dto,
'ch.systemsx.cisd.etlserver.registrator.api.v2': self._apiv2
}
self.module_patcher = patch.dict('sys.modules', modules)
self.module_patcher.start()
from ch.systemsx.cisd.openbis.generic.shared.api.v1.dto import (
SearchCriteria, SearchSubCriteria
)
self.SearchCriteria = SearchCriteria
self.SearchSubCriteria = SearchSubCriteria
self.tmp = tempfile.mkdtemp()
def tearDown(self):
self.module_patcher.stop()
shutil.rmtree(self.tmp)
def test_ssh(self):
import etl_msconvert
etl_msconvert.call_ssh(
['echo', '*'], 'localhost', timeout=10
)[0] == b'*'
with self.assertRaises(etl_msconvert.TimeoutError):
etl_msconvert.call_ssh(
['sleep', '10'], 'localhost', timeout=1
)
with self.assertRaises(subprocess.CalledProcessError):
etl_msconvert.call_ssh(['i-dont-exist'], 'localhost', timeout=1)
def test_rsync(self):
import etl_msconvert
source = self.tmp + '/source'
dest = self.tmp + '/dest'
open(source, 'w').close()
etl_msconvert.rsync(source, dest, timeout=1)
os.unlink(dest)
etl_msconvert.rsync(source, dest, source_host='localhost', timeout=1)
os.unlink(dest)
etl_msconvert.rsync(source, dest, dest_host='localhost', timeout=1)
def test_msconvert(self):
import etl_msconvert
source = self.tmp + '/source'
dest = self.tmp + '/dest'
open(source, 'w').close()
remote_base = os.path.join(self.tmp, 'remote')
os.mkdir(remote_base)
etl_msconvert.convert_raw(source, dest, remote_base=remote_base,
host='localhost', timeout=1, dryrun=True)