Skip to content

Commit

Permalink
info: added new package to parse substreams information, update info …
Browse files Browse the repository at this point in the history
…command to use this library
  • Loading branch information
colindickson committed Oct 2, 2023
1 parent 8cdbb2d commit bc3267d
Show file tree
Hide file tree
Showing 5 changed files with 416 additions and 56 deletions.
94 changes: 38 additions & 56 deletions cmd/substreams/info.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package main

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/streamingfast/cli"
"github.com/streamingfast/substreams/info"

"github.com/spf13/cobra"
"github.com/streamingfast/substreams/manifest"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"github.com/streamingfast/substreams/pipeline/outputmodules"
)

func init() {
Expand All @@ -33,6 +32,7 @@ var infoCmd = &cobra.Command{
}

func init() {
infoCmd.Flags().Bool("json", false, "Output as JSON")
rootCmd.AddCommand(infoCmd)
}

Expand All @@ -49,96 +49,79 @@ func runInfo(cmd *cobra.Command, args []string) error {

outputSinkconfigFilesPath := mustGetString(cmd, "output-sinkconfig-files-path")

manifestReader, err := manifest.NewReader(manifestPath)
info, err := info.Extended(manifestPath, outputModule)
if err != nil {
return fmt.Errorf("manifest reader: %w", err)
return err
}

pkg, err := manifestReader.Read()
if err != nil {
return fmt.Errorf("read manifest %q: %w", manifestPath, err)
}

graph, err := manifest.NewModuleGraph(pkg.Modules.Modules)
if err != nil {
return fmt.Errorf("creating module graph: %w", err)
if mustGetBool(cmd, "json") {
res, err := json.MarshalIndent(info, "", " ")
if err != nil {
return err
}
fmt.Println(string(res))
return nil
}

fmt.Println("Package name:", pkg.PackageMeta[0].Name)
fmt.Println("Version:", pkg.PackageMeta[0].Version)
if doc := pkg.PackageMeta[0].Doc; doc != "" {
fmt.Println("Doc: " + strings.Replace(doc, "\n", "\n ", -1))
fmt.Println("Package name:", info.Name)
fmt.Println("Version:", info.Version)
if doc := info.Documentation; doc != nil && *doc != "" {
fmt.Println("Doc: " + strings.Replace(*doc, "\n", "\n ", -1))
}

hashes := manifest.NewModuleHashes()

fmt.Println("Modules:")
fmt.Println("----")
for modIdx, module := range pkg.Modules.Modules {
fmt.Println("Name:", module.Name)
fmt.Println("Initial block:", module.InitialBlock)
kind := module.GetKind()
switch v := kind.(type) {
case *pbsubstreams.Module_KindMap_:
fmt.Println("Kind: map")
fmt.Println("Output Type:", v.KindMap.OutputType)
case *pbsubstreams.Module_KindStore_:
fmt.Println("Kind: store")
fmt.Println("Value Type:", v.KindStore.ValueType)
fmt.Println("Update Policy:", v.KindStore.UpdatePolicy)
for _, mod := range info.Modules {
fmt.Println("Name:", mod.Name)
fmt.Println("Initial block:", mod.InitialBlock)
fmt.Println("Kind:", mod.Kind)

switch mod.Kind {
case "map":
fmt.Println("Output Type:", *mod.OutputType)
case "store":
fmt.Println("Value Type:", *mod.ValueType)
fmt.Println("Update Policy:", *mod.UpdatePolicy)
default:
fmt.Println("Kind: Unknown")
}

hashes.HashModule(pkg.Modules, module, graph)

fmt.Println("Hash:", hashes.Get(module.Name))
moduleMeta := pkg.ModuleMeta[modIdx]
if moduleMeta != nil && moduleMeta.Doc != "" {
fmt.Println("Doc: " + strings.Replace(moduleMeta.Doc, "\n", "\n ", -1))
fmt.Println("Hash:", mod.Hash)
if doc := mod.Documentation; doc != nil && *doc != "" {
fmt.Println("Doc: ", mod.Documentation)
}
fmt.Println("")
}

if outputModule != "" {
outputGraph, err := outputmodules.NewOutputModuleGraph(outputModule, true, pkg.Modules)
if err != nil {
return err
}
for i, layers := range outputGraph.StagedUsedModules() {
stages := info.ExecutionStages
for i, layers := range stages {
var layerDefs []string
for _, l := range layers {
var mods []string
for _, m := range l {
mods = append(mods, m.Name)
mods = append(mods, m)
}
layerDefs = append(layerDefs, fmt.Sprintf(`["%s"]`, strings.Join(mods, `","`)))
}
fmt.Printf("Stage %d: [%s]\n", i, strings.Join(layerDefs, `,`))
}

}

if pkg.SinkConfig != nil {
fmt.Println()
if info.SinkInfo != nil {
fmt.Println("Sink config:")
fmt.Println("----")
fmt.Println("type:", pkg.SinkConfig.TypeUrl)

confs, files, err := manifest.DescribeSinkConfigs(pkg)
if err != nil {
return err
}
fmt.Println("type:", info.SinkInfo.TypeUrl)

fmt.Println("configs:")
fmt.Println(confs)
fmt.Println(info.SinkInfo.Configs)

if outputSinkconfigFilesPath != "" && files != nil {
if outputSinkconfigFilesPath != "" && info.SinkInfo.Files != nil {
if err := os.MkdirAll(outputSinkconfigFilesPath, 0755); err != nil {
return err
}
fmt.Println("output files:")
for k, v := range files {
for k, v := range info.SinkInfo.Files {
filename := filepath.Join(outputSinkconfigFilesPath, k)
f, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
Expand All @@ -150,7 +133,6 @@ func runInfo(cmd *cobra.Command, args []string) error {
fmt.Printf(" - %q written to %q\n", k, filename)
}
}

}

return nil
Expand Down
227 changes: 227 additions & 0 deletions info/info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package info

import (
"fmt"
"strings"

"github.com/streamingfast/substreams/manifest"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"github.com/streamingfast/substreams/pipeline/outputmodules"
"google.golang.org/protobuf/types/descriptorpb"
)

type BasicInfo struct {
Name string `json:"name"`
Version string `json:"version"`
Documentation *string `json:"documentation,omitempty"`
Modules []ModulesInfo `json:"modules"`
SinkInfo *SinkInfo `json:"sink_info,omitempty"`
ProtoPackages []string `json:"proto_packages"`
ProtoFileCodeMap map[string]string `json:"proto_file_code_map"`
ProtoMessageCodeMap map[string]map[string]string `json:"proto_message_code_map"`
}

type SinkInfo struct {
Configs string `json:"desc"`
TypeUrl string `json:"type_url"`
Files map[string][]byte `json:"files"`
}

type ExtendedInfo struct {
*BasicInfo

ExecutionStages [][][]string `json:"execution_stages,omitempty"`
}

type ProtoFileInfo struct {
Name *string `json:"name,omitempty"`
Package *string `json:"package,omitempty"`
Dependencies []string `json:"dependencies,omitempty"`
PublicDependencies []int32 `json:"public_dependencies,omitempty"`
MessageType []*descriptorpb.DescriptorProto `json:"message_type,omitempty"`
Services []*descriptorpb.ServiceDescriptorProto `json:"services,omitempty"`
}

type ModulesInfo struct {
Name string `json:"name"`
Kind string `json:"kind"`
Inputs []ModuleInput `json:"inputs"`
OutputType *string `json:"output_type,omitempty"` //for map inputs
ValueType *string `json:"value_type,omitempty"` //for store inputs
UpdatePolicy *string `json:"update_policy,omitempty"` //for store inputs
InitialBlock uint64 `json:"initial_block"`
Documentation *string `json:"documentation,omitempty"`
Hash string `json:"hash"`
}

type ModuleInput struct {
Type string `json:"type"`
Name string `json:"name"`
Mode *string `json:"mode,omitempty"` //for store inputs
}

func Basic(manifestPath string) (*BasicInfo, error) {
reader, err := manifest.NewReader(manifestPath)
if err != nil {
return nil, fmt.Errorf("manifest reader: %w", err)
}

pkg, err := reader.Read()
if err != nil {
return nil, fmt.Errorf("read manifest %q: %w", manifestPath, err)
}

manifestInfo := &BasicInfo{
Name: pkg.PackageMeta[0].Name,
Version: pkg.PackageMeta[0].Version,
}
if pkg.PackageMeta[0].Doc != "" {
manifestInfo.Documentation = strPtr(strings.Replace(pkg.PackageMeta[0].Doc, "\n", "\n ", -1))
}

graph, err := manifest.NewModuleGraph(pkg.Modules.Modules)
if err != nil {
return nil, fmt.Errorf("creating module graph: %w", err)
}

modules := make([]ModulesInfo, 0, len(pkg.Modules.Modules))

hashes := manifest.NewModuleHashes()
for ix, mod := range pkg.Modules.Modules {
modInfo := ModulesInfo{}

_, _ = hashes.HashModule(pkg.Modules, mod, graph)
modInfo.Hash = hashes.Get(mod.Name)

modInfo.Name = mod.Name
modInfo.InitialBlock = mod.InitialBlock

kind := mod.GetKind()
switch v := kind.(type) {
case *pbsubstreams.Module_KindMap_:
modInfo.Kind = "map"
modInfo.OutputType = strPtr(v.KindMap.OutputType)
case *pbsubstreams.Module_KindStore_:
modInfo.Kind = "store"
modInfo.ValueType = strPtr(v.KindStore.ValueType)
modInfo.UpdatePolicy = strPtr(v.KindStore.UpdatePolicy.Pretty())
default:
modInfo.Kind = "unknown"
}

modMeta := pkg.ModuleMeta[ix]
if modMeta != nil && modMeta.Doc != "" {
modInfo.Documentation = strPtr(strings.Replace(modMeta.Doc, "\n", "\n ", -1))
}

inputs := make([]ModuleInput, 0, len(mod.Inputs))
for _, input := range mod.Inputs {
inputInfo := ModuleInput{}

switch v := input.Input.(type) {
case *pbsubstreams.Module_Input_Source_:
inputInfo.Type = "source"
inputInfo.Name = v.Source.Type
case *pbsubstreams.Module_Input_Map_:
inputInfo.Type = "map"
inputInfo.Name = v.Map.ModuleName
case *pbsubstreams.Module_Input_Store_:
inputInfo.Type = "store"
inputInfo.Name = v.Store.ModuleName
if v.Store.Mode > 0 {
inputInfo.Mode = strPtr(v.Store.Mode.Pretty())
}
default:
inputInfo.Type = "unknown"
inputInfo.Name = "unknown"
}

inputs = append(inputs, inputInfo)
}
modInfo.Inputs = inputs

modules = append(modules, modInfo)
}
manifestInfo.Modules = modules

protoPackages := make([]string, 0, len(pkg.ProtoFiles))
protoPackageMap := make(map[string]struct{})
for _, protoFile := range pkg.ProtoFiles {
if _, ok := protoPackageMap[protoFile.GetPackage()]; ok {
continue
} else {
protoPackageMap[protoFile.GetPackage()] = struct{}{}
}

protoPackages = append(protoPackages, protoFile.GetPackage())
}
manifestInfo.ProtoPackages = protoPackages

protoParser, err := NewProtoParser(pkg.ProtoFiles)
err = protoParser.Parse()
if err != nil {
return nil, fmt.Errorf("parse files: %w", err)
}
manifestInfo.ProtoFileCodeMap = protoParser.ProtoFileCodeMap
manifestInfo.ProtoMessageCodeMap = protoParser.ProtoPackageMessageCodeMap

if pkg.SinkConfig != nil {
desc, files, err := manifest.DescribeSinkConfigs(pkg)
if err != nil {
return nil, fmt.Errorf("describe sink configs: %w", err)
}
manifestInfo.SinkInfo = &SinkInfo{
Configs: desc,
TypeUrl: pkg.SinkConfig.TypeUrl,
Files: files,
}
}

return manifestInfo, nil
}

func Extended(manifestPath string, outputModule string) (*ExtendedInfo, error) {
basicInfo, err := Basic(manifestPath)
if err != nil {
return nil, err
}

reader, err := manifest.NewReader(manifestPath)
if err != nil {
return nil, fmt.Errorf("manifest reader: %w", err)
}

pkg, err := reader.Read()
if err != nil {
return nil, fmt.Errorf("read manifest %q: %w", manifestPath, err)
}

var stages [][][]string
if outputModule != "" {
outputGraph, err := outputmodules.NewOutputModuleGraph(outputModule, true, pkg.Modules)
if err != nil {
return nil, fmt.Errorf("creating output module graph: %w", err)
}
stages = make([][][]string, 0, len(outputGraph.StagedUsedModules()))
for _, layers := range outputGraph.StagedUsedModules() {
var layerDefs [][]string
for _, l := range layers {
var mods []string
for _, m := range l {
mods = append(mods, m.Name)
}
layerDefs = append(layerDefs, mods)
}
stages = append(stages, layerDefs)
}
}

return &ExtendedInfo{
BasicInfo: basicInfo,
ExecutionStages: stages,
}, nil
}

func strPtr(s string) *string {
return &s
}
Loading

0 comments on commit bc3267d

Please sign in to comment.