Skip to content

Commit

Permalink
Merge pull request #55 from civitaspo/develop
Browse files Browse the repository at this point in the history
0.0.12
  • Loading branch information
civitaspo authored May 23, 2019
2 parents 2d394b0 + ae7addf commit bae3eb4
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 20 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
0.0.12 (2019-05-23)
===================
* [Enhancement] Follow latest python runner script used by `ecs_task.py>`. The changes resolve the same issues that the bellow p-rs resolve.
* [Support type hints for Python3 on py> operator by chezou · Pull Request \#905 · treasure\-data/digdag](https://github.com/treasure-data/digdag/pull/905)
* [Fix default argument check on py> operator by chezou · Pull Request \#913 · treasure\-data/digdag](https://github.com/treasure-data/digdag/pull/913)
* [Fix digdag\.env\.add\_subtask for python3 by sonots · Pull Request \#972 · treasure\-data/digdag](https://github.com/treasure-data/digdag/pull/972)

0.0.11 (2019-01-24)
===================
* [Enhancement] `ecs_task.wait>` operator supports changeable interval and exponential backoff storategy. @Mulyu++
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ _export:
repositories:
- https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-ecs_task:0.0.11
- pro.civitaspo:digdag-operator-ecs_task:0.0.12
ecs_task:
auth_method: profile
tmp_storage:
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.0.11'
version = '0.0.12'

def digdagVersion = '0.9.31'
def scalaSemanticVersion = "2.12.6"
Expand Down
2 changes: 1 addition & 1 deletion example/ecs_task.py/echo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import yaml


def echo(message):
def echo(message: str):
print(yaml.dump(message))

2 changes: 1 addition & 1 deletion example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-ecs_task:0.0.11
- pro.civitaspo:digdag-operator-ecs_task:0.0.12
ecs_task:
auth_method: profile
tmp_storage:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#########
# Copy from https://raw.githubusercontent.com/treasure-data/digdag/52ff276bcc0aed23bf5a0df6c7a7c7b155c22d53/digdag-standards/src/main/resources/digdag/standards/py/runner.py
# Copy from https://raw.githubusercontent.com/treasure-data/digdag/6c81976334b78b3b776e357c7e9244f6bbe2711a/digdag-standards/src/main/resources/digdag/standards/py/runner.py
# Then, customize a bit about error handling
#########

import collections
import sys
import os
import json
import imp
import inspect
import json
import os
import sys
import collections
import traceback

command = sys.argv[1]
Expand All @@ -31,7 +31,6 @@
# fake digdag module already imported
digdag_mod = sys.modules['digdag'] = imp.new_module('digdag')


class Env(object):
def __init__(self, digdag_env_mod):
self.params = digdag_env_mod.params
Expand Down Expand Up @@ -60,7 +59,7 @@ def add_subtask(self, function=None, **params):
command = ".".join([function.im_class.__module__, function.im_class.__name__, function.__name__])
else:
# Python 3
command = ".".join([function.__module__, function.__name__])
command = ".".join([function.__module__, function.__qualname__])
config = params
config["py>"] = command
else:
Expand All @@ -76,13 +75,12 @@ def add_subtask(self, function=None, **params):
self.subtask_config["+subtask" + str(self.subtask_index)] = config
self.subtask_index += 1


digdag_mod.env = Env(digdag_env_mod)
import digdag

# add the archive path to improt path
sys.path.append(os.path.abspath(os.getcwd()))


def digdag_inspect_command(command):
# package.name.Class.method
fragments = command.split(".")
Expand Down Expand Up @@ -112,20 +110,25 @@ def digdag_inspect_command(command):
else:
return (callable_type, None)


def digdag_inspect_arguments(callable_type, exclude_self, params):
if callable_type == object.__init__:
# object.__init__ accepts *varargs and **keywords but it throws exception
return {}
spec = inspect.getargspec(callable_type)
if hasattr(inspect, 'getfullargspec'): # Python3
spec = inspect.getfullargspec(callable_type)
keywords_ = spec.varkw
else: # Python 2
spec = inspect.getargspec(callable_type)
keywords_ = spec.keywords

args = {}
for idx, key in enumerate(spec.args):
if exclude_self and idx == 0:
continue
if key in params:
args[key] = params[key]
else:
if spec.defaults is None or len(spec.defaults) < idx:
if spec.defaults is None or idx < len(spec.args) - len(spec.defaults):
# this keyword is required but not in params. raising an error.
if hasattr(callable_type, '__qualname__'):
# Python 3
Expand All @@ -136,13 +139,13 @@ def digdag_inspect_arguments(callable_type, exclude_self, params):
else:
name = callable_type.__name__
raise TypeError("Method '%s' requires parameter '%s' but not set" % (name, key))
if spec.keywords:
if keywords_:
# above code was only for validation
return params
else:
return args


##### begin: Custom Error Handling Code #####
status_params = {}
def with_error_handler(func, **func_args):
try:
Expand All @@ -154,6 +157,7 @@ def with_error_handler(func, **func_args):
status_params['error_message'] = str(e)
status_params['error_stacktrace'] = traceback.format_exc()
print('message: {}, stacktrace: {}'.format(str(e), traceback.format_exc()))
##### end: Custom Error Handling Code #####

callable_type, method_name = digdag_inspect_command(command)

Expand All @@ -163,21 +167,24 @@ def with_error_handler(func, **func_args):

method = getattr(instance, method_name)
method_args = digdag_inspect_arguments(method, True, params)
# Replace the below code to customize error hadling
# result = method(**method_args)
result = with_error_handler(method, **method_args)

else:
args = digdag_inspect_arguments(callable_type, False, params)
# Replace the below code to customize error hadling
# result = callable_type(**args)
result = with_error_handler(callable_type, **args)

out = {
'subtask_config': digdag_env.subtask_config,
'export_params': digdag_env.export_params,
'store_params': digdag_env.store_params,
#'state_params': digdag_env.state_params, # only for retrying
'status_params': status_params, # only for ecs_task.command_result_internal
# 'state_params': digdag_env.state_params, # only for retrying
}

with open(out_file, 'w') as f:
json.dump(out, f)

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package pro.civitaspo.digdag.plugin

package object ecs_task {

val VERSION: String = "0.0.11"
val VERSION: String = "0.0.12"

}

0 comments on commit bae3eb4

Please sign in to comment.