diff --git a/integration-tests/modules/pse_test.go b/integration-tests/modules/pse_test.go index dad5b19f..b0e8b57b 100644 --- a/integration-tests/modules/pse_test.go +++ b/integration-tests/modules/pse_test.go @@ -251,9 +251,7 @@ func TestPSEDistribution(t *testing.T) { requireT.NoError(err) t.Logf("Distribution 1 at height: %d", height) - scheduledDistributions, err := getScheduledDistribution(ctx, chain) - requireT.NoError(err) - requireT.Len(scheduledDistributions, 2) + awaitScheduleCount(ctx, t, chain, 2) balancesBefore, scoresBefore, totalScore := getAllDelegatorInfo(ctx, t, chain, height-1) balancesAfter, _, _ := getAllDelegatorInfo(ctx, t, chain, height) @@ -298,9 +296,7 @@ func TestPSEDistribution(t *testing.T) { requireT.NoError(err) t.Logf("Distribution 2 at height: %d", height) - scheduledDistributions, err = getScheduledDistribution(ctx, chain) - requireT.NoError(err) - requireT.Len(scheduledDistributions, 1) + awaitScheduleCount(ctx, t, chain, 1) balancesBefore, scoresBefore, totalScore = getAllDelegatorInfo(ctx, t, chain, height-1) balancesAfter, _, _ = getAllDelegatorInfo(ctx, t, chain, height) @@ -336,9 +332,7 @@ func TestPSEDistribution(t *testing.T) { requireT.NoError(err) t.Logf("Distribution 3 at height: %d", height) - scheduledDistributions, err = getScheduledDistribution(ctx, chain) - requireT.NoError(err) - requireT.Empty(scheduledDistributions) + awaitScheduleCount(ctx, t, chain, 0) balancesBefore, scoresBefore, totalScore = getAllDelegatorInfo(ctx, t, chain, height-1) balancesAfter, _, _ = getAllDelegatorInfo(ctx, t, chain, height) @@ -836,7 +830,7 @@ func awaitScheduledDistributionEvent( ) (int64, communityDistributedEvent, error) { var observedHeight int64 err := chain.AwaitState(ctx, func(ctx context.Context) error { - query := fmt.Sprintf("tx.pse.v1.EventAllocationDistributed.mode='EndBlock' AND block.height>%d", startHeight) + query := fmt.Sprintf("tx.pse.v1.EventCommunityDistributed.mode='EndBlock' AND block.height>%d", startHeight) blocks, err := chain.ClientContext.RPCClient().BlockSearch(ctx, query, nil, nil, "") if err != nil { return err @@ -880,6 +874,22 @@ func getScheduledDistribution( return pseResponse.ScheduledDistributions, nil } +func awaitScheduleCount(ctx context.Context, t *testing.T, chain integration.TXChain, expectedCount int) { + t.Helper() + requireT := require.New(t) + err := chain.AwaitState(ctx, func(ctx context.Context) error { + dist, err := getScheduledDistribution(ctx, chain) + if err != nil { + return err + } + if len(dist) != expectedCount { + return fmt.Errorf("expected %d scheduled distributions, got %d", expectedCount, len(dist)) + } + return nil + }, integration.WithAwaitStateTimeout(10*time.Second)) + requireT.NoError(err) +} + func removeAttributeFromEvent(events []tmtypes.Event, key string) []tmtypes.Event { newEvents := make([]tmtypes.Event, 0, len(events)) for _, event := range events { diff --git a/x/pse/keeper/delegation.go b/x/pse/keeper/delegation.go index 07acd176..7350fb3e 100644 --- a/x/pse/keeper/delegation.go +++ b/x/pse/keeper/delegation.go @@ -57,10 +57,7 @@ func (k Keeper) GetDelegatorScore( // SetDelegatorScore sets the score for a delegator. func (k Keeper) SetDelegatorScore( - ctx context.Context, - distributionID uint64, - delAddr sdk.AccAddress, - score sdkmath.Int, + ctx context.Context, distributionID uint64, delAddr sdk.AccAddress, score sdkmath.Int, ) error { key := collections.Join(distributionID, delAddr) return k.AccountScoreSnapshot.Set(ctx, key, score) @@ -72,6 +69,22 @@ func (k Keeper) RemoveDelegatorScore(ctx context.Context, distributionID uint64, return k.AccountScoreSnapshot.Remove(ctx, key) } +// addToScore atomically adds a score value to a delegator's score snapshot. +func (k Keeper) addToScore( + ctx context.Context, distributionID uint64, delAddr sdk.AccAddress, score sdkmath.Int, +) error { + if score.IsZero() { + return nil + } + lastScore, err := k.GetDelegatorScore(ctx, distributionID, delAddr) + if errors.Is(err, collections.ErrNotFound) { + lastScore = sdkmath.NewInt(0) + } else if err != nil { + return err + } + return k.SetDelegatorScore(ctx, distributionID, delAddr, lastScore.Add(score)) +} + // CalculateDelegatorScore calculates the current total score for a delegator. // This includes both the accumulated score snapshot (from previous periods) // and the current period score calculated on-demand from active delegations. @@ -82,7 +95,7 @@ func (k Keeper) CalculateDelegatorScore(ctx context.Context, delAddr sdk.AccAddr if err != nil { return sdkmath.Int{}, err } - distributionID := distribution.ID // TODO update to handle distribution ID properly. + distributionID := distribution.ID // Start with the accumulated score from the snapshot (previous periods) accumulatedScore, err := k.GetDelegatorScore(ctx, distributionID, delAddr) diff --git a/x/pse/keeper/distribute.go b/x/pse/keeper/distribute.go index 091f1a51..8990d63c 100644 --- a/x/pse/keeper/distribute.go +++ b/x/pse/keeper/distribute.go @@ -2,6 +2,7 @@ package keeper import ( "context" + "errors" "cosmossdk.io/collections" sdkmath "cosmossdk.io/math" @@ -11,113 +12,281 @@ import ( "github.com/tokenize-x/tx-chain/v7/x/pse/types" ) -// DistributeCommunityPSE distributes the total community PSE amount to all delegators based on their score. -func (k Keeper) DistributeCommunityPSE( - ctx context.Context, - bondDenom string, - totalPSEAmount sdkmath.Int, - scheduledDistribution types.ScheduledDistribution, -) error { - scheduledAt := scheduledDistribution.Timestamp - // TODO update to use distribution ID, also consider period splits - distributionID := scheduledDistribution.ID - // iterate all delegation time entries and calculate uncalculated score. - params, err := k.GetParams(ctx) +// defaultBatchSize is the number of entries processed per EndBlock during multi-block distribution. +const defaultBatchSize = 100 // TODO: make configurable + +// ProcessPhase1ScoreConversion processes a batch of DelegationTimeEntries from the ongoing distribution (ongoingID), +// converting each entry into a score snapshot and migrating it to nextID (ongoingID + 1). +// +// For each entry in the batch: +// 1. Calculate score from lastChanged to distribution timestamp -> addToScore(ongoingID) +// 2. Create new entry under nextID with same shares, lastChanged = distTimestamp +// 3. Remove entry from ongoingID +// +// Returns true when all ongoingID entries have been processed and TotalScore is computed. +func (k Keeper) ProcessPhase1ScoreConversion(ctx context.Context, ongoing types.ScheduledDistribution) (bool, error) { + ongoingID := ongoing.ID + nextID := ongoing.ID + 1 + distTimestamp := int64(ongoing.Timestamp) + + // Collect a batch of entries from ongoingID. + iter, err := k.DelegationTimeEntries.Iterate( + ctx, + collections.NewPrefixedTripleRange[uint64, sdk.AccAddress, sdk.ValAddress](ongoingID), + ) if err != nil { - return err + return false, err } - finalScoreMap, err := newScoreMap(distributionID, k.addressCodec, params.ExcludedAddresses) - if err != nil { - return err + + type entryKV struct { + delAddr sdk.AccAddress + valAddr sdk.ValAddress + entry types.DelegationTimeEntry } - allDelegationTimeEntries, err := finalScoreMap.iterateDelegationTimeEntries(ctx, k) - if err != nil { - return err + batch := make([]entryKV, 0, defaultBatchSize) + for ; iter.Valid() && len(batch) < defaultBatchSize; iter.Next() { + kv, err := iter.KeyValue() + if err != nil { + iter.Close() + return false, err + } + batch = append(batch, entryKV{ + delAddr: kv.Key.K2(), + valAddr: kv.Key.K3(), + entry: kv.Value, + }) } + iter.Close() - // add uncalculated score to account score snapshot and total score per delegator. - // it calculates the score from the last delegation time entry up to the current block time, which - // is not included in the score snapshot calculations. - err = finalScoreMap.iterateAccountScoreSnapshot(ctx, k) - if err != nil { - return err + // Compute TotalScore from all accumulated snapshots. + if len(batch) == 0 { + if err := k.computeTotalScore(ctx, ongoingID); err != nil { + return false, err + } + return true, nil } - // Clear all account score snapshots. - // Excluded addresses should not have snapshots (cleared when added to exclusion list), - // but we clear unconditionally for all addresses. - // TODO review all the logic for score reset - if err := k.AccountScoreSnapshot.Clear( - ctx, - collections.NewPrefixedPairRange[uint64, sdk.AccAddress](distributionID), - ); err != nil { - return err + for _, item := range batch { + isExcluded, err := k.IsExcludedAddress(ctx, item.delAddr) + if err != nil { + return false, err + } + + if !isExcluded { + score, err := calculateScoreAtTimestamp(ctx, k, item.valAddr, item.entry, distTimestamp) + if err != nil { + return false, err + } + if err := k.addToScore(ctx, ongoingID, item.delAddr, score); err != nil { + return false, err + } + } + + // Migrate entry to nextID with same shares, reset lastChanged to distribution timestamp. + if err := k.SetDelegationTimeEntry(ctx, nextID, item.valAddr, item.delAddr, types.DelegationTimeEntry{ + LastChangedUnixSec: distTimestamp, + Shares: item.entry.Shares, + }); err != nil { + return false, err + } + + // Remove from ongoingID. + if err := k.RemoveDelegationTimeEntry(ctx, ongoingID, item.valAddr, item.delAddr); err != nil { + return false, err + } } - // reset all delegation time entries LastChangedUnixSec to the current block time. - err = k.DelegationTimeEntries.Clear( + return false, nil +} + +// computeTotalScore sums all AccountScoreSnapshot entries for a distribution and stores the result in TotalScore. +func (k Keeper) computeTotalScore(ctx context.Context, distributionID uint64) error { + iter, err := k.AccountScoreSnapshot.Iterate( ctx, - collections.NewPrefixedTripleRange[uint64, sdk.AccAddress, sdk.ValAddress](distributionID), + collections.NewPrefixedPairRange[uint64, sdk.AccAddress](distributionID), ) if err != nil { return err } - currentBlockTime := sdk.UnwrapSDKContext(ctx).BlockTime().Unix() - for _, kv := range allDelegationTimeEntries { - kv.Value.LastChangedUnixSec = currentBlockTime - // TODO review all the logic for score reset - key := collections.Join3(distributionID+1, kv.Key.K2(), kv.Key.K3()) - err = k.DelegationTimeEntries.Set(ctx, key, kv.Value) + defer iter.Close() + + totalScore := sdkmath.NewInt(0) + for ; iter.Valid(); iter.Next() { + kv, err := iter.KeyValue() if err != nil { return err } + totalScore = totalScore.Add(kv.Value) } - // distribute total pse coin based on per delegator score. - totalPSEScore := finalScoreMap.totalScore + return k.TotalScore.Set(ctx, distributionID, totalScore) +} - // leftover is the amount of pse coin that is not distributed to any delegator. - // It will be sent to CommunityPool. - // there are 2 sources of leftover: - // 1. rounding errors due to division. - // 2. some delegators have no delegation. - leftover := totalPSEAmount - sdkCtx := sdk.UnwrapSDKContext(ctx) - if totalPSEScore.IsPositive() { - err = finalScoreMap.walk(func(addr sdk.AccAddress, score sdkmath.Int) error { - userAmount := totalPSEAmount.Mul(score).Quo(totalPSEScore) - distributedAmount, err := k.distributeToDelegator(ctx, addr, userAmount, bondDenom) - if err != nil { - return err - } - leftover = leftover.Sub(distributedAmount) - if err := sdkCtx.EventManager().EmitTypedEvent(&types.EventCommunityDistributed{ - DelegatorAddress: addr.String(), - Score: score, - TotalPseScore: totalPSEScore, - Amount: userAmount, - ScheduledAt: scheduledAt, - }); err != nil { - sdkCtx.Logger().Error("failed to emit community distributed event", "error", err) +// ProcessPhase2TokenDistribution distributes tokens to delegators in batches based on their computed scores. +// Uses TotalScore[ongoingID] for proportion calculation and iterates AccountScoreSnapshot[ongoingID]. +// +// For each delegator in the batch: +// 1. Compute share: userAmount = totalPSEAmount × score / totalScore +// 2. Distribute via distributeToDelegator (send tokens + auto-delegate) +// 3. Track cumulative distributed amount +// 4. Remove the processed snapshot entry +// +// When all delegators have been processed, sends leftover (rounding errors + undelegated users) to the community pool. +// Returns true when distribution is complete and all state has been cleaned up. +func (k Keeper) ProcessPhase2TokenDistribution( + ctx context.Context, ongoing types.ScheduledDistribution, bondDenom string, +) (bool, error) { + ongoingID := ongoing.ID + totalPSEAmount := getCommunityAllocationAmount(ongoing) + + totalScore, err := k.TotalScore.Get(ctx, ongoingID) + if err != nil { + return false, err + } + + // No score or no amount: send everything to community pool and clean up. + if !totalScore.IsPositive() || totalPSEAmount.IsZero() { + if totalPSEAmount.IsPositive() { + if err := k.sendLeftoverToCommunityPool(ctx, totalPSEAmount, bondDenom); err != nil { + return false, err } - return nil + } + return true, k.cleanupDistribution(ctx, ongoingID) + } + + // Collect a batch of score snapshots. + iter, err := k.AccountScoreSnapshot.Iterate( + ctx, + collections.NewPrefixedPairRange[uint64, sdk.AccAddress](ongoingID), + ) + if err != nil { + return false, err + } + + type scoreEntry struct { + delAddr sdk.AccAddress + score sdkmath.Int + } + + batch := make([]scoreEntry, 0, defaultBatchSize) + for ; iter.Valid() && len(batch) < defaultBatchSize; iter.Next() { + kv, err := iter.KeyValue() + if err != nil { + iter.Close() + return false, err + } + batch = append(batch, scoreEntry{ + delAddr: kv.Key.K2(), + score: kv.Value, }) + } + iter.Close() + + // Only triggered when all distributions of this round are completed. + // Send leftover to community pool and clean up. + if len(batch) == 0 { + distributedSoFar, err := k.getDistributedAmount(ctx, ongoingID) if err != nil { - return err + return false, err + } + leftover := totalPSEAmount.Sub(distributedSoFar) + if leftover.IsPositive() { + if err := k.sendLeftoverToCommunityPool(ctx, leftover, bondDenom); err != nil { + return false, err + } } + return true, k.cleanupDistribution(ctx, ongoingID) } - // send leftover to CommunityPool. - if leftover.IsPositive() { - pseModuleAddress := k.accountKeeper.GetModuleAddress(types.ClearingAccountCommunity) - err = k.distributionKeeper.FundCommunityPool(ctx, sdk.NewCoins(sdk.NewCoin(bondDenom, leftover)), pseModuleAddress) + sdkCtx := sdk.UnwrapSDKContext(ctx) + + // Distribute rewards to each delegator in the batch proportional to their score. + for _, item := range batch { + userAmount := totalPSEAmount.Mul(item.score).Quo(totalScore) + distributedAmount, err := k.distributeToDelegator(ctx, item.delAddr, userAmount, bondDenom) if err != nil { - return err + return false, err + } + + if err := k.addToDistributedAmount(ctx, ongoingID, distributedAmount); err != nil { + return false, err + } + + if err := sdkCtx.EventManager().EmitTypedEvent(&types.EventCommunityDistributed{ + DelegatorAddress: item.delAddr.String(), + Score: item.score, + TotalPseScore: totalScore, + Amount: userAmount, + ScheduledAt: ongoing.Timestamp, + }); err != nil { + sdkCtx.Logger().Error("failed to emit community distributed event", "error", err) + } + + // Remove processed snapshot. + if err := k.RemoveDelegatorScore(ctx, ongoingID, item.delAddr); err != nil { + return false, err } } - return nil + return false, nil +} + +// getCommunityAllocationAmount extracts the community clearing account allocation from a distribution. +func getCommunityAllocationAmount(dist types.ScheduledDistribution) sdkmath.Int { + for _, alloc := range dist.Allocations { + if alloc.ClearingAccount == types.ClearingAccountCommunity { + return alloc.Amount + } + } + return sdkmath.NewInt(0) +} + +// sendLeftoverToCommunityPool sends remaining undistributed tokens to the community pool. +func (k Keeper) sendLeftoverToCommunityPool(ctx context.Context, amount sdkmath.Int, bondDenom string) error { + pseModuleAddress := k.accountKeeper.GetModuleAddress(types.ClearingAccountCommunity) + return k.distributionKeeper.FundCommunityPool(ctx, sdk.NewCoins(sdk.NewCoin(bondDenom, amount)), pseModuleAddress) +} + +// cleanupDistribution removes all state associated with a completed distribution. +func (k Keeper) cleanupDistribution(ctx context.Context, distributionID uint64) error { + if err := k.AccountScoreSnapshot.Clear( + ctx, + collections.NewPrefixedPairRange[uint64, sdk.AccAddress](distributionID), + ); err != nil { + return err + } + if err := k.TotalScore.Remove(ctx, distributionID); err != nil { + return err + } + if err := k.DistributedAmount.Remove(ctx, distributionID); err != nil { + return err + } + if err := k.AllocationSchedule.Remove(ctx, distributionID); err != nil { + return err + } + return k.OngoingDistribution.Remove(ctx) +} + +// getDistributedAmount returns the cumulative distributed amount for a distribution, or zero if not set. +func (k Keeper) getDistributedAmount(ctx context.Context, distributionID uint64) (sdkmath.Int, error) { + amount, err := k.DistributedAmount.Get(ctx, distributionID) + if errors.Is(err, collections.ErrNotFound) { + return sdkmath.NewInt(0), nil + } + return amount, err +} + +// addToDistributedAmount atomically adds to the cumulative distributed amount. +func (k Keeper) addToDistributedAmount(ctx context.Context, distributionID uint64, amount sdkmath.Int) error { + if amount.IsZero() { + return nil + } + current, err := k.getDistributedAmount(ctx, distributionID) + if err != nil { + return err + } + return k.DistributedAmount.Set(ctx, distributionID, current.Add(amount)) } func (k Keeper) distributeToDelegator( @@ -144,10 +313,8 @@ func (k Keeper) distributeToDelegator( totalDelegationAmount = totalDelegationAmount.Add(delegation.Balance.Amount) } - if len(delegations) == 0 { - return sdkmath.NewInt(0), nil - } - + // Send earned tokens to delegator's wallet regardless of active delegations. + // Score was earned by staking during the scoring period, so the reward is always honored. if err = k.bankKeeper.SendCoinsFromModuleToAccount( ctx, types.ClearingAccountCommunity, @@ -156,6 +323,13 @@ func (k Keeper) distributeToDelegator( ); err != nil { return sdkmath.NewInt(0), err } + + // Auto-delegate proportionally to active validators. If no active delegations + // (e.g., user undelegated during multi-block distribution), skip auto-delegation + if len(delegations) == 0 || totalDelegationAmount.IsZero() { + return amount, nil + } + for _, delegation := range delegations { // NOTE: this division will have rounding errors up to 1 subunit, which is acceptable and will be ignored. // if that one subunit exists, it will remain in user balance as undelegated. diff --git a/x/pse/keeper/distribute_test.go b/x/pse/keeper/distribute_test.go index 675ba11b..509d9335 100644 --- a/x/pse/keeper/distribute_test.go +++ b/x/pse/keeper/distribute_test.go @@ -33,8 +33,8 @@ func TestKeeper_Distribute(t *testing.T) { func(r *runEnv) { distributeAction(r, sdkmath.NewInt(1000)) }, func(r *runEnv) { assertDistributionAction(r, map[*sdk.AccAddress]sdkmath.Int{ - &r.delegators[0]: sdkmath.NewInt(1_100_366), // + 1000 * 1.1 / 3 - &r.delegators[1]: sdkmath.NewInt(900_299), // + 1000 * 0.9 / 3 + &r.delegators[0]: sdkmath.NewInt(1_100_366), // + 1000 * 1.1 / 2 + &r.delegators[1]: sdkmath.NewInt(900_299), // + 1000 * 0.9 / 2 }) }, func(r *runEnv) { assertScoreResetAction(r) }, @@ -132,13 +132,18 @@ func TestKeeper_Distribute(t *testing.T) { func(r *runEnv) { undelegateAction(r, r.delegators[0], r.validators[0], 1_100_000) }, func(r *runEnv) { distributeAction(r, sdkmath.NewInt(1000)) }, func(r *runEnv) { + // delegators[0] fully undelegated — no auto-delegation, but earned reward sent as liquid tokens assertDistributionAction(r, map[*sdk.AccAddress]sdkmath.Int{ - &r.delegators[0]: sdkmath.NewInt(0), // + 1000 * 1.1 / 3 - &r.delegators[1]: sdkmath.NewInt(900_299), // + 1000 * 0.9 / 3 + &r.delegators[0]: sdkmath.NewInt(0), // staking balance 0 (no active delegation for auto-delegate) + &r.delegators[1]: sdkmath.NewInt(900_299), // 900k original + 1000 * 0.9 / 2.4 ≈ 299 auto-delegated }) + // delegators[0] receives 366 as liquid: 1000 (FundAccount) + 366 (PSE reward) = 1366 + // undelegated 1,100,000 tokens are in unbonding queue, not liquid + balance := r.testApp.BankKeeper.GetBalance(r.ctx, r.delegators[0], sdk.DefaultBondDenom) + r.requireT.Equal(sdkmath.NewInt(1366), balance.Amount) }, - // + 1000 * 1.1 / 3 (from user's share) + 2 (from rounding) - func(r *runEnv) { assertCommunityPoolBalanceAction(r, sdkmath.NewInt(366+2)) }, + // only rounding leftover goes to community pool (no forfeited rewards) + func(r *runEnv) { assertCommunityPoolBalanceAction(r, sdkmath.NewInt(2)) }, func(r *runEnv) { assertScoreResetAction(r) }, }, }, @@ -193,9 +198,10 @@ func TestKeeper_Distribute(t *testing.T) { ctx, _, err := testApp.BeginNextBlockAtTime(startTime) requireT.NoError(err) runContext := &runEnv{ - testApp: testApp, - ctx: ctx, - requireT: requireT, + testApp: testApp, + ctx: ctx, + requireT: requireT, + currentDistID: tempDistributionID, } // add validators. @@ -349,16 +355,42 @@ func Test_ExcludedAddress_FullLifecycle(t *testing.T) { scheduledDistribution := types.ScheduledDistribution{ ID: distributionID, Timestamp: uint64(ctx.BlockTime().Unix()), + Allocations: []types.ClearingAccountAllocation{{ + ClearingAccount: types.ClearingAccountCommunity, + Amount: amount, + }}, } - balanceBefore := testApp.BankKeeper.GetBalance(ctx, delAddr, bondDenom) - err = pseKeeper.DistributeCommunityPSE(ctx, bondDenom, amount, scheduledDistribution) + err = pseKeeper.OngoingDistribution.Set(ctx, scheduledDistribution) requireT.NoError(err) + balanceBefore := testApp.BankKeeper.GetBalance(ctx, delAddr, bondDenom) + for { + done, err := pseKeeper.ProcessPhase1ScoreConversion(ctx, scheduledDistribution) + requireT.NoError(err) + if done { + break + } + } + for { + done, err := pseKeeper.ProcessPhase2TokenDistribution(ctx, scheduledDistribution, bondDenom) + requireT.NoError(err) + if done { + break + } + } balanceAfter := testApp.BankKeeper.GetBalance(ctx, delAddr, bondDenom) requireT.Equal( balanceBefore.Amount.String(), balanceAfter.Amount.String(), "Excluded address should receive no rewards", ) + // After distribution, entries migrated from distributionID to distributionID+1. + // Save a new schedule so hooks and UpdateExcludedAddresses can find it. + distributionID++ + err = pseKeeper.SaveDistributionSchedule(ctx, []types.ScheduledDistribution{ + {Timestamp: distributionID, ID: distributionID}, + }) + requireT.NoError(err) + // Step 6: Verify excluded delegator can fully undelegate after distribution msgUndel := &stakingtypes.MsgUndelegate{ DelegatorAddress: delAddr.String(), diff --git a/x/pse/keeper/distribution.go b/x/pse/keeper/distribution.go index 19e47ca2..d9203e52 100644 --- a/x/pse/keeper/distribution.go +++ b/x/pse/keeper/distribution.go @@ -2,7 +2,9 @@ package keeper import ( "context" + "errors" + "cosmossdk.io/collections" errorsmod "cosmossdk.io/errors" sdkmath "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" @@ -10,56 +12,114 @@ import ( "github.com/tokenize-x/tx-chain/v7/x/pse/types" ) -// ProcessNextDistribution processes the next due distribution from the schedule. -// Checks the earliest scheduled distribution and processes it if the current block time has passed its timestamp. -// Only one distribution is processed per call. Should be called from EndBlock. +// ProcessNextDistribution is the EndBlock entry point for distribution processing. +// It either resumes an ongoing multi-block distribution or starts a new one if due. +// 1. If OngoingDistribution exists -> resume (Phase 1 or Phase 2) +// 2. If no ongoing -> peek schedule -> if due: +// a. Process non-community allocations immediately (single-block) +// b. If community allocation exists -> set OngoingDistribution (Phase 1 starts next block) +// c. Else, no community allocation, non-community distribution is already done, remove from AllocationSchedule func (k Keeper) ProcessNextDistribution(ctx context.Context) error { - sdkCtx := sdk.UnwrapSDKContext(ctx) + // Resume ongoing multi-block distribution if one is in progress. + ongoing, found, err := k.getOngoingDistribution(ctx) + if err != nil { + return err + } + if found { + return k.resumeOngoingDistribution(ctx, ongoing) + } - // Peek at the next scheduled distribution + // No ongoing distribution — check if next scheduled distribution is due. scheduledDistribution, shouldProcess, err := k.PeekNextAllocationSchedule(ctx) if err != nil { return err } - - // Return early if schedule is empty or not ready to process if !shouldProcess { return nil } - timestamp := scheduledDistribution.Timestamp - - // Get bond denom from staking params - //nolint:contextcheck // this is correct context passing bondDenom, err := k.stakingKeeper.BondDenom(ctx) if err != nil { return err } - // Get params containing clearing account to recipient address mappings params, err := k.GetParams(ctx) if err != nil { return err } - // Process all allocations scheduled for this timestamp - if err := k.distributeAllocatedTokens( - ctx, timestamp, bondDenom, params.ClearingAccountMappings, scheduledDistribution, + // Process non-community allocations immediately (single-block). + if err := k.distributeNonCommunityAllocations( + ctx, scheduledDistribution, bondDenom, params.ClearingAccountMappings, ); err != nil { return err } - // Remove the completed distribution from the schedule + sdkCtx := sdk.UnwrapSDKContext(ctx) + + // If community allocation exists, start multi-block processing. + communityAmount := getCommunityAllocationAmount(scheduledDistribution) + if communityAmount.IsPositive() { + if err := k.OngoingDistribution.Set(ctx, scheduledDistribution); err != nil { + return err + } + sdkCtx.Logger().Info("started multi-block community distribution", + "distribution_id", scheduledDistribution.ID, + "timestamp", scheduledDistribution.Timestamp) + return nil + } + + // No community allocation — remove from schedule if err := k.AllocationSchedule.Remove(ctx, scheduledDistribution.ID); err != nil { return err } - sdkCtx.Logger().Info("processed and removed allocation from schedule", - "timestamp", timestamp) + sdkCtx.Logger().Info("processed single-block distribution", + "distribution_id", scheduledDistribution.ID, + "timestamp", scheduledDistribution.Timestamp) return nil } +// resumeOngoingDistribution continues a multi-block community distribution. +// Phase is determined by TotalScore existence: absent -> Phase 1, present -> Phase 2. +func (k Keeper) resumeOngoingDistribution(ctx context.Context, ongoing types.ScheduledDistribution) error { + sdkCtx := sdk.UnwrapSDKContext(ctx) + ongoingID := ongoing.ID + + // TotalScore absent -> Phase 1 (score conversion still in progress). + _, err := k.TotalScore.Get(ctx, ongoingID) + if errors.Is(err, collections.ErrNotFound) { + done, err := k.ProcessPhase1ScoreConversion(ctx, ongoing) + if err != nil { + return err + } + if done { + sdkCtx.Logger().Info("phase 1 complete, TotalScore computed", + "distribution_id", ongoingID) + } + return nil + } + if err != nil { + return err + } + + // TotalScore present -> Phase 2 (token distribution). + bondDenom, err := k.stakingKeeper.BondDenom(ctx) + if err != nil { + return err + } + done, err := k.ProcessPhase2TokenDistribution(ctx, ongoing, bondDenom) + if err != nil { + return err + } + if done { + sdkCtx.Logger().Info("multi-block community distribution complete", + "distribution_id", ongoingID) + } + return nil +} + // PeekNextAllocationSchedule returns the earliest scheduled distribution and whether it should be processed. func (k Keeper) PeekNextAllocationSchedule(ctx context.Context) (types.ScheduledDistribution, bool, error) { sdkCtx := sdk.UnwrapSDKContext(ctx) @@ -92,38 +152,25 @@ func (k Keeper) PeekNextAllocationSchedule(ctx context.Context) (types.Scheduled return scheduledDist, shouldProcess, nil } -// distributeAllocatedTokens transfers tokens from clearing accounts to their mapped recipients. -// Processes all allocations within a single scheduled distribution. -// Any transfer failure indicates a state invariant violation (insufficient balance or invalid recipient). -func (k Keeper) distributeAllocatedTokens( +// distributeNonCommunityAllocations processes all non-community allocations in a single block. +func (k Keeper) distributeNonCommunityAllocations( ctx context.Context, - timestamp uint64, + scheduledDistribution types.ScheduledDistribution, bondDenom string, clearingAccountMappings []types.ClearingAccountMapping, - scheduledDistribution types.ScheduledDistribution, ) error { sdkCtx := sdk.UnwrapSDKContext(ctx) - // Transfer tokens for each allocation in this distribution period + for _, allocation := range scheduledDistribution.Allocations { if allocation.Amount.IsZero() { continue } - // Community clearing account has different distribution logic + // Community allocation handled separately via multi-block distribution. if allocation.ClearingAccount == types.ClearingAccountCommunity { - if err := k.DistributeCommunityPSE(ctx, bondDenom, allocation.Amount, scheduledDistribution); err != nil { - return errorsmod.Wrapf( - types.ErrTransferFailed, - "failed to distribute Community clearing account allocation: %v", - err, - ) - } continue } - // Find the recipient addresses mapped to this clearing account - // Note: Community clearing account is handled above and doesn't need a mapping. - // Mappings are validated on update and genesis, so they are guaranteed to exist. var recipientAddrs []string for _, mapping := range clearingAccountMappings { if mapping.ClearingAccount == allocation.ClearingAccount { @@ -132,12 +179,6 @@ func (k Keeper) distributeAllocatedTokens( } } - // Distribution Precision Handling: - // The allocation amount is split equally among all recipients using integer division. - // Any remainder from division is sent to the community pool to ensure: - // - Each recipient receives exactly: allocation.Amount / numRecipients (base amount) - // - Remainder (if any) goes to community pool for ecosystem benefit - // This guarantees fair distribution and no tokens are lost numRecipients := sdkmath.NewInt(int64(len(recipientAddrs))) if numRecipients.IsZero() { return errorsmod.Wrapf( @@ -149,16 +190,10 @@ func (k Keeper) distributeAllocatedTokens( amountPerRecipient := allocation.Amount.Quo(numRecipients) remainder := allocation.Amount.Mod(numRecipients) - // Transfer tokens to each recipient for _, recipientAddr := range recipientAddrs { - // Convert recipient address string to SDK account address - // Safe to use Must* because addresses are validated at genesis/update time recipient := sdk.MustAccAddressFromBech32(recipientAddr) - - // Each recipient gets equal base amount coinsToSend := sdk.NewCoins(sdk.NewCoin(bondDenom, amountPerRecipient)) - // Transfer tokens from clearing account to recipient if err := k.bankKeeper.SendCoinsFromModuleToAccount( ctx, allocation.ClearingAccount, @@ -175,7 +210,6 @@ func (k Keeper) distributeAllocatedTokens( } } - // Send any remainder to community pool if !remainder.IsZero() { clearingAccountAddr := k.accountKeeper.GetModuleAddress(allocation.ClearingAccount) remainderCoins := sdk.NewCoins(sdk.NewCoin(bondDenom, remainder)) @@ -187,30 +221,18 @@ func (k Keeper) distributeAllocatedTokens( err, ) } - - sdkCtx.Logger().Info("sent distribution remainder to community pool", - "clearing_account", allocation.ClearingAccount, - "remainder", remainder.String()) } - // Emit single allocation completed event with recipient list, per-recipient amount, and community pool amount if err := sdkCtx.EventManager().EmitTypedEvent(&types.EventAllocationDistributed{ ClearingAccount: allocation.ClearingAccount, RecipientAddresses: recipientAddrs, AmountPerRecipient: amountPerRecipient, CommunityPoolAmount: remainder, - ScheduledAt: timestamp, + ScheduledAt: scheduledDistribution.Timestamp, TotalAmount: allocation.Amount, }); err != nil { sdkCtx.Logger().Error("failed to emit allocation completed event", "error", err) } - - sdkCtx.Logger().Info("allocated tokens", - "clearing_account", allocation.ClearingAccount, - "recipients", recipientAddrs, - "total_amount", allocation.Amount.String(), - "amount_per_recipient", amountPerRecipient.String(), - "community_pool_amount", remainder.String()) } return nil @@ -266,6 +288,18 @@ func (k Keeper) UpdateDistributionSchedule( return errorsmod.Wrapf(types.ErrInvalidAuthority, "expected %s, got %s", k.authority, authority) } + // Reject if a multi-block distribution is in progress. + ongoing, ongoingFound, err := k.getOngoingDistribution(ctx) + if err != nil { + return err + } + if ongoingFound { + return errorsmod.Wrapf( + types.ErrOngoingDistribution, + "cannot update schedule while distribution %d is in progress", ongoing.ID, + ) + } + // Validate minimum gap between distributions params, err := k.GetParams(ctx) if err != nil { diff --git a/x/pse/keeper/distribution_test.go b/x/pse/keeper/distribution_test.go index fa386454..ee786de4 100644 --- a/x/pse/keeper/distribution_test.go +++ b/x/pse/keeper/distribution_test.go @@ -4,9 +4,13 @@ import ( "testing" "time" + "cosmossdk.io/collections" sdkmath "cosmossdk.io/math" "github.com/cosmos/cosmos-sdk/crypto/keys/ed25519" sdk "github.com/cosmos/cosmos-sdk/types" + minttypes "github.com/cosmos/cosmos-sdk/x/mint/types" + stakingkeeper "github.com/cosmos/cosmos-sdk/x/staking/keeper" + stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types" "github.com/stretchr/testify/require" "github.com/tokenize-x/tx-chain/v7/testutil/simapp" @@ -97,8 +101,14 @@ func TestDistribution_GenesisRebuild(t *testing.T) { // Process first distribution ctx = ctx.WithBlockTime(time.Unix(int64(time1)+10, 0)) ctx = ctx.WithBlockHeight(100) - err = pseKeeper.ProcessNextDistribution(ctx) - requireT.NoError(err) + for range 10 { + err = pseKeeper.ProcessNextDistribution(ctx) + requireT.NoError(err) + _, oErr := pseKeeper.OngoingDistribution.Get(ctx) + if oErr != nil { + break + } + } // Export genesis genesisState, err := pseKeeper.ExportGenesis(ctx) @@ -260,6 +270,468 @@ func TestDistribution_PrecisionWithMultipleRecipients(t *testing.T) { "community pool should have received the distribution remainders") } +// TestDistribution_MultiBlockEndBlockerRouting tests the full EndBlocker routing logic +// across multiple calls to ProcessNextDistribution, verifying phase transitions: +// +// Call 1 (idle -> start): non-community allocations distributed, OngoingDistribution set +// Call 2 (Phase 1): score conversion batch processed +// Call 3 (Phase 1 -> done): empty batch, TotalScore computed +// Call 4 (Phase 2): tokens distributed to delegators +// Call 5 (Phase 2 -> cleanup): empty batch, cleanup runs, OngoingDistribution removed +// Call 6 (idle): no ongoing, no due schedule, nothing happens +func TestDistribution_MultiBlockEndBlockerRouting(t *testing.T) { + requireT := require.New(t) + + startTime := time.Now().Round(time.Second) + testApp := simapp.New(simapp.WithStartTime(startTime)) + ctx, _, err := testApp.BeginNextBlockAtTime(startTime) + requireT.NoError(err) + + pseKeeper := testApp.PSEKeeper + bankKeeper := testApp.BankKeeper + + bondDenom, err := testApp.StakingKeeper.BondDenom(ctx) + requireT.NoError(err) + + // Create validator + valOp, _ := testApp.GenAccount(ctx) + requireT.NoError(testApp.FundAccount(ctx, valOp, sdk.NewCoins(sdk.NewCoin(bondDenom, sdkmath.NewInt(1000))))) + val, err := testApp.AddValidator(ctx, valOp, sdk.NewInt64Coin(bondDenom, 10), nil) + requireT.NoError(err) + valAddr := sdk.MustValAddressFromBech32(val.GetOperator()) + + // Create two delegators with delegations + del1, _ := testApp.GenAccount(ctx) + del2, _ := testApp.GenAccount(ctx) + requireT.NoError(testApp.FundAccount(ctx, del1, sdk.NewCoins(sdk.NewCoin(bondDenom, sdkmath.NewInt(10_000))))) + requireT.NoError(testApp.FundAccount(ctx, del2, sdk.NewCoins(sdk.NewCoin(bondDenom, sdkmath.NewInt(10_000))))) + + distributionID := uint64(1) + recipientAddr := sdk.AccAddress(ed25519.GenPrivKey().PubKey().Address()).String() + + // Save initial schedule for hooks to find the distribution ID + err = pseKeeper.SaveDistributionSchedule(ctx, []types.ScheduledDistribution{ + {ID: distributionID, Timestamp: distributionID}, + }) + requireT.NoError(err) + + // Delegate + for _, del := range []sdk.AccAddress{del1, del2} { + msg := &stakingtypes.MsgDelegate{ + DelegatorAddress: del.String(), + ValidatorAddress: valAddr.String(), + Amount: sdk.NewInt64Coin(bondDenom, 500), + } + _, err = stakingkeeper.NewMsgServerImpl(testApp.StakingKeeper).Delegate(ctx, msg) + requireT.NoError(err) + } + + // Advance time for score accumulation + ctx, _, err = testApp.BeginNextBlockAtTime(ctx.BlockTime().Add(10 * time.Second)) + requireT.NoError(err) + + // Set up clearing account mappings + params, err := pseKeeper.GetParams(ctx) + requireT.NoError(err) + params.ClearingAccountMappings = []types.ClearingAccountMapping{ + {ClearingAccount: types.ClearingAccountFoundation, RecipientAddresses: []string{recipientAddr}}, + {ClearingAccount: types.ClearingAccountAlliance, RecipientAddresses: []string{recipientAddr}}, + {ClearingAccount: types.ClearingAccountPartnership, RecipientAddresses: []string{recipientAddr}}, + {ClearingAccount: types.ClearingAccountInvestors, RecipientAddresses: []string{recipientAddr}}, + {ClearingAccount: types.ClearingAccountTeam, RecipientAddresses: []string{recipientAddr}}, + } + err = pseKeeper.SetParams(ctx, params) + requireT.NoError(err) + + // Fund all clearing accounts + communityAmount := sdkmath.NewInt(1000) + nonCommunityAmount := sdkmath.NewInt(100) + for _, clearingAccount := range types.GetAllClearingAccounts() { + amount := nonCommunityAmount + if clearingAccount == types.ClearingAccountCommunity { + amount = communityAmount + } + coins := sdk.NewCoins(sdk.NewCoin(bondDenom, amount)) + err = bankKeeper.MintCoins(ctx, minttypes.ModuleName, coins) + requireT.NoError(err) + err = bankKeeper.SendCoinsFromModuleToModule(ctx, minttypes.ModuleName, clearingAccount, coins) + requireT.NoError(err) + } + + // Update schedule with the actual distribution (due now) + distTimestamp := uint64(ctx.BlockTime().Unix()) + err = pseKeeper.AllocationSchedule.Remove(ctx, distributionID) + requireT.NoError(err) + err = pseKeeper.SaveDistributionSchedule(ctx, []types.ScheduledDistribution{ + { + ID: distributionID, + Timestamp: distTimestamp, + Allocations: []types.ClearingAccountAllocation{ + {ClearingAccount: types.ClearingAccountCommunity, Amount: communityAmount}, + {ClearingAccount: types.ClearingAccountFoundation, Amount: nonCommunityAmount}, + {ClearingAccount: types.ClearingAccountAlliance, Amount: nonCommunityAmount}, + {ClearingAccount: types.ClearingAccountPartnership, Amount: nonCommunityAmount}, + {ClearingAccount: types.ClearingAccountInvestors, Amount: nonCommunityAmount}, + {ClearingAccount: types.ClearingAccountTeam, Amount: nonCommunityAmount}, + }, + }, + }) + requireT.NoError(err) + + // --- Call 1: Start distribution --- + err = pseKeeper.ProcessNextDistribution(ctx) + requireT.NoError(err) + + // Verify: OngoingDistribution should be set + ongoing, err := pseKeeper.OngoingDistribution.Get(ctx) + requireT.NoError(err) + requireT.Equal(distributionID, ongoing.ID) + + // Verify: non-community recipient should have received tokens + recipientBalance := bankKeeper.GetBalance(ctx, sdk.MustAccAddressFromBech32(recipientAddr), bondDenom) + requireT.Equal(nonCommunityAmount.MulRaw(5).String(), recipientBalance.Amount.String(), + "recipient should have received all 5 non-community allocations") + + // Verify: TotalScore should NOT exist yet (Phase 1 hasn't run) + _, err = pseKeeper.TotalScore.Get(ctx, distributionID) + requireT.ErrorIs(err, collections.ErrNotFound) + + // --- Call 2: Phase 1 (process score entries) --- + err = pseKeeper.ProcessNextDistribution(ctx) + requireT.NoError(err) + + // TotalScore still not set (entries processed but empty-batch call needed to compute it) + _, err = pseKeeper.TotalScore.Get(ctx, distributionID) + requireT.ErrorIs(err, collections.ErrNotFound) + + // Verify entries migrated from distributionID to distributionID+1 + hasEntries := false + err = pseKeeper.DelegationTimeEntries.Walk(ctx, + collections.NewPrefixedTripleRange[uint64, sdk.AccAddress, sdk.ValAddress](distributionID+1), + func(key collections.Triple[uint64, sdk.AccAddress, sdk.ValAddress], value types.DelegationTimeEntry) (bool, error) { + hasEntries = true + return true, nil + }) + requireT.NoError(err) + requireT.True(hasEntries, "entries should be migrated to next distribution ID") + + // --- Call 3: Phase 1 done (empty batch -> compute TotalScore) --- + err = pseKeeper.ProcessNextDistribution(ctx) + requireT.NoError(err) + + // TotalScore should now exist + totalScore, err := pseKeeper.TotalScore.Get(ctx, distributionID) + requireT.NoError(err) + requireT.True(totalScore.IsPositive(), "TotalScore should be positive") + + // OngoingDistribution should still exist + _, err = pseKeeper.OngoingDistribution.Get(ctx) + requireT.NoError(err) + + // --- Call 4: Phase 2 (distribute tokens) --- + err = pseKeeper.ProcessNextDistribution(ctx) + requireT.NoError(err) + + // OngoingDistribution should still exist (cleanup hasn't run yet) + _, err = pseKeeper.OngoingDistribution.Get(ctx) + requireT.NoError(err) + + // --- Call 5: Phase 2 done (empty batch -> cleanup) --- + err = pseKeeper.ProcessNextDistribution(ctx) + requireT.NoError(err) + + // OngoingDistribution should be removed + _, err = pseKeeper.OngoingDistribution.Get(ctx) + requireT.ErrorIs(err, collections.ErrNotFound, "OngoingDistribution should be removed after cleanup") + + // Schedule entry should be removed + _, err = pseKeeper.AllocationSchedule.Get(ctx, distributionID) + requireT.ErrorIs(err, collections.ErrNotFound, "schedule entry should be removed after cleanup") + + // TotalScore should be cleaned up + _, err = pseKeeper.TotalScore.Get(ctx, distributionID) + requireT.ErrorIs(err, collections.ErrNotFound, "TotalScore should be removed after cleanup") + + // Delegators should have received community tokens (auto-delegated) + stakingQuerier := stakingkeeper.NewQuerier(testApp.StakingKeeper) + for _, del := range []sdk.AccAddress{del1, del2} { + resp, err := stakingQuerier.DelegatorDelegations(ctx, &stakingtypes.QueryDelegatorDelegationsRequest{ + DelegatorAddr: del.String(), + }) + requireT.NoError(err) + totalDelegated := sdkmath.NewInt(0) + for _, d := range resp.DelegationResponses { + totalDelegated = totalDelegated.Add(d.Balance.Amount) + } + requireT.True(totalDelegated.GT(sdkmath.NewInt(500)), + "delegator should have more than initial 500 after community distribution") + } + + // --- Call 6: Idle (nothing to do) --- + err = pseKeeper.ProcessNextDistribution(ctx) + requireT.NoError(err) + + // Still no ongoing + _, err = pseKeeper.OngoingDistribution.Get(ctx) + requireT.ErrorIs(err, collections.ErrNotFound) +} + +// TestDistribution_NonCommunityOnlySingleBlock tests that a distribution with +// no community allocation completes in a single call to ProcessNextDistribution. +func TestDistribution_NonCommunityOnlySingleBlock(t *testing.T) { + requireT := require.New(t) + + testApp := simapp.New() + ctx, _, err := testApp.BeginNextBlock() + requireT.NoError(err) + + pseKeeper := testApp.PSEKeeper + bankKeeper := testApp.BankKeeper + + bondDenom, err := testApp.StakingKeeper.BondDenom(ctx) + requireT.NoError(err) + + recipientAddr := sdk.AccAddress(ed25519.GenPrivKey().PubKey().Address()).String() + + // Set up mappings + params, err := pseKeeper.GetParams(ctx) + requireT.NoError(err) + params.ClearingAccountMappings = []types.ClearingAccountMapping{ + {ClearingAccount: types.ClearingAccountFoundation, RecipientAddresses: []string{recipientAddr}}, + {ClearingAccount: types.ClearingAccountAlliance, RecipientAddresses: []string{recipientAddr}}, + {ClearingAccount: types.ClearingAccountPartnership, RecipientAddresses: []string{recipientAddr}}, + {ClearingAccount: types.ClearingAccountInvestors, RecipientAddresses: []string{recipientAddr}}, + {ClearingAccount: types.ClearingAccountTeam, RecipientAddresses: []string{recipientAddr}}, + } + err = pseKeeper.SetParams(ctx, params) + requireT.NoError(err) + + // Fund non-community clearing accounts only + amount := sdkmath.NewInt(100) + for _, clearingAccount := range types.GetNonCommunityClearingAccounts() { + coins := sdk.NewCoins(sdk.NewCoin(bondDenom, amount)) + err = bankKeeper.MintCoins(ctx, minttypes.ModuleName, coins) + requireT.NoError(err) + err = bankKeeper.SendCoinsFromModuleToModule(ctx, minttypes.ModuleName, clearingAccount, coins) + requireT.NoError(err) + } + + // Schedule with zero community allocation + distTime := uint64(ctx.BlockTime().Unix()) - 1 + err = pseKeeper.SaveDistributionSchedule(ctx, []types.ScheduledDistribution{ + { + ID: 1, + Timestamp: distTime, + Allocations: []types.ClearingAccountAllocation{ + {ClearingAccount: types.ClearingAccountCommunity, Amount: sdkmath.NewInt(0)}, + {ClearingAccount: types.ClearingAccountFoundation, Amount: amount}, + {ClearingAccount: types.ClearingAccountAlliance, Amount: amount}, + {ClearingAccount: types.ClearingAccountPartnership, Amount: amount}, + {ClearingAccount: types.ClearingAccountInvestors, Amount: amount}, + {ClearingAccount: types.ClearingAccountTeam, Amount: amount}, + }, + }, + }) + requireT.NoError(err) + + // Single call should complete everything + err = pseKeeper.ProcessNextDistribution(ctx) + requireT.NoError(err) + + // No OngoingDistribution should be set (no community allocation) + _, err = pseKeeper.OngoingDistribution.Get(ctx) + requireT.ErrorIs(err, collections.ErrNotFound, "no OngoingDistribution for non-community-only distribution") + + // Schedule entry should be removed + _, err = pseKeeper.AllocationSchedule.Get(ctx, 1) + requireT.ErrorIs(err, collections.ErrNotFound, "schedule should be removed after single-block distribution") + + // Recipient should have received all non-community tokens + recipientBalance := bankKeeper.GetBalance(ctx, sdk.MustAccAddressFromBech32(recipientAddr), bondDenom) + requireT.Equal(amount.MulRaw(5).String(), recipientBalance.Amount.String()) +} + +// TestDistribution_EndBlockerWithScenarios mirrors TestKeeper_Distribute scenarios but routes +// through ProcessNextDistribution (the actual EndBlocker entry point) instead of calling +// Phase1/Phase2 directly. This validates the full EndBlocker routing with real delegation flows. +func TestDistribution_EndBlockerWithScenarios(t *testing.T) { + cases := []struct { + name string + actions []func(*runEnv) + }{ + { + name: "unaccumulated score via EndBlocker", + actions: []func(*runEnv){ + func(r *runEnv) { delegateAction(r, r.delegators[0], r.validators[0], 1_100_000) }, + func(r *runEnv) { delegateAction(r, r.delegators[1], r.validators[0], 900_000) }, + func(r *runEnv) { waitAction(r, time.Second*8) }, + func(r *runEnv) { endBlockerDistributeAction(r, sdkmath.NewInt(1000)) }, + func(r *runEnv) { + assertDistributionAction(r, map[*sdk.AccAddress]sdkmath.Int{ + &r.delegators[0]: sdkmath.NewInt(1_100_366), + &r.delegators[1]: sdkmath.NewInt(900_299), + }) + }, + func(r *runEnv) { assertScoreResetAction(r) }, + }, + }, + { + name: "accumulated + unaccumulated score via EndBlocker", + actions: []func(*runEnv){ + func(r *runEnv) { delegateAction(r, r.delegators[0], r.validators[0], 1_100_000) }, + func(r *runEnv) { delegateAction(r, r.delegators[1], r.validators[0], 900_000) }, + func(r *runEnv) { waitAction(r, time.Second*8) }, + func(r *runEnv) { delegateAction(r, r.delegators[0], r.validators[0], 900_000) }, + func(r *runEnv) { delegateAction(r, r.delegators[1], r.validators[0], 1_100_000) }, + func(r *runEnv) { waitAction(r, time.Second*8) }, + func(r *runEnv) { endBlockerDistributeAction(r, sdkmath.NewInt(1000)) }, + func(r *runEnv) { + assertDistributionAction(r, map[*sdk.AccAddress]sdkmath.Int{ + &r.delegators[0]: sdkmath.NewInt(2_000_387), + &r.delegators[1]: sdkmath.NewInt(2_000_362), + }) + }, + func(r *runEnv) { assertScoreResetAction(r) }, + }, + }, + { + name: "unbonding delegation via EndBlocker", + actions: []func(*runEnv){ + func(r *runEnv) { delegateAction(r, r.delegators[0], r.validators[0], 1_100_000) }, + func(r *runEnv) { delegateAction(r, r.delegators[1], r.validators[0], 900_000) }, + func(r *runEnv) { waitAction(r, time.Second*8) }, + func(r *runEnv) { undelegateAction(r, r.delegators[0], r.validators[0], 900_000) }, + func(r *runEnv) { undelegateAction(r, r.delegators[1], r.validators[0], 700_000) }, + func(r *runEnv) { waitAction(r, time.Second*8) }, + func(r *runEnv) { endBlockerDistributeAction(r, sdkmath.NewInt(1000)) }, + func(r *runEnv) { + assertDistributionAction(r, map[*sdk.AccAddress]sdkmath.Int{ + &r.delegators[0]: sdkmath.NewInt(200_295), + &r.delegators[1]: sdkmath.NewInt(200_249), + }) + }, + func(r *runEnv) { assertCommunityPoolBalanceAction(r, sdkmath.NewInt(2)) }, + func(r *runEnv) { assertScoreResetAction(r) }, + }, + }, + { + name: "redelegation via EndBlocker", + actions: []func(*runEnv){ + func(r *runEnv) { delegateAction(r, r.delegators[0], r.validators[0], 1_100_000) }, + func(r *runEnv) { delegateAction(r, r.delegators[1], r.validators[0], 900_000) }, + func(r *runEnv) { waitAction(r, time.Second*8) }, + func(r *runEnv) { redelegateAction(r, r.delegators[0], r.validators[0], r.validators[2], 900_000) }, + func(r *runEnv) { redelegateAction(r, r.delegators[1], r.validators[0], r.validators[2], 700_000) }, + func(r *runEnv) { waitAction(r, time.Second*8) }, + func(r *runEnv) { endBlockerDistributeAction(r, sdkmath.NewInt(1000)) }, + func(r *runEnv) { + assertDistributionAction(r, map[*sdk.AccAddress]sdkmath.Int{ + &r.delegators[0]: sdkmath.NewInt(1_100_365), + &r.delegators[1]: sdkmath.NewInt(900_298), + }) + }, + func(r *runEnv) { assertCommunityPoolBalanceAction(r, sdkmath.NewInt(2)) }, + func(r *runEnv) { assertScoreResetAction(r) }, + }, + }, + { + name: "zero score via EndBlocker", + actions: []func(*runEnv){ + func(r *runEnv) { endBlockerDistributeAction(r, sdkmath.NewInt(1000)) }, + func(r *runEnv) { + assertDistributionAction(r, map[*sdk.AccAddress]sdkmath.Int{ + &r.delegators[0]: sdkmath.NewInt(0), + &r.delegators[1]: sdkmath.NewInt(0), + }) + }, + func(r *runEnv) { assertCommunityPoolBalanceAction(r, sdkmath.NewInt(1000)) }, + func(r *runEnv) { assertScoreResetAction(r) }, + }, + }, + { + name: "multiple distributions via EndBlocker", + actions: []func(*runEnv){ + func(r *runEnv) { delegateAction(r, r.delegators[0], r.validators[0], 1_100_000) }, + func(r *runEnv) { delegateAction(r, r.delegators[1], r.validators[0], 900_000) }, + func(r *runEnv) { waitAction(r, time.Second*8) }, + func(r *runEnv) { endBlockerDistributeAction(r, sdkmath.NewInt(1000)) }, + func(r *runEnv) { + assertDistributionAction(r, map[*sdk.AccAddress]sdkmath.Int{ + &r.delegators[0]: sdkmath.NewInt(1_100_366), + &r.delegators[1]: sdkmath.NewInt(900_299), + }) + }, + func(r *runEnv) { assertCommunityPoolBalanceAction(r, sdkmath.NewInt(2)) }, + func(r *runEnv) { assertScoreResetAction(r) }, + func(r *runEnv) { waitAction(r, time.Second*8) }, + func(r *runEnv) { endBlockerDistributeAction(r, sdkmath.NewInt(1000)) }, + func(r *runEnv) { + assertDistributionAction(r, map[*sdk.AccAddress]sdkmath.Int{ + &r.delegators[0]: sdkmath.NewInt(1_100_732), + &r.delegators[1]: sdkmath.NewInt(900_598), + }) + }, + func(r *runEnv) { assertCommunityPoolBalanceAction(r, sdkmath.NewInt(4)) }, + func(r *runEnv) { assertScoreResetAction(r) }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + requireT := require.New(t) + startTime := time.Now().Round(time.Second) + testApp := simapp.New(simapp.WithStartTime(startTime)) + ctx, _, err := testApp.BeginNextBlockAtTime(startTime) + requireT.NoError(err) + runContext := &runEnv{ + testApp: testApp, + ctx: ctx, + requireT: requireT, + currentDistID: tempDistributionID, + } + + // add validators. + for range 3 { + validatorOperator, _ := testApp.GenAccount(ctx) + requireT.NoError(testApp.FundAccount( + ctx, validatorOperator, sdk.NewCoins(sdk.NewCoin(sdk.DefaultBondDenom, sdkmath.NewInt(1000)))), + ) + validator, err := testApp.AddValidator( + ctx, validatorOperator, sdk.NewInt64Coin(sdk.DefaultBondDenom, 10), nil, + ) + requireT.NoError(err) + runContext.validators = append( + runContext.validators, + sdk.MustValAddressFromBech32(validator.GetOperator()), + ) + } + + // add delegators. + for range 3 { + delegator, _ := testApp.GenAccount(ctx) + requireT.NoError(testApp.FundAccount( + ctx, delegator, sdk.NewCoins(sdk.NewCoin(sdk.DefaultBondDenom, sdkmath.NewInt(1000))), + )) + runContext.delegators = append(runContext.delegators, delegator) + } + + err = testApp.PSEKeeper.SaveDistributionSchedule(ctx, []types.ScheduledDistribution{ + { + Timestamp: tempDistributionID, + ID: tempDistributionID, + }, + }) + requireT.NoError(err) + + // run actions. + for _, action := range tc.actions { + action(runContext) + } + }) + } +} + func TestDistribution_EndBlockFailure(t *testing.T) { requireT := require.New(t) diff --git a/x/pse/keeper/genesis.go b/x/pse/keeper/genesis.go index 9147ca6b..70b1b9fe 100644 --- a/x/pse/keeper/genesis.go +++ b/x/pse/keeper/genesis.go @@ -28,30 +28,25 @@ func (k Keeper) InitGenesis(ctx context.Context, genState types.GenesisState) er } } - // Populate delegation time entries from genesis state - for _, delegationTimeEntryExported := range genState.DelegationTimeEntries { - valAddr, err := k.valAddressCodec.StringToBytes(delegationTimeEntryExported.ValidatorAddress) + // Populate delegation time entries from genesis state. + for _, entry := range genState.DelegationTimeEntries { + valAddr, err := k.valAddressCodec.StringToBytes(entry.ValidatorAddress) if err != nil { return err } - delAddr, err := k.addressCodec.StringToBytes(delegationTimeEntryExported.DelegatorAddress) + delAddr, err := k.addressCodec.StringToBytes(entry.DelegatorAddress) if err != nil { return err } - if err = k.SetDelegationTimeEntry( - ctx, - delegationTimeEntryExported.DistributionID, - valAddr, - delAddr, - types.DelegationTimeEntry{ - Shares: delegationTimeEntryExported.Shares, - LastChangedUnixSec: delegationTimeEntryExported.LastChangedUnixSec, - }); err != nil { + if err = k.SetDelegationTimeEntry(ctx, entry.DistributionID, valAddr, delAddr, types.DelegationTimeEntry{ + Shares: entry.Shares, + LastChangedUnixSec: entry.LastChangedUnixSec, + }); err != nil { return err } } - // Populate account scores from genesis state + // Populate account scores from genesis state. for _, accountScore := range genState.AccountScores { addr, err := k.addressCodec.StringToBytes(accountScore.Address) if err != nil { diff --git a/x/pse/keeper/hooks.go b/x/pse/keeper/hooks.go index 5c00b8ec..e5d5a124 100644 --- a/x/pse/keeper/hooks.go +++ b/x/pse/keeper/hooks.go @@ -24,35 +24,54 @@ func (k Keeper) Hooks() Hooks { return Hooks{k} } +// getOngoingDistribution returns the ongoing distribution if one exists. +func (k Keeper) getOngoingDistribution(ctx context.Context) (types.ScheduledDistribution, bool, error) { + ongoing, err := k.OngoingDistribution.Get(ctx) + if errors.Is(err, collections.ErrNotFound) { + return types.ScheduledDistribution{}, false, nil + } + if err != nil { + return types.ScheduledDistribution{}, false, err + } + return ongoing, true, nil +} + +// getNextDistributionID returns the next distribution ID that new entries should be written to. +// If an ongoing distribution exists (ongoingID=N is being processed), returns N+1. +// Otherwise returns the next scheduled distribution's ID (zero-value ID when no schedule exists). +// TODO: handle empty distribution schedule — currently returns 0 when no schedule exists. +func (k Keeper) getNextDistributionID(ctx context.Context) (uint64, error) { + ongoing, found, err := k.getOngoingDistribution(ctx) + if err != nil { + return 0, err + } + if found { + return ongoing.ID + 1, nil + } + + distribution, _, err := k.PeekNextAllocationSchedule(ctx) + if err != nil { + return 0, err + } + return distribution.ID, nil +} + // AfterDelegationModified implements the staking hooks interface. +// Handles 3 scenarios based on where the delegator's entry exists: +// - Scenario 1: Entry in ongoingID (ongoing distribution in progress). +// - Scenario 2: Entry in nextID — normal score calculation. +// - Scenario 3: No entry — create new entry, no score. func (h Hooks) AfterDelegationModified(ctx context.Context, delAddr sdk.AccAddress, valAddr sdk.ValAddress) error { delegation, err := h.k.stakingKeeper.GetDelegation(ctx, delAddr, valAddr) if err != nil { return err } - // TODO handle empty distribution schedule in all places. - distribution, _, err := h.k.PeekNextAllocationSchedule(ctx) + nextID, err := h.k.getNextDistributionID(ctx) if err != nil { return err } - // TODO update to handle distribution ID properly. - // We should update the logic to find the active distribution (probably via a new store called OngoingDistribution) - // and check for period splits - distributionID := distribution.ID - - blockTimeUnixSeconds := sdk.UnwrapSDKContext(ctx).BlockTime().Unix() - delegationTimeEntry, err := h.k.GetDelegationTimeEntry(ctx, distributionID, valAddr, delAddr) - if errors.Is(err, collections.ErrNotFound) { - delegationTimeEntry = types.DelegationTimeEntry{ - LastChangedUnixSec: blockTimeUnixSeconds, - Shares: delegation.Shares, - } - } else if err != nil { - return err - } - // Stop score addition for excluded addresses isExcluded, err := h.k.IsExcludedAddress(ctx, delAddr) if err != nil { return err @@ -61,48 +80,60 @@ func (h Hooks) AfterDelegationModified(ctx context.Context, delAddr sdk.AccAddre return nil } - // Only update AccountScoreSnapshot for non-excluded addresses - lastScore, err := h.k.GetDelegatorScore(ctx, distributionID, delAddr) - if errors.Is(err, collections.ErrNotFound) { - lastScore = sdkmath.NewInt(0) - } else if err != nil { - return err - } + blockTime := sdk.UnwrapSDKContext(ctx).BlockTime().Unix() - addedScore, err := calculateAddedScore(ctx, h.k, valAddr, delegationTimeEntry) + // Scenario 1: Entry exists in previous distribution (ongoing distribution in progress). + // Split score at distribution timestamp, move entry to nextID. + ongoing, ongoingFound, err := h.k.getOngoingDistribution(ctx) if err != nil { return err } - newScore := lastScore.Add(addedScore) + if ongoingFound { + handled, err := h.migrateOngoingEntry(ctx, ongoing, nextID, delAddr, valAddr, blockTime) + if err != nil { + return err + } + if handled { + return h.k.SetDelegationTimeEntry(ctx, nextID, valAddr, delAddr, types.DelegationTimeEntry{ + LastChangedUnixSec: blockTime, + Shares: delegation.Shares, + }) + } + } - // Update DelegationTimeEntry for non-excluded addresses - if err := h.k.SetDelegationTimeEntry(ctx, distributionID, valAddr, delAddr, types.DelegationTimeEntry{ - LastChangedUnixSec: blockTimeUnixSeconds, - Shares: delegation.Shares, - }); err != nil { + // Scenario 2: Entry exists in next distribution. + currentEntry, err := h.k.GetDelegationTimeEntry(ctx, nextID, valAddr, delAddr) + if err == nil { + score, err := calculateAddedScore(ctx, h.k, valAddr, currentEntry) + if err != nil { + return err + } + if err := h.k.addToScore(ctx, nextID, delAddr, score); err != nil { + return err + } + return h.k.SetDelegationTimeEntry(ctx, nextID, valAddr, delAddr, types.DelegationTimeEntry{ + LastChangedUnixSec: blockTime, + Shares: delegation.Shares, + }) + } + if !errors.Is(err, collections.ErrNotFound) { return err } - return h.k.SetDelegatorScore(ctx, distributionID, delAddr, newScore) + // Scenario 3: No entry — create new under nextID (no score, duration = 0). + return h.k.SetDelegationTimeEntry(ctx, nextID, valAddr, delAddr, types.DelegationTimeEntry{ + LastChangedUnixSec: blockTime, + Shares: delegation.Shares, + }) } // BeforeDelegationRemoved implements the staking hooks interface. func (h Hooks) BeforeDelegationRemoved(ctx context.Context, delAddr sdk.AccAddress, valAddr sdk.ValAddress) error { - distribution, _, err := h.k.PeekNextAllocationSchedule(ctx) + nextID, err := h.k.getNextDistributionID(ctx) if err != nil { return err } - distributionID := distribution.ID // TODO update to handle distribution ID properly. - delegationTimeEntry, err := h.k.GetDelegationTimeEntry(ctx, distributionID, valAddr, delAddr) - if err != nil { - if errors.Is(err, collections.ErrNotFound) { - return nil - } - return err - } - - // Stop score addition for excluded addresses isExcluded, err := h.k.IsExcludedAddress(ctx, delAddr) if err != nil { return err @@ -111,44 +142,69 @@ func (h Hooks) BeforeDelegationRemoved(ctx context.Context, delAddr sdk.AccAddre return nil } - // Only update AccountScoreSnapshot for non-excluded addresses - lastScore, err := h.k.GetDelegatorScore(ctx, distributionID, delAddr) - if errors.Is(err, collections.ErrNotFound) { - lastScore = sdkmath.NewInt(0) - } else if err != nil { - return err - } + blockTime := sdk.UnwrapSDKContext(ctx).BlockTime().Unix() - addedScore, err := calculateAddedScore(ctx, h.k, valAddr, delegationTimeEntry) + // Scenario 1: Entry exists in previous distribution (ongoing). + ongoing, ongoingFound, err := h.k.getOngoingDistribution(ctx) if err != nil { return err } - newScore := lastScore.Add(addedScore) + if ongoingFound { + if _, err := h.migrateOngoingEntry(ctx, ongoing, nextID, delAddr, valAddr, blockTime); err != nil { + return err + } + } - // Remove DelegationTimeEntry for non-excluded addresses - if err := h.k.RemoveDelegationTimeEntry(ctx, distributionID, valAddr, delAddr); err != nil { + // Scenario 2: Entry exists in next distribution. + currentEntry, err := h.k.GetDelegationTimeEntry(ctx, nextID, valAddr, delAddr) + if err == nil { + score, err := calculateAddedScore(ctx, h.k, valAddr, currentEntry) + if err != nil { + return err + } + if err := h.k.addToScore(ctx, nextID, delAddr, score); err != nil { + return err + } + return h.k.RemoveDelegationTimeEntry(ctx, nextID, valAddr, delAddr) + } + if !errors.Is(err, collections.ErrNotFound) { return err } - return h.k.SetDelegatorScore(ctx, distributionID, delAddr, newScore) + // Scenario 3: No entry. + return nil } -func calculateAddedScore( +// calculateScoreAtTimestamp calculates the score for a delegation entry up to a specific timestamp. +// score = tokens × (atTimestamp - lastChanged). +func calculateScoreAtTimestamp( ctx context.Context, keeper Keeper, valAddr sdk.ValAddress, - delegationTimeEntry types.DelegationTimeEntry, + entry types.DelegationTimeEntry, + atTimestamp int64, ) (sdkmath.Int, error) { val, err := keeper.stakingKeeper.GetValidator(ctx, valAddr) if err != nil { return sdkmath.NewInt(0), err } + duration := atTimestamp - entry.LastChangedUnixSec + if duration <= 0 { + return sdkmath.NewInt(0), nil + } + tokens := val.TokensFromShares(entry.Shares).TruncateInt() + return tokens.MulRaw(duration), nil +} - blockTimeUnixSeconds := sdk.UnwrapSDKContext(ctx).BlockTime().Unix() - delegationDuration := blockTimeUnixSeconds - delegationTimeEntry.LastChangedUnixSec - previousDelegatedTokens := val.TokensFromShares(delegationTimeEntry.Shares).TruncateInt() - delegationScore := previousDelegatedTokens.MulRaw(delegationDuration) - return delegationScore, nil +// calculateAddedScore calculates the score for a delegation entry up to the current block time. +func calculateAddedScore( + ctx context.Context, + keeper Keeper, + valAddr sdk.ValAddress, + delegationTimeEntry types.DelegationTimeEntry, +) (sdkmath.Int, error) { + blockTime := sdk.UnwrapSDKContext(ctx).BlockTime().Unix() + return calculateScoreAtTimestamp(ctx, keeper, valAddr, delegationTimeEntry, blockTime) } // BeforeValidatorSlashed implements the staking hooks interface. @@ -199,3 +255,55 @@ func (h Hooks) AfterValidatorBeginUnbonding(_ context.Context, _ sdk.ConsAddress func (h Hooks) AfterUnbondingInitiated(_ context.Context, _ uint64) error { return nil } + +// migrateOngoingEntry handles a delegation entry that still lives under the ongoing distribution. +// It calculates score for both the ongoing and next periods, removes the entry from ongoingID, +// and returns true if the entry was found and processed. +func (h Hooks) migrateOngoingEntry( + ctx context.Context, + ongoing types.ScheduledDistribution, + nextID uint64, + delAddr sdk.AccAddress, + valAddr sdk.ValAddress, + blockTime int64, +) (bool, error) { + ongoingID := ongoing.ID + ongoingEntry, err := h.k.GetDelegationTimeEntry(ctx, ongoingID, valAddr, delAddr) + if errors.Is(err, collections.ErrNotFound) { + return false, nil + } + if err != nil { + return false, err + } + + distTimestamp := int64(ongoing.Timestamp) + + // Score for ongoing period: lastChanged -> distribution timestamp. + ongoingScore, err := calculateScoreAtTimestamp(ctx, h.k, valAddr, ongoingEntry, distTimestamp) + if err != nil { + return false, err + } + if err := h.k.addToScore(ctx, ongoingID, delAddr, ongoingScore); err != nil { + return false, err + } + + // Score for next period: distribution timestamp -> now. + nextPeriodEntry := types.DelegationTimeEntry{ + LastChangedUnixSec: distTimestamp, + Shares: ongoingEntry.Shares, + } + nextScore, err := calculateScoreAtTimestamp(ctx, h.k, valAddr, nextPeriodEntry, blockTime) + if err != nil { + return false, err + } + if err := h.k.addToScore(ctx, nextID, delAddr, nextScore); err != nil { + return false, err + } + + // Remove the old entry from ongoingID to prevent double scoring in Phase 1 batch processing. + if err := h.k.RemoveDelegationTimeEntry(ctx, ongoingID, valAddr, delAddr); err != nil { + return false, err + } + + return true, nil +} diff --git a/x/pse/keeper/hooks_test.go b/x/pse/keeper/hooks_test.go index b5171b02..ed8a4875 100644 --- a/x/pse/keeper/hooks_test.go +++ b/x/pse/keeper/hooks_test.go @@ -1,6 +1,7 @@ package keeper_test import ( + "errors" "testing" "time" @@ -199,9 +200,10 @@ func TestKeeper_Hooks(t *testing.T) { testApp := simapp.New() ctx := testApp.NewContext(false) runContext := &runEnv{ - testApp: testApp, - ctx: ctx, - requireT: requireT, + testApp: testApp, + ctx: ctx, + requireT: requireT, + currentDistID: tempDistributionID, } err := testApp.PSEKeeper.SaveDistributionSchedule(ctx, []types.ScheduledDistribution{ @@ -247,16 +249,17 @@ func TestKeeper_Hooks(t *testing.T) { } type runEnv struct { - testApp *simapp.App - ctx sdk.Context - delegators []sdk.AccAddress - validators []sdk.ValAddress - requireT *require.Assertions + testApp *simapp.App + ctx sdk.Context + delegators []sdk.AccAddress + validators []sdk.ValAddress + requireT *require.Assertions + currentDistID uint64 } func assertScoreAction(r *runEnv, delAddr sdk.AccAddress, expectedScore sdkmath.Int) { score, err := r.testApp.PSEKeeper.GetDelegatorScore( - r.ctx, tempDistributionID, delAddr, + r.ctx, r.currentDistID, delAddr, ) r.requireT.NoError(err) r.requireT.Equal(expectedScore, score) @@ -285,7 +288,9 @@ func assertCommunityPoolBalanceAction(r *runEnv, expectedBalance sdkmath.Int) { } func assertScoreResetAction(r *runEnv) { - scoreRanger := collections.NewPrefixedPairRange[uint64, sdk.AccAddress](tempDistributionID) + // After cleanup, score snapshots at the ongoing distribution ID should be cleared. + ongoingID := r.currentDistID - 1 + scoreRanger := collections.NewPrefixedPairRange[uint64, sdk.AccAddress](ongoingID) err := r.testApp.PSEKeeper.AccountScoreSnapshot.Walk(r.ctx, scoreRanger, func(key collections.Pair[uint64, sdk.AccAddress], value sdkmath.Int) (bool, error) { r.requireT.Equal(sdkmath.NewInt(0), value) @@ -293,8 +298,9 @@ func assertScoreResetAction(r *runEnv) { }) r.requireT.NoError(err) + // Entries should exist at the current distribution ID (migrated during Phase 1). blockTimeUnixSeconds := r.ctx.BlockTime().Unix() - entriesRanger := collections.NewPrefixedTripleRange[uint64, sdk.AccAddress, sdk.ValAddress](tempDistributionID) + entriesRanger := collections.NewPrefixedTripleRange[uint64, sdk.AccAddress, sdk.ValAddress](r.currentDistID) err = r.testApp.PSEKeeper.DelegationTimeEntries.Walk(r.ctx, entriesRanger, func( key collections.Triple[uint64, sdk.AccAddress, sdk.ValAddress], value types.DelegationTimeEntry, @@ -372,10 +378,75 @@ func distributeAction(r *runEnv, amount sdkmath.Int) { r.requireT.NoError(err) scheduledDistribution := types.ScheduledDistribution{ Timestamp: uint64(r.ctx.BlockTime().Unix()), - ID: tempDistributionID, + ID: r.currentDistID, + Allocations: []types.ClearingAccountAllocation{{ + ClearingAccount: types.ClearingAccountCommunity, + Amount: amount, + }}, } - err = r.testApp.PSEKeeper.DistributeCommunityPSE(r.ctx, bondDenom, amount, scheduledDistribution) + + // Set OngoingDistribution to simulate EndBlocker starting multi-block processing. + err = r.testApp.PSEKeeper.OngoingDistribution.Set(r.ctx, scheduledDistribution) + r.requireT.NoError(err) + + // Run Phase 1 until done. + for { + done, err := r.testApp.PSEKeeper.ProcessPhase1ScoreConversion(r.ctx, scheduledDistribution) + r.requireT.NoError(err) + if done { + break + } + } + + // Run Phase 2 until done. + for { + done, err := r.testApp.PSEKeeper.ProcessPhase2TokenDistribution(r.ctx, scheduledDistribution, bondDenom) + r.requireT.NoError(err) + if done { + break + } + } + + // Advance to next distribution ID (Phase 1 migrated entries to currentDistID+1). + r.currentDistID++ +} + +// endBlockerDistributeAction runs distribution through ProcessNextDistribution (the actual EndBlocker entry point) +// instead of directly calling Phase1/Phase2. This validates the full routing logic. +func endBlockerDistributeAction(r *runEnv, amount sdkmath.Int) { + mintAndSendToPSECommunityClearingAccount(r, amount) + + // Update the AllocationSchedule so ProcessNextDistribution picks it up as due. + scheduledDistribution := types.ScheduledDistribution{ + Timestamp: uint64(r.ctx.BlockTime().Unix()), + ID: r.currentDistID, + Allocations: []types.ClearingAccountAllocation{{ + ClearingAccount: types.ClearingAccountCommunity, + Amount: amount, + }}, + } + err := r.testApp.PSEKeeper.AllocationSchedule.Set(r.ctx, scheduledDistribution.ID, scheduledDistribution) r.requireT.NoError(err) + + // Call ProcessNextDistribution repeatedly until distribution completes. + for i := range 20 { + err = r.testApp.PSEKeeper.ProcessNextDistribution(r.ctx) + r.requireT.NoError(err, "ProcessNextDistribution failed at iteration %d", i) + + _, ongoingErr := r.testApp.PSEKeeper.OngoingDistribution.Get(r.ctx) + if errors.Is(ongoingErr, collections.ErrNotFound) { + // Cleanup done — distribution complete. + break + } + r.requireT.NoError(ongoingErr) + } + + // Verify cleanup completed. + _, err = r.testApp.PSEKeeper.OngoingDistribution.Get(r.ctx) + r.requireT.ErrorIs(err, collections.ErrNotFound, "OngoingDistribution should be removed after distribution") + + // Advance to next distribution ID (Phase 1 migrated entries to currentDistID+1). + r.currentDistID++ } func mintAndSendCoin(r *runEnv, recipient sdk.AccAddress, coins sdk.Coins) { diff --git a/x/pse/keeper/keeper.go b/x/pse/keeper/keeper.go index a66c1d20..cc8be6bc 100644 --- a/x/pse/keeper/keeper.go +++ b/x/pse/keeper/keeper.go @@ -37,7 +37,10 @@ type Keeper struct { types.DelegationTimeEntry, ] AccountScoreSnapshot collections.Map[collections.Pair[uint64, sdk.AccAddress], sdkmath.Int] - AllocationSchedule collections.Map[uint64, types.ScheduledDistribution] // Map: id -> ScheduledDistribution + AllocationSchedule collections.Map[uint64, types.ScheduledDistribution] // Map: ID -> ScheduledDistribution + TotalScore collections.Map[uint64, sdkmath.Int] // Map: ID -> total accumulated score + OngoingDistribution collections.Item[types.ScheduledDistribution] // Currently processing distribution + DistributedAmount collections.Map[uint64, sdkmath.Int] // Map: ID -> cumulative distributed amount DistributionDisabled collections.Item[bool] } @@ -92,6 +95,26 @@ func NewKeeper( collections.Uint64Key, codec.CollValue[types.ScheduledDistribution](cdc), ), + TotalScore: collections.NewMap( + sb, + types.TotalScoreKey, + "total_score", + collections.Uint64Key, + sdk.IntValue, + ), + OngoingDistribution: collections.NewItem( + sb, + types.OngoingDistributionKey, + "ongoing_distribution", + codec.CollValue[types.ScheduledDistribution](cdc), + ), + DistributedAmount: collections.NewMap( + sb, + types.DistributedAmountKey, + "distributed_amount", + collections.Uint64Key, + sdk.IntValue, + ), DistributionDisabled: collections.NewItem( sb, types.DistributionDisabledKey, diff --git a/x/pse/keeper/migrations.go b/x/pse/keeper/migrations.go index 47974b8e..fde99103 100644 --- a/x/pse/keeper/migrations.go +++ b/x/pse/keeper/migrations.go @@ -1,10 +1,25 @@ package keeper -// Migrator is a struct for handling in-place store migrations. +import ( + sdk "github.com/cosmos/cosmos-sdk/types" + + v2 "github.com/tokenize-x/tx-chain/v7/x/pse/migrations/v2" +) + +// Migrator handles in-place store migrations for the PSE module. type Migrator struct { + keeper Keeper } // NewMigrator returns a new Migrator. -func NewMigrator() Migrator { - return Migrator{} +func NewMigrator(keeper Keeper) Migrator { + return Migrator{keeper: keeper} +} + +// Migrate1to2 migrates the store from v1 to v2. +// Key changes: +// - DelegationTimeEntries: Pair[AccAddress, ValAddress] -> Triple[uint64, AccAddress, ValAddress]. +// - AccountScoreSnapshot: AccAddress -> Pair[uint64, AccAddress]. +func (m Migrator) Migrate1to2(ctx sdk.Context) error { + return v2.MigrateStore(ctx, m.keeper.storeService, m.keeper.cdc) } diff --git a/x/pse/keeper/params.go b/x/pse/keeper/params.go index 52c1fe2a..86bc4161 100644 --- a/x/pse/keeper/params.go +++ b/x/pse/keeper/params.go @@ -62,7 +62,7 @@ func (k Keeper) UpdateExcludedAddresses( if err != nil { return err } - distributionID := distribution.ID // TODO update to handle distribution ID properly. + distributionID := distribution.ID for _, addrStr := range addressesToRemove { addr, err := k.addressCodec.StringToBytes(addrStr) if err != nil { diff --git a/x/pse/keeper/score_map.go b/x/pse/keeper/score_map.go deleted file mode 100644 index 25dc6664..00000000 --- a/x/pse/keeper/score_map.go +++ /dev/null @@ -1,166 +0,0 @@ -package keeper - -import ( - "context" - - "cosmossdk.io/collections" - addresscodec "cosmossdk.io/core/address" - sdkmath "cosmossdk.io/math" - sdk "github.com/cosmos/cosmos-sdk/types" - - "github.com/tokenize-x/tx-chain/v7/x/pse/types" -) - -type scoreMap struct { - items []struct { - addr sdk.AccAddress - score sdkmath.Int - } - distributionID uint64 - indexMap map[string]int - addressCodec addresscodec.Codec - totalScore sdkmath.Int - excludedAddresses []sdk.AccAddress -} - -func newScoreMap( - distributionID uint64, - addressCodec addresscodec.Codec, - excludedAddressesStr []string, -) (*scoreMap, error) { - excludedAddresses := make([]sdk.AccAddress, len(excludedAddressesStr)) - for i, addr := range excludedAddressesStr { - var err error - excludedAddresses[i], err = addressCodec.StringToBytes(addr) - if err != nil { - return nil, err - } - } - return &scoreMap{ - items: make([]struct { - addr sdk.AccAddress - score sdkmath.Int - }, 0), - distributionID: distributionID, - indexMap: make(map[string]int), - addressCodec: addressCodec, - totalScore: sdkmath.NewInt(0), - excludedAddresses: excludedAddresses, - }, nil -} - -func (m *scoreMap) addScore(addr sdk.AccAddress, value sdkmath.Int) error { - if value.IsZero() { - return nil - } - key, err := m.addressCodec.BytesToString(addr) - if err != nil { - return err - } - idx, found := m.indexMap[key] - if !found { - m.items = append(m.items, struct { - addr sdk.AccAddress - score sdkmath.Int - }{ - addr: addr, - score: value, - }) - m.indexMap[key] = len(m.items) - 1 - } else { - m.items[idx].score = m.items[idx].score.Add(value) - } - - m.totalScore = m.totalScore.Add(value) - return nil -} - -func (m *scoreMap) walk(fn func(addr sdk.AccAddress, score sdkmath.Int) error) error { - for _, pair := range m.items { - if m.isExcludedAddress(pair.addr) { - continue - } - if err := fn(pair.addr, pair.score); err != nil { - return err - } - } - return nil -} - -func (m *scoreMap) iterateAccountScoreSnapshot(ctx context.Context, k Keeper) error { - iter, err := k.AccountScoreSnapshot.Iterate( - ctx, - collections.NewPrefixedPairRange[uint64, sdk.AccAddress](m.distributionID), - ) - if err != nil { - return err - } - defer iter.Close() - for ; iter.Valid(); iter.Next() { - kv, err := iter.KeyValue() - if err != nil { - return err - } - score := kv.Value - delAddr := kv.Key.K2() - if m.isExcludedAddress(delAddr) { - continue - } - err = m.addScore(delAddr, score) - if err != nil { - return err - } - } - - return nil -} - -func (m *scoreMap) iterateDelegationTimeEntries(ctx context.Context, k Keeper) ( - []collections.KeyValue[collections.Triple[uint64, sdk.AccAddress, sdk.ValAddress], types.DelegationTimeEntry], error, -) { - var allDelegationTimeEntries []collections.KeyValue[ - collections.Triple[uint64, sdk.AccAddress, sdk.ValAddress], - types.DelegationTimeEntry, - ] - delegationTimeEntriesIterator, err := k.DelegationTimeEntries.Iterate( - ctx, - collections.NewPrefixedTripleRange[uint64, sdk.AccAddress, sdk.ValAddress](m.distributionID), - ) - if err != nil { - return nil, err - } - defer delegationTimeEntriesIterator.Close() - - for ; delegationTimeEntriesIterator.Valid(); delegationTimeEntriesIterator.Next() { - kv, err := delegationTimeEntriesIterator.KeyValue() - if err != nil { - return nil, err - } - allDelegationTimeEntries = append(allDelegationTimeEntries, kv) - delAddr := kv.Key.K2() - valAddr := kv.Key.K3() - if m.isExcludedAddress(delAddr) { - continue - } - - delegationTimeEntry := kv.Value - delegationScore, err := calculateAddedScore(ctx, k, valAddr, delegationTimeEntry) - if err != nil { - return nil, err - } - err = m.addScore(delAddr, delegationScore) - if err != nil { - return nil, err - } - } - return allDelegationTimeEntries, nil -} - -func (m *scoreMap) isExcludedAddress(addr sdk.AccAddress) bool { - for _, excludedAddress := range m.excludedAddresses { - if excludedAddress.Equals(addr) { - return true - } - } - return false -} diff --git a/x/pse/migrations/v2/migrate.go b/x/pse/migrations/v2/migrate.go new file mode 100644 index 00000000..fee57e94 --- /dev/null +++ b/x/pse/migrations/v2/migrate.go @@ -0,0 +1,193 @@ +package v2 + +import ( + "context" + + "cosmossdk.io/collections" + sdkstore "cosmossdk.io/core/store" + sdkmath "cosmossdk.io/math" + "github.com/cosmos/cosmos-sdk/codec" + sdk "github.com/cosmos/cosmos-sdk/types" + + "github.com/tokenize-x/tx-chain/v7/x/pse/types" +) + +// MigrateStore migrates the PSE module state from v1 to v2. +// - DelegationTimeEntries key: Pair[AccAddress, ValAddress] -> Triple[uint64, AccAddress, ValAddress]. +// - AccountScoreSnapshot key: AccAddress -> Pair[uint64, AccAddress]. +func MigrateStore( + ctx context.Context, + storeService sdkstore.KVStoreService, + cdc codec.BinaryCodec, +) error { + distributionID, err := getFirstDistributionID(ctx, storeService, cdc) + if err != nil { + return err + } + + if err := migrateDelegationTimeEntries(ctx, storeService, cdc, distributionID); err != nil { + return err + } + + return migrateAccountScoreSnapshot(ctx, storeService, distributionID) +} + +// TODO: Currently assigns the first distribution ID to all entries. Implement proper mapping +// of entries to correct distribution IDs based on timestamps when multiple distributions exist. +func getFirstDistributionID( + ctx context.Context, + storeService sdkstore.KVStoreService, + cdc codec.BinaryCodec, +) (uint64, error) { + sb := collections.NewSchemaBuilder(storeService) + schedule := collections.NewMap( + sb, + types.AllocationScheduleKey, + "allocation_schedule", + collections.Uint64Key, + codec.CollValue[types.ScheduledDistribution](cdc), + ) + if _, err := sb.Build(); err != nil { + return 0, err + } + + iter, err := schedule.Iterate(ctx, nil) + if err != nil { + return 0, err + } + defer iter.Close() + + if !iter.Valid() { + return 0, nil + } + + kv, err := iter.KeyValue() + if err != nil { + return 0, err + } + + return kv.Value.ID, nil +} + +func migrateDelegationTimeEntries( + ctx context.Context, + storeService sdkstore.KVStoreService, + cdc codec.BinaryCodec, + distributionID uint64, +) error { + oldSB := collections.NewSchemaBuilder(storeService) + oldMap := collections.NewMap( + oldSB, + types.StakingTimeKey, + "delegation_time_entries", + collections.PairKeyCodec(sdk.AccAddressKey, sdk.ValAddressKey), + codec.CollValue[types.DelegationTimeEntry](cdc), + ) + if _, err := oldSB.Build(); err != nil { + return err + } + + type entry struct { + delAddr sdk.AccAddress + valAddr sdk.ValAddress + value types.DelegationTimeEntry + } + + var entries []entry + err := oldMap.Walk(ctx, nil, func( + key collections.Pair[sdk.AccAddress, sdk.ValAddress], + value types.DelegationTimeEntry, + ) (bool, error) { + entries = append(entries, entry{ + delAddr: key.K1(), + valAddr: key.K2(), + value: value, + }) + return false, nil + }) + if err != nil { + return err + } + + if err := oldMap.Clear(ctx, nil); err != nil { + return err + } + + newSB := collections.NewSchemaBuilder(storeService) + newMap := collections.NewMap( + newSB, + types.StakingTimeKey, + "delegation_time_entries", + collections.TripleKeyCodec(collections.Uint64Key, sdk.AccAddressKey, sdk.ValAddressKey), + codec.CollValue[types.DelegationTimeEntry](cdc), + ) + if _, err := newSB.Build(); err != nil { + return err + } + + for _, e := range entries { + key := collections.Join3(distributionID, e.delAddr, e.valAddr) + if err := newMap.Set(ctx, key, e.value); err != nil { + return err + } + } + + return nil +} + +func migrateAccountScoreSnapshot( + ctx context.Context, + storeService sdkstore.KVStoreService, + distributionID uint64, +) error { + oldSB := collections.NewSchemaBuilder(storeService) + oldMap := collections.NewMap( + oldSB, + types.AccountScoreKey, + "account_score", + sdk.AccAddressKey, + sdk.IntValue, + ) + if _, err := oldSB.Build(); err != nil { + return err + } + + type entry struct { + addr sdk.AccAddress + score sdkmath.Int + } + + var entries []entry + err := oldMap.Walk(ctx, nil, func(key sdk.AccAddress, value sdkmath.Int) (bool, error) { + entries = append(entries, entry{addr: key, score: value}) + return false, nil + }) + if err != nil { + return err + } + + if err := oldMap.Clear(ctx, nil); err != nil { + return err + } + + newSB := collections.NewSchemaBuilder(storeService) + newMap := collections.NewMap( + newSB, + types.AccountScoreKey, + "account_score", + collections.PairKeyCodec(collections.Uint64Key, sdk.AccAddressKey), + sdk.IntValue, + ) + if _, err := newSB.Build(); err != nil { + return err + } + + for _, e := range entries { + key := collections.Join(distributionID, e.addr) + if err := newMap.Set(ctx, key, e.score); err != nil { + return err + } + } + + return nil +} diff --git a/x/pse/module.go b/x/pse/module.go index 38aa3b5f..dcc12687 100644 --- a/x/pse/module.go +++ b/x/pse/module.go @@ -100,6 +100,11 @@ func NewAppModule(keeper keeper.Keeper) AppModule { func (am AppModule) RegisterServices(cfg module.Configurator) { types.RegisterMsgServer(cfg.MsgServer(), keeper.NewMsgServer(am.keeper)) types.RegisterQueryServer(cfg.QueryServer(), keeper.NewQueryService(am.keeper)) + + m := keeper.NewMigrator(am.keeper) + if err := cfg.RegisterMigration(types.ModuleName, 1, m.Migrate1to2); err != nil { + panic(err) + } } // Name returns the module's name. @@ -132,7 +137,7 @@ func (am AppModule) IsAppModule() {} func (am AppModule) IsOnePerModuleType() {} // ConsensusVersion implements AppModule/ConsensusVersion. -func (AppModule) ConsensusVersion() uint64 { return 1 } +func (AppModule) ConsensusVersion() uint64 { return 2 } // EndBlock returns the end blocker for the module. It returns no validator // updates. diff --git a/x/pse/types/errors.go b/x/pse/types/errors.go index eff36811..d279f870 100644 --- a/x/pse/types/errors.go +++ b/x/pse/types/errors.go @@ -22,4 +22,7 @@ var ( // ErrInvalidParam is returned when a parameter is invalid. ErrInvalidParam = sdkerrors.Register(ModuleName, 7, "invalid parameter") + + // ErrOngoingDistribution is returned when a schedule update is attempted during an ongoing distribution. + ErrOngoingDistribution = sdkerrors.Register(ModuleName, 8, "distribution is currently in progress") ) diff --git a/x/pse/types/key.go b/x/pse/types/key.go index be6e50fe..040e8de3 100644 --- a/x/pse/types/key.go +++ b/x/pse/types/key.go @@ -15,6 +15,9 @@ var ( ParamsKey = collections.NewPrefix(0) StakingTimeKey = collections.NewPrefix(1) AccountScoreKey = collections.NewPrefix(2) - AllocationScheduleKey = collections.NewPrefix(3) // Map: timestamp -> ScheduledDistribution + AllocationScheduleKey = collections.NewPrefix(3) // Map: ID -> ScheduledDistribution DistributionDisabledKey = collections.NewPrefix(4) + TotalScoreKey = collections.NewPrefix(5) // Map: ID -> total accumulated score + OngoingDistributionKey = collections.NewPrefix(6) // Item: currently processing ScheduledDistribution + DistributedAmountKey = collections.NewPrefix(7) // Map: ID -> cumulative distributed amount ) diff --git a/x/pse/types/params_test.go b/x/pse/types/params_test.go index ef09c3d3..d048e3df 100644 --- a/x/pse/types/params_test.go +++ b/x/pse/types/params_test.go @@ -364,7 +364,7 @@ func TestValidateAllocationSchedule(t *testing.T) { Allocations: createAllModuleAllocations(sdkmath.NewInt(1000)), }, }, - expectErr: false, // TODO: this should be true or removed, bases on the decision on the id zero check. + expectErr: false, // TODO: this should be handled based on the decision on the id zero check. errMsg: "id cannot be zero", }, {