diff --git a/cmd/substreams/codegen.go b/cmd/substreams/codegen.go index 7c976c214..b83de5700 100644 --- a/cmd/substreams/codegen.go +++ b/cmd/substreams/codegen.go @@ -54,7 +54,7 @@ func runCodeGen(cmd *cobra.Command, args []string) error { return fmt.Errorf("computing working directory: %w", err) } workingDir := filepath.Dir(manifestAbsPath) - manif, err := manifest.LoadManifestFile(manifestAbsPath) + manif, err := manifest.LoadManifestFile(manifestAbsPath, workingDir) if err != nil { return fmt.Errorf("loading manifest: %w", err) } diff --git a/cmd/substreams/pack.go b/cmd/substreams/pack.go index d3fb9a489..7283bc97a 100644 --- a/cmd/substreams/pack.go +++ b/cmd/substreams/pack.go @@ -2,7 +2,6 @@ package main import ( "fmt" - "gopkg.in/yaml.v3" "io/ioutil" "os" "path/filepath" @@ -38,7 +37,7 @@ func init() { replaced by "-") and "" is "package.version" value. You can use "{version}" which resolves to "package.version". `)) - packCmd.Flags().StringArrayP("config", "c", []string{}, cli.FlagDescription(`path to a configuration file that contains overrides for the manifest`)) + //packCmd.Flags().StringArrayP("config", "c", []string{}, cli.FlagDescription(`path to a configuration file that contains overrides for the manifest`)) } func runPack(cmd *cobra.Command, args []string) error { @@ -47,23 +46,22 @@ func runPack(cmd *cobra.Command, args []string) error { manifestPath = args[0] } - // Get the value of the -c flag - overridePaths, _ := cmd.Flags().GetStringArray("config") - var manifestReaderOptions []manifest.Option - // If the overridePath is provided, read, decode it and add to manifestReaderOptions - if len(overridePaths) > 0 { - var overrides []*manifest.ConfigurationOverride - for _, overridePath := range overridePaths { - overrideConfig, err := readOverrideConfig(overridePath) - if err != nil { - return fmt.Errorf("reading override config %q: %w", overridePath, err) - } - overrides = append(overrides, overrideConfig) - } - manifestReaderOptions = append(manifestReaderOptions, manifest.WithOverrides(overrides...)) - } + //// Get the value of the -c flag + //overridePaths, _ := cmd.Flags().GetStringArray("config") + //// If the overridePath is provided, read, decode it and add to manifestReaderOptions + //if len(overridePaths) > 0 { + // var overrides []*manifest.ConfigurationOverride + // for _, overridePath := range overridePaths { + // overrideConfig, err := readOverrideConfig(overridePath) + // if err != nil { + // return fmt.Errorf("reading override config %q: %w", overridePath, err) + // } + // overrides = append(overrides, overrideConfig) + // } + // manifestReaderOptions = append(manifestReaderOptions, manifest.WithOverrides(overrides...)) + //} // Use the manifestReaderOptions while creating the manifest reader manifestReader, err := manifest.NewReader(manifestPath, manifestReaderOptions...) @@ -120,19 +118,3 @@ func resolveOutputFile(input string, bindings map[string]string) string { return input } - -// This function reads and decodes the override configuration from a given path -func readOverrideConfig(path string) (*manifest.ConfigurationOverride, error) { - fileBytes, err := os.ReadFile(path) - if err != nil { - return nil, err - } - - overrideConfig := &manifest.ConfigurationOverride{} - err = yaml.Unmarshal(fileBytes, overrideConfig) - if err != nil { - return nil, err - } - - return overrideConfig, nil -} diff --git a/codegen/init_test.go b/codegen/init_test.go index 2046cdc3d..1ca4a182c 100644 --- a/codegen/init_test.go +++ b/codegen/init_test.go @@ -23,7 +23,7 @@ func InitTestGenerator(t *testing.T) *Generator { panic(fmt.Errorf("reading manifest file %s :%w", manifestPath, err)) } - manif, err := manifest.LoadManifestFile(manifestPath) + manif, err := manifest.LoadManifestFile(manifestPath, ".") require.NoError(t, err) return NewGenerator(pkg, manif, protoDefinitions, "") diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 760c3f35d..3ae721fa1 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -29,6 +29,27 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * `substreams info` command now properly displays the content of sink configs, optionally writing the fields that were bundled from files to disk with `--output-sinkconfig-files-path=` +### Changed + +* The override feature has been overhauled. Users may now override an existing substreams by pointing to an override file in `run` or `gui` command. This override manifest will have a `deriveFrom` field which points to the original substreams which is to be overriden. + +Example of an override manifest: +``` +deriveFrom: path/to/original.spkg #this can also be a remote url + +package: + name: "overriden_package_name" + version: "100.0.0" + +network: polygon + +initialBlocks: + module_name_1: 17500000 + module_name_1: 17500000 +params: + module_name_1: "override_paramete_value_here" +``` + ## v1.1.14 ### Bug fixes diff --git a/manifest/manifest.go b/manifest/manifest.go index 2e45cfe33..c9bff2b79 100644 --- a/manifest/manifest.go +++ b/manifest/manifest.go @@ -111,7 +111,12 @@ type StreamOutput struct { Type string `yaml:"type"` } -func decodeYamlManifestFromFile(yamlFilePath string) (out *Manifest, err error) { +func decodeYamlManifestFromFile(yamlFilePath, workingDir string) (out *Manifest, err error) { + //if yamlFilePath is a relative path, make it absolute + if !filepath.IsAbs(yamlFilePath) { + yamlFilePath = filepath.Join(workingDir, yamlFilePath) + } + cnt, err := os.ReadFile(yamlFilePath) if err != nil { return nil, fmt.Errorf("reading substreams manifest %q: %w", yamlFilePath, err) diff --git a/manifest/manifest_override.go b/manifest/manifest_override.go deleted file mode 100644 index f7c064962..000000000 --- a/manifest/manifest_override.go +++ /dev/null @@ -1,34 +0,0 @@ -package manifest - -import "path/filepath" - -type ManifestOverrideConfiguration struct { - Package PackageMetaOverride `yaml:"package"` - - Network string `yaml:"network"` - InitialBlocks map[string]uint64 `yaml:"initialBlocks"` - Params map[string]string `yaml:"params"` - - Workdir string `yaml:"-"` -} - -func (m *ManifestOverrideConfiguration) resolvePath(path string) string { - if m.Workdir == "" || filepath.IsAbs(path) || httpSchemePrefixRegex.MatchString(path) { - return path - } - - return filepath.Join(m.Workdir, path) -} - -func (o *ManifestOverrideConfiguration) Apply(m *Manifest) error { - //recaluclate initial block graph? - - return nil -} - -type PackageMetaOverride struct { - Name string `yaml:"name"` - //Version string `yaml:"version"` // Semver for package authors - //URL string `yaml:"url"` - //Doc string `yaml:"doc"` -} diff --git a/manifest/manifest_override_test.go b/manifest/manifest_override_test.go deleted file mode 100644 index 88367b0a0..000000000 --- a/manifest/manifest_override_test.go +++ /dev/null @@ -1 +0,0 @@ -package manifest diff --git a/manifest/manifest_test.go b/manifest/manifest_test.go index ceaebd2ed..764f462c5 100644 --- a/manifest/manifest_test.go +++ b/manifest/manifest_test.go @@ -11,7 +11,7 @@ import ( ) func TestManifest_YamlUnmarshal(t *testing.T) { - manifest, err := decodeYamlManifestFromFile("./test/test_manifest.yaml") + manifest, err := decodeYamlManifestFromFile("./test/test_manifest.yaml", ".") assert.NoError(t, err) assert.GreaterOrEqual(t, len(manifest.Modules), 1) } diff --git a/manifest/override.go b/manifest/override.go index 070cdb6de..ca1a74de3 100644 --- a/manifest/override.go +++ b/manifest/override.go @@ -1,47 +1,88 @@ package manifest import ( - "encoding/base64" pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" - "google.golang.org/protobuf/types/known/anypb" - "io" - "strings" ) type ConfigurationOverride struct { Package *PackageOverride `yaml:"package,omitempty"` - Network string `yaml:"network,omitempty"` - InitialBlocks map[string]int64 `yaml:"initialBlocks,omitempty"` + Network *string `yaml:"network,omitempty"` + InitialBlocks map[string]uint64 `yaml:"initialBlocks,omitempty"` Params map[string]string `yaml:"params,omitempty"` - SinkConfig *SinkConfigOverride `yaml:"sinkConfig,omitempty"` - SinkModule string `yaml:"sinkModule,omitempty"` -} - -type SinkConfigOverride struct { - TypeUrl string `yaml:"typeUrl,omitempty"` - Value string `yaml:"value,omitempty"` + DeriveFrom string `yaml:"deriveFrom,omitempty"` } type PackageOverride struct { - Name string `yaml:"name,omitempty"` + Name *string `yaml:"name,omitempty"` + Version *string `yaml:"version,omitempty"` } -func mergeManifests(main *pbsubstreams.Package, override *ConfigurationOverride) error { - if override.Package != nil && override.Package.Name != "" { - if main.PackageMeta == nil { - main.PackageMeta = []*pbsubstreams.PackageMetadata{} +func mergeOverrides(overrides ...*ConfigurationOverride) *ConfigurationOverride { + var merged *ConfigurationOverride + + for _, override := range overrides { + if override == nil { + continue + } + + if merged == nil { + merged = &ConfigurationOverride{} + } + + if override.Package != nil { + if merged.Package == nil { + merged.Package = &PackageOverride{} + } + + if override.Package.Name != nil { + merged.Package.Name = override.Package.Name + } + + if override.Package.Version != nil { + merged.Package.Version = override.Package.Version + } + } + + if override.Network != nil { + merged.Network = override.Network + } + + if override.InitialBlocks != nil { + if merged.InitialBlocks == nil { + merged.InitialBlocks = make(map[string]uint64) + } + + for name, block := range override.InitialBlocks { + merged.InitialBlocks[name] = block + } } - if len(main.PackageMeta) == 0 { - main.PackageMeta = append(main.PackageMeta, &pbsubstreams.PackageMetadata{Name: override.Package.Name}) - } else { - main.PackageMeta[0].Name = override.Package.Name + if override.Params != nil { + if merged.Params == nil { + merged.Params = make(map[string]string) + } + + for name, value := range override.Params { + merged.Params[name] = value + } } } - if override.Network != "" { - main.Network = override.Network + return merged +} + +func applyOverride(main *pbsubstreams.Package, override *ConfigurationOverride) error { + if override == nil { + return nil + } + + if override.Package != nil { + mergePackageMeta(main, override) + } + + if override.Network != nil { + main.Network = *override.Network } if override.Params != nil { @@ -52,26 +93,29 @@ func mergeManifests(main *pbsubstreams.Package, override *ConfigurationOverride) mergeInitialBlocks(main, override) } - // Overriding SinkModule - if override.Package != nil && override.SinkModule != "" { - main.SinkModule = override.SinkModule + return nil +} + +func mergePackageMeta(main *pbsubstreams.Package, override *ConfigurationOverride) { + if override.Package == nil { + return } - // Overriding SinkConfig - if override.Package != nil && override.SinkConfig != nil { - decoder := base64.NewDecoder(base64.StdEncoding, strings.NewReader(string(override.SinkConfig.Value))) - configValueBytes, err := io.ReadAll(decoder) - if err != nil { - return err - } + currentPackageMeta := main.GetPackageMeta() + if currentPackageMeta == nil { + currentPackageMeta = []*pbsubstreams.PackageMetadata{} + } + if len(currentPackageMeta) == 0 { + currentPackageMeta = append(currentPackageMeta, &pbsubstreams.PackageMetadata{}) + } - main.SinkConfig = &anypb.Any{ - TypeUrl: override.SinkConfig.TypeUrl, - Value: configValueBytes, - } + if override.Package.Name != nil { + currentPackageMeta[0].Name = *override.Package.Name } - return nil + if override.Package.Version != nil { + currentPackageMeta[0].Version = *override.Package.Version + } } func mergeInitialBlocks(main *pbsubstreams.Package, override *ConfigurationOverride) { @@ -119,53 +163,4 @@ func mergeParams(main *pbsubstreams.Package, override *ConfigurationOverride) { mainMod.Inputs = mainmodInputs } } - -} - -func mergeOverrides(overrides ...*ConfigurationOverride) *ConfigurationOverride { - merged := &ConfigurationOverride{} - - for _, override := range overrides { - if override == nil { - continue - } - - if override.Package != nil { - merged.Package = override.Package - } - - if override.Network != "" { - merged.Network = override.Network - } - - if override.InitialBlocks != nil { - if merged.InitialBlocks == nil { - merged.InitialBlocks = make(map[string]int64) - } - - for name, block := range override.InitialBlocks { - merged.InitialBlocks[name] = block - } - } - - if override.Params != nil { - if merged.Params == nil { - merged.Params = make(map[string]string) - } - - for name, value := range override.Params { - merged.Params[name] = value - } - } - - if override.SinkConfig != nil { - merged.SinkConfig = override.SinkConfig - } - - if override.SinkModule != "" { - merged.SinkModule = override.SinkModule - } - } - - return merged } diff --git a/manifest/package.go b/manifest/package.go new file mode 100644 index 000000000..cb30c9388 --- /dev/null +++ b/manifest/package.go @@ -0,0 +1,192 @@ +package manifest + +import ( + "fmt" + "os" + "path" + "path/filepath" + + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/dynamic" + pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" +) + +type manifestConverter struct { + inputPath string + + sinkConfigDynamicMessage *dynamic.Message + skipSourceCodeImportValidation bool +} + +func newManifestConverter(inputPath string, skipSourceCodeImportValidation bool) *manifestConverter { + return &manifestConverter{ + inputPath: inputPath, + skipSourceCodeImportValidation: skipSourceCodeImportValidation, + } +} + +func (r *manifestConverter) Convert(manif *Manifest) (*pbsubstreams.Package, []*desc.FileDescriptor, *dynamic.Message, error) { + loadedManifest, err := r.loadManifest(manif, r.inputPath) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to load manifest: %w", err) + } + + return r.manifestToPkg(loadedManifest) +} + +func (r *manifestConverter) loadManifest(manif *Manifest, inputPath string) (*Manifest, error) { + abs, err := filepath.Abs(r.inputPath) + if err != nil { + return nil, fmt.Errorf("unable to get working dir: %w", err) + } + manif.Workdir = path.Dir(abs) + + if manif.SpecVersion != "v0.1.0" { + return nil, fmt.Errorf("invalid 'specVersion', must be v0.1.0") + } + + // Allow environment variables in `imports` element + for i, moduleImport := range manif.Imports { + manif.Imports[i][1] = os.ExpandEnv(moduleImport[1]) + } + + // Allow environment variables in `protobuf.importPaths` element + for i := range manif.Protobuf.ImportPaths { + manif.Protobuf.ImportPaths[i] = os.ExpandEnv(manif.Protobuf.ImportPaths[i]) + } + + // TODO: put some limits on the NUMBER of modules (max 50 ?) + // TODO: put a limit on the SIZE of the WASM payload (max 10MB per binary?) + + for _, s := range manif.Modules { + // TODO: let's make sure this is also checked when received in Protobuf in a remote request. + + switch s.Kind { + case ModuleKindMap: + if s.Output.Type == "" { + return nil, fmt.Errorf("stream %q: missing 'output.type' for kind 'map'", s.Name) + } + case ModuleKindStore: + if err := validateStoreBuilder(s); err != nil { + return nil, fmt.Errorf("stream %q: %w", s.Name, err) + } + + default: + return nil, fmt.Errorf("stream %q: invalid kind %q", s.Name, s.Kind) + } + for idx, input := range s.Inputs { + if err := input.parse(); err != nil { + return nil, fmt.Errorf("module %q: invalid input [%d]: %w", s.Name, idx, err) + } + } + } + + return manif, nil +} + +func (r *manifestConverter) manifestToPkg(manif *Manifest) (*pbsubstreams.Package, []*desc.FileDescriptor, *dynamic.Message, error) { + pkg, err := r.convertToPkg(manif) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to convert manifest to pkg: %w", err) + } + + protoDefinitions, err := loadProtobufs(pkg, manif) + if err != nil { + return nil, nil, nil, fmt.Errorf("error loading protobuf: %w", err) + } + + if err := loadImports(pkg, manif); err != nil { + return nil, nil, nil, fmt.Errorf("error loading imports: %w", err) + } + + if err := r.loadSinkConfig(pkg, manif); err != nil { + return nil, nil, nil, fmt.Errorf("error parsing sink configuration: %w", err) + } + + return pkg, protoDefinitions, r.sinkConfigDynamicMessage, nil +} + +func (r *manifestConverter) convertToPkg(m *Manifest) (pkg *pbsubstreams.Package, err error) { + pkgMeta := &pbsubstreams.PackageMetadata{ + Version: m.Package.Version, + Url: m.Package.URL, + Name: m.Package.Name, + Doc: m.Package.Doc, + } + pkg = &pbsubstreams.Package{ + Version: 1, + PackageMeta: []*pbsubstreams.PackageMetadata{pkgMeta}, + Modules: &pbsubstreams.Modules{}, + Network: m.Network, + } + + moduleCodeIndexes := map[string]int{} + for _, mod := range m.Modules { + pbmeta := &pbsubstreams.ModuleMetadata{ + Doc: mod.Doc, + } + var pbmod *pbsubstreams.Module + + binaryName := "default" + implicit := "" + if mod.Binary != "" { + binaryName = mod.Binary + implicit = "(implicit) " + } + binaryDef, found := m.Binaries[binaryName] + if !found { + return nil, fmt.Errorf("module %q refers to %sbinary %q, which is not defined in the 'binaries' section of the manifest", mod.Name, implicit, binaryName) + } + + switch binaryDef.Type { + case "wasm/rust-v1": + // OPTIM(abourget): also check if it's not already in + // `Binaries`, by comparing its, length + hash or value. + codeIndex, found := moduleCodeIndexes[binaryDef.File] + if !found { + codePath := m.resolvePath(binaryDef.File) + var byteCode []byte + if !r.skipSourceCodeImportValidation { + byteCode, err = os.ReadFile(codePath) + if err != nil { + return nil, fmt.Errorf("failed to read source code %q: %w", codePath, err) + } + } + pkg.Modules.Binaries = append(pkg.Modules.Binaries, &pbsubstreams.Binary{Type: binaryDef.Type, Content: byteCode}) + codeIndex = len(pkg.Modules.Binaries) - 1 + moduleCodeIndexes[binaryDef.File] = codeIndex + } + pbmod, err = mod.ToProtoWASM(uint32(codeIndex)) + default: + return nil, fmt.Errorf("module %q: invalid code type %q", mod.Name, binaryDef.Type) + } + if err != nil { + return nil, err + } + + pkg.ModuleMeta = append(pkg.ModuleMeta, pbmeta) + pkg.Modules.Modules = append(pkg.Modules.Modules, pbmod) + } + + for modName, paramValue := range m.Params { + var modFound bool + for _, mod := range pkg.Modules.Modules { + if mod.Name == modName { + if len(mod.Inputs) == 0 { + return nil, fmt.Errorf("params value defined for module %q but module has no inputs defined, add 'params: string' to 'inputs' for module", modName) + } + p := mod.Inputs[0].GetParams() + if p == nil { + return nil, fmt.Errorf("params value defined for module %q: module %q does not have 'params' as its first input type", modName, modName) + } + p.Value = paramValue + modFound = true + } + } + if !modFound { + return nil, fmt.Errorf("params value defined for module %q, but such module is not defined", modName) + } + } + + return +} diff --git a/manifest/reader.go b/manifest/reader.go index 860677174..01f04d89c 100644 --- a/manifest/reader.go +++ b/manifest/reader.go @@ -1,10 +1,15 @@ package manifest import ( + "bytes" "context" "fmt" + ipfs "github.com/ipfs/go-ipfs-api" + "github.com/jhump/protoreflect/dynamic" + "github.com/streamingfast/dstore" + "golang.org/x/mod/semver" + "google.golang.org/protobuf/proto" "io" - "io/ioutil" "net/http" "net/url" "os" @@ -13,16 +18,12 @@ import ( "strings" "time" - ipfs "github.com/ipfs/go-ipfs-api" "github.com/jhump/protoreflect/desc" - "github.com/jhump/protoreflect/dynamic" "github.com/streamingfast/cli" - "github.com/streamingfast/dstore" pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" "go.uber.org/zap" - "golang.org/x/mod/semver" - "google.golang.org/protobuf/proto" "gopkg.in/yaml.v2" + yaml3 "gopkg.in/yaml.v3" ) var IPFSURL string @@ -61,93 +62,73 @@ func WithCollectProtoDefinitions(f func(protoDefinitions []*desc.FileDescriptor) } } -func WithOverrides(overrides ...*ConfigurationOverride) Option { - override := mergeOverrides(overrides...) - return func(r *Reader) *Reader { - r.override = override - return r +func hasRemotePrefix(in string) bool { + for _, prefix := range []string{"https://", "http://", "ipfs://", "gs://", "s3://", "az://"} { + if strings.HasPrefix(in, prefix) { + return true + } } + + return false } type Reader struct { - resolvedInput string - collectProtoDefinitionsFunc func(protoDefinitions []*desc.FileDescriptor) + currentData []byte + + originalInput string + currentInput string + + workingDir string + + pkg *pbsubstreams.Package + overrides []*ConfigurationOverride // cached values protoDefinitions []*desc.FileDescriptor sinkConfigJSON string sinkConfigDynamicMessage *dynamic.Message + collectProtoDefinitionsFunc func(protoDefinitions []*desc.FileDescriptor) + //options skipSourceCodeImportValidation bool skipModuleOutputTypeValidation bool skipPackageValidation bool - - constructorErr error - - override *ConfigurationOverride } func NewReader(input string, opts ...Option) (*Reader, error) { workingDir, err := os.Getwd() if err != nil { - return nil, fmt.Errorf("unable to get working directory: %w", err) + return nil, fmt.Errorf("unable to get working dir: %w", err) } return newReader(input, workingDir, opts...) } func MustNewReader(input string, opts ...Option) *Reader { - reader, err := NewReader(input, opts...) + r, err := NewReader(input) if err != nil { panic(err) } - return reader + return r } -func newReader(input string, workingDir string, opts ...Option) (*Reader, error) { - r := &Reader{resolvedInput: input} - for _, opt := range opts { - r = opt(r) +func newReader(input, workingDir string, opts ...Option) (*Reader, error) { + r := &Reader{ + originalInput: input, + workingDir: workingDir, } - var err error - r.resolvedInput, err = resolveInput(input, workingDir) - if err != nil { - return nil, fmt.Errorf("invalid Substreams manifest %q: %w", input, err) + for _, opt := range opts { + opt(r) } return r, nil } -func resolveInput(input string, workingDir string) (string, error) { - if hasRemotePackagePrefix(input) { - return input, nil - } - - // If empty, assign input to be `pwd`/substreams.yaml - if input == "" { - input = filepath.Join(workingDir, "substreams.yaml") - } - - // It's supposed to be a directory or a file, so we should be able to stat it and it should exists - stat, err := os.Stat(input) - if err != nil { - // Stat error already says 'stat' so no wrapping - return "", err - } - - // If it's a directory, we look actually for '/substreams.yaml' - if stat.IsDir() { - input = filepath.Join(input, "substreams.yaml") - } - - return input, nil -} - -func (r *Reader) ResolvedInput() string { - return r.resolvedInput +func (r *Reader) Read() (*pbsubstreams.Package, error) { + return r.resolvePkg() } func (r *Reader) MustRead() *pbsubstreams.Package { @@ -159,223 +140,214 @@ func (r *Reader) MustRead() *pbsubstreams.Package { return pkg } -func (r *Reader) Read() (*pbsubstreams.Package, error) { - if r.constructorErr != nil { - return nil, r.constructorErr +func (r *Reader) read() error { + if r.currentInput == "" { + r.currentInput = r.originalInput } - workingDir, err := os.Getwd() - if err != nil { - return nil, fmt.Errorf("unable to get working directory: %w", err) + input := r.currentInput + if r.IsRemotePackage(input) { + return r.readRemote(input) } - pack, err := r.read(workingDir) + return r.readLocal(input) +} + +func (r *Reader) readRemote(input string) error { + u, err := url.Parse(input) if err != nil { - return nil, fmt.Errorf("read: %w", err) + return fmt.Errorf("unable to parse url %q: %w", input, err) } - if r.override != nil { - err := mergeManifests(pack, r.override) - if err != nil { - return nil, fmt.Errorf("merge overrides: %w", err) - } + if u.Scheme == "gs" || u.Scheme == "s3" || u.Scheme == "az" { + return r.readFromStore(input) + } + + if u.Scheme == "ipfs" { + return r.readFromIPFS(u.Host) } - return pack, nil + return r.readFromHttp(input) } -func (r *Reader) read(workingDir string) (*pbsubstreams.Package, error) { - if r.IsRemotePackage() { - return r.newPkgFromURL(r.resolvedInput) +func (r *Reader) readFromHttp(input string) error { + resp, err := httpClient.Get(input) + if err != nil { + return fmt.Errorf("error downloading %q: %w", input, err) } - input := r.resolvedInput - if strings.HasSuffix(input, ".yaml") { - pkg, protoDefinitions, err := r.newPkgFromManifest(input) - if err != nil { - return nil, err - } - if r.collectProtoDefinitionsFunc != nil { - r.collectProtoDefinitionsFunc(protoDefinitions) - } - r.protoDefinitions = protoDefinitions - return pkg, nil + b, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading %q: %w", input, err) } - return r.newPkgFromFile(input) + r.currentData = b + return nil } -// IsRemotePackage determines if reader's input to read the manifest is a remote file accessible over -// HTTP/HTTPS, Google Cloud Storage, S3 or Azure Storage. -func (r *Reader) IsRemotePackage() bool { - return hasRemotePackagePrefix(r.resolvedInput) -} +func (r *Reader) readFromStore(input string) error { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() -func hasRemotePackagePrefix(in string) bool { - for _, prefix := range []string{"https://", "http://", "ipfs://", "gs://", "s3://", "az://"} { - if strings.HasPrefix(in, prefix) { - return true - } + b, err := dstore.ReadObject(ctx, input) + if err != nil { + return fmt.Errorf("error reading %q: %w", input, err) } - return false + r.currentData = b + return nil } -// IsLikelyManifestInput determines if the input is likely a manifest input, which is determined -// by checking: -// - If the input starts with remote prefix ("https://", "http://", "ipfs://", "gs://", "s3://", "az://") -// - If the input ends with `.yaml` -// - If the input is a directory (we check for path separator) -func IsLikelyManifestInput(in string) bool { - if hasRemotePackagePrefix(in) { - return true +func (r *Reader) readFromIPFS(input string) error { + readIPFSContent := func(hash string, sh *ipfs.Shell) ([]byte, error) { + readCloser, err := sh.Cat(hash) + if err != nil { + return nil, err + } + defer readCloser.Close() + return io.ReadAll(readCloser) } - if strings.HasSuffix(in, ".yaml") { - return true - } + sh := ipfs.NewShell(IPFSURL) + sh.SetTimeout(IPFSTimeout) - if strings.Contains(in, string(os.PathSeparator)) { - return true + b, err := readIPFSContent(input, sh) + if err != nil { + return fmt.Errorf("unable to read ipfs content %q: %w", input, err) } - return cli.DirectoryExists(in) || cli.FileExists(in) -} + r.currentData = b -// IsLocalManifest determines if reader's input to read the manifest is a local manifest file, which is determined -// by ensure it's not a remote package and if the file end with `.yaml`. -func (r *Reader) IsLocalManifest() bool { - if r.IsRemotePackage() { - return false + if r.isOverride() { + return nil } - return strings.HasSuffix(r.resolvedInput, ".yaml") -} - -func (r *Reader) newPkgFromFile(inputFilePath string) (pkg *pbsubstreams.Package, err error) { - cnt, err := os.ReadFile(inputFilePath) - if err != nil { - return nil, fmt.Errorf("error reading %q: %w", inputFilePath, err) + type subgraphManifest struct { + DataSources []struct { + Kind string `yaml:"kind"` + Source struct { + Package struct { + File map[string]string `yaml:"file"` + } `yaml:"package"` + } `yaml:"source"` + } `yaml:"dataSources"` } - return r.fromContents(cnt) -} - -func (r *Reader) newPkgFromURL(fileURL string) (pkg *pbsubstreams.Package, err error) { - u, err := url.Parse(fileURL) - if err != nil { - panic(fmt.Errorf("fileURL %q should have been valid by that execution point but it seems it was not: %w", fileURL, err)) + manifest := &subgraphManifest{} + err = yaml.Unmarshal(b, manifest) + if err != nil || len(manifest.DataSources) == 0 { + // not a valid manifest, it is probably the spkg itself + return nil } - if u.Scheme == "gs" || u.Scheme == "s3" || u.Scheme == "az" { - return r.newPkgFromStore(fileURL) + r.currentData = nil + + if manifest.DataSources[0].Kind != "substreams" { + return fmt.Errorf("given ipfs hash is not a substreams-based subgraph") } - if u.Scheme == "ipfs" { - return r.newPkgFromIPFS(u.Host) + var spkgHash string + if len(manifest.DataSources) > 0 && manifest.DataSources[0].Source.Package.File != nil { + spkgHash = manifest.DataSources[0].Source.Package.File["/"] } - resp, err := httpClient.Get(fileURL) - if err != nil { - return nil, fmt.Errorf("error downloading %q: %w", fileURL, err) + if spkgHash == "" { + return fmt.Errorf("no spkg hash found in manifest") } - cnt, err := io.ReadAll(resp.Body) + b, err = readIPFSContent(spkgHash, sh) if err != nil { - return nil, fmt.Errorf("error reading %q: %w", fileURL, err) + return fmt.Errorf("unable to read spkg from ipfs %q: %w", spkgHash, err) } - return r.fromContents(cnt) + r.currentData = b + return nil } -func (r *Reader) newPkgFromStore(fileURL string) (pkg *pbsubstreams.Package, err error) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - cnt, err := dstore.ReadObject(ctx, fileURL) +func (r *Reader) readLocal(input string) error { + err := r.resolveInputPath(input) if err != nil { - return nil, fmt.Errorf("error reading %q: %w", fileURL, err) + return fmt.Errorf("unable to resolve input path %q: %w", input, err) } + input = r.currentInput - return r.fromContents(cnt) -} - -type subgraphManifest struct { - DataSources []struct { - Kind string `yaml:"kind"` - Source struct { - Package struct { - File map[string]string `yaml:"file"` - } `yaml:"package"` - } `yaml:"source"` - } `yaml:"dataSources"` -} - -func readIPFSContent(hash string, sh *ipfs.Shell) ([]byte, error) { - readCloser, err := sh.Cat(hash) + b, err := os.ReadFile(input) if err != nil { - return nil, err + return fmt.Errorf("unable to read file %q: %w", input, err) } - defer readCloser.Close() - return ioutil.ReadAll(readCloser) -} -func (r *Reader) newPkgFromIPFS(hash string) (pkg *pbsubstreams.Package, err error) { - sh := ipfs.NewShell(IPFSURL) - sh.SetTimeout(IPFSTimeout) + r.currentData = b + return nil +} - cnt, err := readIPFSContent(hash, sh) - if err != nil { - return nil, err +func (r *Reader) resolveInputPath(input string) error { + if r.IsRemotePackage(input) { + r.currentInput = input + return nil } - manifest := &subgraphManifest{} - err = yaml.Unmarshal(cnt, manifest) - if err != nil || len(manifest.DataSources) == 0 { - // not a valid manifest, maybe it's the spkg itself - return r.fromContents(cnt) + if input == "" { + r.currentInput = filepath.Join(r.workingDir, "substreams.yaml") + return nil } - if manifest.DataSources[0].Kind != "substreams" { - return nil, fmt.Errorf("given ipfs hash is not a substreams-based subgraph") + stat, err := os.Stat(input) + if err != nil { + return fmt.Errorf("unable to stat input file %q: %w", input, err) } - spkgHash := manifest.DataSources[0].Source.Package.File["/"] - - cnt, err = readIPFSContent(spkgHash, sh) - if err != nil { - return nil, err + if stat.IsDir() { + r.currentInput = filepath.Join(input, "substreams.yaml") + return nil } - return r.fromContents(cnt) + return nil } -func (r *Reader) newPkgFromManifest(inputPath string) (pkg *pbsubstreams.Package, protoDefinitions []*desc.FileDescriptor, err error) { - manif, err := LoadManifestFile(inputPath) - if err != nil { - return nil, nil, err +func (r *Reader) isOverride() bool { + if r.currentData == nil { + return false } + return bytes.Contains(r.currentData, []byte("deriveFrom:")) +} - pkg, protoDefinitions, err = r.manifestToPkg(manif) - if err != nil { - return nil, nil, err +func (r *Reader) getPkg() (*pbsubstreams.Package, error) { + if r.currentData == nil { + return nil, fmt.Errorf("no result available") } - if err := r.validate(pkg); err != nil { - return nil, nil, fmt.Errorf("failed validation: %w", err) + if r.isOverride() { + return nil, fmt.Errorf("cannot get package from override") } - return pkg, protoDefinitions, nil -} + if strings.HasSuffix(r.currentInput, ".yaml") || strings.HasSuffix(r.currentInput, ".yml") { + manif := &Manifest{} + decoder := yaml3.NewDecoder(bytes.NewReader(r.currentData)) + decoder.KnownFields(true) + + if err := decoder.Decode(&manif); err != nil { + return nil, fmt.Errorf("unable to unmarshal manifest: %w", err) + } + + pkg, err := r.newPkgFromManifest(manif) + if err != nil { + return nil, fmt.Errorf("unable to convert manifest to package: %w", err) + } -func (r *Reader) fromContents(contents []byte) (pkg *pbsubstreams.Package, err error) { - pkg = &pbsubstreams.Package{} - if err := proto.Unmarshal(contents, pkg); err != nil { - return nil, fmt.Errorf("unmarshalling: %w", err) + if err := r.validate(pkg); err != nil { + return nil, fmt.Errorf("failed validation: %w", err) + } + return pkg, nil + } + + pkg := &pbsubstreams.Package{} + err := proto.Unmarshal(r.currentData, pkg) + if err != nil { + return nil, fmt.Errorf("unable to unmarshal package: %w", err) } if err := r.validate(pkg); err != nil { - return nil, fmt.Errorf("validation failed: %w", err) + return nil, fmt.Errorf("failed validation: %w", err) } return pkg, nil @@ -394,14 +366,6 @@ func (r *Reader) validate(pkg *pbsubstreams.Package) error { return nil } -// validatePackage validates a package just produced or just read from -// disk. -// -// validatePackage is run only by the client, as the server doesn't -// have access to the full Package. -// -// WARN: put ANY MODULES validation that need to be applied by the -// server in `ValidateModules`. func (r *Reader) validatePackage(pkg *pbsubstreams.Package) error { if len(pkg.ModuleMeta) != len(pkg.Modules.Modules) { return fmt.Errorf("inconsistent package, metadata for modules not same length as modules list") @@ -471,6 +435,116 @@ func (r *Reader) validatePackage(pkg *pbsubstreams.Package) error { return nil } +func (r *Reader) newPkgFromManifest(manif *Manifest) (*pbsubstreams.Package, error) { + converter := newManifestConverter(r.currentInput, r.skipSourceCodeImportValidation) + pkg, descriptors, dynMessage, err := converter.Convert(manif) + if err != nil { + return nil, fmt.Errorf("unable to convert manifest to package: %w", err) + } + r.sinkConfigDynamicMessage = dynMessage + + if r.collectProtoDefinitionsFunc != nil { + r.collectProtoDefinitionsFunc(descriptors) + } + r.protoDefinitions = descriptors + return pkg, nil +} + +func (r *Reader) getOverride() (*ConfigurationOverride, error) { + if r.currentData == nil { + return nil, fmt.Errorf("no result available") + } + + if !r.isOverride() { + return nil, fmt.Errorf("not an override") + } + + override := &ConfigurationOverride{} + err := yaml.Unmarshal(r.currentData, override) + if err != nil { + return nil, fmt.Errorf("unable to unmarshal override: %w", err) + } + + return override, nil +} + +func (r *Reader) resolvePkg() (*pbsubstreams.Package, error) { + if r.pkg != nil { + return r.pkg, nil + } + + err := r.read() + if err != nil { + return nil, err + } + + if r.isOverride() { + or, err := r.getOverride() + if err != nil { + return nil, fmt.Errorf("unable to get override: %w", err) + } + r.overrides = append(r.overrides, or) + r.currentInput = or.DeriveFrom + + return r.resolvePkg() + } + + pkg, err := r.getPkg() + if err != nil { + return nil, fmt.Errorf("unable to get package: %w", err) + } + + //reverse order r.overrides to be able to squash them in the right order + for i, j := 0, len(r.overrides)-1; i < j; i, j = i+1, j-1 { + r.overrides[i], r.overrides[j] = r.overrides[j], r.overrides[i] + } + + mergedOverride := mergeOverrides(r.overrides...) + if err := applyOverride(pkg, mergedOverride); err != nil { + return nil, fmt.Errorf("applying override: %w", err) + } + + r.pkg = pkg + return pkg, nil +} + +// IsRemotePackage determines if reader's input to read the manifest is a remote file accessible over +// HTTP/HTTPS, Google Cloud Storage, S3 or Azure Storage. +func (r *Reader) IsRemotePackage(input string) bool { + return hasRemotePrefix(input) +} + +// IsLocalManifest determines if reader's input to read the manifest is a local manifest file, which is determined +// by ensure it's not a remote package and if the file end with `.yaml`. +func (r *Reader) IsLocalManifest() bool { + if r.IsRemotePackage(r.currentInput) { + return false + } + + return strings.HasSuffix(r.currentInput, ".yaml") +} + +// IsLikelyManifestInput determines if the input is likely a manifest input, which is determined +// by checking: +// - If the input starts with remote prefix ("https://", "http://", "ipfs://", "gs://", "s3://", "az://") +// - If the input ends with `.yaml` +// - If the input is a directory (we check for path separator) +func IsLikelyManifestInput(in string) bool { + if hasRemotePrefix(in) { + return true + } + + if strings.HasSuffix(in, ".yaml") { + return true + } + + if strings.Contains(in, string(os.PathSeparator)) { + return true + } + + return cli.DirectoryExists(in) || cli.FileExists(in) +} + func duplicateStringInput(in *pbsubstreams.Module_Input) string { if in == nil { return "" @@ -569,8 +643,8 @@ func ValidateModules(mods *pbsubstreams.Modules) error { return nil } -func LoadManifestFile(inputPath string) (*Manifest, error) { - m, err := decodeYamlManifestFromFile(inputPath) +func LoadManifestFile(inputPath, workingDir string) (*Manifest, error) { + m, err := decodeYamlManifestFromFile(inputPath, workingDir) if err != nil { return nil, fmt.Errorf("decoding yaml: %w", err) } @@ -708,116 +782,6 @@ func mergeProtoFiles(src, dest *pbsubstreams.Package) { // recent, but it'll simply go in list order.. } -// manifestToPkg will take a Manifest object, most likely generated from a YAML file, and will create a Proto Pakcage object -// in some cases we do not want to validate the package and ensure that all the code and dependencies are there fro example -// when we are using the generated package transitively -func (r *Reader) manifestToPkg(m *Manifest) (*pbsubstreams.Package, []*desc.FileDescriptor, error) { - pkg, err := r.convertToPkg(m) - if err != nil { - return nil, nil, fmt.Errorf("failed to convert manifest to pkg: %w", err) - } - - protoDefinitions, err := loadProtobufs(pkg, m) - if err != nil { - return nil, nil, fmt.Errorf("error loading protobuf: %w", err) - } - - if err := loadImports(pkg, m); err != nil { - return nil, nil, fmt.Errorf("error loading imports: %w", err) - } - - if err := r.loadSinkConfig(pkg, m); err != nil { - return nil, nil, fmt.Errorf("error parsing sink configuration: %w", err) - } - - return pkg, protoDefinitions, nil -} - -func (r *Reader) convertToPkg(m *Manifest) (pkg *pbsubstreams.Package, err error) { - pkgMeta := &pbsubstreams.PackageMetadata{ - Version: m.Package.Version, - Url: m.Package.URL, - Name: m.Package.Name, - Doc: m.Package.Doc, - } - pkg = &pbsubstreams.Package{ - Version: 1, - PackageMeta: []*pbsubstreams.PackageMetadata{pkgMeta}, - Modules: &pbsubstreams.Modules{}, - Network: m.Network, - } - - moduleCodeIndexes := map[string]int{} - for _, mod := range m.Modules { - pbmeta := &pbsubstreams.ModuleMetadata{ - Doc: mod.Doc, - } - var pbmod *pbsubstreams.Module - - binaryName := "default" - implicit := "" - if mod.Binary != "" { - binaryName = mod.Binary - implicit = "(implicit) " - } - binaryDef, found := m.Binaries[binaryName] - if !found { - return nil, fmt.Errorf("module %q refers to %sbinary %q, which is not defined in the 'binaries' section of the manifest", mod.Name, implicit, binaryName) - } - - switch binaryDef.Type { - case "wasm/rust-v1": - // OPTIM(abourget): also check if it's not already in - // `Binaries`, by comparing its, length + hash or value. - codeIndex, found := moduleCodeIndexes[binaryDef.File] - if !found { - codePath := m.resolvePath(binaryDef.File) - var byteCode []byte - if !r.skipSourceCodeImportValidation { - byteCode, err = os.ReadFile(codePath) - if err != nil { - return nil, fmt.Errorf("failed to read source code %q: %w", codePath, err) - } - } - pkg.Modules.Binaries = append(pkg.Modules.Binaries, &pbsubstreams.Binary{Type: binaryDef.Type, Content: byteCode}) - codeIndex = len(pkg.Modules.Binaries) - 1 - moduleCodeIndexes[binaryDef.File] = codeIndex - } - pbmod, err = mod.ToProtoWASM(uint32(codeIndex)) - default: - return nil, fmt.Errorf("module %q: invalid code type %q", mod.Name, binaryDef.Type) - } - if err != nil { - return nil, err - } - - pkg.ModuleMeta = append(pkg.ModuleMeta, pbmeta) - pkg.Modules.Modules = append(pkg.Modules.Modules, pbmod) - } - - for modName, paramValue := range m.Params { - var modFound bool - for _, mod := range pkg.Modules.Modules { - if mod.Name == modName { - if len(mod.Inputs) == 0 { - return nil, fmt.Errorf("params value defined for module %q but module has no inputs defined, add 'params: string' to 'inputs' for module", modName) - } - p := mod.Inputs[0].GetParams() - if p == nil { - return nil, fmt.Errorf("params value defined for module %q: module %q does not have 'params' as its first input type", modName, modName) - } - p.Value = paramValue - modFound = true - } - } - if !modFound { - return nil, fmt.Errorf("params value defined for module %q, but such module is not defined", modName) - } - } - - return -} - var storeValidTypes = map[string]bool{ "bigint": true, "int64": true, diff --git a/manifest/reader_test.go b/manifest/reader_test.go index 80a7f196a..43365c9d1 100644 --- a/manifest/reader_test.go +++ b/manifest/reader_test.go @@ -14,6 +14,49 @@ import ( "google.golang.org/protobuf/types/descriptorpb" ) +func TestReader_ReadWithOverride(t *testing.T) { + path, err := filepath.Abs("testdata/") + require.NoError(t, err) + + override, err := filepath.Abs("testdata/with-params-override.yaml") + + r, err := newReader(override, path) + require.NoError(t, err) + + pkg, err := r.Read() + require.NoError(t, err) + require.NotNil(t, pkg) +} + +func TestReader_ReadWithNestedOverride(t *testing.T) { + path, err := filepath.Abs("testdata/") + require.NoError(t, err) + + override, err := filepath.Abs("testdata/with-params-override-override.yaml") + require.NoError(t, err) + + r, err := newReader(override, path) + require.NoError(t, err) + + pkg, err := r.Read() + require.NoError(t, err) + require.NotNil(t, pkg) +} + +func TestReader_ReadWithRemoteOverride(t *testing.T) { + path, err := filepath.Abs("testdata/") + require.NoError(t, err) + + override, err := filepath.Abs("testdata/univ3-override.yaml") + require.NoError(t, err) + + r, err := newReader(override, path) + require.NoError(t, err) + + _, err = r.Read() + require.NoError(t, err) +} + func TestReader_Read(t *testing.T) { absolutePathToInferredManifest, err := filepath.Abs("testdata/inferred_manifest") require.NoError(t, err) @@ -308,7 +351,7 @@ func TestReader_Read(t *testing.T) { r, err := newReader(manifestPath, workingDir, readerOptions...) tt.assertionNew(t, err) - got, err := r.read(workingDir) + got, err := r.Read() tt.assertionRead(t, err) assertProtoEqual(t, tt.want, got) }) diff --git a/manifest/signature_test.go b/manifest/signature_test.go index 00dab2938..e8106ee3f 100644 --- a/manifest/signature_test.go +++ b/manifest/signature_test.go @@ -88,8 +88,8 @@ func Test_HashModule(t *testing.T) { { file: "testdata/with-params.yaml", hashes: map[string]string{ - "mod1": "551398cb7d52f5e6dfaa09ea5800afe647c8c853", - "mod2": "3591fd3507951ab63668d3e4093e9bd797aea2fa", + "mod1": "a9f22492be1fb13050c07f1502d5a6e78577dd80", + "mod2": "6aca30692dfa835efe09fbf51b0a1735ea3b3155", }, }, } diff --git a/manifest/sink.go b/manifest/sink.go index 53fc6ce98..15e907fe0 100644 --- a/manifest/sink.go +++ b/manifest/sink.go @@ -7,7 +7,6 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" - "errors" "fmt" "io" "io/fs" @@ -17,23 +16,22 @@ import ( "github.com/jhump/protoreflect/desc" "github.com/jhump/protoreflect/dynamic" + pbss "github.com/streamingfast/substreams/pb/sf/substreams" + pbssv1 "github.com/streamingfast/substreams/pb/sf/substreams/v1" "go.uber.org/zap" "google.golang.org/protobuf/types/descriptorpb" "google.golang.org/protobuf/types/known/anypb" - - pbss "github.com/streamingfast/substreams/pb/sf/substreams" - pbssv1 "github.com/streamingfast/substreams/pb/sf/substreams/v1" ) -func (r *Reader) loadSinkConfig(pkg *pbssv1.Package, m *Manifest) error { +func (r *manifestConverter) loadSinkConfig(pkg *pbssv1.Package, m *Manifest) error { if m.Sink == nil { return nil } if m.Sink.Module == "" { - return errors.New(`sink: "module" unspecified`) + return fmt.Errorf(`sink: "module" unspecified`) } if m.Sink.Type == "" { - return errors.New(`sink: "type" unspecified`) + return fmt.Errorf(`sink: "type" unspecified`) } pkg.SinkModule = m.Sink.Module diff --git a/manifest/testdata/univ3-override.yaml b/manifest/testdata/univ3-override.yaml new file mode 100644 index 000000000..d9a21a537 --- /dev/null +++ b/manifest/testdata/univ3-override.yaml @@ -0,0 +1,5 @@ +deriveFrom: "https://github.com/streamingfast/substreams-uniswap-v3/releases/download/v0.2.8/substreams.spkg" + +package: + name: "univ3custom" + diff --git a/manifest/testdata/with-params-override-override.yaml b/manifest/testdata/with-params-override-override.yaml new file mode 100644 index 000000000..f9d7cacf8 --- /dev/null +++ b/manifest/testdata/with-params-override-override.yaml @@ -0,0 +1,10 @@ +deriveFrom: "./testdata/with-params-override.yaml" + +package: + name: "overriddentwice" + +initialBlocks: + mod1: 175 + mod2: 175 +params: + mod2: "goodbye" \ No newline at end of file diff --git a/manifest/testdata/with-params-override.yaml b/manifest/testdata/with-params-override.yaml new file mode 100644 index 000000000..c935d30b6 --- /dev/null +++ b/manifest/testdata/with-params-override.yaml @@ -0,0 +1,12 @@ +deriveFrom: "./testdata/with-params.yaml" + +package: + name: "overridden" + version: v2.3.5 + +#network: polygon +initialBlocks: + mod1: 150 + mod2: 150 +params: + mod2: "hello" \ No newline at end of file diff --git a/manifest/testdata/with-params.yaml b/manifest/testdata/with-params.yaml index 4f62d2f02..260f1289e 100644 --- a/manifest/testdata/with-params.yaml +++ b/manifest/testdata/with-params.yaml @@ -11,6 +11,7 @@ binaries: modules: - name: mod1 kind: map + initialBlock: 100 inputs: - source: sf.test.Block output: @@ -18,6 +19,7 @@ modules: - name: mod2 kind: map + initialBlock: 100 inputs: - params: string - map: mod1