857 words
4 minutes
Doris 写入
Doris 存储层次
doris 首先有内存的memtable—>rowset->segment->column writer
-
rowset:rowset 是描述一次写入
-
segment:一个rowset可能由多个segment组成
-
tablet:tablet 是包括多个segment,一个segment包括多个rowset
-
column一行数据有多个column
导入代码
整个流程第一步:
Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) { VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id() << ", memsize: " << PrettyPrinter::print_bytes(memtable->memory_usage()) << ", rows: " << memtable->stat().raw_rows; memtable->update_mem_type(MemType::FLUSH); int64_t duration_ns = 0; { ... RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size)); ... }我们看看实现
Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id, int64_t* flush_size) { if (block->rows() == 0) { return Status::OK(); }
{ SCOPED_RAW_TIMER(&_segment_writer_ns); RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size)); } // delete bitmap and seg compaction are done on the destination BE. return Status::OK();}void Tablet::add_rowsets(const std::vector<RowsetSharedPtr>& to_add) { if (to_add.empty()) { return; } std::vector<RowsetMetaSharedPtr> rs_metas; rs_metas.reserve(to_add.size()); for (auto& rs : to_add) { _rs_version_map.emplace(rs->version(), rs); _timestamped_version_tracker.add_version(rs->version()); rs_metas.push_back(rs->rowset_meta()); } _tablet_meta->modify_rs_metas(rs_metas, {});}Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id, int64_t* flush_size) { if (block->rows() == 0) { return Status::OK(); }
{ SCOPED_RAW_TIMER(&_segment_writer_ns); RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size)); } // delete bitmap and seg compaction are done on the destination BE. return Status::OK();}调用:
Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_t segment_id, int64_t* flush_size) { if (block->rows() == 0) { return Status::OK(); } vectorized::Block flush_block(*block); if (_context.write_type != DataWriteType::TYPE_COMPACTION && _context.tablet_schema->num_variant_columns() > 0) { RETURN_IF_ERROR(_parse_variant_columns(flush_block)); } bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024; if (config::enable_vertical_segment_writer) { std::unique_ptr<segment_v2::VerticalSegmentWriter> writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression)); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows())); RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); } else { std::unique_ptr<segment_v2::SegmentWriter> writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression)); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows())); RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); } return Status::OK();}后面调用:
Status SegmentFlusher::_flush_segment_writer( std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int64_t* flush_size) { MonotonicStopWatch total_timer; total_timer.start();
uint32_t row_num = writer->num_rows_written(); _num_rows_updated += writer->num_rows_updated(); _num_rows_deleted += writer->num_rows_deleted(); _num_rows_new_added += writer->num_rows_new_added(); _num_rows_filtered += writer->num_rows_filtered();
if (row_num == 0) { return Status::OK(); }
MonotonicStopWatch finalize_timer; finalize_timer.start(); uint64_t segment_file_size; uint64_t common_index_size; Status s = writer->finalize(&segment_file_size, &common_index_size); finalize_timer.stop();
if (!s.ok()) { return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string()); }
MonotonicStopWatch inverted_index_timer; inverted_index_timer.start(); int64_t inverted_index_file_size = 0; RETURN_IF_ERROR(writer->close_inverted_index(&inverted_index_file_size)); inverted_index_timer.stop();
VLOG_DEBUG << "tablet_id:" << _context.tablet_id << " flushing filename: " << writer->data_dir_path() << " rowset_id:" << _context.rowset_id;
KeyBoundsPB key_bounds; Slice min_key = writer->min_encoded_key(); Slice max_key = writer->max_encoded_key(); DCHECK_LE(min_key.compare(max_key), 0); key_bounds.set_min_key(min_key.to_string()); key_bounds.set_max_key(max_key.to_string());
uint32_t segment_id = writer->segment_id(); SegmentStatistics segstat; segstat.row_num = row_num; segstat.data_size = segment_file_size; segstat.index_size = inverted_index_file_size; segstat.key_bounds = key_bounds;
writer.reset();
MonotonicStopWatch collector_timer; collector_timer.start(); RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat)); collector_timer.stop();
total_timer.stop();
LOG(INFO) << "tablet_id:" << _context.tablet_id << ", flushing rowset_dir: " << _context.tablet_path << ", rowset_id:" << _context.rowset_id << ", data size:" << PrettyPrinter::print_bytes(segstat.data_size) << ", index size:" << PrettyPrinter::print_bytes(segstat.index_size) << ", timing breakdown: total=" << total_timer.elapsed_time_milliseconds() << "ms" << ", finalize=" << finalize_timer.elapsed_time_milliseconds() << "ms" << ", inverted_index=" << inverted_index_timer.elapsed_time_milliseconds() << "ms" << ", collector=" << collector_timer.elapsed_time_milliseconds() << "ms";
if (flush_size) { *flush_size = segment_file_size; } return Status::OK();}核心转换逻辑,将行转换成列:
std::pair<Status, IOlapColumnDataAccessor*> OlapBlockDataConvertor::convert_column_data( size_t cid) { assert(cid < _convertors.size()); auto convert_func = [&]() -> Status { RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_convertors[cid]->convert_to_olap()); return Status::OK(); }; auto status = convert_func(); return {status, _convertors[cid].get()};}Table: user_orders├── Partition: p202401 (范围分区)│ ├── Tablet 167 (Hash分桶)│ │ ├── Rowset v1-v5 (基础版本)│ │ │ ├── Segment 0 (256MB, 200万行)│ │ │ │ ├── Page 0-4095: user_id 列│ │ │ │ ├── Page 4096-8191: amount 列│ │ │ │ └── Page 8192-12287: order_time 列│ │ │ └── Segment 1 (256MB, 200万行)│ │ ├── Rowset v6-v9 (增量版本)│ │ └── Rowset v10 (最新版本)│ ├── Tablet 168│ └── ...└── Partition: p202402逻辑层: 表 (Table) │ ▼物理层: Tablet (数据分片) │ ▼版本层: Rowset (行集,版本管理单元) │ ▼文件层: Segment (段,物理文件) │ ▼存储层: Page (页,数据块) │ ▼数据层: Column (列,存储格式)// 表达式计算得到列数据
ColumnPtr VExpr::get_const_col(const Block& block, size_t row_idx) { // 计算这一行这个列的值 // 例如:对于常量值 'Alice',直接返回包含'Alice'的列}
// 位置: doris/be/src/vec/columns/column.h// 实际的列数据插入
template <typename T>void ColumnVector<T>::insert_range_from(const IColumn& src, size_t start, size_t length) { const auto& src_vec = static_cast<const ColumnVector<T>&>(src);
// 将源列的数据追加到当前列 size_t old_size = data.size(); data.resize(old_size + length);
// 关键:这里发生了数据复制,从源列到目标列 memcpy(&data[old_size], &src_vec.data[start], length * sizeof(T));} void insert_data(const char* pos, size_t length) override { const size_t old_size = chars.size(); const size_t new_size = old_size + length;
if (length) { check_chars_length(new_size, offsets.size() + 1); chars.resize(new_size); memcpy(chars.data() + old_size, pos, length); } offsets.push_back(new_size); sanity_check_simple(); } Doris 写入
https://tatamagic.com/posts/doris/