-
Notifications
You must be signed in to change notification settings - Fork 0
Description
分析HBase Put操作流程
Exported on 2025/8/21 at GMT+8 13:58:59 from Cursor (1.1.5)
User
帮我分析下Hbase Put 操作流程
Cursor
我来帮你分析HBase Put操作的流程。让我先探索一下HBase的代码结构,找到相关的Put操作实现代码。
很好!我找到了Put.java文件。现在让我查看Put类的实现,以及相关的Mutation基类。
现在让我查看Mutation基类,了解Put操作的更多细节:
让我继续查看Mutation类的更多内容:
现在让我查看Table接口,了解Put操作是如何被调用的:
让我查看Table接口中的put方法:
现在让我查看Table接口中的put方法定义:
让我继续查看Table接口中的put方法:
现在让我查看Table接口中的put方法定义:
现在让我查看HRegion中的put方法实现,这是服务端的核心实现:
让我查看mutate方法的实现:
让我查看HRegion中的mutate方法:
现在让我查看HRegion中的mutate方法实现:
让我查看batchMutate方法的实现:
让我继续查看doMiniBatchMutate方法的实现:
现在让我查看客户端Table的实现,了解Put操作是如何从客户端发送到服务端的:
让我查看TableOverAsyncTable,这可能是同步Table接口的实现:
现在让我查看AsyncTableImpl中的put方法实现:
让我查看AsyncTableImpl中的put方法:
现在让我查看RawAsyncTableImpl中的put方法:
让我查看RawAsyncTableImpl中的put方法:
让我查看RawAsyncTableImpl中的put方法实现:
现在让我查看RequestConverter中的buildMutateRequest方法:
让我查看RequestConverter中的buildMutateRequest方法:
基于我对HBase代码的分析,现在我可以为你详细分析HBase Put操作的流程。让我总结一下整个流程:
HBase Put操作流程分析
1. 客户端层面 (Client Side)
1.1 Put对象创建
// 创建Put对象,指定行键
Put put = new Put(Bytes.toBytes("rowKey"));
// 添加列族、列限定符和值
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qualifier"), Bytes.toBytes("value"));
// 可选:设置时间戳
put.setTimestamp(timestamp);1.2 Put对象结构
- Put类继承自Mutation类
- 包含行键(row)、时间戳(ts)、列族映射(familyMap)等
- 支持设置持久性(durability)、TTL、ACL等属性
1.3 客户端调用链
Table.put(Put put)
→ TableOverAsyncTable.put(Put put)
→ AsyncTableImpl.put(Put put)
→ RawAsyncTableImpl.put(Put put)
→ 构建RPC请求并发送2. RPC通信层
2.1 请求构建
- 使用
RequestConverter.buildMutateRequest()将Put对象转换为Protocol Buffer格式 - 构建
MutateRequest,包含:- Region信息
- Mutation类型(PUT)
- Put数据
2.2 网络传输
- 通过HBase RPC框架发送到对应的RegionServer
- 支持重试机制和超时控制
3. 服务端层面 (Server Side)
3.1 RegionServer接收
RSRpcServices接收RPC请求- 解析请求并调用对应Region的put方法
3.2 Region处理流程
HRegion.put(Put put)
→ mutate(Put put)
→ batchMutate(BatchOperation)
→ doMiniBatchMutate(BatchOperation)3.3 核心处理步骤
步骤1: 资源检查和锁定
- 检查Region是否只读
- 检查资源是否充足
- 获取行锁(Row Lock)确保原子性
步骤2: 预准备操作
- 调用协处理器(Coprocessor)的prePut钩子
- 验证列族是否存在
- 准备时间戳
步骤3: WAL写入
- 构建WAL编辑记录
- 写入预写日志(Write-Ahead Log)
- 同步WAL到磁盘(根据durability设置)
步骤4: MemStore更新
- 将数据写入内存存储(MemStore)
- 更新MVCC(多版本并发控制)信息
步骤5: 完成操作
- 调用协处理器的postPut钩子
- 释放行锁
- 返回操作状态
4. 数据存储结构
4.1 KeyValue结构
KeyValue kv = new KeyValue(
row, // 行键
family, // 列族
qualifier, // 列限定符
timestamp, // 时间戳
KeyValue.Type.Put, // 操作类型
value // 值
);4.2 存储层次
- MemStore: 内存中的有序数据结构
- HFile: 磁盘上的不可变文件
- WAL: 预写日志,保证数据持久性
5. 关键特性
5.1 原子性
- 同一行的所有列族操作是原子的
- 通过行锁保证并发安全
5.2 持久性
- 支持多种durability级别:
SKIP_WAL: 跳过WAL,性能最高但可能丢失数据ASYNC_WAL: 异步写入WALSYNC_WAL: 同步写入WALFSYNC_WAL: 强制同步到磁盘
5.3 版本控制
- 支持多版本存储
- 时间戳用于版本管理
- 可配置最大版本数
6. 性能优化
6.1 批量操作
- 支持批量Put操作
- 减少RPC调用次数
6.2 异步操作
- 提供异步API
- 支持CompletableFuture
6.3 缓冲机制
- BufferedMutator提供客户端缓冲
- 批量发送提高吞吐量
7. 错误处理
7.1 重试机制
- 网络异常自动重试
- 可配置重试次数和间隔
7.2 异常类型
NoSuchColumnFamilyException: 列族不存在RegionTooBusyException: Region过载FailedSanityCheckException: 数据验证失败
这个流程展示了HBase Put操作从客户端到服务端的完整路径,体现了HBase作为分布式NoSQL数据库的架构特点和设计理念。
// 创建Put对象,指定行键
Put put = new Put(Bytes.toBytes("rowKey"));
// 添加列族、列限定符和值
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qualifier"), Bytes.toBytes("value"));
// 可选:设置时间戳
put.setTimestamp(timestamp);Table.put(Put put)
→ TableOverAsyncTable.put(Put put)
→ AsyncTableImpl.put(Put put)
→ RawAsyncTableImpl.put(Put put)
→ 构建RPC请求并发送HRegion.put(Put put)
→ mutate(Put put)
→ batchMutate(BatchOperation)
→ doMiniBatchMutate(BatchOperation)KeyValue kv = new KeyValue(
row, // 行键
family, // 列族
qualifier, // 列限定符
timestamp, // 时间戳
KeyValue.Type.Put, // 操作类型
value // 值
);User
多版本特性:如果服务端设置了只保留一个版本,帮我找一下在写入时候对多版本的处理代码
Cursor
我来帮你找到HBase在写入时对多版本处理的代码,特别是当服务端设置为只保留一个版本时的处理逻辑。
首先让我搜索相关的代码:
现在让我搜索更具体的多版本处理代码:
让我查看这个关键的代码段:
现在让我搜索更多关于多版本处理的代码:
让我搜索Store类中的upsert方法:
让我查看HStore中的upsert方法:
现在让我查看AbstractMemStore中的upsert方法:
让我查看AbstractMemStore中单个cell的upsert方法:
让我查看AbstractMemStore中的doUpsert方法:
现在让我查看MutableSegment中的upsert方法:
现在让我搜索更多关于多版本控制的代码,特别是关于maxVersions的处理:
现在让我搜索更多关于多版本控制的代码,特别是关于版本清理的逻辑:
让我搜索更多关于多版本控制的代码:
现在让我查看ScanInfo类,了解minVersions和maxVersions的处理:
现在让我查看NewVersionBehaviorTracker类,了解新版本行为跟踪器的实现:
现在让我查看ScanWildcardColumnTracker类,了解通配符列跟踪器的实现:
基于我对HBase代码的深入分析,现在我可以为你详细解释HBase在写入时对多版本的处理,特别是当服务端设置为只保留一个版本时的处理逻辑。
HBase多版本特性:写入时的版本处理代码分析
1. 核心判断逻辑
在HRegion.java中,有一个关键的判断来决定是否使用upsert模式:
boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1;
if (upsert) {
store.upsert(cells, getSmallestReadPoint(), memstoreAccounting);
} else {
store.add(cells, memstoreAccounting);
}关键点:
- 当
maxVersions == 1时,HBase会使用upsert模式 - 当
maxVersions > 1时,使用普通的add模式
2. Upsert模式的实现
2.1 调用链
HRegion.applyToMemStore()
→ HStore.upsert()
→ AbstractMemStore.upsert()
→ MutableSegment.upsert()
2.2 MutableSegment.upsert()的核心逻辑
public void upsert(ExtendedCell cell, long readpoint, MemStoreSizing memStoreSizing,
boolean sizeAddedPreOperation) {
internalAdd(cell, false, memStoreSizing, sizeAddedPreOperation);
// Get the Cells for the row/family/qualifier regardless of timestamp.
// For this case we want to clean up any other puts
ExtendedCell firstCell =
PrivateCellUtil.createFirstOnRowColTS(cell, HConstants.LATEST_TIMESTAMP);
SortedSet<ExtendedCell> ss = this.tailSet(firstCell);
Iterator<ExtendedCell> it = ss.iterator();
// versions visible to oldest scanner
int versionsVisible = 0;
while (it.hasNext()) {
ExtendedCell cur = it.next();
if (cell == cur) {
// ignore the one just put in
continue;
}
// check that this is the row and column we are interested in, otherwise bail
if (CellUtil.matchingRows(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
// only remove Puts that concurrent scanners cannot possibly see
if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) {
if (versionsVisible >= 1) {
// if we get here we have seen at least one version visible to the oldest scanner,
// which means we can prove that no scanner will see this version
// false means there was a change, so give us the size.
int cellLen = getCellLength(cur);
long heapSize = heapSizeChange(cur, true);
long offHeapSize = offHeapSizeChange(cur, true);
incMemStoreSize(-cellLen, -heapSize, -offHeapSize, -1);
if (memStoreSizing != null) {
memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize, 1);
}
it.remove();
} else {
versionsVisible++;
}
}
} else {
// past the row or column, done
break;
}
}
}3. 版本清理的关键逻辑
3.1 版本可见性判断
int versionsVisible = 0;
// ...
if (versionsVisible >= 1) {
// 可以安全删除旧版本
it.remove();
} else {
versionsVisible++;
}核心逻辑:
- 当
versionsVisible >= 1时,说明已经有一个版本对最老的扫描器可见 - 此时可以安全删除旧版本,因为不会有扫描器能看到它
3.2 删除条件
if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) {
// 只有Put类型的Cell且序列ID小于等于读点才能删除
}删除条件:
- 必须是Put类型的Cell(不包括Delete标记)
- 序列ID必须小于等于当前读点(readpoint)
- 已经有一个版本对扫描器可见
4. 与普通Add模式的区别
4.1 普通Add模式
// 当maxVersions > 1时
store.add(cells, memstoreAccounting);- 直接添加新版本到MemStore
- 不删除旧版本
- 版本数量由maxVersions控制
4.2 Upsert模式
// 当maxVersions == 1时
store.upsert(cells, getSmallestReadPoint(), memstoreAccounting);- 添加新版本的同时清理旧版本
- 确保只保留一个版本
- 立即进行版本清理,而不是等到读取时
5. 版本控制的其他机制
5.1 读取时的版本控制
在ScanWildcardColumnTracker中:
// keep the KV if required by minversions or it is not expired, yet
if (currentCount <= minVersions || !isExpired(timestamp)) {5.2 新版本行为跟踪器
在NewVersionBehaviorTracker中:
if (countCurrentCol == minVersions) {
return MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
}
if (countCurrentCol > minVersions) {
// This may not be reached, only for safety.
return MatchCode.SEEK_NEXT_COL;
}6. 总结
当HBase服务端设置maxVersions = 1时:
- 写入时立即清理:使用upsert模式,在写入新版本的同时立即清理旧版本
- 版本可见性保证:确保清理的版本不会被任何扫描器看到
- 内存优化:及时释放旧版本占用的内存空间
- 性能提升:避免在读取时进行版本过滤,提高查询性能
这种设计体现了HBase在性能和数据一致性之间的平衡,通过写入时的主动版本管理,既保证了数据的正确性,又优化了存储和查询性能。
boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1;
if (upsert) {
store.upsert(cells, getSmallestReadPoint(), memstoreAccounting);
} else {
store.add(cells, memstoreAccounting);
}HRegion.applyToMemStore()
→ HStore.upsert()
→ AbstractMemStore.upsert()
→ MutableSegment.upsert()
public void upsert(ExtendedCell cell, long readpoint, MemStoreSizing memStoreSizing,
boolean sizeAddedPreOperation) {
internalAdd(cell, false, memStoreSizing, sizeAddedPreOperation);
// Get the Cells for the row/family/qualifier regardless of timestamp.
// For this case we want to clean up any other puts
ExtendedCell firstCell =
PrivateCellUtil.createFirstOnRowColTS(cell, HConstants.LATEST_TIMESTAMP);
SortedSet<ExtendedCell> ss = this.tailSet(firstCell);
Iterator<ExtendedCell> it = ss.iterator();
// versions visible to oldest scanner
int versionsVisible = 0;
while (it.hasNext()) {
ExtendedCell cur = it.next();
if (cell == cur) {
// ignore the one just put in
continue;
}
// check that this is the row and column we are interested in, otherwise bail
if (CellUtil.matchingRows(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
// only remove Puts that concurrent scanners cannot possibly see
if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) {
if (versionsVisible >= 1) {
// if we get here we have seen at least one version visible to the oldest scanner,
// which means we can prove that no scanner will see this version
// false means there was a change, so give us the size.
int cellLen = getCellLength(cur);
long heapSize = heapSizeChange(cur, true);
long offHeapSize = offHeapSizeChange(cur, true);
incMemStoreSize(-cellLen, -heapSize, -offHeapSize, -1);
if (memStoreSizing != null) {
memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize, 1);
}
it.remove();
} else {
versionsVisible++;
}
}
} else {
// past the row or column, done
break;
}
}
}int versionsVisible = 0;
// ...
if (versionsVisible >= 1) {
// 可以安全删除旧版本
it.remove();
} else {
versionsVisible++;
}if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) {
// 只有Put类型的Cell且序列ID小于等于读点才能删除
}// 当maxVersions > 1时
store.add(cells, memstoreAccounting);// 当maxVersions == 1时
store.upsert(cells, getSmallestReadPoint(), memstoreAccounting);// keep the KV if required by minversions or it is not expired, yet
if (currentCount <= minVersions || !isExpired(timestamp)) {if (countCurrentCol == minVersions) {
return MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
}
if (countCurrentCol > minVersions) {
// This may not be reached, only for safety.
return MatchCode.SEEK_NEXT_COL;
}