-
Notifications
You must be signed in to change notification settings - Fork 1
/
TestSuites.py
172 lines (140 loc) · 6.62 KB
/
TestSuites.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import requests
import time
import csv
import random
import string
from Signature import getSignature
from JobTemplate import JobTemplate
class TestSuites:
def __init__(self, host):
self.host = host
def _addTestSuite(self, name, setting_str, description = ""):
testsuite_dic = {}
testsuite_dic["name"] = name
testsuite_dic["description"] = description
for s in setting_str.split("\n"):
if s == '':
continue
setting = s.split("=")
key = "settings[" + setting[0] + "]"
testsuite_dic[key] = setting[1]
self.total_num = self.total_num + 1
#open the result file
refile = open(self.result_file, 'a')
resp = requests.post(self.add_case_host, headers=self.header, data=testsuite_dic, verify=False)
if resp.status_code != 200:
self.failed_num = self.failed_num + 1
refile.write("[ERROR] add test suite " + name + " failed \n")
err_msg = resp.json()
refile.write(err_msg["error"] + "\n")
if "duplicate key" not in err_msg["error"]:
self.flag_add_jobTemplate = 0
else :
refile.write("[SUCCESS] add test suite " + name + " successfully \n")
self.success_num = self.success_num + 1
add_case_result = resp.json()
testsuite_id = add_case_result["id"]
if self.flag_add_jobTemplate == 1:
job_template = JobTemplate(self.host)
error_message = job_template.addJobTemplate(self.api_key, self.api_secret, self.flavor, self.distri, self.version, self.arch, self.machine, self.group, name, self.priority)
if error_message != '':
refile.write('[ERROR] add job template failed \n')
refile.write(error_message + "\n")
self.job_template_failed_num = self.job_template_failed_num + 1
else:
refile.write("[SUCCESS] add job template" + self.group + " " + name + " successfully \n")
refile.close()
def addTestSuite(self,csv_file, api_key, api_secret, flag_add_jobTemplate=0, flavor='', distri='', version='', arch='', machine='', group='', priority=50):
self.csv_file = csv_file
self.api_key = api_key
self.api_secret = api_secret
self.flavor = flavor
self.distri = distri
self.version = version
self.arch = arch
self.machine = machine
self.group = group
self.priority = priority
self.flag_add_jobTemplate = flag_add_jobTemplate
timestamp = int(time.time())
url = '/api/v1/test_suites'
# get signature
signature = getSignature(url + str(timestamp), self.api_secret)
header = {'X-API-KEY':self.api_key, 'X-API-HASH':signature, 'X-API-Microtime':str(timestamp)}
result_file = ''.join(random.sample(string.ascii_letters + string.digits, 8)) + ".txt"
error_message = ''
self.add_case_host = self.host + url
self.header = header
self.result_file = result_file
self.total_num = 0
self.success_num = 0
self.failed_num = 0
self.job_template_failed_num = 0;
#read csv file and add test suites
try:
with open(self.csv_file, 'r') as testsuite_csv:
content = csv.reader(testsuite_csv, delimiter=',')
for row in content:
name = row[0]
setting = row[1]
self._addTestSuite(name, setting)
rfile = open(self.result_file, 'a')
rfile.write(str(self.total_num) + " testsuites are operated \n")
rfile.write(str(self.success_num) + " testsuites success to add \n")
rfile.write(str(self.failed_num) + " testsuites failed to add \n")
rfile.write(str(self.job_template_failed_num) + " testsuites failed to add into job template")
rfile.close()
except OSError as err:
error_message = err
return error_message, result_file
def deleteTestSuite(self, api_key, api_secret, testsuite_id):
timestamp = int(time.time())
url = '/api/v1/test_suites/' + str(testsuite_id)
# get signature
signature = getSignature(url + str(timestamp), api_secret)
delete_header = {'X-API-KEY':api_key, 'X-API-HASH':signature, 'X-API-Microtime':str(timestamp), 'Accept':'application/json'}
delete_url = self.host + url
resp = requests.delete(delete_url, headers=delete_header, verify=False)
if resp.status_code != 200 :
data = resp.json()
print(data)
return data["error_status"]
return ''
def getAllTestSuite(self):
resp = requests.get(self.host + "/api/v1/test_suites", verify=False)
data = resp.json()
if resp.status_code != 200:
return data["error"] , ()
return '', data["TestSuites"]
def delTestSuite(self,csv_file, api_key, api_secret):
#read csv file and add test suites
result_file = ''.join(random.sample(string.ascii_letters + string.digits, 8)) + ".txt"
refile = open(result_file, 'a')
error_message, testsuites = self.getAllTestSuite()
cvs_name_list = []
self.total_num = 0
self.success_num = 0
self.failed_num = 0
try:
with open(csv_file, 'r') as testsuite_csv:
content = csv.reader(testsuite_csv, delimiter=',')
for row in content:
name = row[0]
cvs_name_list.append(name)
for x in testsuites:
if x["name"] in cvs_name_list:
error_message = self.deleteTestSuite(api_key, api_secret, x["id"])
self.total_num += 1
if error_message != '':
refile.write("[ERROR] del test suite " + x["name"] + " failed \n")
refile.write(error_message + "\n")
self.failed_num += 1
else:
refile.write("[SUCCESS] del test suite " + x["name"] + " successfully \n")
self.success_num += 1
refile.write(str(self.total_num) + " testsuites are operated \n")
refile.write(str(self.success_num) + " testsuites success to del \n")
refile.write(str(self.failed_num) + " testsuites failed to del \n")
except OSError as err:
error_message = err
return error_message, result_file