Skip to content

Commit

Permalink
Merge pull request #17 from melexis/login_once
Browse files Browse the repository at this point in the history
Login to server once, retrieve defects with each node
  • Loading branch information
SteinHeselmans authored Jan 28, 2019
2 parents 32d8c20 + 3a9d573 commit b47047b
Showing 1 changed file with 158 additions and 143 deletions.
301 changes: 158 additions & 143 deletions mlx/coverity.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,141 +136,168 @@ def run(self):
return [item_list_node]


# -----------------------------------------------------------------------------
# Event handlers
def process_coverity_nodes(app, doctree, fromdocname):
"""
This function should be triggered upon ``doctree-resolved event``
Obtain information from Coverity server and generate a table.
"""
env = app.builder.env

# Login to Coverity and obtain stream information
try:
report_info(env, 'Login to Coverity server... ', True)
coverity_conf_service = CoverityConfigurationService(app.config.coverity_credentials['transport'],
app.config.coverity_credentials['hostname'],
app.config.coverity_credentials['port'])
coverity_conf_service.login(app.config.coverity_credentials['username'],
app.config.coverity_credentials['password'])
report_info(env, 'done')

report_info(env, 'obtaining stream information... ', True)
stream = coverity_conf_service.get_stream(app.config.coverity_credentials['stream'])
if stream is None:
report_info(env, 'failed')
raise ValueError('No such Coverity stream [%s] found on [%s]',
app.config.coverity_credentials['stream'], coverity_conf_service.get_service_url())
report_info(env, 'done')

# Get Stream's project name
report_info(env, 'obtaining project name from stream... ', True)
project_name = coverity_conf_service.get_project_name(stream)
report_info(env, 'done')
coverity_service = CoverityDefectService(coverity_conf_service)
coverity_service.login(app.config.coverity_credentials['username'], app.config.coverity_credentials['password'])
except (URLError, HTTPError, Exception, ValueError) as error_info:
report_info(env, 'failed with: %s' % error_info)
# Create failed topnode
class SphinxCoverityConnector():
def __init__(self):
"""
Initialize the object by setting error variable to false
"""
self.coverity_login_error = False

def initialize_environment(self, app):
"""
Perform initializations needed before the build process starts.
"""

# LaTeX-support: since we generate empty tags, we need to relax the verbosity of that error
if 'preamble' not in app.config.latex_elements:
app.config.latex_elements['preamble'] = ''
app.config.latex_elements['preamble'] += '''\
\\makeatletter
\\let\@noitemerr\\relax
\\makeatother'''

env = app.builder.env

# Login to Coverity and obtain stream information
try:
report_info(env, 'Login to Coverity server... ', True)
coverity_conf_service = CoverityConfigurationService(app.config.coverity_credentials['transport'],
app.config.coverity_credentials['hostname'],
app.config.coverity_credentials['port'])
coverity_conf_service.login(app.config.coverity_credentials['username'],
app.config.coverity_credentials['password'])
report_info(env, 'done')

report_info(env, 'obtaining stream information... ', True)
stream = coverity_conf_service.get_stream(app.config.coverity_credentials['stream'])
if stream is None:
raise ValueError('No such Coverity stream [%s] found on [%s]' %
(app.config.coverity_credentials['stream'], coverity_conf_service.get_service_url()))
report_info(env, 'done')

# Get Stream's project name
report_info(env, 'obtaining project name from stream... ', True)
self.project_name = coverity_conf_service.get_project_name(stream)
report_info(env, 'done')
self.coverity_service = CoverityDefectService(coverity_conf_service)
self.coverity_service.login(app.config.coverity_credentials['username'],
app.config.coverity_credentials['password'])
except (URLError, HTTPError, Exception, ValueError) as error_info:
self.coverity_login_error_msg = error_info
report_info(env, 'failed with: %s' % error_info)
self.coverity_login_error = True

# -----------------------------------------------------------------------------
# Event handlers
def process_coverity_nodes(self, app, doctree, fromdocname):
"""
This function should be triggered upon ``doctree-resolved event``
Obtain information from Coverity server and generate a table.
"""
env = app.builder.env

if self.coverity_login_error:
# Create failed topnode
for node in doctree.traverse(CoverityDefect):
top_node = create_top_node("Failed to connect to Coverity Server")
node.replace_self(top_node)
report_warning(env, 'Connection failed: %s' % self.coverity_login_error_msg, fromdocname)
return

# Item matrix:
# Create table with related items, printing their target references.
# Only source and target items matching respective regexp shall be included
for node in doctree.traverse(CoverityDefect):
top_node = create_top_node("Failed to connect to Coverity Server")
node.replace_self(top_node)
return
top_node = create_top_node(node['title'])
table = nodes.table()
tgroup = nodes.tgroup()

# Item matrix:
# Create table with related items, printing their target references.
# Only source and target items matching respective regexp shall be included
for node in doctree.traverse(CoverityDefect):
top_node = create_top_node(node['title'])
table = nodes.table()
tgroup = nodes.tgroup()
for c in node['col']:
tgroup += [nodes.colspec(colwidth=5)]

for c in node['col']:
tgroup += [nodes.colspec(colwidth=5)]
tgroup += nodes.thead('', create_row(node['col']))
tbody = nodes.tbody()

tgroup += nodes.thead('', create_row(node['col']))
tbody = nodes.tbody()
tgroup += tbody
table += tgroup

tgroup += tbody
table += tgroup
# Setup counters
# count_total = 0
# count_covered = 0

# Setup counters
# count_total = 0
# count_covered = 0
# Get items from server
report_info(env, 'obtaining defects... ', True)
try:
defects = self.coverity_service.get_defects(self.project_name, app.config.coverity_credentials['stream'], # noqa: E501
checker=node['checker'], impact=node['impact'], kind=node['kind'], # noqa: E501
classification=node['classification'], action=node['action'], # noqa: E501
component=node['component'], cwe=node['cwe'], cid=node['cid']) # noqa: E501
except (URLError, AttributeError, Exception) as e:
report_warning(env, 'failed with %s' % e, fromdocname)
continue
report_info(env, "%d received" % (defects['totalNumberOfRecords']))
report_info(env, "building defects table... ", True)

# Get items from server
report_info(env, 'obtaining defects... ', True)
try:
defects = coverity_service.get_defects(project_name, app.config.coverity_credentials['stream'],
checker=node['checker'], impact=node['impact'], kind=node['kind'],
classification=node['classification'], action=node['action'],
component=node['component'], cwe=node['cwe'], cid=node['cid'])
except (URLError, AttributeError, Exception) as e:
report_warning(env, 'failed with %s' % e)
continue
report_info(env, "%d received" % (defects['totalNumberOfRecords']))
report_info(env, "building defects table... ", True)
try:
for defect in defects['mergedDefects']:
row = nodes.row()

# go through each col and decide if it is there or we print empty
for item_col in node['col']:
if 'CID' == item_col:
# CID is default and even if it is in disregard
row += create_cell(str(defect['cid']),
url=self.coverity_service.get_defect_url(app.config.coverity_credentials['stream'], # noqa: E501
str(defect['cid'])))
elif 'Category' == item_col:
row += create_cell(defect['displayCategory'])
elif 'Impact' == item_col:
row += create_cell(defect['displayImpact'])
elif 'Issue' == item_col:
row += create_cell(defect['displayIssueKind'])
elif 'Type' == item_col:
row += create_cell(defect['displayType'])
elif 'Checker' == item_col:
row += create_cell(defect['checkerName'])
elif 'Component' == item_col:
row += create_cell(defect['componentName'])
elif 'Comment' == item_col:
row += cov_attribute_value_to_col(defect, 'Comment')
elif 'Classification' == item_col:
row += cov_attribute_value_to_col(defect, 'Classification')
elif 'Action' == item_col:
row += cov_attribute_value_to_col(defect, 'Action')
elif 'Status' == item_col:
row += cov_attribute_value_to_col(defect, 'DefectStatus')
else:
# generic check which if it is missing prints empty cell anyway
row += cov_attribute_value_to_col(defect, item_col)

tbody += row
report_info(env, "done")
top_node += table
except AttributeError as e:
report_info(env, 'No issues matching your query or empty stream. %s' % e)
top_node += table
top_node += nodes.paragraph(text='No issues matching your query or empty stream')

try:
for defect in defects['mergedDefects']:
row = nodes.row()

# go through each col and decide if it is there or we print empty
for item_col in node['col']:
if 'CID' == item_col:
# CID is default and even if it is in disregard
row += create_cell(str(defect['cid']),
url=coverity_service.get_defect_url(app.config.coverity_credentials['stream'], # noqa: E501
str(defect['cid'])))
elif 'Category' == item_col:
row += create_cell(defect['displayCategory'])
elif 'Impact' == item_col:
row += create_cell(defect['displayImpact'])
elif 'Issue' == item_col:
row += create_cell(defect['displayIssueKind'])
elif 'Type' == item_col:
row += create_cell(defect['displayType'])
elif 'Checker' == item_col:
row += create_cell(defect['checkerName'])
elif 'Component' == item_col:
row += create_cell(defect['componentName'])
elif 'Comment' == item_col:
row += cov_attribute_value_to_col(defect, 'Comment')
elif 'Classification' == item_col:
row += cov_attribute_value_to_col(defect, 'Classification')
elif 'Action' == item_col:
row += cov_attribute_value_to_col(defect, 'Action')
elif 'Status' == item_col:
row += cov_attribute_value_to_col(defect, 'DefectStatus')
else:
# generic check which if it is missing prints empty cell anyway
row += cov_attribute_value_to_col(defect, item_col)

tbody += row
report_info(env, "done")
top_node += table
except AttributeError as e:
report_info(env, 'No issues matching your query or empty stream. %s' % e)
top_node += table
top_node += nodes.paragraph(text='No issues matching your query or empty stream')

node.replace_self(top_node)
# try:
# percentage = int(100 * count_covered / count_total)
# except ZeroDivisionError:
# percentage = 0
# disp = 'Statistics: {cover} out of {total} covered: {pct}%'.format(cover=count_covered,
# total=count_total,
# pct=percentage)
# if node['graph']:
# p_node = nodes.paragraph()
# txt = nodes.Text(disp)
# p_node += txt
# top_node += p_node
#
# top_node += table
#
node.replace_self(top_node)
# try:
# percentage = int(100 * count_covered / count_total)
# except ZeroDivisionError:
# percentage = 0
# disp = 'Statistics: {cover} out of {total} covered: {pct}%'.format(cover=count_covered,
# total=count_total,
# pct=percentage)
# if node['graph']:
# p_node = nodes.paragraph()
# txt = nodes.Text(disp)
# p_node += txt
# top_node += p_node
#
# top_node += table
#


def create_ref_node(contents, url):
Expand Down Expand Up @@ -324,20 +351,6 @@ def cov_attribute_value_to_col(defect, name):
return col


def initialize_environment(app):
"""
Perform initializations needed before the build process starts.
"""

# LaTeX-support: since we generate empty tags, we need to relax the verbosity of that error
if 'preamble' not in app.config.latex_elements:
app.config.latex_elements['preamble'] = ''
app.config.latex_elements['preamble'] += '''\
\\makeatletter
\\let\@noitemerr\\relax
\\makeatother'''


# Extension setup
def setup(app):
'''Extension setup'''
Expand All @@ -356,11 +369,13 @@ def setup(app):

app.add_node(CoverityDefect)

sphinx_coverity_connector = SphinxCoverityConnector()

app.add_directive('coverity-list', CoverityDefectListDirective)

app.connect('doctree-resolved', process_coverity_nodes)
app.connect('doctree-resolved', sphinx_coverity_connector.process_coverity_nodes)

app.connect('builder-inited', initialize_environment)
app.connect('builder-inited', sphinx_coverity_connector.initialize_environment)

try:
return {'version': '%(prog)s {version}'.format(version=pkg_resources.require('mlx.coverity')[0].version)}
Expand Down

0 comments on commit b47047b

Please sign in to comment.