558 words
3 minutes
Doris 写入
Doris 存储层次
doris 首先有内存的memtable—>rowset->segment->column writer
-
rowset:rowset 是描述一次写入
-
segment:一个rowset可能由多个segment组成
-
tablet:tablet 是包括多个segment,一个segment包括多个rowset
-
column一行数据有多个column
void Tablet::add_rowsets(const std::vector<RowsetSharedPtr>& to_add) { if (to_add.empty()) { return; } std::vector<RowsetMetaSharedPtr> rs_metas; rs_metas.reserve(to_add.size()); for (auto& rs : to_add) { _rs_version_map.emplace(rs->version(), rs); _timestamped_version_tracker.add_version(rs->version()); rs_metas.push_back(rs->rowset_meta()); } _tablet_meta->modify_rs_metas(rs_metas, {});}Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id, int64_t* flush_size) { if (block->rows() == 0) { return Status::OK(); }
{ SCOPED_RAW_TIMER(&_segment_writer_ns); RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size)); } // delete bitmap and seg compaction are done on the destination BE. return Status::OK();}调用:
Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_t segment_id, int64_t* flush_size) { if (block->rows() == 0) { return Status::OK(); } vectorized::Block flush_block(*block); if (_context.write_type != DataWriteType::TYPE_COMPACTION && _context.tablet_schema->num_variant_columns() > 0) { RETURN_IF_ERROR(_parse_variant_columns(flush_block)); } bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024; if (config::enable_vertical_segment_writer) { std::unique_ptr<segment_v2::VerticalSegmentWriter> writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression)); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows())); RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); } else { std::unique_ptr<segment_v2::SegmentWriter> writer; RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression)); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows())); RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size)); } return Status::OK();}后面调用:
Status SegmentFlusher::_flush_segment_writer( std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int64_t* flush_size) { MonotonicStopWatch total_timer; total_timer.start();
uint32_t row_num = writer->num_rows_written(); _num_rows_updated += writer->num_rows_updated(); _num_rows_deleted += writer->num_rows_deleted(); _num_rows_new_added += writer->num_rows_new_added(); _num_rows_filtered += writer->num_rows_filtered();
if (row_num == 0) { return Status::OK(); }
MonotonicStopWatch finalize_timer; finalize_timer.start(); uint64_t segment_file_size; uint64_t common_index_size; Status s = writer->finalize(&segment_file_size, &common_index_size); finalize_timer.stop();
if (!s.ok()) { return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string()); }
MonotonicStopWatch inverted_index_timer; inverted_index_timer.start(); int64_t inverted_index_file_size = 0; RETURN_IF_ERROR(writer->close_inverted_index(&inverted_index_file_size)); inverted_index_timer.stop();
VLOG_DEBUG << "tablet_id:" << _context.tablet_id << " flushing filename: " << writer->data_dir_path() << " rowset_id:" << _context.rowset_id;
KeyBoundsPB key_bounds; Slice min_key = writer->min_encoded_key(); Slice max_key = writer->max_encoded_key(); DCHECK_LE(min_key.compare(max_key), 0); key_bounds.set_min_key(min_key.to_string()); key_bounds.set_max_key(max_key.to_string());
uint32_t segment_id = writer->segment_id(); SegmentStatistics segstat; segstat.row_num = row_num; segstat.data_size = segment_file_size; segstat.index_size = inverted_index_file_size; segstat.key_bounds = key_bounds;
writer.reset();
MonotonicStopWatch collector_timer; collector_timer.start(); RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat)); collector_timer.stop();
total_timer.stop();
LOG(INFO) << "tablet_id:" << _context.tablet_id << ", flushing rowset_dir: " << _context.tablet_path << ", rowset_id:" << _context.rowset_id << ", data size:" << PrettyPrinter::print_bytes(segstat.data_size) << ", index size:" << PrettyPrinter::print_bytes(segstat.index_size) << ", timing breakdown: total=" << total_timer.elapsed_time_milliseconds() << "ms" << ", finalize=" << finalize_timer.elapsed_time_milliseconds() << "ms" << ", inverted_index=" << inverted_index_timer.elapsed_time_milliseconds() << "ms" << ", collector=" << collector_timer.elapsed_time_milliseconds() << "ms";
if (flush_size) { *flush_size = segment_file_size; } return Status::OK();}Table: user_orders├── Partition: p202401 (范围分区)│ ├── Tablet 167 (Hash分桶)│ │ ├── Rowset v1-v5 (基础版本)│ │ │ ├── Segment 0 (256MB, 200万行)│ │ │ │ ├── Page 0-4095: user_id 列│ │ │ │ ├── Page 4096-8191: amount 列│ │ │ │ └── Page 8192-12287: order_time 列│ │ │ └── Segment 1 (256MB, 200万行)│ │ ├── Rowset v6-v9 (增量版本)│ │ └── Rowset v10 (最新版本)│ ├── Tablet 168│ └── ...└── Partition: p202402逻辑层: 表 (Table) │ ▼物理层: Tablet (数据分片) │ ▼版本层: Rowset (行集,版本管理单元) │ ▼文件层: Segment (段,物理文件) │ ▼存储层: Page (页,数据块) │ ▼数据层: Column (列,存储格式)