Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OCC] Add async scheduler (experiment) #366

Draft
wants to merge 65 commits into
base: occ-main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
6bc6c90
Add occ todos / comments (#317)
udpatil Sep 13, 2023
b66d23e
Multiversion Item Implementation and Tests (#318)
udpatil Sep 26, 2023
0048776
[occ] Add incarnation field (#321)
udpatil Sep 29, 2023
5d8941c
[occ] Implement basic multiversion store (#322)
udpatil Oct 6, 2023
dac5f7b
[occ] Add concurrency worker configuration (#324)
stevenlanders Oct 9, 2023
94bb98f
[occ] Occ multiversion store (#326)
udpatil Oct 10, 2023
5f89416
[occ] Add batch tx delivery interface (#327)
stevenlanders Oct 10, 2023
571d00a
[occ] MVKV store implementation and tests (#323)
udpatil Oct 10, 2023
9886602
[occ] Add validation function for transaction state to multiversionst…
udpatil Oct 13, 2023
293ac79
[occ] Add basic worker task and scheduler shell (#328)
stevenlanders Oct 17, 2023
dfb2260
[occ] Implement iterator for mvkv (#329)
udpatil Oct 17, 2023
663716a
fix dependency (#334)
udpatil Oct 17, 2023
b34d61c
[occ] Iterateset tracking and validation implementation (#337)
udpatil Oct 19, 2023
0aebbc9
[occ] Add scheduler logic for validation (#336)
stevenlanders Oct 19, 2023
096041b
[occ] Fix situation where no stores causes a panic (#338)
stevenlanders Oct 20, 2023
0b9193c
Add occ flag check to context (#340)
stevenlanders Oct 23, 2023
27484e4
[occ] Add struct field and helpers for estimate prefills (#341)
udpatil Oct 24, 2023
95ddc84
Fix map access panic (#343)
stevenlanders Oct 30, 2023
be4a4ae
Gen estimates writeset (#344)
udpatil Nov 3, 2023
931e2f6
[OCC] Add trace spans to scheduler (#347)
stevenlanders Nov 6, 2023
eac8657
[occ] Fix parent store readset validation (#348)
udpatil Nov 10, 2023
6260732
[occ] OCC scheduler and validation fixes (#359)
udpatil Nov 22, 2023
c660786
[occ] Add optimizations for multiversion and mvkv (#361)
udpatil Nov 27, 2023
92457bd
add async scheduler
stevenlanders Nov 28, 2023
8f58b8c
small fixes
stevenlanders Nov 28, 2023
ed15067
remove waiting to avoid data race for now
stevenlanders Nov 28, 2023
4119313
remove another datarace
stevenlanders Nov 28, 2023
48969c3
fix hang
stevenlanders Nov 28, 2023
a7a9eaf
avoid status datarace
stevenlanders Nov 28, 2023
9e4a279
fix hang
stevenlanders Nov 28, 2023
014f497
rebase sync version
stevenlanders Nov 29, 2023
0141969
protect task type transition and reads
stevenlanders Nov 29, 2023
ff52c47
refactor before fix
stevenlanders Nov 29, 2023
522ed03
cleanup
stevenlanders Nov 29, 2023
23dfade
save off working version before refactor
stevenlanders Nov 30, 2023
612547d
remove abort limiter
stevenlanders Nov 30, 2023
ab828e0
remove validate all...
stevenlanders Nov 30, 2023
4be4b86
cleanup
stevenlanders Nov 30, 2023
e613b6d
add timer (to remove later)
stevenlanders Nov 30, 2023
4b7cfc1
performance improvements
stevenlanders Nov 30, 2023
62b93ad
add moar speed
stevenlanders Nov 30, 2023
3d6d486
fixes
stevenlanders Nov 30, 2023
b008e13
fix closed channel
stevenlanders Dec 1, 2023
04dd6a1
adjust timer
stevenlanders Dec 2, 2023
05940d5
save off perf improvements
stevenlanders Dec 3, 2023
6351efb
add shards
stevenlanders Dec 3, 2023
d4d8c1e
Improve invalidate writeset (#369)
udpatil Dec 4, 2023
466e81d
Improve invalidate writeset (#370)
udpatil Dec 4, 2023
4265efb
replace loadAndDelete with load
udpatil Dec 4, 2023
6c51897
use slice pointer instead of slice
udpatil Dec 4, 2023
998f98f
fix
udpatil Dec 4, 2023
f843e10
update intset
stevenlanders Dec 4, 2023
fa61b8d
update unit
udpatil Dec 4, 2023
d76f6b6
revalidate situation
stevenlanders Dec 4, 2023
617ca97
notify children
stevenlanders Dec 4, 2023
0af9bab
disable hangDebug
stevenlanders Dec 4, 2023
33edd83
fix validate-after-no-execute case
stevenlanders Dec 4, 2023
16d6465
handle empty txs
stevenlanders Dec 4, 2023
28333f5
put the parent notify back
stevenlanders Dec 4, 2023
1109797
nil -> empty slice
stevenlanders Dec 4, 2023
a3d4f5e
add mx.lock
stevenlanders Dec 4, 2023
41045e3
mx.lock
stevenlanders Dec 4, 2023
3f6b953
add debug log
stevenlanders Dec 4, 2023
cad9b98
add height to log
udpatil Dec 5, 2023
a33813b
fix hang
stevenlanders Dec 5, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
push:
branches:
- main
- occ-main # TODO: remove after occ work is done

permissions:
contents: read
Expand Down
24 changes: 23 additions & 1 deletion baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"syscall"
"time"

"github.com/cosmos/cosmos-sdk/tasks"

"github.com/armon/go-metrics"
"github.com/gogo/protobuf/proto"
abci "github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -234,11 +236,31 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc
}, nil
}

// DeliverTxBatch executes multiple txs
func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) (res sdk.DeliverTxBatchResponse) {
scheduler := tasks.NewScheduler(app.concurrencyWorkers, app.TracingInfo, app.DeliverTx)
// This will basically no-op the actual prefill if the metadata for the txs is empty

// process all txs, this will also initializes the MVS if prefill estimates was disabled
txRes, err := scheduler.ProcessAll(ctx, req.TxEntries)
if err != nil {
// TODO: handle error
}

responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries))
for _, tx := range txRes {
responses = append(responses, &sdk.DeliverTxResult{Response: tx})
}
return sdk.DeliverTxBatchResponse{Results: responses}
}

// DeliverTx implements the ABCI interface and executes a tx in DeliverTx mode.
// State only gets persisted if all messages are valid and get executed successfully.
// Otherwise, the ResponseDeliverTx will contain releveant error information.
// Otherwise, the ResponseDeliverTx will contain relevant error information.
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
// gas execution context.
// TODO: (occ) this is the function called from sei-chain to perform execution of a transaction.
// We'd likely replace this with an execution tasks that is scheduled by the OCC scheduler
func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")
defer func() {
Expand Down
39 changes: 31 additions & 8 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.opentelemetry.io/otel/trace"

"github.com/armon/go-metrics"
"github.com/cosmos/cosmos-sdk/server/config"
"github.com/cosmos/cosmos-sdk/utils/tracing"
"github.com/gogo/protobuf/proto"
sdbm "github.com/sei-protocol/sei-tm-db/backends"
Expand Down Expand Up @@ -60,7 +61,8 @@ const (
FlagArchivalArweaveIndexDBFullPath = "archival-arweave-index-db-full-path"
FlagArchivalArweaveNodeURL = "archival-arweave-node-url"

FlagChainID = "chain-id"
FlagChainID = "chain-id"
FlagConcurrencyWorkers = "concurrency-workers"
)

var (
Expand Down Expand Up @@ -168,6 +170,8 @@ type BaseApp struct { //nolint: maligned
TmConfig *tmcfg.Config

TracingInfo *tracing.Info

concurrencyWorkers int
}

type appStore struct {
Expand Down Expand Up @@ -294,6 +298,16 @@ func NewBaseApp(
app.cms.(*rootmulti.Store).SetOrphanConfig(app.orphanConfig)
}

// if no option overrode already, initialize to the flags value
// this avoids forcing every implementation to pass an option, but allows it
if app.concurrencyWorkers == 0 {
app.concurrencyWorkers = cast.ToInt(appOpts.Get(FlagConcurrencyWorkers))
}
// safely default this to the default value if 0
if app.concurrencyWorkers == 0 {
app.concurrencyWorkers = config.DefaultConcurrencyWorkers
}

return app
}

Expand All @@ -307,6 +321,11 @@ func (app *BaseApp) AppVersion() uint64 {
return app.appVersion
}

// ConcurrencyWorkers returns the number of concurrent workers for the BaseApp.
func (app *BaseApp) ConcurrencyWorkers() int {
return app.concurrencyWorkers
}

// Version returns the application's version string.
func (app *BaseApp) Version() string {
return app.version
Expand Down Expand Up @@ -821,6 +840,7 @@ func (app *BaseApp) getContextForTx(mode runTxMode, txBytes []byte) sdk.Context

// cacheTxContext returns a new context based off of the provided context with
// a branched multi-store.
// TODO: (occ) This is an example of where we wrap the multistore with a cache multistore, and then return a modified context using that multistore
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, sdk.CacheMultiStore) {
ms := ctx.MultiStore()
// TODO: https://github.com/cosmos/cosmos-sdk/issues/2824
Expand All @@ -847,13 +867,13 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context
// and execute successfully. An error is returned otherwise.
func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, priority int64, err error) {

defer telemetry.MeasureThroughputSinceWithLabels(
telemetry.TxCount,
[]metrics.Label{
telemetry.NewLabel("mode", modeKeyToString[mode]),
},
time.Now(),
)
// defer telemetry.MeasureThroughputSinceWithLabels(
// telemetry.TxCount,
// []metrics.Label{
// telemetry.NewLabel("mode", modeKeyToString[mode]),
// },
// time.Now(),
// )

// Reset events after each checkTx or simulateTx or recheckTx
// DeliverTx is garbage collected after FinalizeBlocker
Expand Down Expand Up @@ -974,6 +994,7 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
storeAccessOpEvents := msCache.GetEvents()
accessOps := ctx.TxMsgAccessOps()[acltypes.ANTE_MSG_INDEX]

// TODO: (occ) This is an example of where we do our current validation. Note that this validation operates on the declared dependencies for a TX / antehandler + the utilized dependencies, whereas the validation
missingAccessOps := ctx.MsgValidator().ValidateAccessOperations(accessOps, storeAccessOpEvents)
if len(missingAccessOps) != 0 {
for op := range missingAccessOps {
Expand Down Expand Up @@ -1118,6 +1139,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, mode runTxMode) (*s
storeAccessOpEvents := msgMsCache.GetEvents()
accessOps := ctx.TxMsgAccessOps()[i]
missingAccessOps := ctx.MsgValidator().ValidateAccessOperations(accessOps, storeAccessOpEvents)
// TODO: (occ) This is where we are currently validating our per message dependencies,
// whereas validation will be done holistically based on the mvkv for OCC approach
if len(missingAccessOps) != 0 {
for op := range missingAccessOps {
ctx.Logger().Info((fmt.Sprintf("eventMsgName=%s Missing Access Operation:%s ", eventMsgName, op.String())))
Expand Down
145 changes: 145 additions & 0 deletions baseapp/deliver_tx_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package baseapp

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"

"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

func anteHandler(capKey sdk.StoreKey, storeKey []byte) sdk.AnteHandler {
return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) {
store := ctx.KVStore(capKey)
txTest := tx.(txTest)

if txTest.FailOnAnte {
return ctx, sdkerrors.Wrap(sdkerrors.ErrUnauthorized, "ante handler failure")
}

val := getIntFromStore(store, storeKey)
setIntOnStore(store, storeKey, val+1)

ctx.EventManager().EmitEvents(
counterEvent("ante-val", val+1),
)

return ctx, nil
}
}

func handlerKVStore(capKey sdk.StoreKey) sdk.Handler {
return func(ctx sdk.Context, msg sdk.Msg) (*sdk.Result, error) {
ctx = ctx.WithEventManager(sdk.NewEventManager())
res := &sdk.Result{}

// Extract the unique ID from the message (assuming you have added this)
txIndex := ctx.TxIndex()

// Use the unique ID to get a specific key for this transaction
sharedKey := []byte(fmt.Sprintf("shared"))
txKey := []byte(fmt.Sprintf("tx-%d", txIndex))

// Similar steps as before: Get the store, retrieve a value, increment it, store back, emit an event
// Get the store
store := ctx.KVStore(capKey)

// increment per-tx key (no conflict)
val := getIntFromStore(store, txKey)
setIntOnStore(store, txKey, val+1)

// increment shared key
sharedVal := getIntFromStore(store, sharedKey)
setIntOnStore(store, sharedKey, sharedVal+1)

// Emit an event with the incremented value and the unique ID
ctx.EventManager().EmitEvent(
sdk.NewEvent(sdk.EventTypeMessage,
sdk.NewAttribute("shared-val", fmt.Sprintf("%d", sharedVal+1)),
sdk.NewAttribute("tx-val", fmt.Sprintf("%d", val+1)),
sdk.NewAttribute("tx-id", fmt.Sprintf("%d", txIndex)),
),
)

res.Events = ctx.EventManager().Events().ToABCIEvents()
return res, nil
}
}

func requireAttribute(t *testing.T, evts []abci.Event, name string, val string) {
for _, evt := range evts {
for _, att := range evt.Attributes {
if string(att.Key) == name {
require.Equal(t, val, string(att.Value))
return
}
}
}
require.Fail(t, fmt.Sprintf("attribute %s not found via value %s", name, val))
}

func TestDeliverTxBatch(t *testing.T) {
// test increments in the ante
anteKey := []byte("ante-key")

anteOpt := func(bapp *BaseApp) {
bapp.SetAnteHandler(anteHandler(capKey1, anteKey))
}

// test increments in the handler
routerOpt := func(bapp *BaseApp) {
r := sdk.NewRoute(routeMsgCounter, handlerKVStore(capKey1))
bapp.Router().AddRoute(r)
}

app := setupBaseApp(t, anteOpt, routerOpt)
app.InitChain(context.Background(), &abci.RequestInitChain{})

// Create same codec used in txDecoder
codec := codec.NewLegacyAmino()
registerTestCodec(codec)

nBlocks := 3
txPerHeight := 5

for blockN := 0; blockN < nBlocks; blockN++ {
header := tmproto.Header{Height: int64(blockN) + 1}
app.setDeliverState(header)
app.deliverState.ctx = app.deliverState.ctx.WithBlockGasMeter(sdk.NewInfiniteGasMeter())
app.BeginBlock(app.deliverState.ctx, abci.RequestBeginBlock{Header: header})

var requests []*sdk.DeliverTxEntry
for i := 0; i < txPerHeight; i++ {
counter := int64(blockN*txPerHeight + i)
tx := newTxCounter(counter, counter)

txBytes, err := codec.Marshal(tx)
require.NoError(t, err)
requests = append(requests, &sdk.DeliverTxEntry{
Request: abci.RequestDeliverTx{Tx: txBytes},
})
}

responses := app.DeliverTxBatch(app.deliverState.ctx, sdk.DeliverTxBatchRequest{TxEntries: requests})
require.Len(t, responses.Results, txPerHeight)

for idx, deliverTxRes := range responses.Results {
res := deliverTxRes.Response
require.Equal(t, abci.CodeTypeOK, res.Code)
requireAttribute(t, res.Events, "tx-id", fmt.Sprintf("%d", idx))
requireAttribute(t, res.Events, "tx-val", fmt.Sprintf("%d", blockN+1))
requireAttribute(t, res.Events, "shared-val", fmt.Sprintf("%d", blockN*txPerHeight+idx+1))
}

app.EndBlock(app.deliverState.ctx, abci.RequestEndBlock{})
require.Empty(t, app.deliverState.ctx.MultiStore().GetEvents())
app.SetDeliverStateToCommit()
app.Commit(context.Background())
}
}
11 changes: 11 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func SetSnapshotInterval(interval uint64) func(*BaseApp) {
return func(app *BaseApp) { app.SetSnapshotInterval(interval) }
}

func SetConcurrencyWorkers(workers int) func(*BaseApp) {
return func(app *BaseApp) { app.SetConcurrencyWorkers(workers) }
}

// SetSnapshotKeepRecent sets the recent snapshots to keep.
func SetSnapshotKeepRecent(keepRecent uint32) func(*BaseApp) {
return func(app *BaseApp) { app.SetSnapshotKeepRecent(keepRecent) }
Expand Down Expand Up @@ -295,6 +299,13 @@ func (app *BaseApp) SetSnapshotInterval(snapshotInterval uint64) {
app.snapshotInterval = snapshotInterval
}

func (app *BaseApp) SetConcurrencyWorkers(workers int) {
if app.sealed {
panic("SetConcurrencyWorkers() on sealed BaseApp")
}
app.concurrencyWorkers = workers
}

// SetSnapshotKeepRecent sets the number of recent snapshots to keep.
func (app *BaseApp) SetSnapshotKeepRecent(snapshotKeepRecent uint32) {
if app.sealed {
Expand Down
4 changes: 4 additions & 0 deletions proto/cosmos/accesscontrol/constants.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ enum ResourceType {
KV_DEX_SHORT_ORDER_COUNT = 92; // child of KV_DEX

KV_BANK_DEFERRED = 93; // child of KV
reserved 94;
KV_BANK_DEFERRED_MODULE_TX_INDEX = 95; // child of KV_BANK_DEFERRED

KV_DEX_MEM_CONTRACTS_TO_PROCESS = 96; // child of KV_DEX_MEM
KV_DEX_MEM_DOWNSTREAM_CONTRACTS = 97; // child of KV_DEX_MEM
}

enum WasmMessageSubtype {
Expand Down
9 changes: 9 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ const (

// DefaultGRPCWebAddress defines the default address to bind the gRPC-web server to.
DefaultGRPCWebAddress = "0.0.0.0:9091"

// DefaultConcurrencyWorkers defines the default workers to use for concurrent transactions
DefaultConcurrencyWorkers = 10
)

// BaseConfig defines the server's basic configuration
Expand Down Expand Up @@ -88,6 +91,10 @@ type BaseConfig struct {
SeparateOrphanVersionsToKeep int64 `mapstructure:"separate-orphan-versions-to-keep"`
NumOrphanPerFile int `mapstructure:"num-orphan-per-file"`
OrphanDirectory string `mapstructure:"orphan-dir"`

// ConcurrencyWorkers defines the number of workers to use for concurrent
// transaction execution. A value of -1 means unlimited workers. Default value is 10.
ConcurrencyWorkers int `mapstructure:"concurrency-workers"`
}

// APIConfig defines the API listener configuration.
Expand Down Expand Up @@ -236,6 +243,7 @@ func DefaultConfig() *Config {
IAVLDisableFastNode: true,
CompactionInterval: 0,
NoVersioning: false,
ConcurrencyWorkers: DefaultConcurrencyWorkers,
},
Telemetry: telemetry.Config{
Enabled: false,
Expand Down Expand Up @@ -310,6 +318,7 @@ func GetConfig(v *viper.Viper) (Config, error) {
SeparateOrphanVersionsToKeep: v.GetInt64("separate-orphan-versions-to-keep"),
NumOrphanPerFile: v.GetInt("num-orphan-per-file"),
OrphanDirectory: v.GetString("orphan-dir"),
ConcurrencyWorkers: v.GetInt("concurrency-workers"),
},
Telemetry: telemetry.Config{
ServiceName: v.GetString("telemetry.service-name"),
Expand Down
5 changes: 5 additions & 0 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ func TestSetSnapshotDirectory(t *testing.T) {
cfg := DefaultConfig()
require.Equal(t, "", cfg.StateSync.SnapshotDirectory)
}

func TestSetConcurrencyWorkers(t *testing.T) {
cfg := DefaultConfig()
require.Equal(t, DefaultConcurrencyWorkers, cfg.ConcurrencyWorkers)
}
3 changes: 3 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ num-orphan-per-file = {{ .BaseConfig.NumOrphanPerFile }}
# if separate-orphan-storage is true, where to store orphan data
orphan-dir = "{{ .BaseConfig.OrphanDirectory }}"

# concurrency-workers defines how many workers to run for concurrent transaction execution
# concurrency-workers = {{ .BaseConfig.ConcurrencyWorkers }}

###############################################################################
### Telemetry Configuration ###
###############################################################################
Expand Down
Loading