diff --git a/cmd/node/node.go b/cmd/node/node.go index 463a0893e2..e8a5388eb5 100644 --- a/cmd/node/node.go +++ b/cmd/node/node.go @@ -32,6 +32,7 @@ import ( "github.com/wavesplatform/gowaves/pkg/logging" "github.com/wavesplatform/gowaves/pkg/metrics" "github.com/wavesplatform/gowaves/pkg/miner" + "github.com/wavesplatform/gowaves/pkg/miner/endorsementpool" "github.com/wavesplatform/gowaves/pkg/miner/scheduler" "github.com/wavesplatform/gowaves/pkg/miner/utxpool" "github.com/wavesplatform/gowaves/pkg/node" @@ -480,7 +481,7 @@ func runNode(ctx context.Context, nc *config) (_ io.Closer, retErr error) { return nil, errors.Wrap(apiErr, "failed to run APIs") } - return startNode(ctx, nc, svs, features, minerScheduler, parent, declAddr, nl), nil + return startNode(ctx, nc, svs, features, minerScheduler, parent, declAddr, nl, cfg.GenerationPeriod), nil } func startNode( @@ -492,6 +493,7 @@ func startNode( parent peer.Parent, declAddr proto.TCPAddr, nl *slog.Logger, + periodGeneration uint64, ) *node.Node { bindAddr := proto.NewTCPAddrFromString(nc.bindAddress) @@ -504,7 +506,7 @@ func startNode( go ntw.Run(ctx) n := node.NewNode(svs, declAddr, bindAddr, nc.microblockInterval, nc.enableLightMode, nl, fl) - go n.Run(ctx, parent, svs.InternalChannel, networkInfoCh, ntw.SyncPeer()) + go n.Run(ctx, parent, svs.InternalChannel, networkInfoCh, ntw.SyncPeer(), periodGeneration) return n } @@ -813,6 +815,7 @@ func createServices( Scheduler: scheduler, BlocksApplier: blocks_applier.NewBlocksApplier(), UtxPool: utxpool.New(utxPoolMaxSizeBytes, utxValidator, cfg), + EndorsementPool: endorsementpool.NewEndorsementPool(cfg.MaxEndorsements), Scheme: cfg.AddressSchemeCharacter, Time: ntpTime, Wallet: wal, diff --git a/pkg/client/transactions.go b/pkg/client/transactions.go index 3727c59e88..290eb8f5e1 100644 --- a/pkg/client/transactions.go +++ b/pkg/client/transactions.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "net/http" "github.com/pkg/errors" @@ -176,6 +177,7 @@ func (a *Transactions) Broadcast(ctx context.Context, transaction proto.Transact } } + slog.Debug("Broadcasting transaction", "transaction", string(bts)) req, err := http.NewRequest("POST", url.String(), bytes.NewReader(bts)) if err != nil { return nil, err diff --git a/pkg/miner/endorsementpool/endorsement_pool.go b/pkg/miner/endorsementpool/endorsement_pool.go new file mode 100644 index 0000000000..8275874616 --- /dev/null +++ b/pkg/miner/endorsementpool/endorsement_pool.go @@ -0,0 +1,238 @@ +package endorsementpool + +import ( + "bytes" + "container/heap" + "errors" + "sync" + + "github.com/wavesplatform/gowaves/pkg/crypto/bls" + "github.com/wavesplatform/gowaves/pkg/proto" +) + +type key struct { + blockID proto.BlockID + endorserIndex int32 +} + +func makeKey(blockID proto.BlockID, idx int32) key { + return key{blockID: blockID, endorserIndex: idx} +} + +type heapItemEndorsement struct { + eb *proto.EndorseBlock + endorserPK bls.PublicKey + balance uint64 + seq uint64 +} + +type endorsementMinHeap []*heapItemEndorsement + +func (h endorsementMinHeap) Len() int { return len(h) } + +func (h endorsementMinHeap) Less(i, j int) bool { + if h[i].balance == h[j].balance { + // Late (Higher seq), lower priority. + return h[i].seq > h[j].seq + } + // Lower balance, lower priority. + return h[i].balance < h[j].balance +} + +func (h endorsementMinHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *endorsementMinHeap) Push(x any) { + item, ok := x.(*heapItemEndorsement) + if !ok { + return // Impossible, but silences errcheck. + } + *h = append(*h, item) +} + +func (h *endorsementMinHeap) Pop() any { + old := *h + n := len(old) + item := old[n-1] + *h = old[:n-1] + return item +} + +type EndorsementPool struct { + mu sync.Mutex + seq uint64 + byKey map[key]*heapItemEndorsement + h endorsementMinHeap + conflicts []proto.EndorseBlock + maxEndorsements int +} + +func NewEndorsementPool(maxGenerators int) *EndorsementPool { + return &EndorsementPool{ + byKey: make(map[key]*heapItemEndorsement), + maxEndorsements: maxGenerators, + } +} + +// Add inserts an endorsement into the heap with priority based on balance desc, seq asc. +func (p *EndorsementPool) Add(e *proto.EndorseBlock, pk bls.PublicKey, + lastFinalizedHeight proto.Height, lastFinalizedBlockID proto.BlockID, balance uint64) error { + if e == nil { + return errors.New("invalid endorsement") + } + + k := makeKey(e.EndorsedBlockID, e.EndorserIndex) + + p.mu.Lock() + defer p.mu.Unlock() + if _, exists := p.byKey[k]; exists { + p.conflicts = append(p.conflicts, *e) + return nil + } + if proto.Height(e.FinalizedBlockHeight) <= lastFinalizedHeight && + e.FinalizedBlockID != lastFinalizedBlockID { + p.conflicts = append(p.conflicts, *e) + return nil + } + + p.seq++ + item := &heapItemEndorsement{ + eb: e, + endorserPK: pk, + balance: balance, + seq: p.seq, + } + + // If heap is not filled yet. + if len(p.h) < p.maxEndorsements { + heap.Push(&p.h, item) + p.byKey[k] = item + return nil + } + + // If heap is full — check min (root). + minItem := p.h[0] + // If priority is lower or equal the min, throw the new one away. + if balance < minItem.balance || (balance == minItem.balance && item.seq > minItem.seq) { + return nil + } + + // Otherwise remove min and insert the new one. + r := heap.Pop(&p.h) + removed, _ := r.(*heapItemEndorsement) + delete(p.byKey, makeKey(removed.eb.EndorsedBlockID, removed.eb.EndorserIndex)) + + heap.Push(&p.h, item) + p.byKey[k] = item + return nil +} + +func (p *EndorsementPool) GetAll() []proto.EndorseBlock { + p.mu.Lock() + defer p.mu.Unlock() + + out := make([]proto.EndorseBlock, len(p.h)) + for i, it := range p.h { + out[i] = *it.eb + } + return out +} + +func (p *EndorsementPool) FormFinalization(finalizationHeight proto.Height) (proto.FinalizationVoting, error) { + p.mu.Lock() + defer p.mu.Unlock() + + signatures := make([]bls.Signature, 0, len(p.h)) + endorsersIndexes := make([]int32, 0, len(p.h)) + var aggregatedSignature bls.Signature + + for _, it := range p.h { + signatures = append(signatures, it.eb.Signature) + endorsersIndexes = append(endorsersIndexes, it.eb.EndorserIndex) + } + if len(signatures) != 0 { + aggregatedSignatureBytes, err := bls.AggregateSignatures(signatures) + if err != nil { + return proto.FinalizationVoting{}, err + } + var errCnvrt error + aggregatedSignature, errCnvrt = bls.NewSignatureFromBytes(aggregatedSignatureBytes) + if errCnvrt != nil { + return proto.FinalizationVoting{}, errCnvrt + } + } + + return proto.FinalizationVoting{ + AggregatedEndorsementSignature: aggregatedSignature, + FinalizedBlockHeight: finalizationHeight, + EndorserIndexes: endorsersIndexes, + ConflictEndorsements: p.conflicts, + }, nil +} + +func (p *EndorsementPool) GetEndorsers() []bls.PublicKey { + p.mu.Lock() + defer p.mu.Unlock() + + out := make([]bls.PublicKey, len(p.h)) + for i, it := range p.h { + out[i] = it.endorserPK + } + return out +} + +func (p *EndorsementPool) Len() int { + p.mu.Lock() + defer p.mu.Unlock() + return len(p.h) +} + +func (p *EndorsementPool) CleanAll() { + p.mu.Lock() + defer p.mu.Unlock() + + p.byKey = make(map[key]*heapItemEndorsement) + p.h = nil + p.conflicts = nil +} + +func (p *EndorsementPool) Verify() (bool, error) { + p.mu.Lock() + defer p.mu.Unlock() + + n := len(p.h) + if n == 0 { + return false, errors.New("failed to verify endorsements: pool is empty") + } + + sigs := make([]bls.Signature, 0, n) + pks := make([]bls.PublicKey, 0, n) + msg, err := p.h[0].eb.EndorsementMessage() + if err != nil { + return false, err + } + for _, it := range p.h { + sigs = append(sigs, it.eb.Signature) + pks = append(pks, it.endorserPK) + nextMsg, msgErr := it.eb.EndorsementMessage() + if msgErr != nil { + return false, msgErr + } + if bytes.Equal(nextMsg, msg) { + return false, errors.New("failed to verify endorsements: inconsistent endorsement messages") + } + } + agg, err := bls.AggregateSignatures(sigs) + if err != nil { + return false, err + } + return bls.VerifyAggregate(pks, msg, agg), nil +} + +func (p *EndorsementPool) ConflictEndorsements() []proto.EndorseBlock { + p.mu.Lock() + defer p.mu.Unlock() + + out := make([]proto.EndorseBlock, len(p.conflicts)) + copy(out, p.conflicts) + return out +} diff --git a/pkg/miner/endorsementpool/endorsementpool_test.go b/pkg/miner/endorsementpool/endorsementpool_test.go new file mode 100644 index 0000000000..c1299ea007 --- /dev/null +++ b/pkg/miner/endorsementpool/endorsementpool_test.go @@ -0,0 +1,143 @@ +package endorsementpool_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/wavesplatform/gowaves/pkg/crypto" + "github.com/wavesplatform/gowaves/pkg/crypto/bls" + "github.com/wavesplatform/gowaves/pkg/miner/endorsementpool" + "github.com/wavesplatform/gowaves/pkg/proto" +) + +func newDummyEndorsement(t *testing.T, idx int32, sig string) *proto.EndorseBlock { + b := make([]byte, crypto.DigestSize) + b[0] = byte(idx) + id, err := proto.NewBlockIDFromBytes(b) + require.NoError(t, err) + blsSignature, err := bls.NewSignatureFromBase58(sig) + require.NoError(t, err) + return &proto.EndorseBlock{ + EndorserIndex: idx, + EndorsedBlockID: id, + Signature: blsSignature, + FinalizedBlockHeight: 1, + FinalizedBlockID: id, + } +} + +const sigOne = "nBWfaRLW7EdcwxhDMaXuZZFMhHyowAxY7476rkBsUUeguTXrMSNuTVkuWLmZjRmRfgMXEGuvdHiu1V7joRFSLz3X6MQBF8m88kHJE" + + "j6Tc2ktBnMTzihh2JMGpuuWBLSK8rv" +const sigTwo = "RNMTkL736x3TmXfjQufKnxSgySaaoec3WYnxmujcum9BHEmCdjmwvjoUehghqYCWJcNj5CNfb9QdnujV9o2DRitbLgq2bnLdTU5s" + + "1DLBWBkVx8mBayvdfx7rPZ3mtUWeh5L" +const sigThree = "U8GEty7F58p7QZrNAxRYrfMSU4z6CwtiukBu9hGDP9rLx3VmF9ZYy8bHWBCTDTYW7s2juqRHU3aERUJfgx3KhxBdv57UFb34" + + "evuW9wYQKKoCTbfasfZENM4GDbPdL2nQYKY" +const sigFour = "2F4sw8YzXpSf93ACAngoTnNxCaYWoGL4vY88RYgEs3BeSsnAmMGmVSfe8h6hybkfb6CYoUwV1prRbYWo6umrL9evmTPeksdaQ" + + "rp19eTcwxZLBtPzbwqonCbEX8eDJVTydRBo" + +const finalizedHeightEndorsement = 0 + +func TestEndorsementPool_PriorityByBalance(t *testing.T) { + pool := endorsementpool.NewEndorsementPool(5) + + e1 := newDummyEndorsement(t, 1, sigOne) + e2 := newDummyEndorsement(t, 2, sigTwo) + e3 := newDummyEndorsement(t, 3, sigThree) + + require.NoError(t, pool.Add(e1, bls.PublicKey{}, finalizedHeightEndorsement, proto.BlockID{}, + 10)) + require.NoError(t, pool.Add(e2, bls.PublicKey{}, finalizedHeightEndorsement, proto.BlockID{}, + 20)) + require.NoError(t, pool.Add(e3, bls.PublicKey{}, finalizedHeightEndorsement, proto.BlockID{}, + 30)) + + all := pool.GetAll() + require.Len(t, all, 3) + + minBalance := uint64(0) + for _, e := range all { + if e.EndorserIndex == 1 { + minBalance = 10 + } + } + require.Equal(t, uint64(10), minBalance) +} + +func TestEndorsementPool_PriorityBySeqWhenEqualBalance(t *testing.T) { + pool := endorsementpool.NewEndorsementPool(3) + + e1 := newDummyEndorsement(t, 1, sigOne) + e2 := newDummyEndorsement(t, 2, sigTwo) + + require.NoError(t, pool.Add(e1, bls.PublicKey{}, finalizedHeightEndorsement, proto.BlockID{}, + 100)) + require.NoError(t, pool.Add(e2, bls.PublicKey{}, finalizedHeightEndorsement, proto.BlockID{}, + 100)) + + all := pool.GetAll() + require.Len(t, all, 2) + + // Balance e1 and e2 are equal, so we check by seq. + e3 := newDummyEndorsement(t, 3, sigThree) + require.NoError(t, pool.Add(e3, bls.PublicKey{}, finalizedHeightEndorsement, proto.BlockID{}, + 100)) + + require.Equal(t, 3, pool.Len()) +} + +func TestEndorsementPool_RemoveLowPriorityWhenFull(t *testing.T) { + pool := endorsementpool.NewEndorsementPool(3) + + require.NoError(t, pool.Add(newDummyEndorsement(t, 1, sigOne), bls.PublicKey{}, + finalizedHeightEndorsement, proto.BlockID{}, 10)) + require.NoError(t, pool.Add(newDummyEndorsement(t, 2, sigTwo), bls.PublicKey{}, + finalizedHeightEndorsement, proto.BlockID{}, 20)) + require.NoError(t, pool.Add(newDummyEndorsement(t, 3, sigThree), bls.PublicKey{}, + finalizedHeightEndorsement, proto.BlockID{}, 30)) + + require.Equal(t, 3, pool.Len()) + + require.NoError(t, pool.Add(newDummyEndorsement(t, 4, sigFour), bls.PublicKey{}, + finalizedHeightEndorsement, proto.BlockID{}, 40)) + + all := pool.GetAll() + require.Equal(t, 3, len(all), "pool size must remain constant when full") + + // Low priority (balance=10) should be evicted. + found10 := false + for _, e := range all { + if e.EndorserIndex == 1 { + found10 = true + } + } + require.False(t, found10, "low priority (balance=10) should be evicted") +} + +func TestEndorsementPool_RejectLowBalanceWhenFull(t *testing.T) { + pool := endorsementpool.NewEndorsementPool(2) + + require.NoError(t, pool.Add(newDummyEndorsement(t, 1, sigOne), bls.PublicKey{}, + finalizedHeightEndorsement, proto.BlockID{}, 50)) + require.NoError(t, pool.Add(newDummyEndorsement(t, 2, sigTwo), bls.PublicKey{}, + finalizedHeightEndorsement, proto.BlockID{}, 60)) + require.Equal(t, 2, pool.Len()) + + // Low balance (30) shouldn't get added. + require.NoError(t, pool.Add(newDummyEndorsement(t, 3, sigThree), bls.PublicKey{}, + finalizedHeightEndorsement, proto.BlockID{}, 30)) + require.Equal(t, 2, pool.Len(), "low-priority endorsement should be rejected") + + // High balance (100) should evict the lowest (50). + require.NoError(t, pool.Add(newDummyEndorsement(t, 4, sigFour), bls.PublicKey{}, + finalizedHeightEndorsement, proto.BlockID{}, 100)) + require.Equal(t, 2, pool.Len()) + + all := pool.GetAll() + found50 := false + for _, e := range all { + if e.EndorserIndex == 1 { + found50 = true + } + } + require.False(t, found50, "element with lowest balance should be evicted") +} diff --git a/pkg/miner/micro_miner.go b/pkg/miner/micro_miner.go index a6d4c5b5e9..679a44c8fd 100644 --- a/pkg/miner/micro_miner.go +++ b/pkg/miner/micro_miner.go @@ -37,16 +37,18 @@ func NewMicroMiner(services services.Services) *MicroMiner { logger: slog.Default().With(logging.NamespaceKey, "MICRO MINER"), } } - -func (a *MicroMiner) Micro(minedBlock *proto.Block, rest proto.MiningLimits, keyPair proto.KeyPair) (*proto.Block, *proto.MicroBlock, proto.MiningLimits, error) { - // way to stop mine microblocks +func (a *MicroMiner) Micro( + minedBlock *proto.Block, + rest proto.MiningLimits, + keyPair proto.KeyPair, + partialFinalization *proto.FinalizationVoting, +) (*proto.Block, *proto.MicroBlock, proto.MiningLimits, error) { if minedBlock == nil { return nil, nil, rest, errors.New("no block provided") } topBlock := a.state.TopBlock() if topBlock.BlockSignature != minedBlock.BlockSignature { - // block changed, exit return nil, nil, rest, ErrStateChanged } @@ -56,23 +58,77 @@ func (a *MicroMiner) Micro(minedBlock *proto.Block, rest proto.MiningLimits, key } a.logger.Debug("Generating micro block", "TopBlockID", topBlock.BlockID(), "height", height) - parentTimestamp := topBlock.Timestamp + parentTimestamp, err := a.getParentTimestamp(height) + if err != nil { + return nil, nil, rest, err + } + + appliedTransactions, txSnapshots, binSize, + txCount, droppedTxCount, inapplicable := a.collectTransactions(minedBlock, rest, parentTimestamp) + + a.logger.Debug("Transaction validation for micro block finished", + slog.Int("transactions", len(appliedTransactions)), + slog.Int("inapplicable", len(inapplicable)), + slog.Int("dropped", droppedTxCount), + ) + + if txCount == 0 { + if len(inapplicable) > 0 || rest.MaxTxsSizeInBytes-binSize < 40 { + return nil, nil, rest, ErrBlockIsFull + } + return nil, nil, rest, ErrNoTransactions + } + + transactions := make([]proto.Transaction, len(appliedTransactions)) + for i, appliedTx := range appliedTransactions { + if a.logger.Enabled(context.Background(), slog.LevelDebug) { + a.logger.Debug("Appending transaction", logging.TxID(appliedTx.T, a.scheme)) + } + transactions[i] = appliedTx.T + } + + newBlock, sh, err := a.createNewBlock(minedBlock, keyPair, transactions, txSnapshots, height) + if err != nil { + return nil, nil, rest, err + } + + micro, err := a.createMicroBlock(newBlock, keyPair, transactions, partialFinalization, sh, txCount) + if err != nil { + return nil, nil, rest, err + } + + newRest := proto.MiningLimits{ + MaxScriptRunsInBlock: rest.MaxScriptRunsInBlock, + MaxScriptsComplexityInBlock: rest.MaxScriptsComplexityInBlock, + ClassicAmountOfTxsInBlock: rest.ClassicAmountOfTxsInBlock, + MaxTxsSizeInBytes: rest.MaxTxsSizeInBytes - binSize, + } + return newBlock, µ, newRest, nil +} + +// --- helpers --- + +func (a *MicroMiner) getParentTimestamp(height uint64) (uint64, error) { + parentTimestamp := a.state.TopBlock().Timestamp if height > 1 { parent, err := a.state.BlockByHeight(height - 1) if err != nil { - return nil, nil, rest, err + return 0, err } parentTimestamp = parent.Timestamp } + return parentTimestamp, nil +} - txCount := 0 // counter for successfully applied transactions - binSize := 0 - droppedTxCount := 0 - - var appliedTransactions []*types.TransactionWithBytes - var inapplicable []*types.TransactionWithBytes +func (a *MicroMiner) collectTransactions( + minedBlock *proto.Block, + rest proto.MiningLimits, + parentTimestamp uint64, +) ([]*types.TransactionWithBytes, [][]proto.AtomicSnapshot, int, int, int, []*types.TransactionWithBytes) { + const minTransactionSize = 40 + txCount, binSize, droppedTxCount := 0, 0, 0 + var appliedTransactions, inapplicable []*types.TransactionWithBytes var txSnapshots [][]proto.AtomicSnapshot - const minTransactionSize = 40 // Roughly estimated minimal transaction size. _ = a.state.MapUnsafe(func(s state.NonThreadSafeState) error { defer s.ResetValidationList() @@ -84,153 +140,159 @@ func (a *MicroMiner) Micro(minedBlock *proto.Block, rest proto.MiningLimits, key } t := a.utx.Pop() if t == nil { - a.logger.Debug("No more transactions in UTX", - slog.Int("transactions", len(appliedTransactions)), - slog.Int("inapplicable", len(inapplicable)), - slog.Int("dropped", droppedTxCount), - ) + a.logNoMoreTransactions(appliedTransactions, inapplicable, droppedTxCount) break } - txSizeWithLen := len(t.B) + uint32SizeBytes - if newTxsSize := binSize + txSizeWithLen; newTxsSize > rest.MaxTxsSizeInBytes { + + if shouldSkip, _ := a.checkTxSize(t, binSize, rest.MaxTxsSizeInBytes, uint32SizeBytes); shouldSkip { inapplicable = append(inapplicable, t) continue } - // In the miner we pack transactions from UTX into new block. - // We should accept failed transactions here. - // Validate and apply tx to state. snapshot, errVal := s.ValidateNextTx(t.T, minedBlock.Timestamp, parentTimestamp, minedBlock.Version, true) if stateerr.IsTxCommitmentError(errVal) { - a.logger.Error("Failed to validate a transaction from UTX, returning applied transactions to UTX", - logging.Error(errVal), logging.TxID(t.T, a.scheme), - slog.Int("transactions", len(appliedTransactions)), - slog.Int("inapplicable", len(inapplicable)), - slog.Int("dropped", droppedTxCount), - ) - droppedTxCount++ // drop this tx - // This should not happen in practice. - // Reset state, tx count, return applied transactions to UTX. - s.ResetValidationList() - txCount = 0 - for _, appliedTx := range appliedTransactions { - // transactions were validated before, so no need to validate them with state again - uErr := a.utx.AddWithBytesRaw(appliedTx.T, appliedTx.B) - if uErr != nil { - droppedTxCount++ // drop this tx - a.logger.Warn("Failed to return a successfully applied transaction to UTX, throwing tx away", - logging.Error(uErr), logging.TxID(t.T, a.scheme), - ) - } - } - a.logger.Debug("Applied transactions returned to UTX, resetting applied list, continuing", - slog.Int("returned", len(appliedTransactions)), - slog.Int("inapplicable", len(inapplicable)), - slog.Int("dropped", droppedTxCount), - ) + droppedTxCount += a.resetOnCommitmentError(s, t, appliedTransactions, droppedTxCount) appliedTransactions = nil txSnapshots = nil + txCount = 0 continue } if errVal != nil { - a.logger.Debug("Transaction from UTX is not applicable to state, skipping", - logging.Error(errVal), logging.TxID(t.T, a.scheme), - ) + a.logInapplicableTx(t, errVal) inapplicable = append(inapplicable, t) continue } - txCount += 1 - binSize += txSizeWithLen + txCount++ + binSize += len(t.B) + uint32SizeBytes appliedTransactions = append(appliedTransactions, t) txSnapshots = append(txSnapshots, snapshot) } - // return inapplicable transactions to utx - for _, tx := range inapplicable { - uErr := a.utx.AddWithBytes(s, tx.T, tx.B) // validate with state while adding back - if uErr != nil { - droppedTxCount++ // drop this tx - a.logger.Debug("Failed to return an inapplicable transaction to UTX, throwing tx away", - logging.Error(uErr), logging.TxID(tx.T, a.scheme), - ) - } - } + a.returnInapplicableTxs(s, inapplicable, &droppedTxCount) return nil }) - a.logger.Debug("Transaction validation for micro block finished", - slog.Int("transactions", len(appliedTransactions)), + return appliedTransactions, txSnapshots, binSize, txCount, droppedTxCount, inapplicable +} + +func (a *MicroMiner) logNoMoreTransactions(applied, inapplicable []*types.TransactionWithBytes, dropped int) { + a.logger.Debug("No more transactions in UTX", + slog.Int("transactions", len(applied)), slog.Int("inapplicable", len(inapplicable)), - slog.Int("dropped", droppedTxCount), + slog.Int("dropped", dropped), ) +} - // no transactions applied, skip - if txCount == 0 { - // TODO: we should distinguish between block is full because of size and because or because of complexity - // limit reached. For now we return the same error. - if len(inapplicable) > 0 || rest.MaxTxsSizeInBytes-binSize < minTransactionSize { - return nil, nil, rest, ErrBlockIsFull +func (a *MicroMiner) logInapplicableTx(t *types.TransactionWithBytes, err error) { + a.logger.Debug("Transaction from UTX is not applicable", + logging.Error(err), + logging.TxID(t.T, a.scheme), + ) +} + +func (a *MicroMiner) checkTxSize( + t *types.TransactionWithBytes, + binSize, maxSize int, + uint32SizeBytes int, +) (bool, int) { + txSizeWithLen := len(t.B) + uint32SizeBytes + if binSize+txSizeWithLen > maxSize { + return true, txSizeWithLen + } + return false, txSizeWithLen +} + +func (a *MicroMiner) returnInapplicableTxs( + s state.NonThreadSafeState, + inapplicable []*types.TransactionWithBytes, + dropped *int, +) { + for _, tx := range inapplicable { + if uErr := a.utx.AddWithBytes(s, tx.T, tx.B); uErr != nil { + (*dropped)++ + a.logger.Debug("Failed to return inapplicable tx", + logging.Error(uErr), + logging.TxID(tx.T, a.scheme), + ) } - return nil, nil, rest, ErrNoTransactions } +} - transactions := make([]proto.Transaction, len(appliedTransactions)) - for i, appliedTx := range appliedTransactions { - if a.logger.Enabled(context.Background(), slog.LevelDebug) { - a.logger.Debug("Appending transaction", logging.TxID(appliedTx.T, a.scheme)) +func (a *MicroMiner) resetOnCommitmentError( + s state.NonThreadSafeState, + t *types.TransactionWithBytes, + applied []*types.TransactionWithBytes, + dropped int, +) int { + a.logger.Error("Tx commitment error, returning applied txs", logging.TxID(t.T, a.scheme)) + s.ResetValidationList() + for _, appliedTx := range applied { + if uErr := a.utx.AddWithBytesRaw(appliedTx.T, appliedTx.B); uErr != nil { + dropped++ + a.logger.Warn("Failed to return applied tx", logging.Error(uErr), logging.TxID(appliedTx.T, a.scheme)) } - transactions[i] = appliedTx.T } + return dropped +} + +func (a *MicroMiner) createNewBlock( + minedBlock *proto.Block, + keyPair proto.KeyPair, + transactions []proto.Transaction, + txSnapshots [][]proto.AtomicSnapshot, + height uint64, +) (*proto.Block, *crypto.Digest, error) { lightNodeNewBlockActivated, err := a.state.IsActiveLightNodeNewBlocksFields(height) if err != nil { - return nil, nil, rest, err + return nil, nil, err } var sh *crypto.Digest if lightNodeNewBlockActivated { prevSh, ok := minedBlock.GetStateHash() if !ok { - return nil, nil, rest, errors.New("mined block should have a state hash field") + return nil, nil, errors.New("mined block should have a state hash field") } newSh, errSh := state.CalculateSnapshotStateHash(a.scheme, height, prevSh, transactions, txSnapshots) if errSh != nil { - return nil, nil, rest, errSh + return nil, nil, errSh } sh = &newSh } newTransactions := minedBlock.Transactions.Join(transactions) - newBlock, err := proto.CreateBlock( - newTransactions, - minedBlock.Timestamp, - minedBlock.Parent, - minedBlock.GeneratorPublicKey, - minedBlock.NxtConsensus, - minedBlock.Version, - minedBlock.Features, - minedBlock.RewardVote, - a.scheme, - sh, + newTransactions, minedBlock.Timestamp, minedBlock.Parent, + minedBlock.GeneratorPublicKey, minedBlock.NxtConsensus, + minedBlock.Version, minedBlock.Features, minedBlock.RewardVote, + a.scheme, sh, ) if err != nil { - return nil, nil, rest, err + return nil, nil, err } - sk := keyPair.Secret - err = newBlock.SetTransactionsRootIfPossible(a.scheme) - if err != nil { - return nil, nil, rest, err + if err = newBlock.SetTransactionsRootIfPossible(a.scheme); err != nil { + return nil, nil, err } - err = newBlock.Sign(a.scheme, keyPair.Secret) - if err != nil { - return nil, nil, rest, err + if err = newBlock.Sign(a.scheme, keyPair.Secret); err != nil { + return nil, nil, err } - err = newBlock.GenerateBlockID(a.scheme) - if err != nil { - return nil, nil, rest, err + if err = newBlock.GenerateBlockID(a.scheme); err != nil { + return nil, nil, err } + + return newBlock, sh, nil +} + +func (a *MicroMiner) createMicroBlock( + newBlock *proto.Block, + keyPair proto.KeyPair, + transactions []proto.Transaction, + partialFinalization *proto.FinalizationVoting, + sh *crypto.Digest, + txCount int, +) (proto.MicroBlock, error) { micro := proto.MicroBlock{ VersionField: byte(newBlock.Version), SenderPK: keyPair.Public, @@ -240,20 +302,11 @@ func (a *MicroMiner) Micro(minedBlock *proto.Block, rest proto.MiningLimits, key TotalResBlockSigField: newBlock.BlockSignature, TotalBlockID: newBlock.BlockID(), StateHash: sh, + PartialFinalization: partialFinalization, } - - err = micro.Sign(a.scheme, sk) - if err != nil { - return nil, nil, rest, err + if err := micro.Sign(a.scheme, keyPair.Secret); err != nil { + return proto.MicroBlock{}, err } - a.logger.Debug("Micro block mined", "micro", micro) - - newRest := proto.MiningLimits{ - MaxScriptRunsInBlock: rest.MaxScriptRunsInBlock, - MaxScriptsComplexityInBlock: rest.MaxScriptsComplexityInBlock, - ClassicAmountOfTxsInBlock: rest.ClassicAmountOfTxsInBlock, - MaxTxsSizeInBytes: rest.MaxTxsSizeInBytes - binSize, - } - return newBlock, µ, newRest, nil + return micro, nil } diff --git a/pkg/mock/state.go b/pkg/mock/state.go index 8641d2d869..05ad7e0628 100644 --- a/pkg/mock/state.go +++ b/pkg/mock/state.go @@ -10,6 +10,7 @@ import ( gomock "github.com/golang/mock/gomock" crypto "github.com/wavesplatform/gowaves/pkg/crypto" + bls "github.com/wavesplatform/gowaves/pkg/crypto/bls" proto "github.com/wavesplatform/gowaves/pkg/proto" ast "github.com/wavesplatform/gowaves/pkg/ride/ast" settings "github.com/wavesplatform/gowaves/pkg/settings" @@ -328,6 +329,36 @@ func (mr *MockStateInfoMockRecorder) BlockchainSettings() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockchainSettings", reflect.TypeOf((*MockStateInfo)(nil).BlockchainSettings)) } +// CalculateVotingFinalization mocks base method. +func (m *MockStateInfo) CalculateVotingFinalization(endorsers []proto.WavesAddress, height proto.Height, allGenerators []proto.WavesAddress) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CalculateVotingFinalization", endorsers, height, allGenerators) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CalculateVotingFinalization indicates an expected call of CalculateVotingFinalization. +func (mr *MockStateInfoMockRecorder) CalculateVotingFinalization(endorsers, height, allGenerators interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CalculateVotingFinalization", reflect.TypeOf((*MockStateInfo)(nil).CalculateVotingFinalization), endorsers, height, allGenerators) +} + +// CommittedGenerators mocks base method. +func (m *MockStateInfo) CommittedGenerators(periodStart uint32) ([]proto.WavesAddress, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CommittedGenerators", periodStart) + ret0, _ := ret[0].([]proto.WavesAddress) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CommittedGenerators indicates an expected call of CommittedGenerators. +func (mr *MockStateInfoMockRecorder) CommittedGenerators(periodStart interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CommittedGenerators", reflect.TypeOf((*MockStateInfo)(nil).CommittedGenerators), periodStart) +} + // CurrentScore mocks base method. func (m *MockStateInfo) CurrentScore() (*big.Int, error) { m.ctrl.T.Helper() @@ -373,6 +404,36 @@ func (mr *MockStateInfoMockRecorder) EstimatorVersion() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EstimatorVersion", reflect.TypeOf((*MockStateInfo)(nil).EstimatorVersion)) } +// FindEndorserPKByIndex mocks base method. +func (m *MockStateInfo) FindEndorserPKByIndex(periodStart uint32, index int) (bls.PublicKey, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindEndorserPKByIndex", periodStart, index) + ret0, _ := ret[0].(bls.PublicKey) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FindEndorserPKByIndex indicates an expected call of FindEndorserPKByIndex. +func (mr *MockStateInfoMockRecorder) FindEndorserPKByIndex(periodStart, index interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindEndorserPKByIndex", reflect.TypeOf((*MockStateInfo)(nil).FindEndorserPKByIndex), periodStart, index) +} + +// FindGeneratorPKByEndorserPK mocks base method. +func (m *MockStateInfo) FindGeneratorPKByEndorserPK(periodStart uint32, endorserPK bls.PublicKey) (crypto.PublicKey, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindGeneratorPKByEndorserPK", periodStart, endorserPK) + ret0, _ := ret[0].(crypto.PublicKey) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FindGeneratorPKByEndorserPK indicates an expected call of FindGeneratorPKByEndorserPK. +func (mr *MockStateInfoMockRecorder) FindGeneratorPKByEndorserPK(periodStart, endorserPK interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindGeneratorPKByEndorserPK", reflect.TypeOf((*MockStateInfo)(nil).FindGeneratorPKByEndorserPK), periodStart, endorserPK) +} + // FullAssetInfo mocks base method. func (m *MockStateInfo) FullAssetInfo(assetID proto.AssetID) (*proto.FullAssetInfo, error) { m.ctrl.T.Helper() @@ -613,6 +674,36 @@ func (mr *MockStateInfoMockRecorder) IsAssetExist(assetID interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsAssetExist", reflect.TypeOf((*MockStateInfo)(nil).IsAssetExist), assetID) } +// LastFinalizedBlock mocks base method. +func (m *MockStateInfo) LastFinalizedBlock() (*proto.BlockHeader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LastFinalizedBlock") + ret0, _ := ret[0].(*proto.BlockHeader) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LastFinalizedBlock indicates an expected call of LastFinalizedBlock. +func (mr *MockStateInfoMockRecorder) LastFinalizedBlock() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastFinalizedBlock", reflect.TypeOf((*MockStateInfo)(nil).LastFinalizedBlock)) +} + +// LastFinalizedHeight mocks base method. +func (m *MockStateInfo) LastFinalizedHeight() (proto.Height, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LastFinalizedHeight") + ret0, _ := ret[0].(proto.Height) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LastFinalizedHeight indicates an expected call of LastFinalizedHeight. +func (mr *MockStateInfoMockRecorder) LastFinalizedHeight() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastFinalizedHeight", reflect.TypeOf((*MockStateInfo)(nil).LastFinalizedHeight)) +} + // LegacyStateHashAtHeight mocks base method. func (m *MockStateInfo) LegacyStateHashAtHeight(height proto.Height) (proto.StateHash, error) { m.ctrl.T.Helper() @@ -1747,6 +1838,21 @@ func (mr *MockStateMockRecorder) BlockchainSettings() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockchainSettings", reflect.TypeOf((*MockState)(nil).BlockchainSettings)) } +// CalculateVotingFinalization mocks base method. +func (m *MockState) CalculateVotingFinalization(endorsers []proto.WavesAddress, height proto.Height, allGenerators []proto.WavesAddress) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CalculateVotingFinalization", endorsers, height, allGenerators) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CalculateVotingFinalization indicates an expected call of CalculateVotingFinalization. +func (mr *MockStateMockRecorder) CalculateVotingFinalization(endorsers, height, allGenerators interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CalculateVotingFinalization", reflect.TypeOf((*MockState)(nil).CalculateVotingFinalization), endorsers, height, allGenerators) +} + // Close mocks base method. func (m *MockState) Close() error { m.ctrl.T.Helper() @@ -1761,6 +1867,21 @@ func (mr *MockStateMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockState)(nil).Close)) } +// CommittedGenerators mocks base method. +func (m *MockState) CommittedGenerators(periodStart uint32) ([]proto.WavesAddress, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CommittedGenerators", periodStart) + ret0, _ := ret[0].([]proto.WavesAddress) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CommittedGenerators indicates an expected call of CommittedGenerators. +func (mr *MockStateMockRecorder) CommittedGenerators(periodStart interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CommittedGenerators", reflect.TypeOf((*MockState)(nil).CommittedGenerators), periodStart) +} + // CreateNextSnapshotHash mocks base method. func (m *MockState) CreateNextSnapshotHash(block *proto.Block) (crypto.Digest, error) { m.ctrl.T.Helper() @@ -1821,6 +1942,36 @@ func (mr *MockStateMockRecorder) EstimatorVersion() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EstimatorVersion", reflect.TypeOf((*MockState)(nil).EstimatorVersion)) } +// FindEndorserPKByIndex mocks base method. +func (m *MockState) FindEndorserPKByIndex(periodStart uint32, index int) (bls.PublicKey, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindEndorserPKByIndex", periodStart, index) + ret0, _ := ret[0].(bls.PublicKey) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FindEndorserPKByIndex indicates an expected call of FindEndorserPKByIndex. +func (mr *MockStateMockRecorder) FindEndorserPKByIndex(periodStart, index interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindEndorserPKByIndex", reflect.TypeOf((*MockState)(nil).FindEndorserPKByIndex), periodStart, index) +} + +// FindGeneratorPKByEndorserPK mocks base method. +func (m *MockState) FindGeneratorPKByEndorserPK(periodStart uint32, endorserPK bls.PublicKey) (crypto.PublicKey, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindGeneratorPKByEndorserPK", periodStart, endorserPK) + ret0, _ := ret[0].(crypto.PublicKey) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FindGeneratorPKByEndorserPK indicates an expected call of FindGeneratorPKByEndorserPK. +func (mr *MockStateMockRecorder) FindGeneratorPKByEndorserPK(periodStart, endorserPK interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindGeneratorPKByEndorserPK", reflect.TypeOf((*MockState)(nil).FindGeneratorPKByEndorserPK), periodStart, endorserPK) +} + // FullAssetInfo mocks base method. func (m *MockState) FullAssetInfo(assetID proto.AssetID) (*proto.FullAssetInfo, error) { m.ctrl.T.Helper() @@ -2061,6 +2212,36 @@ func (mr *MockStateMockRecorder) IsAssetExist(assetID interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsAssetExist", reflect.TypeOf((*MockState)(nil).IsAssetExist), assetID) } +// LastFinalizedBlock mocks base method. +func (m *MockState) LastFinalizedBlock() (*proto.BlockHeader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LastFinalizedBlock") + ret0, _ := ret[0].(*proto.BlockHeader) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LastFinalizedBlock indicates an expected call of LastFinalizedBlock. +func (mr *MockStateMockRecorder) LastFinalizedBlock() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastFinalizedBlock", reflect.TypeOf((*MockState)(nil).LastFinalizedBlock)) +} + +// LastFinalizedHeight mocks base method. +func (m *MockState) LastFinalizedHeight() (proto.Height, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LastFinalizedHeight") + ret0, _ := ret[0].(proto.Height) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LastFinalizedHeight indicates an expected call of LastFinalizedHeight. +func (mr *MockStateMockRecorder) LastFinalizedHeight() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastFinalizedHeight", reflect.TypeOf((*MockState)(nil).LastFinalizedHeight)) +} + // LegacyStateHashAtHeight mocks base method. func (m *MockState) LegacyStateHashAtHeight(height proto.Height) (proto.StateHash, error) { m.ctrl.T.Helper() diff --git a/pkg/node/actions_by_type.go b/pkg/node/actions_by_type.go index 9e994f8da0..a5cab6e853 100644 --- a/pkg/node/actions_by_type.go +++ b/pkg/node/actions_by_type.go @@ -362,6 +362,27 @@ func MicroBlockSnapshotAction( return fsm.MicroBlockSnapshot(mess.ID, blockID, blockSnapshot) } +func EndorseBlockAction( + _ services.Services, mess peer.ProtoMessage, fsm *fsm.FSM, nl *slog.Logger, +) (fsm.Async, error) { + protoMess := g.EndorseBlock{} + endorseMsg, ok := mess.Message.(*proto.EndorseBlockMessage) + if !ok { + nl.Debug("unexpected message type", slog.String("type", fmt.Sprintf("%T", mess.Message))) + return nil, fmt.Errorf("unexpected message type %T, expected *proto.EndorseBlockMessage", mess.Message) + } + err := protoMess.UnmarshalVT(endorseMsg.Bytes) + if err != nil { + return nil, err + } + var c proto.ProtobufConverter + endorseBlock, err := c.EndorseBlock(&protoMess) + if err != nil { + return nil, err + } + return fsm.BlockEndorsement(&endorseBlock) +} + func createActions() map[reflect.Type]Action { return map[reflect.Type]Action{ reflect.TypeFor[*proto.ScoreMessage](): ScoreAction, @@ -384,5 +405,6 @@ func createActions() map[reflect.Type]Action { reflect.TypeFor[*proto.MicroBlockSnapshotRequestMessage](): MicroSnapshotRequestAction, reflect.TypeFor[*proto.BlockSnapshotMessage](): BlockSnapshotAction, reflect.TypeFor[*proto.MicroBlockSnapshotMessage](): MicroBlockSnapshotAction, + reflect.TypeFor[*proto.EndorseBlockMessage](): EndorseBlockAction, } } diff --git a/pkg/node/fsm/action.go b/pkg/node/fsm/action.go index 982c4a28ec..b01ef816ed 100644 --- a/pkg/node/fsm/action.go +++ b/pkg/node/fsm/action.go @@ -18,6 +18,7 @@ type currentScorer interface { type Actions interface { SendScore(currentScorer) SendBlock(block *proto.Block) + SendEndorseBlock(endorse *proto.EndorseBlock) } type ActionsImpl struct { @@ -74,3 +75,25 @@ func (a *ActionsImpl) SendBlock(block *proto.Block) { a.logger.Debug("Network message sent to peers", logging.Type(msg), slog.Int("count", cnt), slog.Any("blockID", block.BlockID())) } + +func (a *ActionsImpl) SendEndorseBlock(endorse *proto.EndorseBlock) { + bts, err := endorse.Marshal() + if err != nil { + a.logger.Error("Failed to marshal endorse block", logging.Error(err)) + return + } + + msg := &proto.EndorseBlockMessage{ + Bytes: bts, + } + + var cnt int + a.services.Peers.EachConnected(func(p peer.Peer, _ *proto.Score) { + p.SendMessage(msg) + cnt++ + }) + a.logger.Debug("Network message sent to peers", + logging.Type(msg), + slog.Int("count", cnt), + slog.Any("endorse", endorse)) +} diff --git a/pkg/node/fsm/fsm.go b/pkg/node/fsm/fsm.go index cd0bdfeec0..34ce67b00f 100644 --- a/pkg/node/fsm/fsm.go +++ b/pkg/node/fsm/fsm.go @@ -79,6 +79,8 @@ type BaseInfo struct { utx types.UtxPool + endorsements types.EndorsementPool + minPeersMining int skipMessageList *messages.SkipMessageList @@ -90,6 +92,8 @@ type BaseInfo struct { cleanCancel context.CancelFunc logger *slog.Logger netLogger *slog.Logger + + generationPeriod uint64 } func (a *BaseInfo) BroadcastTransaction(t proto.Transaction, receivedFrom peer.Peer) { @@ -195,6 +199,7 @@ const ( StartMiningEvent = "StartMining" ChangeSyncPeerEvent = "ChangeSyncPeer" BlockSnapshotEvent = "BlockSnapshotEvent" + BlockEndorsementEvent = "EndorseBlock" MicroBlockSnapshotEvent = "MicroBlockSnapshotEvent" ) @@ -220,6 +225,7 @@ func NewFSM( syncPeer *network.SyncPeer, enableLightMode bool, logger, netLogger *slog.Logger, + generationPeriod uint64, ) (*FSM, Async, error) { if microblockInterval <= 0 { return nil, nil, errors.New("microblock interval must be positive") @@ -245,16 +251,18 @@ func NewFSM( actions: &ActionsImpl{services: services, logger: logger}, - utx: services.UtxPool, + utx: services.UtxPool, + endorsements: services.EndorsementPool, minPeersMining: services.MinPeersMining, - skipMessageList: services.SkipMessageList, - syncPeer: syncPeer, - enableLightMode: enableLightMode, - cleanUtxRunning: &atomic.Bool{}, - logger: logger, - netLogger: netLogger, + skipMessageList: services.SkipMessageList, + syncPeer: syncPeer, + enableLightMode: enableLightMode, + cleanUtxRunning: &atomic.Bool{}, + logger: logger, + netLogger: netLogger, + generationPeriod: generationPeriod, } info.scheduler.Reschedule() // Reschedule mining just before starting the FSM (i.e. before starting the node). @@ -396,6 +404,12 @@ func (f *FSM) BlockSnapshot(p peer.Peer, blockID proto.BlockID, snapshots proto. return *asyncRes, err } +func (f *FSM) BlockEndorsement(endorseBlock *proto.EndorseBlock) (Async, error) { + asyncRes := &Async{} + err := f.fsm.Fire(BlockEndorsementEvent, asyncRes, endorseBlock) + return *asyncRes, err +} + func (f *FSM) MicroBlockSnapshot(p peer.Peer, blockID proto.BlockID, snapshots proto.BlockSnapshot) (Async, error) { asyncRes := &Async{} err := f.fsm.Fire(MicroBlockSnapshotEvent, asyncRes, p, blockID, snapshots) diff --git a/pkg/node/fsm/ng_state.go b/pkg/node/fsm/ng_state.go index e198f39e0e..026d743f90 100644 --- a/pkg/node/fsm/ng_state.go +++ b/pkg/node/fsm/ng_state.go @@ -2,6 +2,7 @@ package fsm import ( "context" + "fmt" "log/slog" "github.com/pkg/errors" @@ -14,9 +15,12 @@ import ( "github.com/wavesplatform/gowaves/pkg/p2p/peer" "github.com/wavesplatform/gowaves/pkg/p2p/peer/extension" "github.com/wavesplatform/gowaves/pkg/proto" + "github.com/wavesplatform/gowaves/pkg/settings" "github.com/wavesplatform/gowaves/pkg/state" ) +var errNoFinalization = errors.New("no finalization available") + type NGState struct { baseInfo BaseInfo blocksCache blockStatesCache @@ -205,6 +209,146 @@ func (a *NGState) Block(peer peer.Peer, block *proto.Block) (State, Async, error return newNGState(a.baseInfo), nil, nil } +func (a *NGState) BlockEndorsement(blockEndorsement *proto.EndorseBlock) (State, Async, error) { + endorsedBlockID := blockEndorsement.EndorsedBlockID + endorsedMicroBlock, found := a.baseInfo.MicroBlockCache.GetBlock(endorsedBlockID) + if !found || endorsedMicroBlock == nil { + return a, nil, a.Errorf(fmt.Errorf( + "failed to find the microblock that was endorsed (block ID: %s)", + endorsedBlockID.String(), + )) + } + top := a.baseInfo.storage.TopBlock() + if top.Parent != endorsedMicroBlock.Reference { + err := errors.Errorf("endorsed Block ID '%s' must match the parent's block ID '%s'", + endorsedMicroBlock.Reference.String(), top.BlockID().String()) + return a, nil, proto.NewInfoMsg(err) + } + + activationHeight, actErr := a.baseInfo.storage.ActivationHeight(int16(settings.DeterministicFinality)) + if actErr != nil { + return a, nil, + proto.NewInfoMsg(errors.Errorf("failed to get DeterministicFinality activation height, %v", actErr)) + } + height, heightErr := a.baseInfo.storage.Height() + if heightErr != nil { + return a, nil, a.Errorf(heightErr) + } + periodStart, err := state.CurrentGenerationPeriodStart(activationHeight, height, a.baseInfo.generationPeriod) + if err != nil { + return nil, nil, err + } + + endorserPK, err := a.baseInfo.storage.FindEndorserPKByIndex(periodStart, int(blockEndorsement.EndorserIndex)) + if err != nil { + return nil, nil, err + } + endorserWavesPK, findErr := a.baseInfo.storage.FindGeneratorPKByEndorserPK(periodStart, endorserPK) + if findErr != nil { + return nil, nil, findErr + } + endorserAddress := proto.MustAddressFromPublicKey(a.baseInfo.scheme, endorserWavesPK) + endorserRec := proto.NewRecipientFromAddress(endorserAddress) + balance, err := a.baseInfo.storage.GeneratingBalance(endorserRec, height) + if err != nil { + return nil, nil, err + } + localFinalizedHeight, err := a.baseInfo.storage.LastFinalizedHeight() + if err != nil { + return nil, nil, err + } + localFinalizedBlockHeader, err := a.baseInfo.storage.LastFinalizedBlock() + if err != nil { + return nil, nil, err + } + addErr := a.baseInfo.endorsements.Add(blockEndorsement, endorserPK, + localFinalizedHeight, localFinalizedBlockHeader.BlockID(), balance) + if addErr != nil { + return a, nil, errors.Errorf("failed to add an endorsement, %v", addErr) + } + + a.baseInfo.actions.SendEndorseBlock(blockEndorsement) // TODO should we send it out if conflicting? + return newNGState(a.baseInfo), nil, nil +} + +func (a *NGState) getPartialFinalization(height proto.Height) (*proto.FinalizationVoting, error) { + if a.baseInfo.endorsements.Len() == 0 { + return nil, errNoFinalization + } + fin, err := a.baseInfo.endorsements.FormFinalization(height) + if err != nil { + return nil, fmt.Errorf("failed to finalize endorsements for microblock: %w", err) + } + return &fin, nil +} + +func (a *NGState) getBlockFinalization(height proto.Height) (*proto.FinalizationVoting, error) { + blockFinalization, err := a.tryFinalize(height) + if err != nil { + if !errors.Is(err, errNoFinalization) { + return nil, a.Errorf(errors.Wrap(err, "failed to try finalize last block")) + } + return nil, errNoFinalization + } + return blockFinalization, nil +} + +func (a *NGState) tryFinalize(height proto.Height) (*proto.FinalizationVoting, error) { + // No finalization since nobody endorsed the last block. + if a.baseInfo.endorsements.Len() == 0 { + return nil, errNoFinalization + } + + activationHeight, err := a.baseInfo.storage.ActivationHeight(int16(settings.DeterministicFinality)) + if err != nil { + return nil, fmt.Errorf("failed to get DeterministicFinality activation height: %w", err) + } + + ok, err := a.baseInfo.endorsements.Verify() + if err != nil { + return nil, err + } + if !ok { + return nil, fmt.Errorf("endorsement verification failed at height %d", height) + } + + periodStart, err := state.CurrentGenerationPeriodStart(activationHeight, height, a.baseInfo.generationPeriod) + if err != nil { + return nil, err + } + + allEndorsers := a.baseInfo.endorsements.GetEndorsers() + endorsersAddresses := make([]proto.WavesAddress, 0, len(allEndorsers)) + for _, endorser := range allEndorsers { + pk, findErr := a.baseInfo.storage.FindGeneratorPKByEndorserPK(periodStart, endorser) + if findErr != nil { + return nil, findErr + } + addr := proto.MustAddressFromPublicKey(a.baseInfo.scheme, pk) + endorsersAddresses = append(endorsersAddresses, addr) + } + + generators, err := a.baseInfo.storage.CommittedGenerators(periodStart) + if err != nil { + return nil, err + } + + canFinalize, err := a.baseInfo.storage.CalculateVotingFinalization(endorsersAddresses, height, generators) + if err != nil { + return nil, fmt.Errorf("failed to calculate finalization voting: %w", err) + } + + if canFinalize { + finalization, finErr := a.baseInfo.endorsements.FormFinalization(height) + if finErr != nil { + return nil, finErr + } + return &finalization, nil + } + + return nil, errNoFinalization +} + func (a *NGState) MinedBlock( block *proto.Block, limits proto.MiningLimits, keyPair proto.KeyPair, vrf []byte, ) (State, Async, error) { @@ -219,6 +363,7 @@ func (a *NGState) MinedBlock( if heightErr != nil { return a, nil, a.Errorf(heightErr) } + metrics.BlockMined(block) err := a.baseInfo.storage.Map(func(state state.NonThreadSafeState) error { var err error @@ -239,6 +384,7 @@ func (a *NGState) MinedBlock( slog.Info("Generated key block successfully applied to state", "state", a.String(), "blockID", block.ID.String()) + a.baseInfo.endorsements.CleanAll() a.blocksCache.Clear() a.blocksCache.AddBlockState(block) a.baseInfo.actions.SendBlock(block) @@ -256,6 +402,7 @@ func (a *NGState) MicroBlock(p peer.Peer, micro *proto.MicroBlock) (State, Async metrics.MicroBlockDeclined(micro) return a, nil, a.Errorf(err) } + a.baseInfo.logger.Debug("Received microblock successfully applied to state", "state", a.String(), "blockID", block.BlockID(), "ref", micro.Reference) a.baseInfo.MicroBlockCache.AddMicroBlock(block.BlockID(), micro) @@ -278,16 +425,35 @@ func (a *NGState) MicroBlock(p peer.Peer, micro *proto.MicroBlock) (State, Async return st, tasks.Tasks(timeoutTask), nil } -func (a *NGState) microMine(minedBlock *proto.Block, - rest proto.MiningLimits, keyPair proto.KeyPair) (*proto.Block, *proto.MicroBlock, proto.MiningLimits, error) { - return a.baseInfo.microMiner.Micro(minedBlock, rest, keyPair) -} - // mineMicro handles a new microblock generated by miner. func (a *NGState) mineMicro( minedBlock *proto.Block, rest proto.MiningLimits, keyPair proto.KeyPair, vrf []byte, ) (State, Async, error) { - block, micro, rest, err := a.microMine(minedBlock, rest, keyPair) + height, heightErr := a.baseInfo.storage.Height() + if heightErr != nil { + return a, nil, a.Errorf(heightErr) + } + finalityActivated, err := a.baseInfo.storage.IsActiveAtHeight(int16(settings.DeterministicFinality), height+1) + if err != nil { + return a, nil, a.Errorf(err) + } + var partialFinalization *proto.FinalizationVoting + var blockFinalization *proto.FinalizationVoting + if finalityActivated { + partialFinalization, err = a.getPartialFinalization(height) + if err != nil && !errors.Is(err, errNoFinalization) { + return a, nil, a.Errorf(err) + } + blockFinalization, err = a.getBlockFinalization(height) + if err != nil && !errors.Is(err, errNoFinalization) { + return a, nil, a.Errorf(err) + } + } + block, micro, rest, err := a.baseInfo.microMiner.Micro(minedBlock, rest, keyPair, partialFinalization) + if block != nil { + block.FinalizationVoting = blockFinalization + } + switch { case errors.Is(err, miner.ErrNoTransactions) || errors.Is(err, miner.ErrBlockIsFull): // no txs to include in micro a.baseInfo.logger.Debug( @@ -303,6 +469,7 @@ func (a *NGState) mineMicro( return a, nil, a.Errorf(errors.Wrap(err, "failed to generate microblock")) } metrics.MicroBlockMined(micro, block.TransactionCount) + err = a.baseInfo.storage.Map(func(s state.NonThreadSafeState) error { _, er := a.baseInfo.blocksApplier.ApplyMicro(s, block) return er @@ -532,6 +699,19 @@ func initNGStateInFSM(state *StateData, fsm *stateless.StateMachine, info BaseIn } return a.Block(convertToInterface[peer.Peer](args[0]), args[1].(*proto.Block)) })). + PermitDynamic(BlockEndorsementEvent, + createPermitDynamicCallback(BlockEndorsementEvent, state, func(args ...any) (State, Async, error) { + a, ok := state.State.(*NGState) + if !ok { + return a, nil, a.Errorf(errors.Errorf( + "unexpected type '%T' expected '*NGState'", state.State)) + } + endorse, ok := args[0].(*proto.EndorseBlock) + if !ok { + return a, nil, a.Errorf(errors.Errorf("unexpected type %T, expected *proto.EndorseBlock", args[0])) + } + return a.BlockEndorsement(endorse) + })). PermitDynamic(MinedBlockEvent, createPermitDynamicCallback(MinedBlockEvent, state, func(args ...any) (State, Async, error) { a, ok := state.State.(*NGState) diff --git a/pkg/node/node.go b/pkg/node/node.go index bab789b750..bf87215a47 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -155,6 +155,7 @@ func (a *Node) logErrors(err error) { func (a *Node) Run( ctx context.Context, p peer.Parent, internalMessageCh <-chan messages.InternalMessage, networkMsgCh <-chan network.InfoMessage, syncPeer *network.SyncPeer, + generationPeriod uint64, ) { messageCh, protoMessagesLenProvider, wg := deduplicateProtoTxMessages(ctx, p.MessageCh) defer wg.Wait() @@ -167,7 +168,7 @@ func (a *Node) Run( // TODO: Consider using context `ctx` in FSM, for now FSM works in the background context. m, async, err := fsm.NewFSM(a.services, a.microblockInterval, a.obsolescence, syncPeer, a.enableLightMode, - a.fsmLogger, a.netLogger) + a.fsmLogger, a.netLogger, generationPeriod) if err != nil { slog.Error("Failed to create FSM", logging.Error(err)) return diff --git a/pkg/proto/block.go b/pkg/proto/block.go index 00f40d78dc..0cc22f14a0 100644 --- a/pkg/proto/block.go +++ b/pkg/proto/block.go @@ -340,15 +340,16 @@ type BlockHeader struct { RewardVote int64 `json:"desiredReward"` ConsensusBlockLength uint32 `json:"-"` NxtConsensus `json:"nxt-consensus"` - TransactionBlockLength uint32 `json:"transactionBlockLength,omitempty"` - TransactionCount int `json:"transactionCount"` - GeneratorPublicKey crypto.PublicKey `json:"generatorPublicKey"` - BlockSignature crypto.Signature `json:"signature"` - TransactionsRoot B58Bytes `json:"transactionsRoot,omitempty"` - StateHash *crypto.Digest `json:"stateHash,omitempty"` // is nil before protocol version 1.5 - ChallengedHeader *ChallengedHeader `json:"challengedHeader,omitempty"` // is nil before protocol version 1.5 - - ID BlockID `json:"id"` // this field must be generated and set after Block unmarshalling + TransactionBlockLength uint32 `json:"transactionBlockLength,omitempty"` + TransactionCount int `json:"transactionCount"` + GeneratorPublicKey crypto.PublicKey `json:"generatorPublicKey"` + BlockSignature crypto.Signature `json:"signature"` + TransactionsRoot B58Bytes `json:"transactionsRoot,omitempty"` + StateHash *crypto.Digest `json:"stateHash,omitempty"` // is nil before protocol version 1.5 + ChallengedHeader *ChallengedHeader `json:"challengedHeader,omitempty"` // is nil before protocol version 1.5 + FinalizationVoting *FinalizationVoting `json:"finalizationVoting,omitempty"` + // This field must be generated and set after Block unmarshalling. + ID BlockID `json:"id"` } func (b *BlockHeader) GetStateHash() (crypto.Digest, bool) { @@ -362,6 +363,17 @@ func (b *BlockHeader) GetStateHash() (crypto.Digest, bool) { return sh, present } +func (b *BlockHeader) GetFinalizationVoting() (FinalizationVoting, bool) { + var ( + fv FinalizationVoting + present = b.FinalizationVoting != nil + ) + if present { + fv = *b.FinalizationVoting + } + return fv, present +} + func (b *BlockHeader) GetChallengedHeader() (ChallengedHeader, bool) { var ( ch ChallengedHeader @@ -469,6 +481,14 @@ func (b *BlockHeader) HeaderToProtobufHeader(scheme Scheme) (*g.Block_Header, er if sh, present := b.GetStateHash(); present { stateHash = sh.Bytes() } + var finalizationVoting *g.FinalizationVoting + if fv, present := b.GetFinalizationVoting(); present { + var err error + finalizationVoting, err = fv.ToProtobuf() + if err != nil { + return nil, err + } + } return &g.Block_Header{ ChainId: int32(scheme), Reference: b.Parent.Bytes(), @@ -482,6 +502,7 @@ func (b *BlockHeader) HeaderToProtobufHeader(scheme Scheme) (*g.Block_Header, er TransactionsRoot: b.TransactionsRoot, StateHash: stateHash, ChallengedHeader: challengedHeader, + FinalizationVoting: finalizationVoting, }, nil } @@ -564,7 +585,6 @@ func (b *BlockHeader) MarshalHeaderToBinary() ([]byte, error) { } res = append(res, b.GeneratorPublicKey[:]...) res = append(res, b.BlockSignature[:]...) - return res, nil } diff --git a/pkg/proto/endorse_test.go b/pkg/proto/endorse_test.go new file mode 100644 index 0000000000..2248ec14d8 --- /dev/null +++ b/pkg/proto/endorse_test.go @@ -0,0 +1,54 @@ +package proto_test + +import ( + "encoding/base64" + "encoding/binary" + "testing" + + "github.com/stretchr/testify/require" + "github.com/wavesplatform/gowaves/pkg/proto" +) + +func TestEndorsementMessage(t *testing.T) { + finalizedIDBase64 := "ZUUlMmISFnQqWlFgZmxAXW4lNUUnWF15YQp+IHd3TnYCIA8BWmRUfh4VNnYREQBXAARPcgkcYQ8FRw87BB4uMw==" + endorsedIDBase64 := "GR40QGtzGDh+KWwjaUICbnV9Y28CFBwDeHFhXTMEHzxWPGxWe2IMbBtTYH0fE2gSal5CB1s2TRQoQ09PJVghHQ==" + finalizedHeight := uint32(5) + + finalizedIDBytes, err := base64.StdEncoding.DecodeString(finalizedIDBase64) + require.NoError(t, err) + + endorsedIDBytes, err := base64.StdEncoding.DecodeString(endorsedIDBase64) + require.NoError(t, err) + + finalizedID, err := proto.NewBlockIDFromBytes(finalizedIDBytes) + require.NoError(t, err) + + endorsedID, err := proto.NewBlockIDFromBytes(endorsedIDBytes) + require.NoError(t, err) + + e := &proto.EndorseBlock{ + FinalizedBlockID: finalizedID, + FinalizedBlockHeight: finalizedHeight, + EndorsedBlockID: endorsedID, + } + + got, err := e.EndorsementMessage() + require.NoError(t, err) + + // Rebuild using the same concatenation as Scala + expected := make([]byte, 0, len(finalizedIDBytes)+4+len(endorsedIDBytes)) + expected = append(expected, finalizedIDBytes...) + h := make([]byte, 4) + binary.BigEndian.PutUint32(h, finalizedHeight) + expected = append(expected, h...) + expected = append(expected, endorsedIDBytes...) + + require.Equal(t, expected, got, "endorsement message bytes must match Scala version") + + expectedBase64 := base64.StdEncoding.EncodeToString(expected) + require.Equal(t, + "ZUUlMmISFnQqWlFgZmxAXW4lNUUnWF15YQp+IHd3TnYCIA8BWmRUfh4VNnYREQBXAARPcgkcYQ8FRw87BB4uMwAAAAUZHjRAa"+ + "3MYOH4pbCNpQgJudX1jbwIUHAN4cWFdMwQfPFY8bFZ7YgxsG1NgfR8TaBJqXkIHWzZNFChDT08lWCEd", + expectedBase64, + ) +} diff --git a/pkg/proto/finalization.go b/pkg/proto/finalization.go new file mode 100644 index 0000000000..761bacf9f5 --- /dev/null +++ b/pkg/proto/finalization.go @@ -0,0 +1,151 @@ +package proto + +import ( + "encoding/binary" + + "github.com/ccoveille/go-safecast/v2" + "github.com/pkg/errors" + "github.com/wavesplatform/gowaves/pkg/crypto/bls" + g "github.com/wavesplatform/gowaves/pkg/grpc/generated/waves" +) + +type EndorseBlock struct { + EndorserIndex int32 `json:"endorserIndex"` + FinalizedBlockID BlockID `json:"finalizedBlockID"` + FinalizedBlockHeight uint32 `json:"finalizedBlockHeight"` + EndorsedBlockID BlockID `json:"endorsedBlockId"` + Signature bls.Signature `json:"signature"` +} + +func (e *EndorseBlock) Marshal() ([]byte, error) { + endBlockProto := e.ToProtobuf() + return endBlockProto.MarshalVTStrict() +} + +func (e *EndorseBlock) EndorsementMessage() ([]byte, error) { + const heightSize = uint32Size + + finalizedID := e.FinalizedBlockID.Bytes() + endorsedID := e.EndorsedBlockID.Bytes() + + size := len(finalizedID) + heightSize + len(endorsedID) + buf := make([]byte, size) + + // finalizedBlockId + copy(buf[0:len(finalizedID)], finalizedID) + + // finalizedBlockHeight (4 bytes big-endian, same as Scala Ints.toByteArray) + binary.BigEndian.PutUint32(buf[len(finalizedID):len(finalizedID)+heightSize], e.FinalizedBlockHeight) + + // endorsedBlockId + copy(buf[len(finalizedID)+heightSize:], endorsedID) + + return buf, nil +} + +func EndorsementMessage(finalizedBlockID BlockID, endorsedBlockID BlockID, + finalizedBlockHeight Height) ([]byte, error) { + const heightSize = uint32Size + + finalizedID := finalizedBlockID.Bytes() + endorsedID := endorsedBlockID.Bytes() + + size := len(finalizedID) + heightSize + len(endorsedID) + buf := make([]byte, size) + + // finalizedBlockId + copy(buf[0:len(finalizedID)], finalizedID) + + finalizedBlockHeightUint, err := safecast.Convert[uint32](finalizedBlockHeight) + if err != nil { + return nil, errors.Errorf("finalized block height conversion error: %v", err) + } + // finalizedBlockHeight (4 bytes big-endian, same as Scala Ints.toByteArray) + binary.BigEndian.PutUint32(buf[len(finalizedID):len(finalizedID)+heightSize], finalizedBlockHeightUint) + + // endorsedBlockId + copy(buf[len(finalizedID)+heightSize:], endorsedID) + + return buf, nil +} + +func (e *EndorseBlock) UnmarshalFromProtobuf(data []byte) error { + var pbEndorsement = &g.EndorseBlock{} + err := pbEndorsement.UnmarshalVT(data) + if err != nil { + return err + } + var c ProtobufConverter + res, err := c.EndorseBlock(pbEndorsement) + if err != nil { + return err + } + *e = res + return nil +} + +func (e *EndorseBlock) ToProtobuf() *g.EndorseBlock { + endBlockProto := g.EndorseBlock{ + EndorserIndex: e.EndorserIndex, + FinalizedBlockId: e.FinalizedBlockID.Bytes(), + FinalizedBlockHeight: e.FinalizedBlockHeight, + EndorsedBlockId: e.EndorsedBlockID.Bytes(), + Signature: e.Signature.Bytes(), + } + return &endBlockProto +} + +type FinalizationVoting struct { + EndorserIndexes []int32 `json:"endorserIndexes"` + FinalizedBlockHeight Height `json:"finalizedBlockHeight"` + AggregatedEndorsementSignature bls.Signature `json:"aggregatedEndorsementSignature"` + ConflictEndorsements []EndorseBlock `json:"conflictEndorsements"` +} + +func (f *FinalizationVoting) Marshal() ([]byte, error) { + endBlockProto, err := f.ToProtobuf() + if err != nil { + return nil, err + } + return endBlockProto.MarshalVTStrict() +} + +func (f *FinalizationVoting) UnmarshalFromProtobuf(data []byte) error { + var pbFinalization = &g.FinalizationVoting{} + err := pbFinalization.UnmarshalVT(data) + if err != nil { + return err + } + var c ProtobufConverter + res, err := c.FinalizationVoting(pbFinalization) + if err != nil { + return err + } + *f = res + return nil +} + +func (f *FinalizationVoting) ToProtobuf() (*g.FinalizationVoting, error) { + conflictEndorsements := make([]*g.EndorseBlock, len(f.ConflictEndorsements)) + for i, ce := range f.ConflictEndorsements { + conflictEndorsements[i] = ce.ToProtobuf() + } + + finalizedBlockHeight, err := safecast.Convert[int32](f.FinalizedBlockHeight) + if err != nil { + return nil, errors.Errorf("finalized block height conversion error: %v", err) + } + finalizationVoting := g.FinalizationVoting{ + EndorserIndexes: f.EndorserIndexes, + FinalizedBlockHeight: finalizedBlockHeight, + AggregatedEndorsementSignature: f.AggregatedEndorsementSignature.Bytes(), + ConflictEndorsements: conflictEndorsements, + } + return &finalizationVoting, nil +} + +func CalculateLastFinalizedHeight(currentHeight Height) Height { + const genesisHeight = 1 + const maxRollbackDeltaHeight = 100 + return max(genesisHeight, currentHeight-maxRollbackDeltaHeight) +} diff --git a/pkg/proto/microblock.go b/pkg/proto/microblock.go index 85a1be7e4f..0f10d3ee72 100644 --- a/pkg/proto/microblock.go +++ b/pkg/proto/microblock.go @@ -32,6 +32,7 @@ type MicroBlock struct { SenderPK crypto.PublicKey Signature crypto.Signature StateHash *crypto.Digest // is nil before protocol version 1.5 + PartialFinalization *FinalizationVoting } type MicroblockTotalSig = crypto.Signature @@ -47,6 +48,17 @@ func (a *MicroBlock) GetStateHash() (crypto.Digest, bool) { return sh, present } +func (a *MicroBlock) GetPartialFinalization() (FinalizationVoting, bool) { + var ( + fin FinalizationVoting + present = a.PartialFinalization != nil + ) + if present { + fin = *a.PartialFinalization + } + return fin, present +} + func (a *MicroBlock) UnmarshalFromProtobuf(b []byte) error { var pbMicroBlock = &g.SignedMicroBlock{} if err := pbMicroBlock.UnmarshalVT(b); err != nil { @@ -87,6 +99,14 @@ func (a *MicroBlock) ToProtobuf(scheme Scheme) (*g.SignedMicroBlock, error) { if sh, present := a.GetStateHash(); present { stateHash = sh.Bytes() } + var finalizationProtobuf *g.FinalizationVoting + if fin, present := a.GetPartialFinalization(); present { + var cnvrtErr error + finalizationProtobuf, cnvrtErr = fin.ToProtobuf() + if cnvrtErr != nil { + return nil, cnvrtErr + } + } return &g.SignedMicroBlock{ MicroBlock: &g.MicroBlock{ Version: int32(a.VersionField), @@ -95,6 +115,7 @@ func (a *MicroBlock) ToProtobuf(scheme Scheme) (*g.SignedMicroBlock, error) { SenderPublicKey: a.SenderPK.Bytes(), Transactions: txs, StateHash: stateHash, + FinalizationVoting: finalizationProtobuf, }, Signature: sig, TotalBlockId: a.TotalBlockID.Bytes(), @@ -226,6 +247,17 @@ func (a *MicroBlock) WriteWithoutSignature(scheme Scheme, w io.Writer) (int64, e stateHash = sh.Bytes() } s.Bytes(stateHash) + if proto && a.PartialFinalization != nil { // Write finalization only for protobuf micro blocks. + finalizationProtobuf, err := a.PartialFinalization.ToProtobuf() + if err != nil { + return 0, err + } + finalizationBytes, err := finalizationProtobuf.MarshalVTStrict() + if err != nil { + return 0, err + } + s.Bytes(finalizationBytes) + } return s.N(), nil } diff --git a/pkg/proto/microblock_test.go b/pkg/proto/microblock_test.go index e3567a8b46..b8761f7ad0 100644 --- a/pkg/proto/microblock_test.go +++ b/pkg/proto/microblock_test.go @@ -7,12 +7,23 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/wavesplatform/gowaves/pkg/crypto" + "github.com/wavesplatform/gowaves/pkg/crypto/bls" ) +func testTxBytes() []byte { + return []byte{0, 0, 0, 152, 4, 76, 252, 177, 12, 123, 169, 56, 92, 8, 85, 82, 118, + 1, 166, 228, 57, 52, 84, 161, 19, 144, 247, 9, 93, 114, 88, 198, 123, 123, + 210, 188, 95, 177, 170, 229, 15, 176, 248, 128, 112, 121, 201, 53, 221, 15, 55, + 231, 118, 113, 192, 201, 113, 251, 55, 6, 95, 207, 47, 24, 71, 240, 162, 206, 6, 4, + 236, 89, 77, 54, 8, 236, 240, 30, 10, 87, 121, 139, 23, 7, 114, 121, 45, 177, 69, 50, + 132, 55, 119, 224, 172, 245, 68, 95, 44, 28, 243, 4, 0, 0, 0, 0, 1, 107, 137, 11, 41, 201, + 0, 0, 0, 10, 247, 96, 247, 0, 0, 0, 0, 0, 0, 1, 134, 160, 1, 87, 126, 90, 125, 49, 243, 210, + 18, 83, 195, 130, 223, 30, 209, 178, 95, 17, 186, 108, 63, 172, 209, 224, 228, 138, 0, 0} +} + func TestMicroBlock_Marshaling(t *testing.T) { - txBytes := []byte{0, 0, 0, 152, 4, 76, 252, 177, 12, 123, 169, 56, 92, 8, 85, 82, 118, 1, 166, 228, 57, 52, 84, 161, 19, 144, 247, 9, 93, 114, 88, 198, 123, 123, 210, 188, 95, 177, 170, 229, 15, 176, 248, 128, 112, 121, 201, 53, 221, 15, 55, 231, 118, 113, 192, 201, 113, 251, 55, 6, 95, 207, 47, 24, 71, 240, 162, 206, 6, 4, 236, 89, 77, 54, 8, 236, 240, 30, 10, 87, 121, 139, 23, 7, 114, 121, 45, 177, 69, 50, 132, 55, 119, 224, 172, 245, 68, 95, 44, 28, 243, 4, 0, 0, 0, 0, 1, 107, 137, 11, 41, 201, 0, 0, 0, 10, 247, 96, 247, 0, 0, 0, 0, 0, 0, 1, 134, 160, 1, 87, 126, 90, 125, 49, 243, 210, 18, 83, 195, 130, 223, 30, 209, 178, 95, 17, 186, 108, 63, 172, 209, 224, 228, 138, 0, 0} + txBytes := testTxBytes() txs, err := NewTransactionsFromBytes(txBytes, 1, TestNetScheme) require.NoError(t, err) refSig := crypto.MustSignatureFromBase58("37ex9gonRZtUddDHgSzSes5Ds9UeQyS74DyAXtGFrDpJnEg7sjGdi2ncaV4rVpZnLboQmid3whcbZUWS49FV3ZCs") @@ -41,7 +52,7 @@ func TestMicroBlock_Marshaling(t *testing.T) { } func TestMicroBlockProtobufRoundTrip(t *testing.T) { - txBytes := []byte{0, 0, 0, 152, 4, 76, 252, 177, 12, 123, 169, 56, 92, 8, 85, 82, 118, 1, 166, 228, 57, 52, 84, 161, 19, 144, 247, 9, 93, 114, 88, 198, 123, 123, 210, 188, 95, 177, 170, 229, 15, 176, 248, 128, 112, 121, 201, 53, 221, 15, 55, 231, 118, 113, 192, 201, 113, 251, 55, 6, 95, 207, 47, 24, 71, 240, 162, 206, 6, 4, 236, 89, 77, 54, 8, 236, 240, 30, 10, 87, 121, 139, 23, 7, 114, 121, 45, 177, 69, 50, 132, 55, 119, 224, 172, 245, 68, 95, 44, 28, 243, 4, 0, 0, 0, 0, 1, 107, 137, 11, 41, 201, 0, 0, 0, 10, 247, 96, 247, 0, 0, 0, 0, 0, 0, 1, 134, 160, 1, 87, 126, 90, 125, 49, 243, 210, 18, 83, 195, 130, 223, 30, 209, 178, 95, 17, 186, 108, 63, 172, 209, 224, 228, 138, 0, 0} + txBytes := testTxBytes() txs, err := NewTransactionsFromBytes(txBytes, 1, MainNetScheme) require.NoError(t, err) refSig := crypto.MustSignatureFromBase58("37ex9gonRZtUddDHgSzSes5Ds9UeQyS74DyAXtGFrDpJnEg7sjGdi2ncaV4rVpZnLboQmid3whcbZUWS49FV3ZCs") @@ -174,3 +185,85 @@ func TestMicroBlockV5VerifySignatureWithFilledStateHash(t *testing.T) { require.NoError(t, err) assert.True(t, ok) } + +const blsAggregatedSig = "nBWfaRLW7EdcwxhDMaXuZZFMhHyowAxY7476rkBsUUeguTXrMSNuTVkuWLmZjRmRfgMXEGuvdHiu1V7joRFSLz3" + + "X6MQBF8m88kHJEj6Tc2ktBnMTzihh2JMGpuuWBLSK8rv" +const blsConflictSig = "RNMTkL736x3TmXfjQufKnxSgySaaoec3WYnxmujcum9BHEmCdjmwvjoUehghqYCWJcNj5CNfb9QdnujV9o2DRitbLg" + + "q2bnLdTU5s1DLBWBkVx8mBayvdfx7rPZ3mtUWeh5L" + +func TestMicroBlockSignature(t *testing.T) { + txBytes := testTxBytes() + txs, err := NewTransactionsFromBytes(txBytes, 1, TestNetScheme) + require.NoError(t, err) + + seed := make([]byte, 32) + sk, pk, err := crypto.GenerateKeyPair(seed) + require.NoError(t, err) + + refSig := crypto.MustSignatureFromBase58( + "37ex9gonRZtUddDHgSzSes5Ds9UeQyS74DyAXtGFrDpJnEg7sjGdi2ncaV4rVpZnLboQmid3whcbZUWS49FV3ZCs") + ref := NewBlockIDFromSignature(refSig) + + endorsedSig := crypto.MustSignatureFromBase58( + "5GszB5vY2KTxLvYq4zAFQvRkJxv5Rt5BcuTGHZrxgSLTzPtni7eY5k1DN1mJ7mY4ixP5fiHD9z1AfM99AA8yxhjg") + endorsedID := NewBlockIDFromSignature(endorsedSig) + + aggSig, err := bls.NewSignatureFromBase58(blsAggregatedSig) + require.NoError(t, err) + conflictSig, err := bls.NewSignatureFromBase58(blsConflictSig) + require.NoError(t, err) + + finalization := FinalizationVoting{ + EndorserIndexes: []int32{1, 2, 3}, + AggregatedEndorsementSignature: aggSig, + FinalizedBlockHeight: 1, + ConflictEndorsements: []EndorseBlock{ + { + EndorserIndex: 1, + FinalizedBlockID: ref, + FinalizedBlockHeight: 12345, + EndorsedBlockID: endorsedID, + Signature: conflictSig, + }, + }, + } + finalizationVotingExpected := "CgMBAgMQARpggyjkX2gT2YmzoqT+gCY7zgdxeJ75Sa+EtYjQy6qfDfIKLnJ6SCRCC8fsD8C8+wAiFmd4kW" + + "ccRfX8pk/1PFgUjGZtfmFwIQJ5G4pVexxDURku8z4evXcse64vV2XLxb6LIusBCAESQGnFvj8CErOF62bQ6KthEkYLJjwHfER97mTynkydHH" + + "c4/snMkWT+BSNdniltRtW24p82GYZyGWbFPdE1ARnRgIAYuWAiQNXC8WrfOjQIpVQ2uBsNsPL5E5jzxlNj8p81bvr3d1wPKFjE4rJc4ASXV5" + + "PalnIEHuT+YB5fApSdfHv6lRMU54MqYEa9cH8UoVCJUqToKlo2aqh6sYXYb9TzGYYph1cDrsbd3IDZWqNEq0glrbvEdKxIoW+1yHsWszKiSQ" + + "MAEvxrpsfydh6PhGOvEYDlB3YZv49Vhmj8Wr8ZNnU8CHqv0Rjn0w==" + finalizationActualProto, err := finalization.Marshal() + require.NoError(t, err) + + require.Equal(t, finalizationVotingExpected, base64.StdEncoding.EncodeToString(finalizationActualProto)) + + microBlock := MicroBlock{ + VersionField: 5, + Reference: ref, + TotalResBlockSigField: crypto.MustSignatureFromBase58( + "3ta68P5LdLHWKuKcDvASsjcCMEQsm1ySrpxYZwqmzCHiAWHgrYJE1ZmaTsh3ytPqY73545EUPDaGfVdrguTqVTHg"), + SenderPK: pk, + Transactions: txs, + TransactionCount: 1, + PartialFinalization: &finalization, + } + + // Serialize without signature + buf := new(bytes.Buffer) + _, err = microBlock.WriteWithoutSignature(TestNetScheme, buf) + require.NoError(t, err) + + expectedBytesWithoutSignature := "BWnFvj8CErOF62bQ6KthEkYLJjwHfER97mTynkydHHc4/snMkWT+BSNdniltRtW24p82GYZyGWbFPdE1" + + "ARnRgICQgaVdHK4QxQyEYihza6fh1tiQTYDXp9blQTt7S97AiU5A38jSKWoMXr4Q/80NLX0tqB7bHpBBMSzTM5ac6MKPAAAAowAAAAEAAACbC" + + "lcIVBIg7FlNNgjs8B4KV3mLFwdyeS2xRTKEN3fgrPVEXywc8wQaBBCgjQYgydOsyLgtKAHCBiEKFgoUflp9MfPSElPDgt8e0bJfEbpsP6wSBx" + + "CA7oO7rwESQEz8sQx7qThcCFVSdgGm5Dk0VKETkPcJXXJYxnt70rxfsarlD7D4gHB5yTXdDzfndnHAyXH7NwZfzy8YR/CizgbElKnkSNWP6a/" + + "gfVPTrZ62oVuqwNg37tT6xi6ELp94YgoDAQIDEAEaYIMo5F9oE9mJs6Kk/oAmO84HcXie+UmvhLWI0Muqnw3yCi5yekgkQgvH7A/AvPsAIhZn" + + "eJFnHEX1/KZP9TxYFIxmbX5hcCECeRuKVXscQ1EZLvM+Hr13LHuuL1dly8W+iyLrAQgBEkBpxb4/AhKzhetm0OirYRJGCyY8B3xEfe5k8p5Mn" + + "Rx3OP7JzJFk/gUjXZ4pbUbVtuKfNhmGchlmxT3RNQEZ0YCAGLlgIkDVwvFq3zo0CKVUNrgbDbDy+ROY88ZTY/KfNW7693dcDyhYxOKyXOAEl1" + + "eT2pZyBB7k/mAeXwKUnXx7+pUTFOeDKmBGvXB/FKFQiVKk6CpaNmqoerGF2G/U8xmGKYdXA67G3dyA2VqjRKtIJa27xHSsSKFvtch7FrMyokk" + + "DABL8a6bH8nYej4RjrxGA5Qd2Gb+PVYZo/Fq/GTZ1PAh6r9EY59M=" + require.Equal(t, expectedBytesWithoutSignature, base64.StdEncoding.EncodeToString(buf.Bytes())) + + err = microBlock.Sign(TestNetScheme, sk) + require.NoError(t, err) +} diff --git a/pkg/proto/proto.go b/pkg/proto/proto.go index fb31035cc4..312f78347f 100644 --- a/pkg/proto/proto.go +++ b/pkg/proto/proto.go @@ -81,6 +81,7 @@ const ( ContentIDMicroBlockSnapshotRequest PeerMessageID = 35 ContentIDBlockSnapshot PeerMessageID = 36 ContentIDMicroBlockSnapshot PeerMessageID = 37 + ContentIDEndorseBlock PeerMessageID = 38 ) func ProtocolVersion() Version { @@ -2145,6 +2146,8 @@ func CreateMessageByContentID(contentID PeerMessageID) (Message, error) { return &BlockSnapshotMessage{}, nil case ContentIDMicroBlockSnapshot: return &MicroBlockSnapshotMessage{}, nil + case ContentIDEndorseBlock: + return &EndorseBlockMessage{}, nil default: return nil, fmt.Errorf("unexpected content ID %d", contentID) } @@ -2222,3 +2225,61 @@ func unmarshalBlockIDs(data []byte) ([]BlockID, error) { } return ids, nil } + +type EndorseBlockMessage struct { + Bytes BytesPayload +} + +// MarshalBinary encodes EndorseBlockMessage to binary form. +func (m *EndorseBlockMessage) MarshalBinary() ([]byte, error) { + h, err := NewHeader(ContentIDEndorseBlock, m.Bytes) + if err != nil { + return nil, err + } + hdr, err := h.MarshalBinary() + if err != nil { + return nil, err + } + hdr = append(hdr, m.Bytes...) + return hdr, nil +} + +// UnmarshalBinary decodes EndorseBlockMessage from binary form. +func (m *EndorseBlockMessage) UnmarshalBinary(data []byte) error { + var h Header + if err := h.UnmarshalBinary(data); err != nil { + return err + } + if h.Magic != headerMagic { + return fmt.Errorf("wrong magic in Header: %x", h.Magic) + } + if h.ContentID != ContentIDEndorseBlock { + return fmt.Errorf("wrong ContentID in Header: %x", h.ContentID) + } + if common.SafeIntToUint32(len(data)) < maxHeaderLength+h.payloadLength { + return errors.New("EndorseBlockMessage UnmarshalBinary: invalid data size") + } + m.Bytes = make([]byte, h.payloadLength) + copy(m.Bytes, data[maxHeaderLength:maxHeaderLength+h.payloadLength]) + return nil +} + +// ReadFrom reads EndorseBlockMessage from io.Reader. +func (m *EndorseBlockMessage) ReadFrom(r io.Reader) (int64, error) { + return ReadMessage(r, ContentIDEndorseBlock, "EndorseBlockMessage", &m.Bytes) +} + +// WriteTo writes EndorseBlockMessage to io.Writer. +func (m *EndorseBlockMessage) WriteTo(w io.Writer) (int64, error) { + return WriteMessage(w, ContentIDEndorseBlock, "EndorseBlockMessage", &m.Bytes) +} + +func (m *EndorseBlockMessage) IsMessage() {} + +func (m *EndorseBlockMessage) SetPayload(payload Payload) (Message, error) { + if p, ok := payload.(*BytesPayload); ok { + m.Bytes = *p + return m, nil + } + return nil, fmt.Errorf("invalid payload type %T", payload) +} diff --git a/pkg/proto/protobuf_converters.go b/pkg/proto/protobuf_converters.go index cc4a65fd43..6e44b2fbbb 100644 --- a/pkg/proto/protobuf_converters.go +++ b/pkg/proto/protobuf_converters.go @@ -3,7 +3,6 @@ package proto import ( "github.com/ccoveille/go-safecast/v2" "github.com/pkg/errors" - "github.com/wavesplatform/gowaves/pkg/crypto" "github.com/wavesplatform/gowaves/pkg/crypto/bls" g "github.com/wavesplatform/gowaves/pkg/grpc/generated/waves" @@ -1726,6 +1725,71 @@ func (c *ProtobufConverter) Block(block *g.Block) (Block, error) { }, nil } +func (c *ProtobufConverter) EndorseBlock(endorsement *g.EndorseBlock) (EndorseBlock, error) { + if endorsement == nil { + return EndorseBlock{}, errors.New("empty endorsement") + } + finalizedBlockID, err := NewBlockIDFromBytes(endorsement.FinalizedBlockId) + if err != nil { + return EndorseBlock{}, errors.Errorf("failed to parse finalized block ID: %v", err) + } + endorsedBlockID, err := NewBlockIDFromBytes(endorsement.EndorsedBlockId) + if err != nil { + return EndorseBlock{}, errors.Errorf("failed to parse endorsed block ID: %v", err) + } + sig, err := bls.NewSignatureFromBytes(endorsement.Signature) + if err != nil { + return EndorseBlock{}, errors.Errorf("failed to parse bls signature: %v", err) + } + return EndorseBlock{ + EndorserIndex: endorsement.EndorserIndex, + FinalizedBlockID: finalizedBlockID, + FinalizedBlockHeight: endorsement.FinalizedBlockHeight, + EndorsedBlockID: endorsedBlockID, + Signature: sig, + }, nil +} + +func (c *ProtobufConverter) FinalizationVoting(finalizationVoting *g.FinalizationVoting) (FinalizationVoting, error) { + if finalizationVoting == nil { + return FinalizationVoting{}, errors.New("empty finalization voting") + } + conflictEndorsements := make([]EndorseBlock, 0, len(finalizationVoting.ConflictEndorsements)) + for i, ce := range finalizationVoting.ConflictEndorsements { + if ce == nil { + continue + } + finalizedBlockID, err := NewBlockIDFromBytes(ce.FinalizedBlockId) + if err != nil { + return FinalizationVoting{}, errors.Errorf("failed to parse finalized block ID at index %d: %v", i, err) + } + endorsedBlockID, err := NewBlockIDFromBytes(ce.EndorsedBlockId) + if err != nil { + return FinalizationVoting{}, errors.Errorf("failed to parse endorsed block ID at index %d: %v", i, err) + } + sig, err := bls.NewSignatureFromBytes(ce.Signature) + if err != nil { + return FinalizationVoting{}, errors.Errorf("failed to parse bls signature: %v", err) + } + conflictEndorsements = append(conflictEndorsements, EndorseBlock{ + EndorserIndex: ce.EndorserIndex, + FinalizedBlockID: finalizedBlockID, + FinalizedBlockHeight: ce.FinalizedBlockHeight, + EndorsedBlockID: endorsedBlockID, + Signature: sig, + }) + } + aggregatedSignature, err := bls.NewSignatureFromBytes(finalizationVoting.AggregatedEndorsementSignature) + if err != nil { + return FinalizationVoting{}, errors.Errorf("failed to parse aggregated bls signature: %v", err) + } + return FinalizationVoting{ + EndorserIndexes: finalizationVoting.EndorserIndexes, + AggregatedEndorsementSignature: aggregatedSignature, + ConflictEndorsements: conflictEndorsements, + }, nil +} + func (c *ProtobufConverter) SignedTransactions(txs []*g.SignedTransaction) ([]Transaction, error) { res := make([]Transaction, len(txs)) for i, stx := range txs { diff --git a/pkg/services/services.go b/pkg/services/services.go index b582fd255b..36b66822b9 100644 --- a/pkg/services/services.go +++ b/pkg/services/services.go @@ -49,6 +49,7 @@ type Services struct { Scheduler types.Scheduler BlocksApplier BlocksApplier UtxPool types.UtxPool + EndorsementPool types.EndorsementPool Scheme proto.Scheme InvRequester types.InvRequester Time types.Time diff --git a/pkg/state/api.go b/pkg/state/api.go index 130bd3feed..b5d134a425 100644 --- a/pkg/state/api.go +++ b/pkg/state/api.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" "github.com/wavesplatform/gowaves/pkg/crypto" + "github.com/wavesplatform/gowaves/pkg/crypto/bls" "github.com/wavesplatform/gowaves/pkg/keyvalue" "github.com/wavesplatform/gowaves/pkg/libs/ntptime" "github.com/wavesplatform/gowaves/pkg/proto" @@ -155,6 +156,17 @@ type StateInfo interface { // SnapshotsAtHeight returns block snapshots at the given height. SnapshotsAtHeight(height proto.Height) (proto.BlockSnapshot, error) + + // CalculateVotingFinalization calculates whether the generating balance of endorsers at the block with the + // Given height exceeds the total generating balance of all committed generators for that block. + CalculateVotingFinalization(endorsers []proto.WavesAddress, height proto.Height, + allGenerators []proto.WavesAddress) (bool, error) + + FindEndorserPKByIndex(periodStart uint32, index int) (bls.PublicKey, error) + FindGeneratorPKByEndorserPK(periodStart uint32, endorserPK bls.PublicKey) (crypto.PublicKey, error) + CommittedGenerators(periodStart uint32) ([]proto.WavesAddress, error) + LastFinalizedHeight() (proto.Height, error) + LastFinalizedBlock() (*proto.BlockHeader, error) } // StateModifier contains all the methods needed to modify node's state. diff --git a/pkg/state/appender.go b/pkg/state/appender.go index 01c3420dc6..51e0bd780b 100644 --- a/pkg/state/appender.go +++ b/pkg/state/appender.go @@ -3,6 +3,8 @@ package state import ( stderrs "errors" "fmt" + "github.com/wavesplatform/gowaves/pkg/crypto/bls" + "github.com/wavesplatform/gowaves/pkg/util/common" "log/slog" "github.com/ccoveille/go-safecast/v2" @@ -60,6 +62,8 @@ type txAppender struct { buildApiData bool bUpdatesPluginInfo *proto.BlockchainUpdatesPluginInfo + + finalizer *finalizationProcessor } func newTxAppender( @@ -103,6 +107,7 @@ func newTxAppender( } ia := newInvokeApplier(state, sc, txHandler, stor, settings, blockDiffer, diffStorInvoke, diffApplier) ethKindResolver := proto.NewEthereumTransactionKindResolver(state, settings.AddressSchemeCharacter) + finalizer := newFinalizationProcessor(stor, rw, settings) return &txAppender{ sc: sc, ia: ia, @@ -120,6 +125,7 @@ func newTxAppender( buildApiData: buildAPIData, ethTxKindResolver: ethKindResolver, bUpdatesPluginInfo: bUpdatesPluginInfo, + finalizer: finalizer, }, nil } @@ -804,6 +810,19 @@ func (a *txAppender) appendBlock(params *appendBlockParams) error { if hasParent { checkerInfo.parentTimestamp = params.parent.Timestamp } + // Process parent's block finalization. + if hasParent { + err = a.finalizer.updateFinalization(params.parent.FinalizationVoting, params.parent, params.blockchainHeight) + if err != nil { + return err + } + } + // Process current block finalization. + err = a.finalizer.validateCurrentGenerators(params.blockchainHeight, + params.block.FinalizationVoting, params.block.ID) + if err != nil { + return err + } snapshotApplierInfo := newBlockSnapshotsApplierInfo(checkerInfo, a.settings.AddressSchemeCharacter) cleanup := a.txHandler.sa.SetApplierInfo(snapshotApplierInfo) @@ -1333,3 +1352,285 @@ func (a *txAppender) reset() { a.diffStor.reset() a.blockDiffer.reset() } + +type finalizationProcessor struct { + stor *blockchainEntitiesStorage + rw *blockReadWriter + settings *settings.BlockchainSettings +} + +func newFinalizationProcessor( + stor *blockchainEntitiesStorage, + rw *blockReadWriter, + settings *settings.BlockchainSettings, +) *finalizationProcessor { + return &finalizationProcessor{ + stor: stor, + rw: rw, + settings: settings, + } +} + +func (f *finalizationProcessor) votingFinalization( + endorsers []proto.WavesAddress, + height proto.Height, + allGenerators []proto.WavesAddress, +) (bool, error) { + var totalGeneratingBalance uint64 + var endorsersGeneratingBalance uint64 + + for _, gen := range allGenerators { + balance, err := f.stor.balances.generatingBalance(gen.ID(), height) + if err != nil { + return false, err + } + totalGeneratingBalance, err = common.AddInt(totalGeneratingBalance, balance) + if err != nil { + return false, errors.Wrap(err, "totalGeneratingBalance overflow") + } + } + + for _, endorser := range endorsers { + balance, err := f.stor.balances.generatingBalance(endorser.ID(), height) + if err != nil { + return false, err + } + endorsersGeneratingBalance, err = common.AddInt(endorsersGeneratingBalance, balance) + if err != nil { + return false, errors.Wrap(err, "endorsersGeneratingBalance overflow") + } + } + + if totalGeneratingBalance == 0 { + return false, nil + } + if endorsersGeneratingBalance == 0 { + return false, nil + } + + // endorsersBalance >= 2/3 totalGeneratingBalance + if endorsersGeneratingBalance*3 >= totalGeneratingBalance*2 { + return true, nil + } + return false, nil +} + +func (f *finalizationProcessor) loadLastFinalizedHeight( + height proto.Height, + block *proto.BlockHeader, + finalityActivated bool, +) (proto.Height, error) { + h, err := f.stor.finalizations.newest() + if err == nil { + return h, nil + } + if !errors.Is(err, ErrNoFinalization) && !errors.Is(err, ErrNoFinalizationHistory) { + return 0, err + } + + // No finalization found, calculate it, and, if finality activated - initialize it. + initH := proto.CalculateLastFinalizedHeight(height) + if finalityActivated { + if storErr := f.stor.finalizations.store(initH, block.BlockID()); storErr != nil { + return 0, storErr + } + } + return initH, nil +} + +func (f *finalizationProcessor) loadEndorsersPK( + fv *proto.FinalizationVoting, + periodStart uint32, +) ([]bls.PublicKey, error) { + endorsersPK := make([]bls.PublicKey, 0, len(fv.EndorserIndexes)) + for _, idx := range fv.EndorserIndexes { + pk, err := f.stor.commitments.FindEndorserPKByIndex(periodStart, int(idx)) + if err != nil { + return nil, fmt.Errorf("failed to find endorser PK by index %d: %w", idx, err) + } + endorsersPK = append(endorsersPK, pk) + } + if len(endorsersPK) == 0 { + return nil, fmt.Errorf("finalization has no endorsers") + } + return endorsersPK, nil +} + +func (f *finalizationProcessor) mapEndorsersToAddresses( + endorsersPK []bls.PublicKey, + periodStart uint32, +) ([]proto.WavesAddress, error) { + addrs := make([]proto.WavesAddress, 0, len(endorsersPK)) + for _, endPK := range endorsersPK { + gpk, err := f.stor.commitments.FindGeneratorPKByEndorserPK(periodStart, endPK) + if err != nil { + return nil, fmt.Errorf("failed to map endorser PK to generator PK: %w", err) + } + addr, cnvrtErr := proto.NewAddressFromPublicKey(f.settings.AddressSchemeCharacter, gpk) + if cnvrtErr != nil { + return nil, errors.Wrapf(cnvrtErr, "failed to convert public key %q to address", gpk.String()) + } + addrs = append(addrs, addr) + } + return addrs, nil +} + +func (f *finalizationProcessor) verifyFinalizationSignature( + fv *proto.FinalizationVoting, + msg []byte, + endorsersPK []bls.PublicKey, +) error { + aggBytes := fv.AggregatedEndorsementSignature[:] + if !bls.VerifyAggregate(endorsersPK, msg, aggBytes) { + return fmt.Errorf("invalid aggregated BLS signature") + } + return nil +} + +func (f *finalizationProcessor) calcPeriodStart(height proto.Height) (uint32, error) { + activation, err := f.stor.features.activationHeight(int16(settings.DeterministicFinality)) + if err != nil { + return 0, fmt.Errorf("failed to load activation height: %w", err) + } + return CurrentGenerationPeriodStart(activation, height, f.settings.GenerationPeriod) +} + +func (f *finalizationProcessor) removeGeneratorDeposit(periodStart uint32, badEndorserIndex int32, + blockID proto.BlockID) error { + badEndorserPK, err := f.stor.commitments.FindEndorserPKByIndex(periodStart, int(badEndorserIndex)) + if err != nil { + return fmt.Errorf("failed to find endorser PK by index %d: %w", badEndorserIndex, err) + } + badGeneratorPK, err := f.stor.commitments.FindGeneratorPKByEndorserPK(periodStart, badEndorserPK) + if err != nil { + return fmt.Errorf("failed to map endorser PK to generator PK: %w", err) + } + badEndorserAddress, cnvrtErr := proto.NewAddressFromPublicKey(f.settings.AddressSchemeCharacter, badGeneratorPK) + if cnvrtErr != nil { + return errors.Wrapf(cnvrtErr, + "failed to convert public key %q to address", badGeneratorPK.String()) + } + // Remove the deposit from the endorser's balance. + profile, err := f.stor.balances.newestWavesBalance(badEndorserAddress.ID()) + if err != nil { + return errors.Wrapf(err, + "failed to get newest waves balance profile for address %q", badEndorserAddress.String()) + } + newProfile := profile + newProfile.Deposit, err = common.SubInt(profile.Deposit, Deposit) + if err != nil { + return errors.Wrapf(err, "failed to sub deposit from profile for address %q", badEndorserAddress.String()) + } + value := newWavesValue(profile, newProfile) + if err = f.stor.balances.setWavesBalance(badEndorserAddress.ID(), value, blockID); err != nil { + return errors.Wrapf(err, "failed to get set balance profile for address %q", badEndorserAddress.String()) + } + return nil +} + +func (f *finalizationProcessor) validateCurrentGenerators(height proto.Height, + finalizationVoting *proto.FinalizationVoting, + blockID proto.BlockID) error { + if finalizationVoting == nil { + return nil + } + periodStart, calcErr := f.calcPeriodStart(height) + if calcErr != nil { + return calcErr + } + for _, conflictingEndorsement := range finalizationVoting.ConflictEndorsements { + badEndorserIndex := conflictingEndorsement.EndorserIndex + badEndorserPK, err := f.stor.commitments.FindEndorserPKByIndex(periodStart, int(badEndorserIndex)) + if err != nil { + return fmt.Errorf("failed to find endorser PK by index %d: %w", badEndorserIndex, err) + } + badGeneratorPK, err := f.stor.commitments.FindGeneratorPKByEndorserPK(periodStart, badEndorserPK) + if err != nil { + return fmt.Errorf("failed to map endorser PK to generator PK: %w", err) + } + // Remove the generator from the generator list. + generatorExists, err := f.stor.commitments.generatorExists(periodStart, badGeneratorPK) + if err != nil { + return fmt.Errorf("failed to check if generator exists: %w", err) + } + if generatorExists { + err = f.stor.commitments.removeGenerator(periodStart, badGeneratorPK, blockID) + if err != nil { + return err + } + } + } + + return nil +} + +func (f *finalizationProcessor) updateFinalization( + finalizationVoting *proto.FinalizationVoting, + parent *proto.BlockHeader, + height proto.Height, +) error { + if finalizationVoting == nil { + return nil + } + finalityActivated, err := f.stor.features.newestIsActivated(int16(settings.DeterministicFinality)) + if err != nil { + return err + } + periodStart, err := f.calcPeriodStart(height) + if err != nil { + return err + } + for _, conflictingEndorsement := range finalizationVoting.ConflictEndorsements { + conflictErr := f.removeGeneratorDeposit(periodStart, conflictingEndorsement.EndorserIndex, parent.BlockID()) + if conflictErr != nil { + return conflictErr + } + } + lastFinalizedHeight, err := f.loadLastFinalizedHeight(height, parent, finalityActivated) + if err != nil { + return err + } + lastFinalizedBlockID, err := f.rw.blockIDByHeight(lastFinalizedHeight) + if err != nil { + return fmt.Errorf("failed to load last finalized block ID: %w", err) + } + msg, err := proto.EndorsementMessage( + lastFinalizedBlockID, + parent.ID, + lastFinalizedHeight, + ) + if err != nil { + return fmt.Errorf("failed to build endorsement message: %w", err) + } + + endorsersPK, err := f.loadEndorsersPK(finalizationVoting, periodStart) + if err != nil { + return err + } + + if verifyErr := f.verifyFinalizationSignature(finalizationVoting, msg, endorsersPK); verifyErr != nil { + return verifyErr + } + + endorserAddresses, err := f.mapEndorsersToAddresses(endorsersPK, periodStart) + if err != nil { + return err + } + + generators, err := f.stor.commitments.CommittedGenerators(periodStart, f.settings.AddressSchemeCharacter) + if err != nil { + return fmt.Errorf("failed to load committed generators: %w", err) + } + + canFinalize, err := f.votingFinalization(endorserAddresses, height, generators) + if err != nil { + return fmt.Errorf("failed to calculate 2/3 voting: %w", err) + } + + if canFinalize { + if storErr := f.stor.finalizations.store(height, parent.BlockID()); storErr != nil { + return storErr + } + } + return nil +} diff --git a/pkg/state/commitments.go b/pkg/state/commitments.go index 58f0664f59..c8eb9ec2fe 100644 --- a/pkg/state/commitments.go +++ b/pkg/state/commitments.go @@ -6,9 +6,10 @@ import ( "io" "github.com/fxamacker/cbor/v2" - + "github.com/pkg/errors" "github.com/wavesplatform/gowaves/pkg/crypto" "github.com/wavesplatform/gowaves/pkg/crypto/bls" + "github.com/wavesplatform/gowaves/pkg/keyvalue" "github.com/wavesplatform/gowaves/pkg/proto" ) @@ -165,6 +166,19 @@ func (c *commitments) generators(periodStart uint32) ([]crypto.PublicKey, error) return generators, nil } +func (c *commitments) generatorExists(periodStart uint32, generatorTarget crypto.PublicKey) (bool, error) { + generators, err := c.newestGenerators(periodStart) + if err != nil { + return false, err + } + for _, g := range generators { + if bytes.Equal(generatorTarget.Bytes(), g.Bytes()) { + return true, nil + } + } + return false, nil +} + // newestGenerators returns public keys of generators commited to the given period. func (c *commitments) newestGenerators(periodStart uint32) ([]crypto.PublicKey, error) { key := commitmentKey{periodStart: periodStart} @@ -203,3 +217,158 @@ func checkCommitments(data []byte, generatorPK crypto.PublicKey, endorserPK bls. } return false, nil } + +// size returns the number of commitments for the given period start. +func (c *commitments) size(periodStart uint32) (int, error) { + key := commitmentKey{periodStart: periodStart} + data, err := c.hs.topEntryData(key.bytes()) + if err != nil { + if isNotFoundInHistoryOrDBErr(err) { + return 0, nil + } + return 0, fmt.Errorf("failed to retrieve commitment record: %w", err) + } + var rec commitmentsRecord + if umErr := rec.unmarshalBinary(data); umErr != nil { + return 0, fmt.Errorf("failed to unmarshal commitment record: %w", umErr) + } + return len(rec.Commitments), nil +} + +func (c *commitments) newestSize(periodStart uint32) (int, error) { + key := commitmentKey{periodStart: periodStart} + data, err := c.hs.newestTopEntryData(key.bytes()) + if err != nil { + if isNotFoundInHistoryOrDBErr(err) { + return 0, nil + } + return 0, fmt.Errorf("failed to retrieve commitment newest record: %w", err) + } + var rec commitmentsRecord + if umErr := rec.unmarshalBinary(data); umErr != nil { + return 0, fmt.Errorf("failed to unmarshal commitment record: %w", umErr) + } + return len(rec.Commitments), nil +} + +// FindEndorserPKByIndex returns BLS endorser public keys using +// commitment indexes stored in FinalizationVoting.EndorserIndexes. +func (c *commitments) FindEndorserPKByIndex( + periodStart uint32, index int, +) (bls.PublicKey, error) { + var empty bls.PublicKey + key := commitmentKey{periodStart: periodStart} + data, err := c.hs.newestTopEntryData(key.bytes()) + if err != nil { + if isNotFoundInHistoryOrDBErr(err) { + return empty, fmt.Errorf("no commitments found for period %d", periodStart) + } + return empty, fmt.Errorf("failed to retrieve commitments record: %w", err) + } + + var rec commitmentsRecord + if unmarshalErr := rec.unmarshalBinary(data); unmarshalErr != nil { + return empty, fmt.Errorf("failed to unmarshal commitments: %w", unmarshalErr) + } + + if index < 0 || index >= len(rec.Commitments) { + return empty, fmt.Errorf("index %d out of range (size %d)", index, len(rec.Commitments)) + } + + return rec.Commitments[index].EndorserPK, nil +} + +func (c *commitments) FindGeneratorPKByEndorserPK(periodStart uint32, + endorserPK bls.PublicKey) (crypto.PublicKey, error) { + key := commitmentKey{periodStart: periodStart} + data, err := c.hs.newestTopEntryData(key.bytes()) + if err != nil { + if errors.Is(err, keyvalue.ErrNotFound) { + return crypto.PublicKey{}, errors.Errorf("no commitments found for period %d, %v", periodStart, err) + } + return crypto.PublicKey{}, errors.Errorf("failed to retrieve commitments record: %v", err) + } + + var rec commitmentsRecord + if umErr := rec.unmarshalBinary(data); umErr != nil { + return crypto.PublicKey{}, fmt.Errorf("failed to unmarshal commitments record: %w", umErr) + } + + endPKb := endorserPK[:] + for _, cm := range rec.Commitments { + if bytes.Equal(endPKb, cm.EndorserPK[:]) { + return cm.GeneratorPK, nil + } + } + return crypto.PublicKey{}, fmt.Errorf("endorser public key not found in commitments for period %d", periodStart) +} + +func (c *commitments) CommittedGenerators(periodStart uint32, scheme proto.Scheme) ([]proto.WavesAddress, error) { + pks, err := c.newestGenerators(periodStart) + if err != nil { + return nil, err + } + addresses := make([]proto.WavesAddress, len(pks)) + for i, pk := range pks { + addr, cnvrtErr := proto.NewAddressFromPublicKey(scheme, pk) + if cnvrtErr != nil { + return nil, cnvrtErr + } + addresses[i] = addr + } + return addresses, nil +} + +func (c *commitments) removeGenerator( + periodStart uint32, + generatorPK crypto.PublicKey, + blockID proto.BlockID, +) error { + key := commitmentKey{periodStart: periodStart} + keyBytes := key.bytes() + + data, err := c.hs.newestTopEntryData(keyBytes) + if err != nil { + if isNotFoundInHistoryOrDBErr(err) { + return fmt.Errorf("no commitments found for period %d", periodStart) + } + return fmt.Errorf("failed to retrieve commitments record: %w", err) + } + + var rec commitmentsRecord + if umErr := rec.unmarshalBinary(data); umErr != nil { + return fmt.Errorf("failed to unmarshal commitments record: %w", umErr) + } + + newCommitmentRecords := make([]commitmentItem, 0, len(rec.Commitments)) + var removed *commitmentItem + for _, cm := range rec.Commitments { + if bytes.Equal(cm.GeneratorPK[:], generatorPK[:]) { + removed = &cm + continue + } + newCommitmentRecords = append(newCommitmentRecords, cm) + } + if removed == nil { + return fmt.Errorf( + "endorser public key not found in commitments for period %d", + periodStart, + ) + } + rec.Commitments = newCommitmentRecords + newData, mErr := rec.marshalBinary() + if mErr != nil { + return fmt.Errorf("failed to marshal updated commitments record: %w", mErr) + } + + if c.calculateHashes { + if pErr := c.hasher.pop(string(keyBytes), blockID); pErr != nil { + return fmt.Errorf("failed to update commitment state hash: %w", pErr) + } + } + if addErr := c.hs.addNewEntry(commitment, keyBytes, newData, blockID); addErr != nil { + return fmt.Errorf("failed to add updated commitment record: %w", addErr) + } + + return nil +} diff --git a/pkg/state/commitments_internal_test.go b/pkg/state/commitments_internal_test.go index 945ee2db35..ad84bc6af3 100644 --- a/pkg/state/commitments_internal_test.go +++ b/pkg/state/commitments_internal_test.go @@ -144,11 +144,17 @@ func TestCommitments_Size(t *testing.T) { gs, err := to.entities.commitments.newestGenerators(test.periodStart) require.NoError(t, err) assert.Equal(t, j+1, len(gs)) + newestSize, err := to.entities.commitments.newestSize(test.periodStart) + require.NoError(t, err) + assert.Equal(t, newestSize, len(gs)) // Check after flush. to.flush(t) gs, err = to.entities.commitments.generators(test.periodStart) require.NoError(t, err) assert.Equal(t, j+1, len(gs)) + regularSize, err := to.entities.commitments.size(test.periodStart) + require.NoError(t, err) + assert.Equal(t, regularSize, len(gs)) } }) } diff --git a/pkg/state/finalization.go b/pkg/state/finalization.go new file mode 100644 index 0000000000..5853d55c69 --- /dev/null +++ b/pkg/state/finalization.go @@ -0,0 +1,79 @@ +package state + +import ( + "fmt" + + "github.com/fxamacker/cbor/v2" + "github.com/pkg/errors" + "github.com/wavesplatform/gowaves/pkg/proto" +) + +var ErrNoFinalization = errors.New("no finalized block recorded") +var ErrNoFinalizationHistory = errors.New("no finalization in history") + +// finalizationRecord stores only last finalized height. +type finalizationRecord struct { + FinalizedBlockHeight proto.Height `cbor:"0,keyasint,omitempty"` +} + +func (fr *finalizationRecord) marshalBinary() ([]byte, error) { + return cbor.Marshal(fr) +} + +func (fr *finalizationRecord) unmarshalBinary(data []byte) error { + return cbor.Unmarshal(data, fr) +} + +type finalizations struct { + hs *historyStorage +} + +func newFinalizations(hs *historyStorage) *finalizations { + return &finalizations{hs: hs} +} + +// store replaces existing finalization with a new height. +func (f *finalizations) store( + finalizedBlockHeight proto.Height, + currentBlockID proto.BlockID, +) error { + key := finalizationKey{} + + rec := finalizationRecord{ + FinalizedBlockHeight: finalizedBlockHeight, + } + + newData, err := rec.marshalBinary() + if err != nil { + return fmt.Errorf("failed to marshal finalization record: %w", err) + } + + if addErr := f.hs.addNewEntry(finalization, key.bytes(), newData, currentBlockID); addErr != nil { + return fmt.Errorf("failed to add finalization record: %w", addErr) + } + + return nil +} + +// newest returns the last finalized height. +func (f *finalizations) newest() (proto.Height, error) { + key := finalizationKey{} + data, err := f.hs.newestTopEntryData(key.bytes()) + if err != nil { + if isNotFoundInHistoryOrDBErr(err) { + return 0, ErrNoFinalizationHistory + } + return 0, fmt.Errorf("failed to retrieve finalization record: %w", err) + } + + var rec finalizationRecord + if unmarshalErr := rec.unmarshalBinary(data); unmarshalErr != nil { + return 0, fmt.Errorf("failed to unmarshal finalization record: %w", unmarshalErr) + } + + if rec.FinalizedBlockHeight == 0 { + return 0, ErrNoFinalization + } + + return rec.FinalizedBlockHeight, nil +} diff --git a/pkg/state/history_storage.go b/pkg/state/history_storage.go index 8dffe12330..bb53756bb2 100644 --- a/pkg/state/history_storage.go +++ b/pkg/state/history_storage.go @@ -46,6 +46,7 @@ const ( patches challengedAddress commitment + finalization ) type blockchainEntityProperties struct { @@ -216,6 +217,11 @@ var properties = map[blockchainEntity]blockchainEntityProperties{ needToCut: true, fixedSize: false, }, + finalization: { + needToFilter: true, + needToCut: true, + fixedSize: true, // TODO double check this. + }, } type historyEntry struct { diff --git a/pkg/state/keys.go b/pkg/state/keys.go index 9a0fbffe84..ceb69b283b 100644 --- a/pkg/state/keys.go +++ b/pkg/state/keys.go @@ -137,6 +137,8 @@ const ( challengedAddressKeyPrefix // Key to store and retrieve generator's commitments for a specific generation period. commitmentKeyPrefix + // Key to store and retrieve last finalization record. + finalizationKeyPrefix ) var ( @@ -774,3 +776,9 @@ func (k *commitmentKey) bytes() []byte { binary.BigEndian.PutUint32(buf[1:], k.periodStart) return buf } + +type finalizationKey struct{} + +func (k finalizationKey) bytes() []byte { + return []byte{finalizationKeyPrefix} +} diff --git a/pkg/state/state.go b/pkg/state/state.go index bf59bb9f2e..4426cc8ce2 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -20,6 +20,7 @@ import ( "github.com/wavesplatform/gowaves/pkg/consensus" "github.com/wavesplatform/gowaves/pkg/crypto" + "github.com/wavesplatform/gowaves/pkg/crypto/bls" "github.com/wavesplatform/gowaves/pkg/errs" "github.com/wavesplatform/gowaves/pkg/keyvalue" "github.com/wavesplatform/gowaves/pkg/logging" @@ -72,6 +73,7 @@ type blockchainEntitiesStorage struct { snapshots *snapshotsAtHeight patches *patchesStorage commitments *commitments + finalizations *finalizations calculateHashes bool } @@ -107,6 +109,7 @@ func newBlockchainEntitiesStorage(hs *historyStorage, sets *settings.BlockchainS newSnapshotsAtHeight(hs, sets.AddressSchemeCharacter), newPatchesStorage(hs, sets.AddressSchemeCharacter), newCommitments(hs, calcHashes), + newFinalizations(hs), calcHashes, }, nil } @@ -1548,8 +1551,7 @@ func (s *stateManager) AddBlocksWithSnapshots(blockBytes [][]byte, snapshots []* } func (s *stateManager) AddDeserializedBlocks( - blocks []*proto.Block, -) (*proto.Block, error) { + blocks []*proto.Block) (*proto.Block, error) { s.newBlocks.setNew(blocks) lastBlock, err := s.addBlocks() if err != nil { @@ -1864,7 +1866,7 @@ func (s *stateManager) resetDeposits(nextBlockID proto.BlockID, lastBlockHeight if err != nil { return fmt.Errorf("failed to reset deposits: %w", err) } - start, err := currentGenerationPeriodStart(activationHeight, lastBlockHeight, s.settings.GenerationPeriod) + start, err := CurrentGenerationPeriodStart(activationHeight, lastBlockHeight, s.settings.GenerationPeriod) if err != nil { return fmt.Errorf("failed to reset deposits: %w", err) } @@ -3354,6 +3356,92 @@ func (s *stateManager) Close() error { return nil } +func (s *stateManager) CalculateVotingFinalization(endorsers []proto.WavesAddress, height proto.Height, + allGenerators []proto.WavesAddress) (bool, error) { + var totalGeneratingBalance uint64 + var endorsersGeneratingBalance uint64 + for _, gen := range allGenerators { + genRecipient := proto.NewRecipientFromAddress(gen) + balance, err := s.GeneratingBalance(genRecipient, height) + if err != nil { + return false, err + } + totalGeneratingBalance, err = common.AddInt(totalGeneratingBalance, balance) + if err != nil { + return false, errors.Wrap(err, "totalGeneratingBalance overflow") + } + } + for _, endorser := range endorsers { + endorserRecipient := proto.NewRecipientFromAddress(endorser) + balance, err := s.GeneratingBalance(endorserRecipient, height) + if err != nil { + return false, err + } + endorsersGeneratingBalance, err = common.AddInt(endorsersGeneratingBalance, balance) + if err != nil { + return false, errors.Wrap(err, "endorsersGeneratingBalance overflow") + } + } + if totalGeneratingBalance == 0 { + return false, nil + } + if endorsersGeneratingBalance == 0 { + return false, nil + } + + // If endorsersBalance >= 2/3 totalGeneratingBalance + if endorsersGeneratingBalance*3 >= totalGeneratingBalance*2 { + return true, nil + } + return false, nil +} + +// FindEndorserPKByIndex retrieves the BLS endorser public key by its index +// in the commitments list for the given period. +func (s *stateManager) FindEndorserPKByIndex(periodStart uint32, index int) (bls.PublicKey, error) { + return s.stor.commitments.FindEndorserPKByIndex(periodStart, index) +} + +// FindGeneratorPKByEndorserPK finds the generator's Waves public key corresponding +// to the given BLS endorser public key in the commitments record for the given period. +func (s *stateManager) FindGeneratorPKByEndorserPK(periodStart uint32, + endorserPK bls.PublicKey) (crypto.PublicKey, error) { + return s.stor.commitments.FindGeneratorPKByEndorserPK(periodStart, endorserPK) +} + +// CommittedGenerators returns the list of Waves addresses of committed generators. +func (s *stateManager) CommittedGenerators(periodStart uint32) ([]proto.WavesAddress, error) { + return s.stor.commitments.CommittedGenerators(periodStart, s.settings.AddressSchemeCharacter) +} + +func (s *stateManager) LastFinalizedHeight() (proto.Height, error) { + height, err := s.stor.finalizations.newest() + if err == nil { + return height, nil + } + if !errors.Is(err, ErrNoFinalization) && !errors.Is(err, ErrNoFinalizationHistory) { + return 0, err + } + // No finalization found, calculate it. + return proto.CalculateLastFinalizedHeight(height), nil +} + +func (s *stateManager) LastFinalizedBlock() (*proto.BlockHeader, error) { + lastFinalizedHeight, err := s.LastFinalizedHeight() + if err != nil { + return nil, err + } + lastFinalizedBlockID, err := s.rw.blockIDByHeight(lastFinalizedHeight) + if err != nil { + return nil, fmt.Errorf("failed to load last finalized block ID: %w", err) + } + header, err := s.rw.readBlockHeader(lastFinalizedBlockID) + if err != nil { + return nil, err + } + return header, nil +} + // MinimalGeneratingBalanceAtHeight returns minimal generating balance at given height and timestamp. // It checks feature activation using newestIsActivatedAtHeight function. func (s *stateManager) NewestMinimalGeneratingBalanceAtHeight(height proto.Height, ts uint64) uint64 { diff --git a/pkg/state/state_hasher.go b/pkg/state/state_hasher.go index c0c00d64ba..1483ef54f7 100644 --- a/pkg/state/state_hasher.go +++ b/pkg/state/state_hasher.go @@ -141,3 +141,11 @@ func (s *stateHasher) reset() { s.hashes = make(map[proto.BlockID]crypto.Digest) s.storage.reset() } + +func (s *stateHasher) pop(key string, blockID proto.BlockID) error { + if err := s.checkNewBlock(blockID); err != nil { + return err + } + s.storage.remove(key) + return nil +} diff --git a/pkg/state/threadsafe_wrapper.go b/pkg/state/threadsafe_wrapper.go index fb7fb01dee..87bca1c8ab 100644 --- a/pkg/state/threadsafe_wrapper.go +++ b/pkg/state/threadsafe_wrapper.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "github.com/wavesplatform/gowaves/pkg/crypto" + "github.com/wavesplatform/gowaves/pkg/crypto/bls" "github.com/wavesplatform/gowaves/pkg/proto" "github.com/wavesplatform/gowaves/pkg/ride/ast" "github.com/wavesplatform/gowaves/pkg/settings" @@ -418,6 +419,44 @@ func (a *ThreadSafeReadWrapper) IsActiveLightNodeNewBlocksFields(blockHeight pro return a.s.IsActiveLightNodeNewBlocksFields(blockHeight) } +func (a *ThreadSafeReadWrapper) CalculateVotingFinalization(endorsers []proto.WavesAddress, height proto.Height, + allGenerators []proto.WavesAddress) (bool, error) { + a.mu.RLock() + defer a.mu.RUnlock() + return a.s.CalculateVotingFinalization(endorsers, height, allGenerators) +} + +func (a *ThreadSafeReadWrapper) FindEndorserPKByIndex(periodStart uint32, index int) (bls.PublicKey, error) { + a.mu.RLock() + defer a.mu.RUnlock() + return a.s.FindEndorserPKByIndex(periodStart, index) +} + +func (a *ThreadSafeReadWrapper) FindGeneratorPKByEndorserPK(periodStart uint32, + endorserPK bls.PublicKey) (crypto.PublicKey, error) { + a.mu.RLock() + defer a.mu.RUnlock() + return a.s.FindGeneratorPKByEndorserPK(periodStart, endorserPK) +} + +func (a *ThreadSafeReadWrapper) CommittedGenerators(periodStart uint32) ([]proto.WavesAddress, error) { + a.mu.RLock() + defer a.mu.RUnlock() + return a.s.CommittedGenerators(periodStart) +} + +func (a *ThreadSafeReadWrapper) LastFinalizedHeight() (proto.Height, error) { + a.mu.RLock() + defer a.mu.RUnlock() + return a.s.LastFinalizedHeight() +} + +func (a *ThreadSafeReadWrapper) LastFinalizedBlock() (*proto.BlockHeader, error) { + a.mu.RLock() + defer a.mu.RUnlock() + return a.s.LastFinalizedBlock() +} + func NewThreadSafeReadWrapper(mu *sync.RWMutex, s StateInfo) StateInfo { return &ThreadSafeReadWrapper{ mu: mu, diff --git a/pkg/state/transaction_checker.go b/pkg/state/transaction_checker.go index 64c5c5ae90..9b0aab8650 100644 --- a/pkg/state/transaction_checker.go +++ b/pkg/state/transaction_checker.go @@ -1813,7 +1813,7 @@ func nextGenerationPeriodStart(activationHeight, blockHeight, periodLength uint6 return safecast.Convert[uint32](s) } -func currentGenerationPeriodStart(activationHeight, blockHeight, periodLength uint64) (uint32, error) { +func CurrentGenerationPeriodStart(activationHeight, blockHeight, periodLength uint64) (uint32, error) { s, err := generationPeriodStart(activationHeight, blockHeight, periodLength, 0) if err != nil { return 0, err diff --git a/pkg/state/transaction_checker_test.go b/pkg/state/transaction_checker_test.go index 19c7d7e31e..df38a8d506 100644 --- a/pkg/state/transaction_checker_test.go +++ b/pkg/state/transaction_checker_test.go @@ -1695,7 +1695,7 @@ func TestGenerationPeriodStart(t *testing.T) { {activation: 7, height: 14, length: 3, currStart: 14, nextStart: 17, failed: false, err: ""}, } { t.Run(fmt.Sprintf("%d", i+1), func(t *testing.T) { - currStart, currErr := currentGenerationPeriodStart(test.activation, test.height, test.length) + currStart, currErr := CurrentGenerationPeriodStart(test.activation, test.height, test.length) nextStart, nextErr := nextGenerationPeriodStart(test.activation, test.height, test.length) err := errors.Join(currErr, nextErr) if test.failed { @@ -1883,7 +1883,6 @@ func TestCheckCommitToGenerationWithProofs(t *testing.T) { to.stor.setWavesBalance(t, testGlobal.senderInfo.addr, balanceProfile{Balance: 1_100_10000000}, info.blockID) to.stor.addBlocks(t, int(test.start)) } - tx := createCommitToGenerationWithProofs(t, test.start, test.opts...) _, err := to.tc.checkCommitToGenerationWithProofs(tx, info) if test.valid { diff --git a/pkg/state/transaction_differ_test.go b/pkg/state/transaction_differ_test.go index a0e77bc89c..fb94df43a6 100644 --- a/pkg/state/transaction_differ_test.go +++ b/pkg/state/transaction_differ_test.go @@ -19,7 +19,8 @@ const ( priceConstant = 10e7 ) -var defaultTimestamp = settings.MustMainNetSettings().CheckTempNegativeAfterTime //nolint:gochecknoglobals // no writes +//nolint:gochecknoglobals // no writes +var defaultTimestamp = settings.MustMainNetSettings().CheckTempNegativeAfterTime const ( defaultAmount = uint64(100) diff --git a/pkg/types/types.go b/pkg/types/types.go index bf558992fc..558e4d2ba0 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -6,6 +6,7 @@ import ( "time" "github.com/wavesplatform/gowaves/pkg/crypto" + "github.com/wavesplatform/gowaves/pkg/crypto/bls" "github.com/wavesplatform/gowaves/pkg/proto" "github.com/wavesplatform/gowaves/pkg/ride/ast" "github.com/wavesplatform/gowaves/pkg/util/common" @@ -224,3 +225,15 @@ type EmbeddedWallet interface { Load(password []byte) error AccountSeeds() [][]byte } + +// EndorsementPool storage interface. +type EndorsementPool interface { + Add(e *proto.EndorseBlock, endorserPublicKey bls.PublicKey, + lastFinalizedHeight proto.Height, lastFinalizedBlockID proto.BlockID, balance uint64) error + GetAll() []proto.EndorseBlock + GetEndorsers() []bls.PublicKey + FormFinalization(finalizationHeight proto.Height) (proto.FinalizationVoting, error) + Verify() (bool, error) + Len() int + CleanAll() +}