Coverage Report

Created: 2025-02-21 14:37

/root/bitcoin/src/leveldb/db/db_impl.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5
#include "db/db_impl.h"
6
7
#include <stdint.h>
8
#include <stdio.h>
9
10
#include <algorithm>
11
#include <atomic>
12
#include <set>
13
#include <string>
14
#include <vector>
15
16
#include "db/builder.h"
17
#include "db/db_iter.h"
18
#include "db/dbformat.h"
19
#include "db/filename.h"
20
#include "db/log_reader.h"
21
#include "db/log_writer.h"
22
#include "db/memtable.h"
23
#include "db/table_cache.h"
24
#include "db/version_set.h"
25
#include "db/write_batch_internal.h"
26
#include "leveldb/db.h"
27
#include "leveldb/env.h"
28
#include "leveldb/status.h"
29
#include "leveldb/table.h"
30
#include "leveldb/table_builder.h"
31
#include "port/port.h"
32
#include "table/block.h"
33
#include "table/merger.h"
34
#include "table/two_level_iterator.h"
35
#include "util/coding.h"
36
#include "util/logging.h"
37
#include "util/mutexlock.h"
38
39
namespace leveldb {
40
41
const int kNumNonTableCacheFiles = 10;
42
43
// Information kept for every waiting writer
44
struct DBImpl::Writer {
45
  explicit Writer(port::Mutex* mu)
46
0
      : batch(nullptr), sync(false), done(false), cv(mu) {}
47
48
  Status status;
49
  WriteBatch* batch;
50
  bool sync;
51
  bool done;
52
  port::CondVar cv;
53
};
54
55
struct DBImpl::CompactionState {
56
  // Files produced by compaction
57
  struct Output {
58
    uint64_t number;
59
    uint64_t file_size;
60
    InternalKey smallest, largest;
61
  };
62
63
0
  Output* current_output() { return &outputs[outputs.size() - 1]; }
64
65
  explicit CompactionState(Compaction* c)
66
0
      : compaction(c),
67
0
        smallest_snapshot(0),
68
0
        outfile(nullptr),
69
0
        builder(nullptr),
70
0
        total_bytes(0) {}
71
72
  Compaction* const compaction;
73
74
  // Sequence numbers < smallest_snapshot are not significant since we
75
  // will never have to service a snapshot below smallest_snapshot.
76
  // Therefore if we have seen a sequence number S <= smallest_snapshot,
77
  // we can drop all entries for the same key with sequence numbers < S.
78
  SequenceNumber smallest_snapshot;
79
80
  std::vector<Output> outputs;
81
82
  // State kept for output being generated
83
  WritableFile* outfile;
84
  TableBuilder* builder;
85
86
  uint64_t total_bytes;
87
};
88
89
// Fix user-supplied options to be reasonable
90
template <class T, class V>
91
0
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
92
0
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
93
0
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
94
0
}
Unexecuted instantiation: db_impl.cc:_ZN7leveldbL11ClipToRangeIiiEEvPT_T0_S3_
Unexecuted instantiation: db_impl.cc:_ZN7leveldbL11ClipToRangeImiEEvPT_T0_S3_
95
Options SanitizeOptions(const std::string& dbname,
96
                        const InternalKeyComparator* icmp,
97
                        const InternalFilterPolicy* ipolicy,
98
0
                        const Options& src) {
99
0
  Options result = src;
100
0
  result.comparator = icmp;
101
0
  result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
102
0
  ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
103
0
  ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30);
104
0
  ClipToRange(&result.max_file_size, 1 << 20, 1 << 30);
105
0
  ClipToRange(&result.block_size, 1 << 10, 4 << 20);
106
0
  if (result.info_log == nullptr) {
107
    // Open a log file in the same directory as the db
108
0
    src.env->CreateDir(dbname);  // In case it does not exist
109
0
    src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
110
0
    Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
111
0
    if (!s.ok()) {
112
      // No place suitable for logging
113
0
      result.info_log = nullptr;
114
0
    }
115
0
  }
116
0
  if (result.block_cache == nullptr) {
117
0
    result.block_cache = NewLRUCache(8 << 20);
118
0
  }
119
0
  return result;
120
0
}
121
122
0
static int TableCacheSize(const Options& sanitized_options) {
123
  // Reserve ten files or so for other uses and give the rest to TableCache.
124
0
  return sanitized_options.max_open_files - kNumNonTableCacheFiles;
125
0
}
126
127
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
128
0
    : env_(raw_options.env),
129
0
      internal_comparator_(raw_options.comparator),
130
0
      internal_filter_policy_(raw_options.filter_policy),
131
0
      options_(SanitizeOptions(dbname, &internal_comparator_,
132
0
                               &internal_filter_policy_, raw_options)),
133
0
      owns_info_log_(options_.info_log != raw_options.info_log),
134
0
      owns_cache_(options_.block_cache != raw_options.block_cache),
135
0
      dbname_(dbname),
136
0
      table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
137
0
      db_lock_(nullptr),
138
0
      shutting_down_(false),
139
0
      background_work_finished_signal_(&mutex_),
140
0
      mem_(nullptr),
141
0
      imm_(nullptr),
142
0
      has_imm_(false),
143
0
      logfile_(nullptr),
144
0
      logfile_number_(0),
145
0
      log_(nullptr),
146
0
      seed_(0),
147
0
      tmp_batch_(new WriteBatch),
148
0
      background_compaction_scheduled_(false),
149
0
      manual_compaction_(nullptr),
150
0
      versions_(new VersionSet(dbname_, &options_, table_cache_,
151
0
                               &internal_comparator_)) {}
152
153
0
DBImpl::~DBImpl() {
154
  // Wait for background work to finish.
155
0
  mutex_.Lock();
156
0
  shutting_down_.store(true, std::memory_order_release);
157
0
  while (background_compaction_scheduled_) {
158
0
    background_work_finished_signal_.Wait();
159
0
  }
160
0
  mutex_.Unlock();
161
162
0
  if (db_lock_ != nullptr) {
163
0
    env_->UnlockFile(db_lock_);
164
0
  }
165
166
0
  delete versions_;
167
0
  if (mem_ != nullptr) mem_->Unref();
168
0
  if (imm_ != nullptr) imm_->Unref();
169
0
  delete tmp_batch_;
170
0
  delete log_;
171
0
  delete logfile_;
172
0
  delete table_cache_;
173
174
0
  if (owns_info_log_) {
175
0
    delete options_.info_log;
176
0
  }
177
0
  if (owns_cache_) {
178
0
    delete options_.block_cache;
179
0
  }
180
0
}
181
182
0
Status DBImpl::NewDB() {
183
0
  VersionEdit new_db;
184
0
  new_db.SetComparatorName(user_comparator()->Name());
185
0
  new_db.SetLogNumber(0);
186
0
  new_db.SetNextFile(2);
187
0
  new_db.SetLastSequence(0);
188
189
0
  const std::string manifest = DescriptorFileName(dbname_, 1);
190
0
  WritableFile* file;
191
0
  Status s = env_->NewWritableFile(manifest, &file);
192
0
  if (!s.ok()) {
193
0
    return s;
194
0
  }
195
0
  {
196
0
    log::Writer log(file);
197
0
    std::string record;
198
0
    new_db.EncodeTo(&record);
199
0
    s = log.AddRecord(record);
200
0
    if (s.ok()) {
201
0
      s = file->Close();
202
0
    }
203
0
  }
204
0
  delete file;
205
0
  if (s.ok()) {
206
    // Make "CURRENT" file that points to the new manifest file.
207
0
    s = SetCurrentFile(env_, dbname_, 1);
208
0
  } else {
209
0
    env_->DeleteFile(manifest);
210
0
  }
211
0
  return s;
212
0
}
213
214
0
void DBImpl::MaybeIgnoreError(Status* s) const {
215
0
  if (s->ok() || options_.paranoid_checks) {
216
    // No change needed
217
0
  } else {
218
0
    Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
219
0
    *s = Status::OK();
220
0
  }
221
0
}
222
223
0
void DBImpl::DeleteObsoleteFiles() {
224
0
  mutex_.AssertHeld();
225
226
0
  if (!bg_error_.ok()) {
227
    // After a background error, we don't know whether a new version may
228
    // or may not have been committed, so we cannot safely garbage collect.
229
0
    return;
230
0
  }
231
232
  // Make a set of all of the live files
233
0
  std::set<uint64_t> live = pending_outputs_;
234
0
  versions_->AddLiveFiles(&live);
235
236
0
  std::vector<std::string> filenames;
237
0
  env_->GetChildren(dbname_, &filenames);  // Ignoring errors on purpose
238
0
  uint64_t number;
239
0
  FileType type;
240
0
  std::vector<std::string> files_to_delete;
241
0
  for (std::string& filename : filenames) {
242
0
    if (ParseFileName(filename, &number, &type)) {
243
0
      bool keep = true;
244
0
      switch (type) {
245
0
        case kLogFile:
246
0
          keep = ((number >= versions_->LogNumber()) ||
247
0
                  (number == versions_->PrevLogNumber()));
248
0
          break;
249
0
        case kDescriptorFile:
250
          // Keep my manifest file, and any newer incarnations'
251
          // (in case there is a race that allows other incarnations)
252
0
          keep = (number >= versions_->ManifestFileNumber());
253
0
          break;
254
0
        case kTableFile:
255
0
          keep = (live.find(number) != live.end());
256
0
          break;
257
0
        case kTempFile:
258
          // Any temp files that are currently being written to must
259
          // be recorded in pending_outputs_, which is inserted into "live"
260
0
          keep = (live.find(number) != live.end());
261
0
          break;
262
0
        case kCurrentFile:
263
0
        case kDBLockFile:
264
0
        case kInfoLogFile:
265
0
          keep = true;
266
0
          break;
267
0
      }
268
269
0
      if (!keep) {
270
0
        files_to_delete.push_back(std::move(filename));
271
0
        if (type == kTableFile) {
272
0
          table_cache_->Evict(number);
273
0
        }
274
0
        Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
275
0
            static_cast<unsigned long long>(number));
276
0
      }
277
0
    }
278
0
  }
279
280
  // While deleting all files unblock other threads. All files being deleted
281
  // have unique names which will not collide with newly created files and
282
  // are therefore safe to delete while allowing other threads to proceed.
283
0
  mutex_.Unlock();
284
0
  for (const std::string& filename : files_to_delete) {
285
0
    env_->DeleteFile(dbname_ + "/" + filename);
286
0
  }
287
0
  mutex_.Lock();
288
0
}
289
290
0
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
291
0
  mutex_.AssertHeld();
292
293
  // Ignore error from CreateDir since the creation of the DB is
294
  // committed only when the descriptor is created, and this directory
295
  // may already exist from a previous failed creation attempt.
296
0
  env_->CreateDir(dbname_);
297
0
  assert(db_lock_ == nullptr);
298
0
  Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
299
0
  if (!s.ok()) {
300
0
    return s;
301
0
  }
302
303
0
  if (!env_->FileExists(CurrentFileName(dbname_))) {
304
0
    if (options_.create_if_missing) {
305
0
      s = NewDB();
306
0
      if (!s.ok()) {
307
0
        return s;
308
0
      }
309
0
    } else {
310
0
      return Status::InvalidArgument(
311
0
          dbname_, "does not exist (create_if_missing is false)");
312
0
    }
313
0
  } else {
314
0
    if (options_.error_if_exists) {
315
0
      return Status::InvalidArgument(dbname_,
316
0
                                     "exists (error_if_exists is true)");
317
0
    }
318
0
  }
319
320
0
  s = versions_->Recover(save_manifest);
321
0
  if (!s.ok()) {
322
0
    return s;
323
0
  }
324
0
  SequenceNumber max_sequence(0);
325
326
  // Recover from all newer log files than the ones named in the
327
  // descriptor (new log files may have been added by the previous
328
  // incarnation without registering them in the descriptor).
329
  //
330
  // Note that PrevLogNumber() is no longer used, but we pay
331
  // attention to it in case we are recovering a database
332
  // produced by an older version of leveldb.
333
0
  const uint64_t min_log = versions_->LogNumber();
334
0
  const uint64_t prev_log = versions_->PrevLogNumber();
335
0
  std::vector<std::string> filenames;
336
0
  s = env_->GetChildren(dbname_, &filenames);
337
0
  if (!s.ok()) {
338
0
    return s;
339
0
  }
340
0
  std::set<uint64_t> expected;
341
0
  versions_->AddLiveFiles(&expected);
342
0
  uint64_t number;
343
0
  FileType type;
344
0
  std::vector<uint64_t> logs;
345
0
  for (size_t i = 0; i < filenames.size(); i++) {
346
0
    if (ParseFileName(filenames[i], &number, &type)) {
347
0
      expected.erase(number);
348
0
      if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
349
0
        logs.push_back(number);
350
0
    }
351
0
  }
352
0
  if (!expected.empty()) {
353
0
    char buf[50];
354
0
    snprintf(buf, sizeof(buf), "%d missing files; e.g.",
355
0
             static_cast<int>(expected.size()));
356
0
    return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
357
0
  }
358
359
  // Recover in the order in which the logs were generated
360
0
  std::sort(logs.begin(), logs.end());
361
0
  for (size_t i = 0; i < logs.size(); i++) {
362
0
    s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
363
0
                       &max_sequence);
364
0
    if (!s.ok()) {
365
0
      return s;
366
0
    }
367
368
    // The previous incarnation may not have written any MANIFEST
369
    // records after allocating this log number.  So we manually
370
    // update the file number allocation counter in VersionSet.
371
0
    versions_->MarkFileNumberUsed(logs[i]);
372
0
  }
373
374
0
  if (versions_->LastSequence() < max_sequence) {
375
0
    versions_->SetLastSequence(max_sequence);
376
0
  }
377
378
0
  return Status::OK();
379
0
}
380
381
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
382
                              bool* save_manifest, VersionEdit* edit,
383
0
                              SequenceNumber* max_sequence) {
384
0
  struct LogReporter : public log::Reader::Reporter {
385
0
    Env* env;
386
0
    Logger* info_log;
387
0
    const char* fname;
388
0
    Status* status;  // null if options_.paranoid_checks==false
389
0
    void Corruption(size_t bytes, const Status& s) override {
390
0
      Log(info_log, "%s%s: dropping %d bytes; %s",
391
0
          (this->status == nullptr ? "(ignoring error) " : ""), fname,
392
0
          static_cast<int>(bytes), s.ToString().c_str());
393
0
      if (this->status != nullptr && this->status->ok()) *this->status = s;
394
0
    }
395
0
  };
396
397
0
  mutex_.AssertHeld();
398
399
  // Open the log file
400
0
  std::string fname = LogFileName(dbname_, log_number);
401
0
  SequentialFile* file;
402
0
  Status status = env_->NewSequentialFile(fname, &file);
403
0
  if (!status.ok()) {
404
0
    MaybeIgnoreError(&status);
405
0
    return status;
406
0
  }
407
408
  // Create the log reader.
409
0
  LogReporter reporter;
410
0
  reporter.env = env_;
411
0
  reporter.info_log = options_.info_log;
412
0
  reporter.fname = fname.c_str();
413
0
  reporter.status = (options_.paranoid_checks ? &status : nullptr);
414
  // We intentionally make log::Reader do checksumming even if
415
  // paranoid_checks==false so that corruptions cause entire commits
416
  // to be skipped instead of propagating bad information (like overly
417
  // large sequence numbers).
418
0
  log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
419
0
  Log(options_.info_log, "Recovering log #%llu",
420
0
      (unsigned long long)log_number);
421
422
  // Read all the records and add to a memtable
423
0
  std::string scratch;
424
0
  Slice record;
425
0
  WriteBatch batch;
426
0
  int compactions = 0;
427
0
  MemTable* mem = nullptr;
428
0
  while (reader.ReadRecord(&record, &scratch) && status.ok()) {
429
0
    if (record.size() < 12) {
430
0
      reporter.Corruption(record.size(),
431
0
                          Status::Corruption("log record too small", fname));
432
0
      continue;
433
0
    }
434
0
    WriteBatchInternal::SetContents(&batch, record);
435
436
0
    if (mem == nullptr) {
437
0
      mem = new MemTable(internal_comparator_);
438
0
      mem->Ref();
439
0
    }
440
0
    status = WriteBatchInternal::InsertInto(&batch, mem);
441
0
    MaybeIgnoreError(&status);
442
0
    if (!status.ok()) {
443
0
      break;
444
0
    }
445
0
    const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
446
0
                                    WriteBatchInternal::Count(&batch) - 1;
447
0
    if (last_seq > *max_sequence) {
448
0
      *max_sequence = last_seq;
449
0
    }
450
451
0
    if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
452
0
      compactions++;
453
0
      *save_manifest = true;
454
0
      status = WriteLevel0Table(mem, edit, nullptr);
455
0
      mem->Unref();
456
0
      mem = nullptr;
457
0
      if (!status.ok()) {
458
        // Reflect errors immediately so that conditions like full
459
        // file-systems cause the DB::Open() to fail.
460
0
        break;
461
0
      }
462
0
    }
463
0
  }
464
465
0
  delete file;
466
467
  // See if we should keep reusing the last log file.
468
0
  if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
469
0
    assert(logfile_ == nullptr);
470
0
    assert(log_ == nullptr);
471
0
    assert(mem_ == nullptr);
472
0
    uint64_t lfile_size;
473
0
    if (env_->GetFileSize(fname, &lfile_size).ok() &&
474
0
        env_->NewAppendableFile(fname, &logfile_).ok()) {
475
0
      Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
476
0
      log_ = new log::Writer(logfile_, lfile_size);
477
0
      logfile_number_ = log_number;
478
0
      if (mem != nullptr) {
479
0
        mem_ = mem;
480
0
        mem = nullptr;
481
0
      } else {
482
        // mem can be nullptr if lognum exists but was empty.
483
0
        mem_ = new MemTable(internal_comparator_);
484
0
        mem_->Ref();
485
0
      }
486
0
    }
487
0
  }
488
489
0
  if (mem != nullptr) {
490
    // mem did not get reused; compact it.
491
0
    if (status.ok()) {
492
0
      *save_manifest = true;
493
0
      status = WriteLevel0Table(mem, edit, nullptr);
494
0
    }
495
0
    mem->Unref();
496
0
  }
497
498
0
  return status;
499
0
}
500
501
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
502
0
                                Version* base) {
503
0
  mutex_.AssertHeld();
504
0
  const uint64_t start_micros = env_->NowMicros();
505
0
  FileMetaData meta;
506
0
  meta.number = versions_->NewFileNumber();
507
0
  pending_outputs_.insert(meta.number);
508
0
  Iterator* iter = mem->NewIterator();
509
0
  Log(options_.info_log, "Level-0 table #%llu: started",
510
0
      (unsigned long long)meta.number);
511
512
0
  Status s;
513
0
  {
514
0
    mutex_.Unlock();
515
0
    s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
516
0
    mutex_.Lock();
517
0
  }
518
519
0
  Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
520
0
      (unsigned long long)meta.number, (unsigned long long)meta.file_size,
521
0
      s.ToString().c_str());
522
0
  delete iter;
523
0
  pending_outputs_.erase(meta.number);
524
525
  // Note that if file_size is zero, the file has been deleted and
526
  // should not be added to the manifest.
527
0
  int level = 0;
528
0
  if (s.ok() && meta.file_size > 0) {
529
0
    const Slice min_user_key = meta.smallest.user_key();
530
0
    const Slice max_user_key = meta.largest.user_key();
531
0
    if (base != nullptr) {
532
0
      level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
533
0
    }
534
0
    edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
535
0
                  meta.largest);
536
0
  }
537
538
0
  CompactionStats stats;
539
0
  stats.micros = env_->NowMicros() - start_micros;
540
0
  stats.bytes_written = meta.file_size;
541
0
  stats_[level].Add(stats);
542
0
  return s;
543
0
}
544
545
0
void DBImpl::CompactMemTable() {
546
0
  mutex_.AssertHeld();
547
0
  assert(imm_ != nullptr);
548
549
  // Save the contents of the memtable as a new Table
550
0
  VersionEdit edit;
551
0
  Version* base = versions_->current();
552
0
  base->Ref();
553
0
  Status s = WriteLevel0Table(imm_, &edit, base);
554
0
  base->Unref();
555
556
0
  if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
557
0
    s = Status::IOError("Deleting DB during memtable compaction");
558
0
  }
559
560
  // Replace immutable memtable with the generated Table
561
0
  if (s.ok()) {
562
0
    edit.SetPrevLogNumber(0);
563
0
    edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
564
0
    s = versions_->LogAndApply(&edit, &mutex_);
565
0
  }
566
567
0
  if (s.ok()) {
568
    // Commit to the new state
569
0
    imm_->Unref();
570
0
    imm_ = nullptr;
571
0
    has_imm_.store(false, std::memory_order_release);
572
0
    DeleteObsoleteFiles();
573
0
  } else {
574
0
    RecordBackgroundError(s);
575
0
  }
576
0
}
577
578
0
void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
579
0
  int max_level_with_files = 1;
580
0
  {
581
0
    MutexLock l(&mutex_);
582
0
    Version* base = versions_->current();
583
0
    for (int level = 1; level < config::kNumLevels; level++) {
584
0
      if (base->OverlapInLevel(level, begin, end)) {
585
0
        max_level_with_files = level;
586
0
      }
587
0
    }
588
0
  }
589
0
  TEST_CompactMemTable();  // TODO(sanjay): Skip if memtable does not overlap
590
0
  for (int level = 0; level < max_level_with_files; level++) {
591
0
    TEST_CompactRange(level, begin, end);
592
0
  }
593
0
}
594
595
void DBImpl::TEST_CompactRange(int level, const Slice* begin,
596
0
                               const Slice* end) {
597
0
  assert(level >= 0);
598
0
  assert(level + 1 < config::kNumLevels);
599
600
0
  InternalKey begin_storage, end_storage;
601
602
0
  ManualCompaction manual;
603
0
  manual.level = level;
604
0
  manual.done = false;
605
0
  if (begin == nullptr) {
606
0
    manual.begin = nullptr;
607
0
  } else {
608
0
    begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
609
0
    manual.begin = &begin_storage;
610
0
  }
611
0
  if (end == nullptr) {
612
0
    manual.end = nullptr;
613
0
  } else {
614
0
    end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
615
0
    manual.end = &end_storage;
616
0
  }
617
618
0
  MutexLock l(&mutex_);
619
0
  while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
620
0
         bg_error_.ok()) {
621
0
    if (manual_compaction_ == nullptr) {  // Idle
622
0
      manual_compaction_ = &manual;
623
0
      MaybeScheduleCompaction();
624
0
    } else {  // Running either my compaction or another compaction.
625
0
      background_work_finished_signal_.Wait();
626
0
    }
627
0
  }
628
0
  if (manual_compaction_ == &manual) {
629
    // Cancel my manual compaction since we aborted early for some reason.
630
0
    manual_compaction_ = nullptr;
631
0
  }
632
0
}
633
634
0
Status DBImpl::TEST_CompactMemTable() {
635
  // nullptr batch means just wait for earlier writes to be done
636
0
  Status s = Write(WriteOptions(), nullptr);
637
0
  if (s.ok()) {
638
    // Wait until the compaction completes
639
0
    MutexLock l(&mutex_);
640
0
    while (imm_ != nullptr && bg_error_.ok()) {
641
0
      background_work_finished_signal_.Wait();
642
0
    }
643
0
    if (imm_ != nullptr) {
644
0
      s = bg_error_;
645
0
    }
646
0
  }
647
0
  return s;
648
0
}
649
650
0
void DBImpl::RecordBackgroundError(const Status& s) {
651
0
  mutex_.AssertHeld();
652
0
  if (bg_error_.ok()) {
653
0
    bg_error_ = s;
654
0
    background_work_finished_signal_.SignalAll();
655
0
  }
656
0
}
657
658
0
void DBImpl::MaybeScheduleCompaction() {
659
0
  mutex_.AssertHeld();
660
0
  if (background_compaction_scheduled_) {
661
    // Already scheduled
662
0
  } else if (shutting_down_.load(std::memory_order_acquire)) {
663
    // DB is being deleted; no more background compactions
664
0
  } else if (!bg_error_.ok()) {
665
    // Already got an error; no more changes
666
0
  } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
667
0
             !versions_->NeedsCompaction()) {
668
    // No work to be done
669
0
  } else {
670
0
    background_compaction_scheduled_ = true;
671
0
    env_->Schedule(&DBImpl::BGWork, this);
672
0
  }
673
0
}
674
675
0
void DBImpl::BGWork(void* db) {
676
0
  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
677
0
}
678
679
0
void DBImpl::BackgroundCall() {
680
0
  MutexLock l(&mutex_);
681
0
  assert(background_compaction_scheduled_);
682
0
  if (shutting_down_.load(std::memory_order_acquire)) {
683
    // No more background work when shutting down.
684
0
  } else if (!bg_error_.ok()) {
685
    // No more background work after a background error.
686
0
  } else {
687
0
    BackgroundCompaction();
688
0
  }
689
690
0
  background_compaction_scheduled_ = false;
691
692
  // Previous compaction may have produced too many files in a level,
693
  // so reschedule another compaction if needed.
694
0
  MaybeScheduleCompaction();
695
0
  background_work_finished_signal_.SignalAll();
696
0
}
697
698
0
void DBImpl::BackgroundCompaction() {
699
0
  mutex_.AssertHeld();
700
701
0
  if (imm_ != nullptr) {
702
0
    CompactMemTable();
703
0
    return;
704
0
  }
705
706
0
  Compaction* c;
707
0
  bool is_manual = (manual_compaction_ != nullptr);
708
0
  InternalKey manual_end;
709
0
  if (is_manual) {
710
0
    ManualCompaction* m = manual_compaction_;
711
0
    c = versions_->CompactRange(m->level, m->begin, m->end);
712
0
    m->done = (c == nullptr);
713
0
    if (c != nullptr) {
714
0
      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
715
0
    }
716
0
    Log(options_.info_log,
717
0
        "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
718
0
        m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
719
0
        (m->end ? m->end->DebugString().c_str() : "(end)"),
720
0
        (m->done ? "(end)" : manual_end.DebugString().c_str()));
721
0
  } else {
722
0
    c = versions_->PickCompaction();
723
0
  }
724
725
0
  Status status;
726
0
  if (c == nullptr) {
727
    // Nothing to do
728
0
  } else if (!is_manual && c->IsTrivialMove()) {
729
    // Move file to next level
730
0
    assert(c->num_input_files(0) == 1);
731
0
    FileMetaData* f = c->input(0, 0);
732
0
    c->edit()->DeleteFile(c->level(), f->number);
733
0
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
734
0
                       f->largest);
735
0
    status = versions_->LogAndApply(c->edit(), &mutex_);
736
0
    if (!status.ok()) {
737
0
      RecordBackgroundError(status);
738
0
    }
739
0
    VersionSet::LevelSummaryStorage tmp;
740
0
    Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
741
0
        static_cast<unsigned long long>(f->number), c->level() + 1,
742
0
        static_cast<unsigned long long>(f->file_size),
743
0
        status.ToString().c_str(), versions_->LevelSummary(&tmp));
744
0
  } else {
745
0
    CompactionState* compact = new CompactionState(c);
746
0
    status = DoCompactionWork(compact);
747
0
    if (!status.ok()) {
748
0
      RecordBackgroundError(status);
749
0
    }
750
0
    CleanupCompaction(compact);
751
0
    c->ReleaseInputs();
752
0
    DeleteObsoleteFiles();
753
0
  }
754
0
  delete c;
755
756
0
  if (status.ok()) {
757
    // Done
758
0
  } else if (shutting_down_.load(std::memory_order_acquire)) {
759
    // Ignore compaction errors found during shutting down
760
0
  } else {
761
0
    Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
762
0
  }
763
764
0
  if (is_manual) {
765
0
    ManualCompaction* m = manual_compaction_;
766
0
    if (!status.ok()) {
767
0
      m->done = true;
768
0
    }
769
0
    if (!m->done) {
770
      // We only compacted part of the requested range.  Update *m
771
      // to the range that is left to be compacted.
772
0
      m->tmp_storage = manual_end;
773
0
      m->begin = &m->tmp_storage;
774
0
    }
775
0
    manual_compaction_ = nullptr;
776
0
  }
777
0
}
778
779
0
void DBImpl::CleanupCompaction(CompactionState* compact) {
780
0
  mutex_.AssertHeld();
781
0
  if (compact->builder != nullptr) {
782
    // May happen if we get a shutdown call in the middle of compaction
783
0
    compact->builder->Abandon();
784
0
    delete compact->builder;
785
0
  } else {
786
0
    assert(compact->outfile == nullptr);
787
0
  }
788
0
  delete compact->outfile;
789
0
  for (size_t i = 0; i < compact->outputs.size(); i++) {
790
0
    const CompactionState::Output& out = compact->outputs[i];
791
0
    pending_outputs_.erase(out.number);
792
0
  }
793
0
  delete compact;
794
0
}
795
796
0
Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
797
0
  assert(compact != nullptr);
798
0
  assert(compact->builder == nullptr);
799
0
  uint64_t file_number;
800
0
  {
801
0
    mutex_.Lock();
802
0
    file_number = versions_->NewFileNumber();
803
0
    pending_outputs_.insert(file_number);
804
0
    CompactionState::Output out;
805
0
    out.number = file_number;
806
0
    out.smallest.Clear();
807
0
    out.largest.Clear();
808
0
    compact->outputs.push_back(out);
809
0
    mutex_.Unlock();
810
0
  }
811
812
  // Make the output file
813
0
  std::string fname = TableFileName(dbname_, file_number);
814
0
  Status s = env_->NewWritableFile(fname, &compact->outfile);
815
0
  if (s.ok()) {
816
0
    compact->builder = new TableBuilder(options_, compact->outfile);
817
0
  }
818
0
  return s;
819
0
}
820
821
Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
822
0
                                          Iterator* input) {
823
0
  assert(compact != nullptr);
824
0
  assert(compact->outfile != nullptr);
825
0
  assert(compact->builder != nullptr);
826
827
0
  const uint64_t output_number = compact->current_output()->number;
828
0
  assert(output_number != 0);
829
830
  // Check for iterator errors
831
0
  Status s = input->status();
832
0
  const uint64_t current_entries = compact->builder->NumEntries();
833
0
  if (s.ok()) {
834
0
    s = compact->builder->Finish();
835
0
  } else {
836
0
    compact->builder->Abandon();
837
0
  }
838
0
  const uint64_t current_bytes = compact->builder->FileSize();
839
0
  compact->current_output()->file_size = current_bytes;
840
0
  compact->total_bytes += current_bytes;
841
0
  delete compact->builder;
842
0
  compact->builder = nullptr;
843
844
  // Finish and check for file errors
845
0
  if (s.ok()) {
846
0
    s = compact->outfile->Sync();
847
0
  }
848
0
  if (s.ok()) {
849
0
    s = compact->outfile->Close();
850
0
  }
851
0
  delete compact->outfile;
852
0
  compact->outfile = nullptr;
853
854
0
  if (s.ok() && current_entries > 0) {
855
    // Verify that the table is usable
856
0
    Iterator* iter =
857
0
        table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
858
0
    s = iter->status();
859
0
    delete iter;
860
0
    if (s.ok()) {
861
0
      Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
862
0
          (unsigned long long)output_number, compact->compaction->level(),
863
0
          (unsigned long long)current_entries,
864
0
          (unsigned long long)current_bytes);
865
0
    }
866
0
  }
867
0
  return s;
868
0
}
869
870
0
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
871
0
  mutex_.AssertHeld();
872
0
  Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
873
0
      compact->compaction->num_input_files(0), compact->compaction->level(),
874
0
      compact->compaction->num_input_files(1), compact->compaction->level() + 1,
875
0
      static_cast<long long>(compact->total_bytes));
876
877
  // Add compaction outputs
878
0
  compact->compaction->AddInputDeletions(compact->compaction->edit());
879
0
  const int level = compact->compaction->level();
880
0
  for (size_t i = 0; i < compact->outputs.size(); i++) {
881
0
    const CompactionState::Output& out = compact->outputs[i];
882
0
    compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
883
0
                                         out.smallest, out.largest);
884
0
  }
885
0
  return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
886
0
}
887
888
0
Status DBImpl::DoCompactionWork(CompactionState* compact) {
889
0
  const uint64_t start_micros = env_->NowMicros();
890
0
  int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
891
892
0
  Log(options_.info_log, "Compacting %d@%d + %d@%d files",
893
0
      compact->compaction->num_input_files(0), compact->compaction->level(),
894
0
      compact->compaction->num_input_files(1),
895
0
      compact->compaction->level() + 1);
896
897
0
  assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
898
0
  assert(compact->builder == nullptr);
899
0
  assert(compact->outfile == nullptr);
900
0
  if (snapshots_.empty()) {
901
0
    compact->smallest_snapshot = versions_->LastSequence();
902
0
  } else {
903
0
    compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
904
0
  }
905
906
0
  Iterator* input = versions_->MakeInputIterator(compact->compaction);
907
908
  // Release mutex while we're actually doing the compaction work
909
0
  mutex_.Unlock();
910
911
0
  input->SeekToFirst();
912
0
  Status status;
913
0
  ParsedInternalKey ikey;
914
0
  std::string current_user_key;
915
0
  bool has_current_user_key = false;
916
0
  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
917
0
  while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
918
    // Prioritize immutable compaction work
919
0
    if (has_imm_.load(std::memory_order_relaxed)) {
920
0
      const uint64_t imm_start = env_->NowMicros();
921
0
      mutex_.Lock();
922
0
      if (imm_ != nullptr) {
923
0
        CompactMemTable();
924
        // Wake up MakeRoomForWrite() if necessary.
925
0
        background_work_finished_signal_.SignalAll();
926
0
      }
927
0
      mutex_.Unlock();
928
0
      imm_micros += (env_->NowMicros() - imm_start);
929
0
    }
930
931
0
    Slice key = input->key();
932
0
    if (compact->compaction->ShouldStopBefore(key) &&
933
0
        compact->builder != nullptr) {
934
0
      status = FinishCompactionOutputFile(compact, input);
935
0
      if (!status.ok()) {
936
0
        break;
937
0
      }
938
0
    }
939
940
    // Handle key/value, add to state, etc.
941
0
    bool drop = false;
942
0
    if (!ParseInternalKey(key, &ikey)) {
943
      // Do not hide error keys
944
0
      current_user_key.clear();
945
0
      has_current_user_key = false;
946
0
      last_sequence_for_key = kMaxSequenceNumber;
947
0
    } else {
948
0
      if (!has_current_user_key ||
949
0
          user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
950
0
              0) {
951
        // First occurrence of this user key
952
0
        current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
953
0
        has_current_user_key = true;
954
0
        last_sequence_for_key = kMaxSequenceNumber;
955
0
      }
956
957
0
      if (last_sequence_for_key <= compact->smallest_snapshot) {
958
        // Hidden by an newer entry for same user key
959
0
        drop = true;  // (A)
960
0
      } else if (ikey.type == kTypeDeletion &&
961
0
                 ikey.sequence <= compact->smallest_snapshot &&
962
0
                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
963
        // For this user key:
964
        // (1) there is no data in higher levels
965
        // (2) data in lower levels will have larger sequence numbers
966
        // (3) data in layers that are being compacted here and have
967
        //     smaller sequence numbers will be dropped in the next
968
        //     few iterations of this loop (by rule (A) above).
969
        // Therefore this deletion marker is obsolete and can be dropped.
970
0
        drop = true;
971
0
      }
972
973
0
      last_sequence_for_key = ikey.sequence;
974
0
    }
975
#if 0
976
    Log(options_.info_log,
977
        "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
978
        "%d smallest_snapshot: %d",
979
        ikey.user_key.ToString().c_str(),
980
        (int)ikey.sequence, ikey.type, kTypeValue, drop,
981
        compact->compaction->IsBaseLevelForKey(ikey.user_key),
982
        (int)last_sequence_for_key, (int)compact->smallest_snapshot);
983
#endif
984
985
0
    if (!drop) {
986
      // Open output file if necessary
987
0
      if (compact->builder == nullptr) {
988
0
        status = OpenCompactionOutputFile(compact);
989
0
        if (!status.ok()) {
990
0
          break;
991
0
        }
992
0
      }
993
0
      if (compact->builder->NumEntries() == 0) {
994
0
        compact->current_output()->smallest.DecodeFrom(key);
995
0
      }
996
0
      compact->current_output()->largest.DecodeFrom(key);
997
0
      compact->builder->Add(key, input->value());
998
999
      // Close output file if it is big enough
1000
0
      if (compact->builder->FileSize() >=
1001
0
          compact->compaction->MaxOutputFileSize()) {
1002
0
        status = FinishCompactionOutputFile(compact, input);
1003
0
        if (!status.ok()) {
1004
0
          break;
1005
0
        }
1006
0
      }
1007
0
    }
1008
1009
0
    input->Next();
1010
0
  }
1011
1012
0
  if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
1013
0
    status = Status::IOError("Deleting DB during compaction");
1014
0
  }
1015
0
  if (status.ok() && compact->builder != nullptr) {
1016
0
    status = FinishCompactionOutputFile(compact, input);
1017
0
  }
1018
0
  if (status.ok()) {
1019
0
    status = input->status();
1020
0
  }
1021
0
  delete input;
1022
0
  input = nullptr;
1023
1024
0
  CompactionStats stats;
1025
0
  stats.micros = env_->NowMicros() - start_micros - imm_micros;
1026
0
  for (int which = 0; which < 2; which++) {
1027
0
    for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
1028
0
      stats.bytes_read += compact->compaction->input(which, i)->file_size;
1029
0
    }
1030
0
  }
1031
0
  for (size_t i = 0; i < compact->outputs.size(); i++) {
1032
0
    stats.bytes_written += compact->outputs[i].file_size;
1033
0
  }
1034
1035
0
  mutex_.Lock();
1036
0
  stats_[compact->compaction->level() + 1].Add(stats);
1037
1038
0
  if (status.ok()) {
1039
0
    status = InstallCompactionResults(compact);
1040
0
  }
1041
0
  if (!status.ok()) {
1042
0
    RecordBackgroundError(status);
1043
0
  }
1044
0
  VersionSet::LevelSummaryStorage tmp;
1045
0
  Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
1046
0
  return status;
1047
0
}
1048
1049
namespace {
1050
1051
struct IterState {
1052
  port::Mutex* const mu;
1053
  Version* const version GUARDED_BY(mu);
1054
  MemTable* const mem GUARDED_BY(mu);
1055
  MemTable* const imm GUARDED_BY(mu);
1056
1057
  IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
1058
0
      : mu(mutex), version(version), mem(mem), imm(imm) {}
1059
};
1060
1061
0
static void CleanupIteratorState(void* arg1, void* arg2) {
1062
0
  IterState* state = reinterpret_cast<IterState*>(arg1);
1063
0
  state->mu->Lock();
1064
0
  state->mem->Unref();
1065
0
  if (state->imm != nullptr) state->imm->Unref();
1066
0
  state->version->Unref();
1067
0
  state->mu->Unlock();
1068
0
  delete state;
1069
0
}
1070
1071
}  // anonymous namespace
1072
1073
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
1074
                                      SequenceNumber* latest_snapshot,
1075
0
                                      uint32_t* seed) {
1076
0
  mutex_.Lock();
1077
0
  *latest_snapshot = versions_->LastSequence();
1078
1079
  // Collect together all needed child iterators
1080
0
  std::vector<Iterator*> list;
1081
0
  list.push_back(mem_->NewIterator());
1082
0
  mem_->Ref();
1083
0
  if (imm_ != nullptr) {
1084
0
    list.push_back(imm_->NewIterator());
1085
0
    imm_->Ref();
1086
0
  }
1087
0
  versions_->current()->AddIterators(options, &list);
1088
0
  Iterator* internal_iter =
1089
0
      NewMergingIterator(&internal_comparator_, &list[0], list.size());
1090
0
  versions_->current()->Ref();
1091
1092
0
  IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
1093
0
  internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1094
1095
0
  *seed = ++seed_;
1096
0
  mutex_.Unlock();
1097
0
  return internal_iter;
1098
0
}
1099
1100
0
Iterator* DBImpl::TEST_NewInternalIterator() {
1101
0
  SequenceNumber ignored;
1102
0
  uint32_t ignored_seed;
1103
0
  return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
1104
0
}
1105
1106
0
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
1107
0
  MutexLock l(&mutex_);
1108
0
  return versions_->MaxNextLevelOverlappingBytes();
1109
0
}
1110
1111
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
1112
0
                   std::string* value) {
1113
0
  Status s;
1114
0
  MutexLock l(&mutex_);
1115
0
  SequenceNumber snapshot;
1116
0
  if (options.snapshot != nullptr) {
1117
0
    snapshot =
1118
0
        static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
1119
0
  } else {
1120
0
    snapshot = versions_->LastSequence();
1121
0
  }
1122
1123
0
  MemTable* mem = mem_;
1124
0
  MemTable* imm = imm_;
1125
0
  Version* current = versions_->current();
1126
0
  mem->Ref();
1127
0
  if (imm != nullptr) imm->Ref();
1128
0
  current->Ref();
1129
1130
0
  bool have_stat_update = false;
1131
0
  Version::GetStats stats;
1132
1133
  // Unlock while reading from files and memtables
1134
0
  {
1135
0
    mutex_.Unlock();
1136
    // First look in the memtable, then in the immutable memtable (if any).
1137
0
    LookupKey lkey(key, snapshot);
1138
0
    if (mem->Get(lkey, value, &s)) {
1139
      // Done
1140
0
    } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
1141
      // Done
1142
0
    } else {
1143
0
      s = current->Get(options, lkey, value, &stats);
1144
0
      have_stat_update = true;
1145
0
    }
1146
0
    mutex_.Lock();
1147
0
  }
1148
1149
0
  if (have_stat_update && current->UpdateStats(stats)) {
1150
0
    MaybeScheduleCompaction();
1151
0
  }
1152
0
  mem->Unref();
1153
0
  if (imm != nullptr) imm->Unref();
1154
0
  current->Unref();
1155
0
  return s;
1156
0
}
1157
1158
0
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
1159
0
  SequenceNumber latest_snapshot;
1160
0
  uint32_t seed;
1161
0
  Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
1162
0
  return NewDBIterator(this, user_comparator(), iter,
1163
0
                       (options.snapshot != nullptr
1164
0
                            ? static_cast<const SnapshotImpl*>(options.snapshot)
1165
0
                                  ->sequence_number()
1166
0
                            : latest_snapshot),
1167
0
                       seed);
1168
0
}
1169
1170
0
void DBImpl::RecordReadSample(Slice key) {
1171
0
  MutexLock l(&mutex_);
1172
0
  if (versions_->current()->RecordReadSample(key)) {
1173
0
    MaybeScheduleCompaction();
1174
0
  }
1175
0
}
1176
1177
0
const Snapshot* DBImpl::GetSnapshot() {
1178
0
  MutexLock l(&mutex_);
1179
0
  return snapshots_.New(versions_->LastSequence());
1180
0
}
1181
1182
0
void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
1183
0
  MutexLock l(&mutex_);
1184
0
  snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
1185
0
}
1186
1187
// Convenience methods
1188
0
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
1189
0
  return DB::Put(o, key, val);
1190
0
}
1191
1192
0
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
1193
0
  return DB::Delete(options, key);
1194
0
}
1195
1196
0
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
1197
0
  Writer w(&mutex_);
1198
0
  w.batch = updates;
1199
0
  w.sync = options.sync;
1200
0
  w.done = false;
1201
1202
0
  MutexLock l(&mutex_);
1203
0
  writers_.push_back(&w);
1204
0
  while (!w.done && &w != writers_.front()) {
1205
0
    w.cv.Wait();
1206
0
  }
1207
0
  if (w.done) {
1208
0
    return w.status;
1209
0
  }
1210
1211
  // May temporarily unlock and wait.
1212
0
  Status status = MakeRoomForWrite(updates == nullptr);
1213
0
  uint64_t last_sequence = versions_->LastSequence();
1214
0
  Writer* last_writer = &w;
1215
0
  if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
1216
0
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
1217
0
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
1218
0
    last_sequence += WriteBatchInternal::Count(write_batch);
1219
1220
    // Add to log and apply to memtable.  We can release the lock
1221
    // during this phase since &w is currently responsible for logging
1222
    // and protects against concurrent loggers and concurrent writes
1223
    // into mem_.
1224
0
    {
1225
0
      mutex_.Unlock();
1226
0
      status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
1227
0
      bool sync_error = false;
1228
0
      if (status.ok() && options.sync) {
1229
0
        status = logfile_->Sync();
1230
0
        if (!status.ok()) {
1231
0
          sync_error = true;
1232
0
        }
1233
0
      }
1234
0
      if (status.ok()) {
1235
0
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
1236
0
      }
1237
0
      mutex_.Lock();
1238
0
      if (sync_error) {
1239
        // The state of the log file is indeterminate: the log record we
1240
        // just added may or may not show up when the DB is re-opened.
1241
        // So we force the DB into a mode where all future writes fail.
1242
0
        RecordBackgroundError(status);
1243
0
      }
1244
0
    }
1245
0
    if (write_batch == tmp_batch_) tmp_batch_->Clear();
1246
1247
0
    versions_->SetLastSequence(last_sequence);
1248
0
  }
1249
1250
0
  while (true) {
1251
0
    Writer* ready = writers_.front();
1252
0
    writers_.pop_front();
1253
0
    if (ready != &w) {
1254
0
      ready->status = status;
1255
0
      ready->done = true;
1256
0
      ready->cv.Signal();
1257
0
    }
1258
0
    if (ready == last_writer) break;
1259
0
  }
1260
1261
  // Notify new head of write queue
1262
0
  if (!writers_.empty()) {
1263
0
    writers_.front()->cv.Signal();
1264
0
  }
1265
1266
0
  return status;
1267
0
}
1268
1269
// REQUIRES: Writer list must be non-empty
1270
// REQUIRES: First writer must have a non-null batch
1271
0
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
1272
0
  mutex_.AssertHeld();
1273
0
  assert(!writers_.empty());
1274
0
  Writer* first = writers_.front();
1275
0
  WriteBatch* result = first->batch;
1276
0
  assert(result != nullptr);
1277
1278
0
  size_t size = WriteBatchInternal::ByteSize(first->batch);
1279
1280
  // Allow the group to grow up to a maximum size, but if the
1281
  // original write is small, limit the growth so we do not slow
1282
  // down the small write too much.
1283
0
  size_t max_size = 1 << 20;
1284
0
  if (size <= (128 << 10)) {
1285
0
    max_size = size + (128 << 10);
1286
0
  }
1287
1288
0
  *last_writer = first;
1289
0
  std::deque<Writer*>::iterator iter = writers_.begin();
1290
0
  ++iter;  // Advance past "first"
1291
0
  for (; iter != writers_.end(); ++iter) {
1292
0
    Writer* w = *iter;
1293
0
    if (w->sync && !first->sync) {
1294
      // Do not include a sync write into a batch handled by a non-sync write.
1295
0
      break;
1296
0
    }
1297
1298
0
    if (w->batch != nullptr) {
1299
0
      size += WriteBatchInternal::ByteSize(w->batch);
1300
0
      if (size > max_size) {
1301
        // Do not make batch too big
1302
0
        break;
1303
0
      }
1304
1305
      // Append to *result
1306
0
      if (result == first->batch) {
1307
        // Switch to temporary batch instead of disturbing caller's batch
1308
0
        result = tmp_batch_;
1309
0
        assert(WriteBatchInternal::Count(result) == 0);
1310
0
        WriteBatchInternal::Append(result, first->batch);
1311
0
      }
1312
0
      WriteBatchInternal::Append(result, w->batch);
1313
0
    }
1314
0
    *last_writer = w;
1315
0
  }
1316
0
  return result;
1317
0
}
1318
1319
// REQUIRES: mutex_ is held
1320
// REQUIRES: this thread is currently at the front of the writer queue
1321
0
Status DBImpl::MakeRoomForWrite(bool force) {
1322
0
  mutex_.AssertHeld();
1323
0
  assert(!writers_.empty());
1324
0
  bool allow_delay = !force;
1325
0
  Status s;
1326
0
  while (true) {
1327
0
    if (!bg_error_.ok()) {
1328
      // Yield previous error
1329
0
      s = bg_error_;
1330
0
      break;
1331
0
    } else if (allow_delay && versions_->NumLevelFiles(0) >=
1332
0
                                  config::kL0_SlowdownWritesTrigger) {
1333
      // We are getting close to hitting a hard limit on the number of
1334
      // L0 files.  Rather than delaying a single write by several
1335
      // seconds when we hit the hard limit, start delaying each
1336
      // individual write by 1ms to reduce latency variance.  Also,
1337
      // this delay hands over some CPU to the compaction thread in
1338
      // case it is sharing the same core as the writer.
1339
0
      mutex_.Unlock();
1340
0
      env_->SleepForMicroseconds(1000);
1341
0
      allow_delay = false;  // Do not delay a single write more than once
1342
0
      mutex_.Lock();
1343
0
    } else if (!force &&
1344
0
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
1345
      // There is room in current memtable
1346
0
      break;
1347
0
    } else if (imm_ != nullptr) {
1348
      // We have filled up the current memtable, but the previous
1349
      // one is still being compacted, so we wait.
1350
0
      Log(options_.info_log, "Current memtable full; waiting...\n");
1351
0
      background_work_finished_signal_.Wait();
1352
0
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
1353
      // There are too many level-0 files.
1354
0
      Log(options_.info_log, "Too many L0 files; waiting...\n");
1355
0
      background_work_finished_signal_.Wait();
1356
0
    } else {
1357
      // Attempt to switch to a new memtable and trigger compaction of old
1358
0
      assert(versions_->PrevLogNumber() == 0);
1359
0
      uint64_t new_log_number = versions_->NewFileNumber();
1360
0
      WritableFile* lfile = nullptr;
1361
0
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
1362
0
      if (!s.ok()) {
1363
        // Avoid chewing through file number space in a tight loop.
1364
0
        versions_->ReuseFileNumber(new_log_number);
1365
0
        break;
1366
0
      }
1367
0
      delete log_;
1368
0
      delete logfile_;
1369
0
      logfile_ = lfile;
1370
0
      logfile_number_ = new_log_number;
1371
0
      log_ = new log::Writer(lfile);
1372
0
      imm_ = mem_;
1373
0
      has_imm_.store(true, std::memory_order_release);
1374
0
      mem_ = new MemTable(internal_comparator_);
1375
0
      mem_->Ref();
1376
0
      force = false;  // Do not force another compaction if have room
1377
0
      MaybeScheduleCompaction();
1378
0
    }
1379
0
  }
1380
0
  return s;
1381
0
}
1382
1383
0
bool DBImpl::GetProperty(const Slice& property, std::string* value) {
1384
0
  value->clear();
1385
1386
0
  MutexLock l(&mutex_);
1387
0
  Slice in = property;
1388
0
  Slice prefix("leveldb.");
1389
0
  if (!in.starts_with(prefix)) return false;
1390
0
  in.remove_prefix(prefix.size());
1391
1392
0
  if (in.starts_with("num-files-at-level")) {
1393
0
    in.remove_prefix(strlen("num-files-at-level"));
1394
0
    uint64_t level;
1395
0
    bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
1396
0
    if (!ok || level >= config::kNumLevels) {
1397
0
      return false;
1398
0
    } else {
1399
0
      char buf[100];
1400
0
      snprintf(buf, sizeof(buf), "%d",
1401
0
               versions_->NumLevelFiles(static_cast<int>(level)));
1402
0
      *value = buf;
1403
0
      return true;
1404
0
    }
1405
0
  } else if (in == "stats") {
1406
0
    char buf[200];
1407
0
    snprintf(buf, sizeof(buf),
1408
0
             "                               Compactions\n"
1409
0
             "Level  Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1410
0
             "--------------------------------------------------\n");
1411
0
    value->append(buf);
1412
0
    for (int level = 0; level < config::kNumLevels; level++) {
1413
0
      int files = versions_->NumLevelFiles(level);
1414
0
      if (stats_[level].micros > 0 || files > 0) {
1415
0
        snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", level,
1416
0
                 files, versions_->NumLevelBytes(level) / 1048576.0,
1417
0
                 stats_[level].micros / 1e6,
1418
0
                 stats_[level].bytes_read / 1048576.0,
1419
0
                 stats_[level].bytes_written / 1048576.0);
1420
0
        value->append(buf);
1421
0
      }
1422
0
    }
1423
0
    return true;
1424
0
  } else if (in == "sstables") {
1425
0
    *value = versions_->current()->DebugString();
1426
0
    return true;
1427
0
  } else if (in == "approximate-memory-usage") {
1428
0
    size_t total_usage = options_.block_cache->TotalCharge();
1429
0
    if (mem_) {
1430
0
      total_usage += mem_->ApproximateMemoryUsage();
1431
0
    }
1432
0
    if (imm_) {
1433
0
      total_usage += imm_->ApproximateMemoryUsage();
1434
0
    }
1435
0
    char buf[50];
1436
0
    snprintf(buf, sizeof(buf), "%llu",
1437
0
             static_cast<unsigned long long>(total_usage));
1438
0
    value->append(buf);
1439
0
    return true;
1440
0
  }
1441
1442
0
  return false;
1443
0
}
1444
1445
0
void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
1446
  // TODO(opt): better implementation
1447
0
  MutexLock l(&mutex_);
1448
0
  Version* v = versions_->current();
1449
0
  v->Ref();
1450
1451
0
  for (int i = 0; i < n; i++) {
1452
    // Convert user_key into a corresponding internal key.
1453
0
    InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1454
0
    InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1455
0
    uint64_t start = versions_->ApproximateOffsetOf(v, k1);
1456
0
    uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
1457
0
    sizes[i] = (limit >= start ? limit - start : 0);
1458
0
  }
1459
1460
0
  v->Unref();
1461
0
}
1462
1463
// Default implementations of convenience methods that subclasses of DB
1464
// can call if they wish
1465
0
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1466
0
  WriteBatch batch;
1467
0
  batch.Put(key, value);
1468
0
  return Write(opt, &batch);
1469
0
}
1470
1471
0
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1472
0
  WriteBatch batch;
1473
0
  batch.Delete(key);
1474
0
  return Write(opt, &batch);
1475
0
}
1476
1477
0
DB::~DB() = default;
1478
1479
0
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1480
0
  *dbptr = nullptr;
1481
1482
0
  DBImpl* impl = new DBImpl(options, dbname);
1483
0
  impl->mutex_.Lock();
1484
0
  VersionEdit edit;
1485
  // Recover handles create_if_missing, error_if_exists
1486
0
  bool save_manifest = false;
1487
0
  Status s = impl->Recover(&edit, &save_manifest);
1488
0
  if (s.ok() && impl->mem_ == nullptr) {
1489
    // Create new log and a corresponding memtable.
1490
0
    uint64_t new_log_number = impl->versions_->NewFileNumber();
1491
0
    WritableFile* lfile;
1492
0
    s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
1493
0
                                     &lfile);
1494
0
    if (s.ok()) {
1495
0
      edit.SetLogNumber(new_log_number);
1496
0
      impl->logfile_ = lfile;
1497
0
      impl->logfile_number_ = new_log_number;
1498
0
      impl->log_ = new log::Writer(lfile);
1499
0
      impl->mem_ = new MemTable(impl->internal_comparator_);
1500
0
      impl->mem_->Ref();
1501
0
    }
1502
0
  }
1503
0
  if (s.ok() && save_manifest) {
1504
0
    edit.SetPrevLogNumber(0);  // No older logs needed after recovery.
1505
0
    edit.SetLogNumber(impl->logfile_number_);
1506
0
    s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
1507
0
  }
1508
0
  if (s.ok()) {
1509
0
    impl->DeleteObsoleteFiles();
1510
0
    impl->MaybeScheduleCompaction();
1511
0
  }
1512
0
  impl->mutex_.Unlock();
1513
0
  if (s.ok()) {
1514
0
    assert(impl->mem_ != nullptr);
1515
0
    *dbptr = impl;
1516
0
  } else {
1517
0
    delete impl;
1518
0
  }
1519
0
  return s;
1520
0
}
1521
1522
0
Snapshot::~Snapshot() = default;
1523
1524
0
Status DestroyDB(const std::string& dbname, const Options& options) {
1525
0
  Env* env = options.env;
1526
0
  std::vector<std::string> filenames;
1527
0
  Status result = env->GetChildren(dbname, &filenames);
1528
0
  if (!result.ok()) {
1529
    // Ignore error in case directory does not exist
1530
0
    return Status::OK();
1531
0
  }
1532
1533
0
  FileLock* lock;
1534
0
  const std::string lockname = LockFileName(dbname);
1535
0
  result = env->LockFile(lockname, &lock);
1536
0
  if (result.ok()) {
1537
0
    uint64_t number;
1538
0
    FileType type;
1539
0
    for (size_t i = 0; i < filenames.size(); i++) {
1540
0
      if (ParseFileName(filenames[i], &number, &type) &&
1541
0
          type != kDBLockFile) {  // Lock file will be deleted at end
1542
0
        Status del = env->DeleteFile(dbname + "/" + filenames[i]);
1543
0
        if (result.ok() && !del.ok()) {
1544
0
          result = del;
1545
0
        }
1546
0
      }
1547
0
    }
1548
0
    env->UnlockFile(lock);  // Ignore error since state is already gone
1549
0
    env->DeleteFile(lockname);
1550
0
    env->DeleteDir(dbname);  // Ignore error in case dir contains other files
1551
0
  }
1552
0
  return result;
1553
0
}
1554
1555
}  // namespace leveldb