forked from tikv/rocksdb
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodify
More file actions
307 lines (288 loc) · 12.7 KB
/
modify
File metadata and controls
307 lines (288 loc) · 12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
diff --git a/db/db_impl.h b/db/db_impl.h
index d73fac3..cf70990 100755
--- a/db/db_impl.h
+++ b/db/db_impl.h
@@ -1076,7 +1076,7 @@ Status FlushMemTablesToNvm(
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
- void MaybeScheduleFlushOrCompaction();
+ void MaybeScheduleFlushOrCompaction(bool schedule_compaction = false);
// A flush request specifies the column families to flush as well as the
// largest memtable id to persist for each column family. Once all the
diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc
index 3c8626c..5d73236 100755
--- a/db/db_impl_compaction_flush.cc
+++ b/db/db_impl_compaction_flush.cc
@@ -245,6 +245,19 @@ Status DBImpl::FlushMemTableToNvm(ColumnFamilyData *cfd,const MutableCFOptions &
bool *made_progress, JobContext *job_context,
SuperVersionContext *superversion_context,
LogBuffer *log_buffer){
+
+ while(!cfd->nvmcfmodule->FreeTableAvailable()){
+ if(!cfd->queued_for_compaction()){
+ AddToCompactionQueue(cfd);
+ ++unscheduled_compactions_;
+ }
+ //mutex_.Unlock();
+ MaybeScheduleFlushOrCompaction(true);
+ mutex_.Unlock();
+ sleep(1);
+ mutex_.Lock();
+ }
+
mutex_.AssertHeld();
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
@@ -1890,7 +1903,7 @@ Status DBImpl::EnableAutoCompaction(
return s;
}
-void DBImpl::MaybeScheduleFlushOrCompaction() {
+void DBImpl::MaybeScheduleFlushOrCompaction(bool schedule_compaction) {
mutex_.AssertHeld();
if (!opened_successfully_) {
// Compaction may introduce data race to DB open
@@ -1910,6 +1923,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
return;
}
auto bg_job_limits = GetBGJobLimits();
+ if(!schedule_compaction){
bool is_flush_pool_empty =
env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
@@ -1927,6 +1941,9 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
}
+ }else{
+ bg_job_limits.max_compactions = 1;
+ }
}
if (bg_compaction_paused_ > 0) {
diff --git a/utilities/nvm_mod/nvm_cf_mod.cc b/utilities/nvm_mod/nvm_cf_mod.cc
index 67c9cf2..04b49c7 100755
--- a/utilities/nvm_mod/nvm_cf_mod.cc
+++ b/utilities/nvm_mod/nvm_cf_mod.cc
@@ -311,7 +311,7 @@ ColumnCompactionItem* NvmCfModule::PickColumnCompaction(VersionStorageInfo* vsto
c->page_end.push_back(page_end);
c->page_start.push_back(page_start);
//c->compact_end.push_back(GetCompactEnd(tmp->keys_meta[end_key].offset + tmp->keys_meta[end_key].size));
- RECORD_LOG("select l0 file index:%d key_index:%d - %d offset start: %x page:%d - %d keysnum:%d\n",tmp->sstable_index,first_key_indexs[index],end_key,tmp->keys_meta[first_key_indexs[index]].offset,page_start,page_end, tmp->keys_num);
+ //RECORD_LOG("select l0 file index:%d key_index:%d - %d offset start: %x page:%d - %d keysnum:%d\n",tmp->sstable_index,first_key_indexs[index],end_key,tmp->keys_meta[first_key_indexs[index]].offset,page_start,page_end, tmp->keys_num);
}
}
@@ -404,7 +404,7 @@ ColumnCompactionItem* NvmCfModule::PickColumnCompaction(VersionStorageInfo* vsto
ptr_sst_->GetCompactPages(file->compact_index_end + 1 == file->compact_index_begin ? start:tmp->keys_meta[file->compact_index_begin].offset, start, tmp->keys_meta[end_key].offset + tmp->keys_meta[end_key].size - 1, page_start, page_end);
c->page_end.push_back(page_end);
c->page_start.push_back(page_start);
- RECORD_LOG("select l0 file index:%d key_index:%d - %d offset start: %x page:%d - %d keysnum:%d\n",tmp->sstable_index,first_key_indexs[index],end_key,start,page_start,page_end,tmp->keys_num);
+ //RECORD_LOG("select l0 file index:%d key_index:%d - %d offset start: %x page:%d - %d keysnum:%d\n",tmp->sstable_index,first_key_indexs[index],end_key,start,page_start,page_end,tmp->keys_num);
// c->compact_end.push_back(tmp->keys_meta[end_key].offset + tmp->keys_meta[end_key].size - 1);
// c->compact_begin.push_back(tmp->keys_meta[first_key_indexs[index]].offset);
@@ -714,9 +714,15 @@ void NvmCfModule::AddIterators(VersionStorageInfo* vstorage,MergeIteratorBuilder
void NvmCfModule::RecoverFromStorageInfo(VersionStorageInfo* vstorage){
auto L0files = vstorage->LevelFiles(0);
- if( L0files.size() == 0) return;
+ RECORD_LOG("recover l0:filenum:%d \n",L0files.size());
+ if( L0files.size() == 0){
+ ptr_sst_->RecoverNVMFreePage();
+ return;
+ }
if( open_by_creat_ ) {
printf("error:open nvm cf file by creat, can't recover!\n");
+ ptr_sst_->RecoverNVMFreePage();
+
return;
}
InternalKey* compact_max = nullptr;
@@ -771,22 +777,27 @@ void NvmCfModule::RecoverFromStorageInfo(VersionStorageInfo* vstorage){
GetFixed64(&ptr_size, &file->keys_meta[j].size);
offset += 8;
}
- ptr_sst_->RecoverSstableData(file->sstable_index,filemeta->file_page, file->keys_meta[filemeta->compact_index_begin].offset,file->keys_meta[filemeta->compact_index_end].offset + file->keys_meta[filemeta->compact_index_end].size, filemeta->raw_file_size);
-
- uint64_t cindex = filemeta->compact_index_end;
- if(cindex < filemeta->keys_num - 1){
- if(!compact_max){
- if(user_comparator->Compare(compact_max->Encode(),file->keys_meta[cindex].key.Encode())>0)
- compact_max = &file->keys_meta[cindex].key;
- }else{
- compact_max = &file->keys_meta[cindex].key;
- }
- }
+ if(filemeta->compact_index_end + 1 != 0){
+ /*if(filemeta->compact_index_end + 1 == 0){
+ ptr_sst_->RecoverSstableData(file->sstable_index,filemeta->file_page, file->keys_meta[filemeta->compact_index_begin].offset,0, filemeta->raw_file_size);
+ }else{*/
+ ptr_sst_->RecoverSstableData(file->sstable_index,filemeta->file_page, file->keys_meta[filemeta->compact_index_begin].offset,file->keys_meta[filemeta->compact_index_end].offset + file->keys_meta[filemeta->compact_index_end].size, filemeta->raw_file_size);
+ uint64_t cindex = filemeta->compact_index_end;
+ if(compact_max){
+ if(user_comparator->Compare(compact_max->Encode(),file->keys_meta[cindex].key.Encode())>0)
+ compact_max = &file->keys_meta[cindex].key;
+ }else{
+ compact_max = &file->keys_meta[cindex].key;
+ }
+ // }
+ }else{
+ ptr_sst_->RecoverSstableData(file->sstable_index,filemeta->file_page,file->keys_meta[filemeta->compact_index_begin].offset ,0 , filemeta->raw_file_size);
+ }
RECORD_LOG("recover file:%lu key_point_filenum:%lu keys_num:%lu sstable_index:%lu\n",file->filenum, file->key_point_filenum, filemeta->keys_num, file->sstable_index);
}
ptr_sst_->RecoverNVMFreePage();
-
- sst_meta_->SetCompactMax(compact_max);
+ if(compact_max)
+ sst_meta_->SetCompactMax(compact_max);
}
NvmCfModule* NewNvmCfModule(NvmCfOptions* nvmcfoption,const std::string &cf_name,uint32_t cf_id,const InternalKeyComparator* icmp){
diff --git a/utilities/nvm_mod/nvm_cf_mod.h b/utilities/nvm_mod/nvm_cf_mod.h
index bd6500b..5fad269 100755
--- a/utilities/nvm_mod/nvm_cf_mod.h
+++ b/utilities/nvm_mod/nvm_cf_mod.h
@@ -60,7 +60,7 @@ class NvmCfModule {
const NvmCfOptions* GetNvmCfOptions() { return nvmcfoption_; }
void UpdateCompactMax(InternalKey * key){ sst_meta_->SetCompactMax(key); }
-
+ bool FreeTableAvailable(){ return ptr_sst_->FreeTableAvailable(); }
private:
bool UserKeyInRange(Slice *user_key,InternalKey *start,InternalKey *end);
int UserKeyCompareRange(Slice *user_key,InternalKey *start,InternalKey *end);
diff --git a/utilities/nvm_mod/nvm_pager.cc b/utilities/nvm_mod/nvm_pager.cc
index 0cb4c7c..4a8b794 100755
--- a/utilities/nvm_mod/nvm_pager.cc
+++ b/utilities/nvm_mod/nvm_pager.cc
@@ -43,7 +43,7 @@ namespace rocksdb {
//first_index_.resize(table_num_,-1);
page_bitmap_ = new BitMap((all_size >> NVMPager::PAGE_SIZE_LOG));
table_index_bitmap_ = new BitMap(table_num_);
- init = false;
+ //init = false;
}
NVMPager::~NVMPager()
@@ -63,10 +63,10 @@ namespace rocksdb {
char* NVMPager::AllocSstable(int& index, FileMetaData *meta)
{
- if(!init){
+ /* if(!init){
init = true;
RecoverNVMFreePage();
- }
+ }*/
char* table_addr = nullptr;
if (free_list_.size() < (int64_t)page_num_per_table_){
RECORD_LOG("alloc sst error : l0table num:%d free page num:%ld max table num:%d pagenum per table",l0_table_num_,free_list_.size(),table_num_,page_num_per_table_);
@@ -138,7 +138,13 @@ namespace rocksdb {
{
return offset >> NVMPager::PAGE_SIZE_LOG;
}
-
+ bool NVMPager::FreeTableAvailable(){
+ // if(!init){
+// RecoverNVMFreePage();
+ // return false;
+ // }
+ return free_list_.size() >= page_num_per_table_ && l0_table_num_ < table_num_;
+ }
void NVMPager::GetCompactPages(uint64_t compact_begin, uint64_t begin, uint64_t end,uint32_t &page_start, uint32_t &page_end) //首次compaction的起始位置,本次compaction的起始和结束位置
{
page_end = (end + 1) >> NVMPager::PAGE_SIZE_LOG;//compaction范围后第一个有效页
@@ -240,7 +246,7 @@ namespace rocksdb {
next_free_ = nullptr;
}
void NVMPager::RecoverNVMFreePage(){ //恢复freelist
- init = true;
+ // init = true;
//恢复freelist
uint64_t page_num = page_num_total_;
char* page_base = nvm_base_;
@@ -251,7 +257,7 @@ namespace rocksdb {
}
page_base += NVMPager::PAGE_SIZE;
}
- RECORD_LOG("nvm pager : init pager free table:%d \n",free_list_.size());
+ printf("nvm pager : init pager free pages:%d table:%d\n",free_list_.size(),l0_table_num_);
}
void NVMPager::RecoverAddSstable(int index, std::vector<int> &filepage, uint64_t raw_file_size) { //恢复元数据
if ((uint64_t)index >= table_num_) {
@@ -262,29 +268,37 @@ namespace rocksdb {
printf("error:recover file index is used !\n");
return;
}
+ l0_table_num_ ++;
table_index_bitmap_->set(index);
page_table_[index].reserve(page_num_per_table_);
for (uint i = 0; i < raw_file_size >> NVMPager::PAGE_SIZE_LOG; i++)
- page_table_[i].push_back(nullptr);
+ page_table_[index].push_back(nullptr);
for (uint i = raw_file_size >> NVMPager::PAGE_SIZE_LOG; i < page_num_per_table_; i++) {
int page_index = filepage.at(i);
NVMPageDescriptor* tmp = new NVMPageDescriptor(page_index, nvm_base_+NVMPager::PAGE_SIZE*page_index);
- page_table_[i].push_back(tmp);
+ page_table_[index].push_back(tmp);
page_bitmap_->set(tmp->page_num_);
}
+ RECORD_LOG("recover add sst :%d page %d - %d pagenum:%d",index,raw_file_size >> NVMPager::PAGE_SIZE_LOG,page_num_per_table_,page_table_[index].size());
}
void NVMPager::RecoverSstableData(int index,std::vector<int> &filepage, uint64_t compact_begin,uint64_t compact_end, uint64_t filesize){
- uint page_start, page_end, file_end;
- if(compact_begin > compact_end)
- page_start = 0;
- else if(((compact_begin & NVMPager::PAGE_SIZE) - 1) == 0)
+ int page_start, page_end, file_end;
+ if(compact_begin > compact_end){
+ file_end = compact_begin >> NVMPager::PAGE_SIZE_LOG;
+ if((compact_begin & NVMPager::PAGE_SIZE) == 0)
+ file_end --;
+ page_start = -1;
+ }else{
+ file_end = (filesize >> NVMPager::PAGE_SIZE_LOG) - 1;
page_start = compact_begin >> NVMPager::PAGE_SIZE_LOG;
- else
- page_start = (compact_begin >> NVMPager::PAGE_SIZE_LOG) + 1;
- page_end = (compact_end >> NVMPager::PAGE_SIZE_LOG) - 1;
+ if((compact_begin & NVMPager::PAGE_SIZE) != 0)
+ page_start++;
+ }
+ page_end = ((compact_end+1) >> NVMPager::PAGE_SIZE_LOG) - 1;
+
file_end = (filesize >> NVMPager::PAGE_SIZE_LOG) - 1;
- for(uint i = 0; i <= file_end; ++i){
+ for(int i = 0; i <= file_end; ++i){
if(i < page_start || i > page_end){
int page_index = filepage.at(i);
NVMPageDescriptor* tmp = new NVMPageDescriptor(page_index, nvm_base_+NVMPager::PAGE_SIZE*page_index);
@@ -292,6 +306,7 @@ namespace rocksdb {
page_bitmap_->set(tmp->page_num_);
}
}
+ RECORD_LOG("recover file data: %d page:0-%d %d-%d",index,page_start,page_end,file_end);
//first_index_.at(index) = page_end + 1;
}
diff --git a/utilities/nvm_mod/nvm_pager.h b/utilities/nvm_mod/nvm_pager.h
index 5dea59d..6f97d61 100755
--- a/utilities/nvm_mod/nvm_pager.h
+++ b/utilities/nvm_mod/nvm_pager.h
@@ -89,11 +89,11 @@ namespace rocksdb {
end_->next_free_ = nullptr;
page_num_ ++;
}
- int size(){
+ uint size(){
return page_num_;
}
private:
- int page_num_;
+ uint page_num_;
NVMPageDescriptor* begin_;
NVMPageDescriptor* end_;
std::mutex mu_;
@@ -109,6 +109,7 @@ namespace rocksdb {
~NVMPager();
char* AllocSstable(int& index, FileMetaData *meta);
+ bool FreeTableAvailable();
char* GetIndexPtr(int index);
@@ -146,7 +147,7 @@ namespace rocksdb {
BitMap* table_index_bitmap_; // Table 的索引值的分配情况
uint64_t each_size_;
std::mutex mu_;
- bool init;
+// bool init;
};