Skip to content

Commit

Permalink
Iss632 after rebase on future (#721)
Browse files Browse the repository at this point in the history
* refact branch rebase off of future; introduce custom error (UnparallelizableError and AdjLineNoteImplemented Error) caught before general errors, introduce custom error to ast_to_ir and ir in compiler at appropriate places with more detail error messages

Signed-off-by: YUUU23 <[email protected]>

* refactor: import custom error to ast_to_ir, raise unparallelizable err in pash_compiler

Signed-off-by: YUUU23 <[email protected]>

* refactor: put all expansion custom error from the sh_expand library (expand.py file) under one ExpansionError class in custom_error to catch and log these errors separately

Signed-off-by: YUUU23 <[email protected]>

* fix: remove duplicated ExpansionError in excepts (compile_ir)

Signed-off-by: YUUU23 <[email protected]>

* refactor: import ExpansionError (ExpansionError class initiated within expand package

Signed-off-by: YUUU23 <[email protected]>

* delete: remove expand.py changes as it will be changed in the original package

Signed-off-by: YUUU23 <[email protected]>

* fix: import expansion error

Signed-off-by: YUUU23 <[email protected]>

---------

Signed-off-by: YUUU23 <[email protected]>
  • Loading branch information
YUUU23 authored Jun 12, 2024
1 parent 14b58e9 commit c11365c
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 11 deletions.
6 changes: 4 additions & 2 deletions compiler/ast_to_ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from util import *
from parse import from_ast_objects_to_shell

from custom_error import *

## TODO: Separate the ir stuff to the bare minimum and
## try to move this to the shell_ast folder.

Expand Down Expand Up @@ -159,7 +161,7 @@ def combine_pipe(ast_nodes):
else:
## If any part of the pipe is not an IR, the compilation must fail.
log("Node: {} is not pure".format(ast_nodes[0]))
raise Exception("Not pure node in pipe")
raise UnparallelizableError("Node: {} is not a pure node in pipe".format(ast_nodes[0]))

## Combine the rest of the nodes
for ast_node in ast_nodes[1:]:
Expand All @@ -168,7 +170,7 @@ def combine_pipe(ast_nodes):
else:
## If any part of the pipe is not an IR, the compilation must fail.
log("Node: {} is not pure".format(ast_nodes))
raise Exception("Not pure node in pipe")
raise UnparallelizableError("This specific node: {} is not a pure node in pipe".format(ast_node))

return [combined_nodes]

Expand Down
5 changes: 5 additions & 0 deletions compiler/custom_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class UnparallelizableError(Exception):
pass

class AdjLineNotImplementedError(Exception):
pass
15 changes: 8 additions & 7 deletions compiler/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from shell_ast.ast_util import *
from util import *
from custom_error import *

import config

Expand Down Expand Up @@ -242,11 +243,11 @@ def compile_command_to_DFG(fileIdGen, command, options, redirections=None):
command_invocation
)
if io_info is None:
raise Exception(
raise UnparallelizableError(
f"InputOutputInformation for {format_arg_chars(command)} not provided so considered side-effectful."
)
if io_info.has_other_outputs():
raise Exception(
raise UnparallelizableError(
f"Command {format_arg_chars(command)} has outputs other than streaming."
)
para_info: ParallelizabilityInfo = (
Expand Down Expand Up @@ -840,7 +841,7 @@ def apply_parallelization_to_node(
node_id, parallelizer, fileIdGen, fan_out
)
else:
raise Exception("Splitter not yet implemented")
raise UnparallelizableError("Splitter not yet implemented for command: {}".format(self.get_node(node_id=node_id).cmd_invocation_with_io_vars.cmd_name))

def apply_round_robin_parallelization_to_node(
self, node_id, parallelizer, fileIdGen, fan_out, r_split_batch_size
Expand All @@ -849,11 +850,11 @@ def apply_round_robin_parallelization_to_node(
# currently, this cannot be done since splitter etc. would be added...
aggregator_spec = parallelizer.get_aggregator_spec()
if aggregator_spec.is_aggregator_spec_adj_lines_merge():
raise Exception("adj_lines_merge not yet implemented in PaSh")
raise AdjLineNotImplementedError("adj_lines_merge not yet implemented in PaSh")
elif aggregator_spec.is_aggregator_spec_adj_lines_seq():
raise Exception("adj_lines_seq not yet implemented in PaSh")
raise AdjLineNotImplementedError("adj_lines_seq not yet implemented in PaSh")
elif aggregator_spec.is_aggregator_spec_adj_lines_func():
raise Exception("adj_lines_func not yet implemented in PaSh")
raise AdjLineNotImplementedError("adj_lines_func not yet implemented in PaSh")
# END of what to move

node = self.get_node(node_id)
Expand Down Expand Up @@ -1192,7 +1193,7 @@ def introduce_aggregators_for_consec_chunks(
fileIdGen,
)
else:
raise Exception("aggregator kind not yet implemented")
raise UnparallelizableError("aggregator kind not yet implemented for command: {}".format(original_cmd_invocation_with_io_vars.cmd_name))
else: # we got auxiliary information
assert parallelizer.core_aggregator_spec.is_aggregator_spec_custom_2_ary()
map_in_aggregator_ids = in_aggregator_ids
Expand Down
14 changes: 12 additions & 2 deletions compiler/pash_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from datetime import datetime

from sh_expand import env_vars_util
from sh_expand.expand import ExpansionError

import config
from ir import *
from ast_to_ir import compile_asts
from ir_to_ast import to_shell
from pash_graphviz import maybe_generate_graphviz
from util import *
from custom_error import *

from definitions.ir.aggregator_node import *

Expand Down Expand Up @@ -92,9 +94,17 @@ def compile_ir(ir_filename, compiled_script_file, args, compiler_config):
ret = compile_optimize_output_script(
ir_filename, compiled_script_file, args, compiler_config
)
except ExpansionError as e:
log("WARNING: Exception caught because some region(s) are not expandable and therefore unparallelizable:", e)
except UnparallelizableError as e:
log("WARNING: Exception caught because some region(s) are unparallelizable:", e)
# log(traceback.format_exc()) # uncomment for exact trace report (PaSh user should see informative messages for unparellizable regions)
except (AdjLineNotImplementedError, NotImplementedError) as e:
log("WARNING: Exception caught because some part is not implemented:", e)
log(traceback.format_exc())
except Exception as e:
log("WARNING: Exception caught:", e)
# traceback.print_exc()
log(traceback.format_exc())

return ret

Expand Down Expand Up @@ -142,7 +152,7 @@ def compile_optimize_output_script(

ret = optimized_ast_or_ir
else:
raise Exception("Script failed to compile!")
raise UnparallelizableError("Script failed to compile!")

return ret

Expand Down

0 comments on commit c11365c

Please sign in to comment.