857 words
4 minutes
Doris 写入

Doris 存储层次#

doris 首先有内存的memtable—>rowset->segment->column writer

  • rowset:rowset 是描述一次写入

  • segment:一个rowset可能由多个segment组成

  • tablet:tablet 是包括多个segment,一个segment包括多个rowset

  • column一行数据有多个column

导入代码#

整个流程第一步:

Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) {
VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id()
<< ", memsize: " << PrettyPrinter::print_bytes(memtable->memory_usage())
<< ", rows: " << memtable->stat().raw_rows;
memtable->update_mem_type(MemType::FLUSH);
int64_t duration_ns = 0;
{
...
RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size));
...
}

我们看看实现

Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) {
if (block->rows() == 0) {
return Status::OK();
}
{
SCOPED_RAW_TIMER(&_segment_writer_ns);
RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size));
}
// delete bitmap and seg compaction are done on the destination BE.
return Status::OK();
}
void Tablet::add_rowsets(const std::vector<RowsetSharedPtr>& to_add) {
if (to_add.empty()) {
return;
}
std::vector<RowsetMetaSharedPtr> rs_metas;
rs_metas.reserve(to_add.size());
for (auto& rs : to_add) {
_rs_version_map.emplace(rs->version(), rs);
_timestamped_version_tracker.add_version(rs->version());
rs_metas.push_back(rs->rowset_meta());
}
_tablet_meta->modify_rs_metas(rs_metas, {});
}
Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) {
if (block->rows() == 0) {
return Status::OK();
}
{
SCOPED_RAW_TIMER(&_segment_writer_ns);
RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size));
}
// delete bitmap and seg compaction are done on the destination BE.
return Status::OK();
}

调用:

Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) {
if (block->rows() == 0) {
return Status::OK();
}
vectorized::Block flush_block(*block);
if (_context.write_type != DataWriteType::TYPE_COMPACTION &&
_context.tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(_parse_variant_columns(flush_block));
}
bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
if (config::enable_vertical_segment_writer) {
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
} else {
std::unique_ptr<segment_v2::SegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
}
return Status::OK();
}

后面调用:

Status SegmentFlusher::_flush_segment_writer(
std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int64_t* flush_size) {
MonotonicStopWatch total_timer;
total_timer.start();
uint32_t row_num = writer->num_rows_written();
_num_rows_updated += writer->num_rows_updated();
_num_rows_deleted += writer->num_rows_deleted();
_num_rows_new_added += writer->num_rows_new_added();
_num_rows_filtered += writer->num_rows_filtered();
if (row_num == 0) {
return Status::OK();
}
MonotonicStopWatch finalize_timer;
finalize_timer.start();
uint64_t segment_file_size;
uint64_t common_index_size;
Status s = writer->finalize(&segment_file_size, &common_index_size);
finalize_timer.stop();
if (!s.ok()) {
return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
}
MonotonicStopWatch inverted_index_timer;
inverted_index_timer.start();
int64_t inverted_index_file_size = 0;
RETURN_IF_ERROR(writer->close_inverted_index(&inverted_index_file_size));
inverted_index_timer.stop();
VLOG_DEBUG << "tablet_id:" << _context.tablet_id
<< " flushing filename: " << writer->data_dir_path()
<< " rowset_id:" << _context.rowset_id;
KeyBoundsPB key_bounds;
Slice min_key = writer->min_encoded_key();
Slice max_key = writer->max_encoded_key();
DCHECK_LE(min_key.compare(max_key), 0);
key_bounds.set_min_key(min_key.to_string());
key_bounds.set_max_key(max_key.to_string());
uint32_t segment_id = writer->segment_id();
SegmentStatistics segstat;
segstat.row_num = row_num;
segstat.data_size = segment_file_size;
segstat.index_size = inverted_index_file_size;
segstat.key_bounds = key_bounds;
writer.reset();
MonotonicStopWatch collector_timer;
collector_timer.start();
RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
collector_timer.stop();
total_timer.stop();
LOG(INFO) << "tablet_id:" << _context.tablet_id
<< ", flushing rowset_dir: " << _context.tablet_path
<< ", rowset_id:" << _context.rowset_id
<< ", data size:" << PrettyPrinter::print_bytes(segstat.data_size)
<< ", index size:" << PrettyPrinter::print_bytes(segstat.index_size)
<< ", timing breakdown: total=" << total_timer.elapsed_time_milliseconds() << "ms"
<< ", finalize=" << finalize_timer.elapsed_time_milliseconds() << "ms"
<< ", inverted_index=" << inverted_index_timer.elapsed_time_milliseconds() << "ms"
<< ", collector=" << collector_timer.elapsed_time_milliseconds() << "ms";
if (flush_size) {
*flush_size = segment_file_size;
}
return Status::OK();
}

核心转换逻辑,将行转换成列:

std::pair<Status, IOlapColumnDataAccessor*> OlapBlockDataConvertor::convert_column_data(
size_t cid) {
assert(cid < _convertors.size());
auto convert_func = [&]() -> Status {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_convertors[cid]->convert_to_olap());
return Status::OK();
};
auto status = convert_func();
return {status, _convertors[cid].get()};
}
Table: user_orders
├── Partition: p202401 (范围分区)
│ ├── Tablet 167 (Hash分桶)
│ │ ├── Rowset v1-v5 (基础版本)
│ │ │ ├── Segment 0 (256MB, 200万行)
│ │ │ │ ├── Page 0-4095: user_id 列
│ │ │ │ ├── Page 4096-8191: amount 列
│ │ │ │ └── Page 8192-12287: order_time 列
│ │ │ └── Segment 1 (256MB, 200万行)
│ │ ├── Rowset v6-v9 (增量版本)
│ │ └── Rowset v10 (最新版本)
│ ├── Tablet 168
│ └── ...
└── Partition: p202402
逻辑层: 表 (Table)
物理层: Tablet (数据分片)
版本层: Rowset (行集,版本管理单元)
文件层: Segment (段,物理文件)
存储层: Page (页,数据块)
数据层: Column (列,存储格式)
doris/be/src/vec/exprs/vexpr.cpp
// 表达式计算得到列数据
ColumnPtr VExpr::get_const_col(const Block& block, size_t row_idx) {
// 计算这一行这个列的值
// 例如:对于常量值 'Alice',直接返回包含'Alice'的列
}
// 位置: doris/be/src/vec/columns/column.h
// 实际的列数据插入
template <typename T>
void ColumnVector<T>::insert_range_from(const IColumn& src, size_t start, size_t length) {
const auto& src_vec = static_cast<const ColumnVector<T>&>(src);
// 将源列的数据追加到当前列
size_t old_size = data.size();
data.resize(old_size + length);
// 关键:这里发生了数据复制,从源列到目标列
memcpy(&data[old_size], &src_vec.data[start], length * sizeof(T));
}
void insert_data(const char* pos, size_t length) override {
const size_t old_size = chars.size();
const size_t new_size = old_size + length;
if (length) {
check_chars_length(new_size, offsets.size() + 1);
chars.resize(new_size);
memcpy(chars.data() + old_size, pos, length);
}
offsets.push_back(new_size);
sanity_check_simple();
}
Doris 写入
https://tatamagic.com/posts/doris/
Author
dinosaur
Published at
2026-01-03
License
CC BY-NC-SA 4.0