Skip to content

Implement multi-block PSE community distribution#96

Open
metalarm10 wants to merge 26 commits intomasterfrom
john/pse-core-implementation
Open

Implement multi-block PSE community distribution#96
metalarm10 wants to merge 26 commits intomasterfrom
john/pse-core-implementation

Conversation

@metalarm10
Copy link
Contributor

@metalarm10 metalarm10 commented Mar 2, 2026

Closes: https://app.clickup.com/t/868hj375r

Description

Rewrites the PSE community token distribution from a single-block operation to a multi-block batched pipeline. Non-community allocations (foundation, alliance, etc.) still process in a single block.

Multi-block Phases:

  • Phase 1 (score conversion): Iterates DelegationTimeEntries in batches, calculates each delegator's score, migrates entries to the next distribution ID. Computes TotalScore when all entries are processed.
  • Phase 2 (token distribution): Iterates AccountScoreSnapshot in batches, distributes tokens proportionally (amount × score / totalScore), auto-delegates to validators. Sends leftover (rounding errors) to community pool and cleans up state.

EndBlocker routing (ProcessNextDistribution):

At each EndBlock: check OngoingDistribution -> if exists, resume current phase.
If not, check schedule -> if due, process non-community allocations and start multi-block community distribution.

Phase is determined by TotalScore existence:

  • If absent: Phase 1 is still in progress (score conversion not yet complete).
  • If present: Phase 1 is done, proceed with Phase 2 (token distribution).

Hook changes (3-scenario logic)

When a delegator's stake changes, hooks need to find their existing DelegationTimeEntry and calculate accumulated score. With multi-block distribution, an entry might still be in the previous distribution ID (not yet processed by Phase 1). The hooks now check three places:

  1. Entry in prevID (ongoing distribution in progress): The batch processor (EndBlocker Phase-1) hasn't reached this entry yet. The hook handles it early: calculates score up to the distribution timestamp (for prevID), then from distribution timestamp to now (for currentID), and migrates the entry forward. This ensures no score is lost regardless of batch processing order.
  2. Entry in currentID: Normal case - calculate score from lastChanged to now, update entry with new shares.
  3. No entry found: First-time delegator - create a new entry, no score (duration = 0).

Other changes

  • v1 -> v2 state migration: adds distribution ID prefix to DelegationTimeEntries and AccountScoreSnapshot collection keys
  • distribution_id field in genesis export/import so entries keep their distribution ID across chain export and re-import (previously lost during export)
  • Integration test updated to support multi block phases.
  • Rejects schedule updates while a distribution is in progress

Testing

Unit tests added covering: multi-block EndBlocker phase transitions (idle -> Phase 1 -> Phase 2 -> cleanup), single-block path for non-community-only distributions, and end-to-end distribution scenarios (unaccumulated score, accumulated score, unbonding, redelegation, zero score, multiple distributions) to validate the full EndBlocker flow.

Known follow-ups

  1. Replace temporary ID=0 solution with LastProcessedDistributionID tracking to handle missing PSE schedule cases.
  2. Add fairness score adjustment after PSE distribution: Additional_Score = (now - distribution_started_at) × pse_amount
  3. Proper migration mapping of entries to correct distribution IDs by timestamp (currently same IDs are assigned as placeholder).

Reviewers checklist:

  • Try to write more meaningful comments with clear actions to be taken.
  • Nit-picking should be unblocking. Focus on core issues.

Authors checklist

  • Provide a concise and meaningful description
  • Review the code yourself first, before making the PR.
  • Annotate your PR in places that require explanation.
  • Think and try to split the PR to smaller PR if it is big.

This change is Reviewable

@metalarm10 metalarm10 force-pushed the john/pse-core-implementation branch from 7a54706 to 5645f15 Compare March 3, 2026 10:16
@metalarm10 metalarm10 marked this pull request as ready for review March 3, 2026 11:29
@metalarm10 metalarm10 requested a review from a team as a code owner March 3, 2026 11:29
@metalarm10 metalarm10 requested review from TxCorpi0x, masihyeganeh, miladz68 and ysv and removed request for a team March 3, 2026 11:29
@metalarm10 metalarm10 changed the title pse core implementation draft Implement multi-block PSE community distribution Mar 3, 2026
Copy link
Contributor

@TxCorpi0x TxCorpi0x left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TxCorpi0x made 7 comments.
Reviewable status: 0 of 18 files reviewed, 7 unresolved discussions (waiting on masihyeganeh, metalarm10, miladz68, and ysv).


x/pse/keeper/delegation.go line 80 at r2 (raw file):

	}
	lastScore, err := k.GetDelegatorScore(ctx, distributionID, delAddr)
	if errors.Is(err, collections.ErrNotFound) {

First we need to check for err nil


x/pse/keeper/distribute.go line 71 at r2 (raw file):

	for _, item := range batch {
		isExcluded, err := k.IsExcludedAddress(ctx, item.delAddr)

We can check for excluded addresses in the previous iterator, where we are populating the batches, so all batches have the same defaultBatchSize processed, maybe we can add flag to entryKV for excluded addresses and check in this loop.
What is your opinion @ysv?


x/pse/keeper/distribute.go line 140 at r2 (raw file):

	ctx context.Context, ongoing types.ScheduledDistribution, bondDenom string,
) (bool, error) {
	prevID := ongoing.ID

May be we can having a better naming for prev, ongoing and current. it is a bit confusing for me at least in terms of the timeline.


x/pse/keeper/distribute.go line 149 at r2 (raw file):

	// No score or no amount: send everything to community pool and clean up.
	if !totalScore.IsPositive() || totalPSEAmount.IsZero() {

We should have a resistance mechanism, and disable the distribution if an invariant is not satisfied here. if there is a hidden bug in our process, we can end up with a non-positive score while having a big number in totalPSEAmount, but we transfer all of it to community.


x/pse/keeper/distribution.go line 92 at r2 (raw file):

	// TotalScore absent -> Phase 1 (score conversion still in progress).
	_, err := k.TotalScore.Get(ctx, prevID)
	if errors.Is(err, collections.ErrNotFound) {

It's worth checking the err for nil first, then check for ErrNotFound


x/pse/keeper/distribution.go line 101 at r2 (raw file):

				"distribution_id", prevID)
		}
		return nil

If I understand correctly, we first calculate all of the scores and the receving values in the first blocks after the PSE event, then we go to Phase2 hich is the actual trannsfer and then cleanup.

We can also think about processing these two in parallel, if it does not make things too complicated


x/pse/keeper/migrations.go line 23 at r2 (raw file):

// - DelegationTimeEntries: Pair[AccAddress, ValAddress] -> Triple[uint64, AccAddress, ValAddress].
// - AccountScoreSnapshot: AccAddress -> Pair[uint64, AccAddress].
func (m Migrator) Migrate1to2(ctx sdk.Context) error {

This should fix the issue related to the collections error which we have in the other parallel PRs, this migrator is Cosmos-SDK/IBC-Go style for the store migration (which is good), but i think the plan for TX is to include things in the upgrade handler itself so we can remove it simply in the next releases. this applies to migrate.go as well
What is your opinion @ysv ?

Copy link
Contributor

@TxCorpi0x TxCorpi0x left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TxCorpi0x made 1 comment.
Reviewable status: 0 of 18 files reviewed, 8 unresolved discussions (waiting on masihyeganeh, metalarm10, miladz68, and ysv).


x/pse/keeper/hooks.go line 92 at r2 (raw file):

	}
	if ongoingFound {
		handled, err := h.migrateOngoingEntry(ctx, ongoing, currentID, delAddr, valAddr, blockTime)

The previous period score will be added to AccountScoreSnapshot[prevID]. and current period to AccountScoreSnapshot[currentID] but it does not remove delegation time entry for prevID, the hook sets the tnetry for currentID and the entry stays under prevID, in the later blocks the same period will be seeen in the score (it seems to me a double spending)

Copy link
Contributor Author

@metalarm10 metalarm10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@metalarm10 made 5 comments.
Reviewable status: 0 of 18 files reviewed, 8 unresolved discussions (waiting on masihyeganeh, miladz68, TxCorpi0x, and ysv).


x/pse/keeper/delegation.go line 80 at r2 (raw file):

Previously, TxCorpi0x wrote…

First we need to check for err nil

When err is nil, errors.Is(err, collections.ErrNotFound) returns false, and err != nil is also false, so both branches are skipped and the fetched lastScore is used as-is.
Functionally correct, but happy to reorder if you prefer.


x/pse/keeper/distribute.go line 140 at r2 (raw file):

Previously, TxCorpi0x wrote…

May be we can having a better naming for prev, ongoing and current. it is a bit confusing for me at least in terms of the timeline.

Done.


x/pse/keeper/distribution.go line 92 at r2 (raw file):

Previously, TxCorpi0x wrote…

It's worth checking the err for nil first, then check for ErrNotFound

Again, functionally correct but happy to re-order if you prefer.


x/pse/keeper/distribution.go line 101 at r2 (raw file):

Previously, TxCorpi0x wrote…

If I understand correctly, we first calculate all of the scores and the receving values in the first blocks after the PSE event, then we go to Phase2 hich is the actual trannsfer and then cleanup.

We can also think about processing these two in parallel, if it does not make things too complicated

The problem is we can't. We need TotalScore before calculating any individual delegator's token share (tokens = (delegator_score / total_score) * community_amount). We don't know TotalScore until every delegator's score is computed. So the two phases are essentially sequential.


x/pse/keeper/hooks.go line 92 at r2 (raw file):

Previously, TxCorpi0x wrote…

The previous period score will be added to AccountScoreSnapshot[prevID]. and current period to AccountScoreSnapshot[currentID] but it does not remove delegation time entry for prevID, the hook sets the tnetry for currentID and the entry stays under prevID, in the later blocks the same period will be seeen in the score (it seems to me a double spending)

Good catch, thanks. Fixed.

Copy link
Contributor

@miladz68 miladz68 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partially reviewed, some refactors are needed. will continue the review.

@miladz68 reviewed 5 files and all commit messages, and made 13 comments.
Reviewable status: 5 of 18 files reviewed, 19 unresolved discussions (waiting on masihyeganeh, metalarm10, TxCorpi0x, and ysv).


x/pse/keeper/distribute.go line 149 at r2 (raw file):

Previously, TxCorpi0x wrote…

We should have a resistance mechanism, and disable the distribution if an invariant is not satisfied here. if there is a hidden bug in our process, we can end up with a non-positive score while having a big number in totalPSEAmount, but we transfer all of it to community.

I agree, if any of those conditions happens, we should error out and disable PSE (returning any error to ProcessNextDistribution will disable pse module).


x/pse/keeper/distribute_test.go line 135 at r5 (raw file):

				func(r *runEnv) { distributeAction(r, sdkmath.NewInt(1000)) },
				func(r *runEnv) {
					// delegators[0] fully undelegated — no auto-delegation, but earned reward sent as liquid tokens

this condition must change, in this scenario, user shares must be sent to community pool.


x/pse/keeper/distribute.go line 23 at r5 (raw file):

// For each entry in the batch:
//  1. Calculate score from lastChanged to distribution timestamp -> addToScore(ongoingID)
//  2. Create new entry under nextID with same shares, lastChanged = distTimestamp

In the final solution, the timestamp should the current block time stamp and the score of (current time - distribution time ) should be added to the score of the next id. Add a todo with proper comment to handle it later


x/pse/keeper/distribute.go line 27 at r5 (raw file):

//
// Returns true when all ongoingID entries have been processed and TotalScore is computed.
func (k Keeper) ProcessPhase1ScoreConversion(ctx context.Context, ongoing types.ScheduledDistribution) (bool, error) {

we need a better name for the function, suggestions
ConsumeDelegationTimeEntryToUserScore
ConsumeOngoingDelegationTimeEntry


x/pse/keeper/distribute.go line 64 at r5 (raw file):

	// Compute TotalScore from all accumulated snapshots.
	if len(batch) == 0 {
		if err := k.computeTotalScore(ctx, ongoingID); err != nil {

compute total score should be done in batches as well. meaning that when you iterate TimeDelegationEntry and are converting it to user score, we should add the same score to the total scored. Let me re-phrase it, every time that we add to user score, we should also add to the same amount to total score. It means changes should be done here and also in the hooks.


x/pse/keeper/distribute.go line 137 at r5 (raw file):

// When all delegators have been processed, sends leftover (rounding errors + undelegated users) to the community pool.
// Returns true when distribution is complete and all state has been cleaned up.
func (k Keeper) ProcessPhase2TokenDistribution(

should not contain phase2, something like ProcessOngoingTokenDistribution.


x/pse/keeper/distribute.go line 252 at r5 (raw file):

// cleanupDistribution removes all state associated with a completed distribution.
func (k Keeper) cleanupDistribution(ctx context.Context, distributionID uint64) error {

cleanupOngoingDistribution


x/pse/keeper/distribute.go line 317 at r5 (raw file):

	// Send earned tokens to delegator's wallet regardless of active delegations.
	// Score was earned by staking during the scoring period, so the reward is always honored.

this is not correct, the tokens should be delivered in staked state (except for some rounding errors), so if the user does not have any active validations, their tokens will be sent to community pool.


x/pse/keeper/distribution.go line 136 at r5 (raw file):

		// Distribution Precision Handling:
		// The allocation amount is split equally among all recipients using integer division.

these lines of comments contain some useful information.


x/pse/keeper/distribution.go line 62 at r5 (raw file):

	// If community allocation exists, start multi-block processing.
	communityAmount := getCommunityAllocationAmount(scheduledDistribution)
	if communityAmount.IsPositive() {

non-positive community distribution should be an error which pauses all future distribuitons. we should also take necessary measures to ensure that never happens.


x/pse/keeper/distribution.go line 91 at r5 (raw file):

	// TotalScore absent -> Phase 1 (score conversion still in progress).
	_, err := k.TotalScore.Get(ctx, ongoingID)

total score should be calculated in batches and also as part of the hooks, so this criteria is not a good one to understand if you should move to next phase or not.
suggestion: call ProcessPhase1ScoreConversion and move to next phase if it returns done=true.


x/pse/keeper/distribution.go line 107 at r5 (raw file):

	}

	// TotalScore present -> Phase 2 (token distribution).

phase 2 begins not when total score is present but when all time delgation entries are consumed (i.e phase 1 completed.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants