Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ public class EventPluginConfig {
public static final String SOLIDITY_EVENT_NAME = "solidityevent";
public static final String SOLIDITY_LOG_NAME = "soliditylog";

@Getter
@Setter
private int version;

@Getter
@Setter
private long startSyncBlockNum;

@Getter
@Setter
private String pluginPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public BlockLogTrigger() {
@Override
public String toString() {
return new StringBuilder().append("triggerName: ").append(getTriggerName())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 The change in toString() methods improves readability and consistency.

Suggested change
return new StringBuilder().append("triggerName: ").append(getTriggerName())
.append(", timestamp: ")

.append("timestamp: ")
.append(", timestamp: ")
.append(timeStamp)
.append(", blockNumber: ")
.append(blockNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public SolidityTrigger() {
@Override
public String toString() {
return new StringBuilder().append("triggerName: ").append(getTriggerName())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 The change in toString() methods improves readability and consistency.

Suggested change
return new StringBuilder().append("triggerName: ").append(getTriggerName())
.append(", timestamp: ")

.append("timestamp: ")
.append(", timestamp: ")
.append(timeStamp)
.append(", latestSolidifiedBlockNumber: ")
.append(latestSolidifiedBlockNumber).toString();
Expand Down
2 changes: 2 additions & 0 deletions common/src/main/java/org/tron/core/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ public class Constant {

public static final String NATIVE_QUEUE_SEND_LENGTH = "event.subscribe.native.sendqueuelength";

public static final String EVENT_SUBSCRIBE_VERSION = "event.subscribe.version";
public static final String EVENT_SUBSCRIBE_START_SYNC_BLOCK_NUM = "event.subscribe.startSyncBlockNum";
public static final String EVENT_SUBSCRIBE_PATH = "event.subscribe.path";
public static final String EVENT_SUBSCRIBE_SERVER = "event.subscribe.server";
public static final String EVENT_SUBSCRIBE_DB_CONFIG = "event.subscribe.dbconfig";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.tron.core.db.Manager;
import org.tron.core.metrics.MetricsUtil;
import org.tron.core.net.TronNetService;
import org.tron.core.services.event.EventService;

@Slf4j(topic = "app")
@Component
Expand All @@ -18,6 +19,9 @@ public class ApplicationImpl implements Application {
@Autowired
private ServiceContainer services;

@Autowired
private EventService eventService;

@Autowired
private TronNetService tronNetService;

Expand All @@ -37,6 +41,7 @@ public class ApplicationImpl implements Application {
*/
public void startup() {
this.startServices();
eventService.init();
if ((!Args.getInstance().isSolidityNode()) && (!Args.getInstance().isP2pDisable())) {
tronNetService.start();
}
Expand All @@ -47,6 +52,7 @@ public void startup() {
@Override
public void shutdown() {
this.shutdownServices();
eventService.close();
consensusService.stop();
if (!Args.getInstance().isSolidityNode() && (!Args.getInstance().p2pDisable)) {
tronNetService.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.tron.common.logsfilter.trigger.SolidityTrigger;
import org.tron.common.logsfilter.trigger.TransactionLogTrigger;
import org.tron.common.logsfilter.trigger.Trigger;
import org.tron.common.utils.JsonUtil;

@Slf4j
public class EventPluginLoader {
Expand All @@ -42,6 +43,10 @@ public class EventPluginLoader {

private List<TriggerConfig> triggerConfigList;

private int version = 0;

private long startSyncBlockNum = 0;

private boolean blockLogTriggerEnable = false;

private boolean blockLogTriggerSolidified = false;
Expand Down Expand Up @@ -219,6 +224,10 @@ public boolean start(EventPluginConfig config) {
return false;
}

this.version = config.getVersion();

this.startSyncBlockNum = config.getStartSyncBlockNum();

this.triggerConfigList = config.getTriggerConfigList();

useNativeQueue = config.isUseNativeQueue();
Expand Down Expand Up @@ -358,6 +367,14 @@ public void postSolidityTrigger(SolidityTrigger trigger) {
}
}

public synchronized int getVersion() {
return version;
}

public synchronized long getStartSyncBlockNum() {
return startSyncBlockNum;
}

public synchronized boolean isBlockLogTriggerEnable() {
return blockLogTriggerEnable;
}
Expand Down
9 changes: 9 additions & 0 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,15 @@ private static EventPluginConfig getEventPluginConfig(
final com.typesafe.config.Config config) {
EventPluginConfig eventPluginConfig = new EventPluginConfig();

if (config.hasPath(Constant.EVENT_SUBSCRIBE_VERSION)) {
eventPluginConfig.setVersion(config.getInt(Constant.EVENT_SUBSCRIBE_VERSION));
}

if (config.hasPath(Constant.EVENT_SUBSCRIBE_START_SYNC_BLOCK_NUM)) {
eventPluginConfig.setStartSyncBlockNum(config
.getLong(Constant.EVENT_SUBSCRIBE_START_SYNC_BLOCK_NUM));
}

boolean useNativeQueue = false;
int bindPort = 0;
int sendQueueLength = 0;
Expand Down
37 changes: 23 additions & 14 deletions framework/src/main/java/org/tron/core/db/Manager.java

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High: The introduction of version-dependent conditional logic within core Manager methods (switchFork, blockTrigger, processTransaction) is a significant change. Specifically, the logic around lines 1518-1519 (new lines in the diff) where postContractTrigger is conditionally called based on EventPluginLoader.getInstance().getVersion() == 0 adds complexity and tightly couples the core blockchain logic to the event plugin's version. This pattern of spreading version checks throughout critical modules should be carefully reviewed for potential issues during future upgrades, maintainability, and clarity of event processing flow. Consider abstracting this versioning logic to a dedicated strategy or configuration to improve modularity.

Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ public class Manager {
Collections.synchronizedList(Lists.newArrayList());
// the capacity is equal to Integer.MAX_VALUE default
private BlockingQueue<TransactionCapsule> rePushTransactions;
@Getter
private BlockingQueue<TriggerCapsule> triggerCapsuleQueue;
// log filter
private boolean isRunFilterProcessThread = true;
Expand Down Expand Up @@ -1111,7 +1112,9 @@ private void switchFork(BlockCapsule newHead)
while (!getDynamicPropertiesStore()
.getLatestBlockHeaderHash()
.equals(binaryTree.getValue().peekLast().getParentHash())) {
reOrgContractTrigger();
if (EventPluginLoader.getInstance().getVersion() == 0) {
Comment on lines 1113 to +1115

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High: This conditional logic introduces a dependency on the EventPluginLoader version within a core Manager class. While it might be for a phased rollout, it adds complexity and potential for unexpected behavior if versioning is not robustly managed throughout the system. Consider if this logic can be abstracted or configured more cleanly to avoid direct version checks in core logic.

Suggested change
.getLatestBlockHeaderHash()
.equals(binaryTree.getValue().peekLast().getParentHash())) {
reOrgContractTrigger();
if (EventPluginLoader.getInstance().getVersion() == 0) {
if (EventPluginLoader.getInstance().getVersion() == 0) {
reOrgContractTrigger();
}

reOrgContractTrigger();
}
reOrgLogsFilter();
eraseBlock();
}
Expand Down Expand Up @@ -1373,11 +1376,26 @@ public void pushBlock(final BlockCapsule block)
}

void blockTrigger(final BlockCapsule block, long oldSolid, long newSolid) {
// post block and logs for jsonrpc
try {
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
postBlockFilter(block, false);
postLogsFilter(block, false, false);
Comment on lines 1376 to +1383

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 High: Similar to the above, embedding version-dependent logic directly in blockTrigger can make the event processing flow harder to reason about and maintain. The current implementation essentially bypasses the old trigger mechanism for version != 0 and just updates lastUsedSolidityNum. This is a significant change in behavior based on a version flag.

Suggested change
}
void blockTrigger(final BlockCapsule block, long oldSolid, long newSolid) {
// post block and logs for jsonrpc
try {
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
postBlockFilter(block, false);
postLogsFilter(block, false, false);
// post block and logs for jsonrpc
try {
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
postBlockFilter(block, false);
postLogsFilter(block, false, false);
}
if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) {
postSolidityFilter(oldSolid, newSolid);
}
if (EventPluginLoader.getInstance().getVersion() != 0) {
lastUsedSolidityNum = newSolid;
return;
}

}

if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium: The removal of oldSolidNum as a parameter suggests it's no longer needed for postSolidityTrigger. This is an API change. Ensure that oldSolidNum is genuinely not required and that all callers are updated.

Suggested change
if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) {
postSolidityTrigger(newSolid);

postSolidityFilter(oldSolid, newSolid);
}

if (EventPluginLoader.getInstance().getVersion() != 0) {
lastUsedSolidityNum = newSolid;
return;
}

// if event subscribe is enabled, post block trigger to queue
postBlockTrigger(block);
// if event subscribe is enabled, post solidity trigger to queue
postSolidityTrigger(oldSolid, newSolid);
postSolidityTrigger(newSolid);
} catch (Exception e) {
logger.error("Block trigger failed. head: {}, oldSolid: {}, newSolid: {}",
block.getNum(), oldSolid, newSolid, e);
Expand Down Expand Up @@ -1517,7 +1535,8 @@ public TransactionInfo processTransaction(final TransactionCapsule trxCap, Block

// if event subscribe is enabled, post contract triggers to queue
// only trigger when process block
if (Objects.nonNull(blockCap) && !blockCap.isMerkleRootEmpty()) {
if (Objects.nonNull(blockCap) && !blockCap.isMerkleRootEmpty()
&& EventPluginLoader.getInstance().getVersion() == 0) {
String blockHash = blockCap.getBlockId().toString();
postContractTrigger(trace, false, blockHash);
}
Expand Down Expand Up @@ -2096,7 +2115,7 @@ private void postSolidityFilter(final long oldSolidNum, final long latestSolidif
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium: Consistent with the previous change where oldSolidNum was removed. The logic related to postSolidityFilter has been moved to blockTrigger.

Suggested change
}
private void postSolidityTrigger(final long latestSolidifiedBlockNumber) {

}

private void postSolidityTrigger(final long oldSolidNum, final long latestSolidifiedBlockNumber) {
private void postSolidityTrigger(final long latestSolidifiedBlockNumber) {
if (eventPluginLoaded && EventPluginLoader.getInstance().isSolidityLogTriggerEnable()) {
for (Long i : Args.getSolidityContractLogTriggerMap().keySet()) {
postSolidityLogContractTrigger(i, latestSolidifiedBlockNumber);
Expand All @@ -2122,10 +2141,6 @@ private void postSolidityTrigger(final long oldSolidNum, final long latestSolidi
}
}
}

if (CommonParameter.getInstance().isJsonRpcHttpSolidityNodeEnable()) {
postSolidityFilter(oldSolidNum, latestSolidifiedBlockNumber);
}
lastUsedSolidityNum = latestSolidifiedBlockNumber;
}

Expand Down Expand Up @@ -2237,12 +2252,6 @@ private void postLogsFilter(final BlockCapsule blockCapsule, boolean solidified,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium: This is a direct consequence of moving these calls to blockTrigger.

Suggested change
}
void postBlockTrigger(final BlockCapsule blockCapsule) {


void postBlockTrigger(final BlockCapsule blockCapsule) {
// post block and logs for jsonrpc
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
postBlockFilter(blockCapsule, false);
postLogsFilter(blockCapsule, false, false);
}

// process block trigger
long solidityBlkNum = getDynamicPropertiesStore().getLatestSolidifiedBlockNum();
if (eventPluginLoaded && EventPluginLoader.getInstance().isBlockLogTriggerEnable()) {
Expand Down

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 `BlockEventCache` is implemented as a global singleton with static fields. While the use of `ConcurrentHashMap` helps with individual map operations, the methods `add`, `remove`, and `init` perform multiple operations across different fields (`solidNum`, `head`, `solidId`, `blockEventMap`, `numMap`) that are not atomic as a whole.

For example, the add method modifies numMap, blockEventMap, head, and solidId without a common lock. If add and remove were to be called from different threads, this could lead to an inconsistent state in the cache.

The current implementation seems to rely on callers (BlockEventLoad and SolidEventService) to provide synchronization. This implicit requirement is fragile. Consider making the synchronization explicit within BlockEventCache by using synchronized on the methods or a dedicated lock object to ensure atomicity of operations. This would make the class more robust and thread-safe on its own.

Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.tron.core.services.event;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.tron.core.capsule.BlockCapsule;
import org.tron.core.services.event.bo.BlockEvent;
import org.tron.core.services.event.exception.EventException;

@Slf4j(topic = "event")
public class BlockEventCache {
@Getter
private static volatile long solidNum;

@Getter
private static volatile BlockEvent head;

@Getter
private static volatile BlockCapsule.BlockId solidId;

private static Map<BlockCapsule.BlockId, BlockEvent> blockEventMap = new ConcurrentHashMap<>();
Comment on lines +17 to +25

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The reliance on `static` mutable state (`blockEventMap`, `numMap`, `head`, `solidId`) complicates testing and isolation.

More importantly, while ConcurrentHashMap is thread-safe for individual operations, the compound operations in add and remove (checking numMap, updating blockEventMap, updating head/solidId) are not atomic.

For example:

  1. add checks num > head... and updates head.
  2. Another thread could intervene.

While BlockEventLoad might be the single writer effectively, SolidEventService calls remove concurrently.

Suggestion:
Make BlockEventCache an instance bean (Singleton) managed by Spring/ServiceContainer rather than a static utility class. Protect shared state with explicit locking or use atomic references for the head/solid pointers if concurrent access is expected.


private static Map<Long, List<BlockEvent>> numMap = new ConcurrentHashMap<>();

public static BlockEvent getBlockEvent(BlockCapsule.BlockId blockId) {
return blockEventMap.get(blockId);
}

public static void init(BlockCapsule.BlockId blockId) {
blockEventMap.clear();
Comment on lines +27 to +34

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium: Clearing blockEventMap and numMap entirely on init() can lead to data loss if not intended. Consider if a more granular clear or a check for an already initialized state is needed, or if this init() method is only called once at application startup.

Suggested change
private static Map<Long, List<BlockEvent>> numMap = new ConcurrentHashMap<>();
public static BlockEvent getBlockEvent(BlockCapsule.BlockId blockId) {
return blockEventMap.get(blockId);
}
public static void init(BlockCapsule.BlockId blockId) {
blockEventMap.clear();
public static void init(BlockCapsule.BlockId blockId) {
blockEventMap.clear();
numMap.clear();
solidNum = blockId.getNum();
head = new BlockEvent(blockId);
solidId = blockId;
List<BlockEvent> list = new ArrayList<>();
list.add(head);
numMap.put(blockId.getNum(), list);
blockEventMap.put(blockId, head);
}

numMap.clear();
solidNum = blockId.getNum();
head = new BlockEvent(blockId);
solidId = blockId;
List<BlockEvent> list = new ArrayList<>();
list.add(head);
numMap.put(blockId.getNum(), list);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The numMap field is a ConcurrentHashMap, but the value is List<BlockEvent>, which is an ArrayList. ArrayList is not thread-safe.

In the add method, the code does list.add(blockEvent). If two threads are adding a BlockEvent for the same block number, they will get a reference to the same ArrayList and modify it concurrently. This can lead to race conditions, ConcurrentModificationException, or lost data.

Suggested change
numMap.put(blockId.getNum(), list);
private static Map<Long, List<BlockEvent>> numMap = new ConcurrentHashMap<>();

blockEventMap.put(blockId, head);
}

Comment on lines +43 to +44

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Low: This is a minor log message redundancy.

Suggested change
}
logger.info("Add block event, blockId: {}, parentId: {}",
blockEvent.getBlockId().getString(), blockEvent.getParentId().getString());

public static void add(BlockEvent blockEvent) throws EventException {
logger.info("Add block event, {}", blockEvent.getBlockId().getString(),
blockEvent.getParentId().getString());
if (blockEventMap.get(blockEvent.getParentId()) == null) {
throw new EventException("unlink BlockEvent, "
+ blockEvent.getBlockId().getString() + ", "
+ blockEvent.getParentId().getString());
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The add method is not thread-safe. It performs a sequence of check-then-act operations on shared maps (blockEventMap and numMap) without proper synchronization. Multiple threads calling this method concurrently can lead to race conditions.

For example:

  1. Thread A checks if a parent exists.
  2. Thread B removes the parent.
  3. Thread A proceeds to add the block event, leading to an inconsistent state.

The entire add method should be synchronized to ensure atomicity.

Suggested change
public static synchronized void add(BlockEvent blockEvent) throws EventException {

long num = blockEvent.getBlockId().getNum();
List<BlockEvent> list = numMap.get(num);
if (list == null) {
list = new ArrayList<>();
numMap.put(num, list);
}
list.add(blockEvent);

blockEventMap.put(blockEvent.getBlockId(), blockEvent);

if (num > head.getBlockId().getNum()) {
head = blockEvent;
}

if (blockEvent.getSolidId().getNum() > solidId.getNum()) {
solidId = blockEvent.getSolidId();
}
}

public static void remove(BlockCapsule.BlockId solidId) {
logger.info("Remove solidId {}, solidNum {}, {}, {}",
solidId.getString(), solidNum, numMap.size(), blockEventMap.size());
numMap.forEach((k, v) -> {
if (k < solidId.getNum()) {
v.forEach(value -> blockEventMap.remove(value.getBlockId()));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Removing from a ConcurrentHashMap while iterating over it with forEach has undefined behavior if the map is modified during the iteration. The lambda passed to forEach calls numMap.remove(k), which modifies the map during iteration. This can lead to unpredictable behavior, including missed removals or exceptions.

A safer approach is to collect the keys to be removed in a separate list and then iterate over that list to remove the entries from the map.

Suggested change
v.forEach(value -> blockEventMap.remove(value.getBlockId()));
public static void remove(BlockCapsule.BlockId solidId) {
logger.info("Remove solidId {}, solidNum {}, {}, {}",
solidId.getString(), solidNum, numMap.size(), blockEventMap.size());
List<Long> keysToRemove = new ArrayList<>();
numMap.forEach((k, v) -> {
if (k < solidId.getNum()) {
v.forEach(value -> blockEventMap.remove(value.getBlockId()));
keysToRemove.add(k);
}
});
keysToRemove.forEach(numMap::remove);
solidNum = solidId.getNum();
}

numMap.remove(k);
}
});
solidNum = solidId.getNum();
}

public static List<BlockEvent> getSolidBlockEvents(BlockCapsule.BlockId solidId) {
List<BlockEvent> blockEvents = new ArrayList<>();
BlockCapsule.BlockId tmp = solidId;
while (tmp.getNum() > solidNum) {
BlockEvent blockEvent = blockEventMap.get(tmp);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The getSolidBlockEvents method is not synchronized. It reads from blockEventMap and the solidNum field, which can be modified concurrently by the add and remove methods in other threads. This can lead to an inconsistent view of the data, potentially returning a partial or incorrect list of block events.

To ensure thread safety, this method should also be synchronized.

Suggested change
BlockEvent blockEvent = blockEventMap.get(tmp);
public static synchronized List<BlockEvent> getSolidBlockEvents(BlockCapsule.BlockId solidId) {

blockEvents.add(blockEvent);
tmp = blockEvent.getParentId();
}

return Lists.reverse(blockEvents);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium: Lists.reverse(blockEvents) can be inefficient if blockEvents contains a very large number of elements, as it creates a new list. Consider if the iteration order can be adjusted in the while loop to avoid the reversal, or if the size of blockEvents is expected to be small.

Suggested change
return Lists.reverse(blockEvents);
return Lists.reverse(blockEvents);

}
}
Loading
Loading