Implement multi-block PSE community distribution#96
Implement multi-block PSE community distribution#96metalarm10 wants to merge 26 commits intomasterfrom
Conversation
7a54706 to
5645f15
Compare
TxCorpi0x
left a comment
There was a problem hiding this comment.
@TxCorpi0x made 7 comments.
Reviewable status: 0 of 18 files reviewed, 7 unresolved discussions (waiting on masihyeganeh, metalarm10, miladz68, and ysv).
x/pse/keeper/delegation.go line 80 at r2 (raw file):
} lastScore, err := k.GetDelegatorScore(ctx, distributionID, delAddr) if errors.Is(err, collections.ErrNotFound) {
First we need to check for err nil
x/pse/keeper/distribute.go line 71 at r2 (raw file):
for _, item := range batch { isExcluded, err := k.IsExcludedAddress(ctx, item.delAddr)
We can check for excluded addresses in the previous iterator, where we are populating the batches, so all batches have the same defaultBatchSize processed, maybe we can add flag to entryKV for excluded addresses and check in this loop.
What is your opinion @ysv?
x/pse/keeper/distribute.go line 140 at r2 (raw file):
ctx context.Context, ongoing types.ScheduledDistribution, bondDenom string, ) (bool, error) { prevID := ongoing.ID
May be we can having a better naming for prev, ongoing and current. it is a bit confusing for me at least in terms of the timeline.
x/pse/keeper/distribute.go line 149 at r2 (raw file):
// No score or no amount: send everything to community pool and clean up. if !totalScore.IsPositive() || totalPSEAmount.IsZero() {
We should have a resistance mechanism, and disable the distribution if an invariant is not satisfied here. if there is a hidden bug in our process, we can end up with a non-positive score while having a big number in totalPSEAmount, but we transfer all of it to community.
x/pse/keeper/distribution.go line 92 at r2 (raw file):
// TotalScore absent -> Phase 1 (score conversion still in progress). _, err := k.TotalScore.Get(ctx, prevID) if errors.Is(err, collections.ErrNotFound) {
It's worth checking the err for nil first, then check for ErrNotFound
x/pse/keeper/distribution.go line 101 at r2 (raw file):
"distribution_id", prevID) } return nil
If I understand correctly, we first calculate all of the scores and the receving values in the first blocks after the PSE event, then we go to Phase2 hich is the actual trannsfer and then cleanup.
We can also think about processing these two in parallel, if it does not make things too complicated
x/pse/keeper/migrations.go line 23 at r2 (raw file):
// - DelegationTimeEntries: Pair[AccAddress, ValAddress] -> Triple[uint64, AccAddress, ValAddress]. // - AccountScoreSnapshot: AccAddress -> Pair[uint64, AccAddress]. func (m Migrator) Migrate1to2(ctx sdk.Context) error {
This should fix the issue related to the collections error which we have in the other parallel PRs, this migrator is Cosmos-SDK/IBC-Go style for the store migration (which is good), but i think the plan for TX is to include things in the upgrade handler itself so we can remove it simply in the next releases. this applies to migrate.go as well
What is your opinion @ysv ?
TxCorpi0x
left a comment
There was a problem hiding this comment.
@TxCorpi0x made 1 comment.
Reviewable status: 0 of 18 files reviewed, 8 unresolved discussions (waiting on masihyeganeh, metalarm10, miladz68, and ysv).
x/pse/keeper/hooks.go line 92 at r2 (raw file):
} if ongoingFound { handled, err := h.migrateOngoingEntry(ctx, ongoing, currentID, delAddr, valAddr, blockTime)
The previous period score will be added to AccountScoreSnapshot[prevID]. and current period to AccountScoreSnapshot[currentID] but it does not remove delegation time entry for prevID, the hook sets the tnetry for currentID and the entry stays under prevID, in the later blocks the same period will be seeen in the score (it seems to me a double spending)
metalarm10
left a comment
There was a problem hiding this comment.
@metalarm10 made 5 comments.
Reviewable status: 0 of 18 files reviewed, 8 unresolved discussions (waiting on masihyeganeh, miladz68, TxCorpi0x, and ysv).
x/pse/keeper/delegation.go line 80 at r2 (raw file):
Previously, TxCorpi0x wrote…
First we need to check for err nil
When err is nil, errors.Is(err, collections.ErrNotFound) returns false, and err != nil is also false, so both branches are skipped and the fetched lastScore is used as-is.
Functionally correct, but happy to reorder if you prefer.
x/pse/keeper/distribute.go line 140 at r2 (raw file):
Previously, TxCorpi0x wrote…
May be we can having a better naming for prev, ongoing and current. it is a bit confusing for me at least in terms of the timeline.
Done.
x/pse/keeper/distribution.go line 92 at r2 (raw file):
Previously, TxCorpi0x wrote…
It's worth checking the err for nil first, then check for ErrNotFound
Again, functionally correct but happy to re-order if you prefer.
x/pse/keeper/distribution.go line 101 at r2 (raw file):
Previously, TxCorpi0x wrote…
If I understand correctly, we first calculate all of the scores and the receving values in the first blocks after the PSE event, then we go to Phase2 hich is the actual trannsfer and then cleanup.
We can also think about processing these two in parallel, if it does not make things too complicated
The problem is we can't. We need TotalScore before calculating any individual delegator's token share (tokens = (delegator_score / total_score) * community_amount). We don't know TotalScore until every delegator's score is computed. So the two phases are essentially sequential.
x/pse/keeper/hooks.go line 92 at r2 (raw file):
Previously, TxCorpi0x wrote…
The previous period score will be added to
AccountScoreSnapshot[prevID]. and current period toAccountScoreSnapshot[currentID]but it does not remove delegation time entry for prevID, the hook sets the tnetry for currentID and the entry stays under prevID, in the later blocks the same period will be seeen in the score (it seems to me a double spending)
Good catch, thanks. Fixed.
…only skip auto-delegation
miladz68
left a comment
There was a problem hiding this comment.
partially reviewed, some refactors are needed. will continue the review.
@miladz68 reviewed 5 files and all commit messages, and made 13 comments.
Reviewable status: 5 of 18 files reviewed, 19 unresolved discussions (waiting on masihyeganeh, metalarm10, TxCorpi0x, and ysv).
x/pse/keeper/distribute.go line 149 at r2 (raw file):
Previously, TxCorpi0x wrote…
We should have a resistance mechanism, and disable the distribution if an invariant is not satisfied here. if there is a hidden bug in our process, we can end up with a non-positive score while having a big number in totalPSEAmount, but we transfer all of it to community.
I agree, if any of those conditions happens, we should error out and disable PSE (returning any error to ProcessNextDistribution will disable pse module).
x/pse/keeper/distribute_test.go line 135 at r5 (raw file):
func(r *runEnv) { distributeAction(r, sdkmath.NewInt(1000)) }, func(r *runEnv) { // delegators[0] fully undelegated — no auto-delegation, but earned reward sent as liquid tokens
this condition must change, in this scenario, user shares must be sent to community pool.
x/pse/keeper/distribute.go line 23 at r5 (raw file):
// For each entry in the batch: // 1. Calculate score from lastChanged to distribution timestamp -> addToScore(ongoingID) // 2. Create new entry under nextID with same shares, lastChanged = distTimestamp
In the final solution, the timestamp should the current block time stamp and the score of (current time - distribution time ) should be added to the score of the next id. Add a todo with proper comment to handle it later
x/pse/keeper/distribute.go line 27 at r5 (raw file):
// // Returns true when all ongoingID entries have been processed and TotalScore is computed. func (k Keeper) ProcessPhase1ScoreConversion(ctx context.Context, ongoing types.ScheduledDistribution) (bool, error) {
we need a better name for the function, suggestions
ConsumeDelegationTimeEntryToUserScore
ConsumeOngoingDelegationTimeEntry
x/pse/keeper/distribute.go line 64 at r5 (raw file):
// Compute TotalScore from all accumulated snapshots. if len(batch) == 0 { if err := k.computeTotalScore(ctx, ongoingID); err != nil {
compute total score should be done in batches as well. meaning that when you iterate TimeDelegationEntry and are converting it to user score, we should add the same score to the total scored. Let me re-phrase it, every time that we add to user score, we should also add to the same amount to total score. It means changes should be done here and also in the hooks.
x/pse/keeper/distribute.go line 137 at r5 (raw file):
// When all delegators have been processed, sends leftover (rounding errors + undelegated users) to the community pool. // Returns true when distribution is complete and all state has been cleaned up. func (k Keeper) ProcessPhase2TokenDistribution(
should not contain phase2, something like ProcessOngoingTokenDistribution.
x/pse/keeper/distribute.go line 252 at r5 (raw file):
// cleanupDistribution removes all state associated with a completed distribution. func (k Keeper) cleanupDistribution(ctx context.Context, distributionID uint64) error {
cleanupOngoingDistribution
x/pse/keeper/distribute.go line 317 at r5 (raw file):
// Send earned tokens to delegator's wallet regardless of active delegations. // Score was earned by staking during the scoring period, so the reward is always honored.
this is not correct, the tokens should be delivered in staked state (except for some rounding errors), so if the user does not have any active validations, their tokens will be sent to community pool.
x/pse/keeper/distribution.go line 136 at r5 (raw file):
// Distribution Precision Handling: // The allocation amount is split equally among all recipients using integer division.
these lines of comments contain some useful information.
x/pse/keeper/distribution.go line 62 at r5 (raw file):
// If community allocation exists, start multi-block processing. communityAmount := getCommunityAllocationAmount(scheduledDistribution) if communityAmount.IsPositive() {
non-positive community distribution should be an error which pauses all future distribuitons. we should also take necessary measures to ensure that never happens.
x/pse/keeper/distribution.go line 91 at r5 (raw file):
// TotalScore absent -> Phase 1 (score conversion still in progress). _, err := k.TotalScore.Get(ctx, ongoingID)
total score should be calculated in batches and also as part of the hooks, so this criteria is not a good one to understand if you should move to next phase or not.
suggestion: call ProcessPhase1ScoreConversion and move to next phase if it returns done=true.
x/pse/keeper/distribution.go line 107 at r5 (raw file):
} // TotalScore present -> Phase 2 (token distribution).
phase 2 begins not when total score is present but when all time delgation entries are consumed (i.e phase 1 completed.)
Closes: https://app.clickup.com/t/868hj375r
Description
Rewrites the PSE community token distribution from a single-block operation to a multi-block batched pipeline. Non-community allocations (foundation, alliance, etc.) still process in a single block.
Multi-block Phases:
DelegationTimeEntriesin batches, calculates each delegator's score, migrates entries to the next distributionID. ComputesTotalScorewhen all entries are processed.AccountScoreSnapshotin batches, distributes tokens proportionally(amount × score / totalScore), auto-delegates to validators. Sends leftover (rounding errors) to community pool and cleans up state.EndBlocker routing (ProcessNextDistribution):
At each EndBlock: check
OngoingDistribution-> if exists, resume current phase.If not, check schedule -> if due, process non-community allocations and start multi-block community distribution.
Phase is determined by
TotalScoreexistence:Hook changes (3-scenario logic)
When a delegator's stake changes, hooks need to find their existing
DelegationTimeEntryand calculate accumulated score. With multi-block distribution, an entry might still be in the previous distribution ID (not yet processed by Phase 1). The hooks now check three places:prevID), then from distribution timestamp to now (forcurrentID), and migrates the entry forward. This ensures no score is lost regardless of batch processing order.lastChangedtonow, update entry with new shares.Other changes
DelegationTimeEntriesandAccountScoreSnapshotcollection keysdistribution_idfield in genesis export/import so entries keep their distribution ID across chain export and re-import (previously lost during export)Testing
Unit tests added covering: multi-block EndBlocker phase transitions (idle -> Phase 1 -> Phase 2 -> cleanup), single-block path for non-community-only distributions, and end-to-end distribution scenarios (unaccumulated score, accumulated score, unbonding, redelegation, zero score, multiple distributions) to validate the full EndBlocker flow.
Known follow-ups
LastProcessedDistributionIDtracking to handle missing PSE schedule cases.Additional_Score = (now - distribution_started_at) × pse_amountReviewers checklist:
Authors checklist
This change is