Skip to content

Commit

Permalink
Use embulk-spi v0.11
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroyuki-sato committed May 8, 2024
1 parent afc7b6f commit c8aba49
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 32 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ java {
}

dependencies {

compileOnly "org.embulk:embulk-spi:0.11"
implementation "org.embulk:embulk-util-config:0.5.0"
implementation "org.embulk:embulk-util-file:0.2.0"

testImplementation "junit:junit:4.13.2"
testImplementation "org.embulk:embulk-deps:0.11.3"
testImplementation "org.embulk:embulk-junit4:0.11.3"
}

embulkPlugin {
Expand Down
55 changes: 28 additions & 27 deletions src/main/java/org/embulk/input/CommandFileInputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
package org.embulk.input;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.io.InputStream;
import java.io.IOException;
import java.io.FilterInputStream;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.base.Throwables;
import org.embulk.config.TaskReport;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.util.config.Config;
import org.embulk.util.config.ConfigDefault;
import org.embulk.util.config.ConfigMapper;
import org.embulk.util.config.ConfigMapperFactory;
import org.embulk.util.config.Task;
import org.embulk.util.config.TaskMapper;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigInject;
import org.embulk.config.ConfigSource;
import org.embulk.config.ConfigException;
import org.embulk.config.Task;
import org.embulk.config.TaskSource;
import org.embulk.spi.BufferAllocator;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInputPlugin;
import org.embulk.spi.TransactionalFileInput;
import org.embulk.spi.util.InputStreamFileInput;
import org.embulk.util.file.InputStreamFileInput;
import org.slf4j.LoggerFactory;

public class CommandFileInputPlugin
implements FileInputPlugin
Expand All @@ -38,16 +38,13 @@ public interface PluginTask
@ConfigDefault("\"stdout\"")
public String getPipe();

@ConfigInject
public BufferAllocator getBufferAllocator();
}

private final Logger logger = Exec.getLogger(getClass());

@Override
public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control)
{
PluginTask task = config.loadConfig(PluginTask.class);
final ConfigMapper configMapper = CONFIG_MAPPER_FACTORY.createConfigMapper();
final PluginTask task = configMapper.map(config, PluginTask.class);

switch (task.getPipe()) {
case "stdout":
Expand All @@ -59,7 +56,7 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr
"Unknown 'pipe' option '%s'. It must be either 'stdout' or 'stderr'", task.getPipe()));
}

return resume(task.dump(), 1, control);
return resume(task.toTaskSource(), 1, control);
}

@Override
Expand All @@ -68,7 +65,8 @@ public ConfigDiff resume(TaskSource taskSource,
FileInputPlugin.Control control)
{
control.run(taskSource, taskCount);
return Exec.newConfigDiff();

return CONFIG_MAPPER_FACTORY.newConfigDiff();
}

@Override
Expand All @@ -81,7 +79,8 @@ public void cleanup(TaskSource taskSource,
@Override
public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
{
PluginTask task = taskSource.loadTask(PluginTask.class);
final TaskMapper taskMapper = CONFIG_MAPPER_FACTORY.createTaskMapper();
final PluginTask task = taskMapper.map(taskSource, PluginTask.class);

List<String> cmdline = new ArrayList<String>();
cmdline.addAll(buildShell());
Expand Down Expand Up @@ -123,18 +122,17 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
}
}
} catch (IOException ex) {
throw Throwables.propagate(ex);
throw new RuntimeException(ex);
}
}

@VisibleForTesting
static List<String> buildShell()
protected static List<String> buildShell()
{
String osName = System.getProperty("os.name");
if(osName.indexOf("Windows") >= 0) {
return ImmutableList.of("PowerShell.exe", "-Command");
return Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command"));
} else {
return ImmutableList.of("sh", "-c");
return Collections.unmodifiableList(Arrays.asList("sh", "-c"));
}
}

Expand Down Expand Up @@ -193,7 +191,7 @@ private synchronized void waitFor() throws IOException
try {
code = process.waitFor();
} catch (InterruptedException ex) {
throw Throwables.propagate(ex);
throw new RuntimeException(ex);
}
process = null;
if (code != 0) {
Expand All @@ -212,7 +210,7 @@ public static class PluginFileInput
private static class SingleFileProvider
implements InputStreamFileInput.Provider
{
private InputStream stream;
private final InputStream stream;
private boolean opened = false;

public SingleFileProvider(InputStream stream)
Expand Down Expand Up @@ -241,17 +239,20 @@ public void close() throws IOException

public PluginFileInput(PluginTask task, InputStream stream)
{
super(task.getBufferAllocator(), new SingleFileProvider(stream));
super(Exec.getBufferAllocator(), new SingleFileProvider(stream));
}

public void abort() { }

public TaskReport commit()
{
return Exec.newTaskReport();
return CONFIG_MAPPER_FACTORY.newTaskReport();
}

@Override
public void close() { }
}
private static final Logger logger = LoggerFactory.getLogger(CommandFileInputPlugin.class);

private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build();
}
10 changes: 5 additions & 5 deletions src/test/java/org/embulk/input/TestCommandFileInputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.embulk.input;

import com.google.common.collect.ImmutableList;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import org.junit.Rule;
import org.junit.Test;
import org.embulk.EmbulkTestRuntime;
import org.embulk.test.EmbulkTestRuntime;
import static org.embulk.input.CommandFileInputPlugin.buildShell;

import java.util.List;
Expand All @@ -19,10 +19,10 @@ public class TestCommandFileInputPlugin
@Test
public void testShell() {
if (System.getProperty("os.name").indexOf("Windows") >= 0) {
assertEquals(ImmutableList.of("PowerShell.exe", "-Command"), buildShell());
assertEquals(Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command")), buildShell());
}
else {
assertEquals(ImmutableList.of("sh", "-c"), buildShell());
assertEquals(Collections.unmodifiableList(Arrays.asList("sh", "-c")), buildShell());
}
}
}

0 comments on commit c8aba49

Please sign in to comment.