3167 words
16 minutes
Doris 写入

Doris 存储层次#

doris 首先有内存的memtable—>rowset->segment->column writer

  • rowset:rowset 是描述一次写入

  • segment:一个rowset可能由多个segment组成

  • tablet:tablet 是包括多个segment,一个segment包括多个rowset

  • column一行数据有多个column

导入代码#

整个流程第一步:

Table: user_orders
├── Partition: p202401 (范围分区)
│ ├── Tablet 167 (Hash分桶)
│ │ ├── Rowset v1-v5 (基础版本)
│ │ │ ├── Segment 0 (256MB, 200万行)
│ │ │ │ ├── Page 0-4095: user_id 列
│ │ │ │ ├── Page 4096-8191: amount 列
│ │ │ │ └── Page 8192-12287: order_time 列
│ │ │ └── Segment 1 (256MB, 200万行)
│ │ ├── Rowset v6-v9 (增量版本)
│ │ └── Rowset v10 (最新版本)
│ ├── Tablet 168
│ └── ...
└── Partition: p202402
逻辑层: 表 (Table)
物理层: Tablet (数据分片)
版本层: Rowset (行集,版本管理单元)
文件层: Segment (段,物理文件)
存储层: Page (页,数据块)
数据层: Column (列,存储格式)
doris/be/src/vec/exprs/vexpr.cpp
// 表达式计算得到列数据
ColumnPtr VExpr::get_const_col(const Block& block, size_t row_idx) {
// 计算这一行这个列的值
// 例如:对于常量值 'Alice',直接返回包含'Alice'的列
}
// 位置: doris/be/src/vec/columns/column.h
// 实际的列数据插入
template <typename T>
void ColumnVector<T>::insert_range_from(const IColumn& src, size_t start, size_t length) {
const auto& src_vec = static_cast<const ColumnVector<T>&>(src);
// 将源列的数据追加到当前列
size_t old_size = data.size();
data.resize(old_size + length);
// 关键:这里发生了数据复制,从源列到目标列
memcpy(&data[old_size], &src_vec.data[start], length * sizeof(T));
}
void insert_data(const char* pos, size_t length) override {
const size_t old_size = chars.size();
const size_t new_size = old_size + length;
if (length) {
check_chars_length(new_size, offsets.size() + 1);
chars.resize(new_size);
memcpy(chars.data() + old_size, pos, length);
}
offsets.push_back(new_size);
sanity_check_simple();
}

以streamload 为例#

streamload 使用生产者和消费者模型

生产者#

下面是生产者的堆栈

* thread #431, name = 'rs_normal [work', stop reason = breakpoint 6.1
* frame #0: 0x0000556a4b123c4b doris_be`doris::vectorized::NewJsonReader::_simdjson_set_column_value(this=0x00007f9433b9a180, value=0x00007f96db7721f8, block=0x00007f97c81a8ce8, slot_descs=size=4, valid=0x00007f96db7721ef) at new_json_reader.cpp:935:13
frame #1: 0x0000556a4b1234c4 doris_be`doris::vectorized::NewJsonReader::_simdjson_handle_simple_json_write_columns(this=0x00007f9433b9a180, block=0x00007f97c81a8ce8, slot_descs=size=4, is_empty_row=0x00007f96db7723ef, eof=0x00007f9433b9a388) at new_json_reader.cpp:705:17
frame #2: 0x0000556a4b11ffea doris_be`doris::vectorized::NewJsonReader::_simdjson_handle_simple_json(this=0x00007f9433b9a180, (null)=0x00007f953b52ba00, block=0x00007f97c81a8ce8, slot_descs=size=4, is_empty_row=0x00007f96db7723ef, eof=0x00007f9433b9a388) at new_json_reader.cpp:668:9
frame #3: 0x0000556a4b11e737 doris_be`doris::vectorized::NewJsonReader::_read_json_column(this=0x00007f9433b9a180, state=0x00007f953b52ba00, block=0x00007f97c81a8ce8, slot_descs=size=4, is_empty_row=0x00007f96db7723ef, eof=0x00007f9433b9a388) at new_json_reader.cpp:502:12
frame #4: 0x0000556a4b11bcee doris_be`doris::vectorized::NewJsonReader::get_next_block(this=0x00007f9433b9a180, block=0x00007f97c81a8ce8, read_rows=0x00007f96db7724b8, eof=0x00007f97c81a8a30) at new_json_reader.cpp:217:9
frame #5: 0x0000556a4b60e080 doris_be`doris::vectorized::FileScanner::_get_block_wrapped(this=0x00007f97c81a8000, state=0x00007f953b52ba00, block=0x00007f978f75cfe0, eof=0x00007f96db7730d7) at file_scanner.cpp:465:13
frame #6: 0x0000556a4b607e20 doris_be`doris::vectorized::FileScanner::_get_block_impl(this=0x00007f97c81a8000, state=0x00007f953b52ba00, block=0x00007f978f75cfe0, eof=0x00007f96db7730d7) at file_scanner.cpp:402:17
frame #7: 0x0000556a4b6eeaab doris_be`doris::vectorized::Scanner::get_block(this=0x00007f97c81a8000, state=0x00007f953b52ba00, block=0x00007f978f75cfe0, eof=0x00007f96db7730d7) at scanner.cpp:143:17
frame #8: 0x0000556a4b6ee67e doris_be`doris::vectorized::Scanner::get_block_after_projects(this=0x00007f97c81a8000, state=0x00007f953b52ba00, block=0x00007f978f75cfe0, eos=0x00007f96db7730d7) at scanner.cpp:119:16
frame #9: 0x0000556a4b6f67a3 doris_be`doris::vectorized::ScannerScheduler::_scanner_scan(ctx=std::__shared_ptr<doris::vectorized::ScannerContext, __gnu_cxx::_S_atomic>::element_type @ 0x00007f953b0f8210, scan_task=std::__shared_ptr<doris::vectorized::ScanTask, __gnu_cxx::_S_atomic>::element_type @ 0x00007f953b542290) at scanner_scheduler.cpp:177:5
frame #10: 0x0000556a4b6f569d doris_be`doris::vectorized::ScannerScheduler::submit(this=0x00007f96db773620)::$_0::operator()() const::'lambda'()::operator()() const::'lambda'()::operator()() const at scanner_scheduler.cpp:75:17
frame #11: 0x0000556a4b6f54b7 doris_be`doris::vectorized::ScannerScheduler::submit(this=0x00007f97cafd0300)::$_0::operator()() const::'lambda'()::operator()() const at scanner_scheduler.cpp:74:27
frame #12: 0x0000556a4b6f5475 doris_be`bool std::__invoke_impl<bool, doris::vectorized::ScannerScheduler::submit(std::shared_ptr<doris::vectorized::ScannerContext>, std::shared_ptr<doris::vectorized::ScanTask>)::$_0::operator()() const::'lambda'()&>((null)=__invoke_other @ 0x00007f96db77366f, __f=0x00007f97cafd0300) at invoke.h:61:14
frame #13: 0x0000556a4b6f5435 doris_be`std::enable_if<is_invocable_r_v<bool, doris::vectorized::ScannerScheduler::submit(std::shared_ptr<doris::vectorized::ScannerContext>, std::shared_ptr<doris::vectorized::ScanTask>)::$_0::operator()() const::'lambda'()&>, bool>::type std::__invoke_r<bool, doris::vectorized::ScannerScheduler::submit(std::shared_ptr<doris::vectorized::ScannerContext>, std::shared_ptr<doris::vectorized::ScanTask>)::$_0::operator()() const::'lambda'()&>(__fn=0x00007f97cafd0300) at invoke.h:114:9
frame #14: 0x0000556a4b6f52ed doris_be`std::_Function_handler<bool (), doris::vectorized::ScannerScheduler::submit(std::shared_ptr<doris::vectorized::ScannerContext>, std::shared_ptr<doris::vectorized::ScanTask>)::$_0::operator()() const::'lambda'()>::_M_invoke(__functor=0x00007f97c8daab38) at std_function.h:290:9
frame #15: 0x0000556a43737e1e doris_be`std::function<bool ()>::operator()(this=0x00007f97c8daab38) const at std_function.h:591:9
frame #16: 0x0000556a4b6f4782 doris_be`doris::vectorized::ScannerSplitRunner::process_for(this=0x00007f97c8daab10, (null)=(__r = 1000000000)) at scanner_scheduler.cpp:414:25
frame #17: 0x0000556a4b75987c doris_be`doris::vectorized::PrioritizedSplitRunner::process(this=0x00007f97ce996b90) at prioritized_split_runner.cpp:103:35
frame #18: 0x0000556a4b73fac0 doris_be`doris::vectorized::TimeSharingTaskExecutor::_dispatch_thread(this=0x00007f97651fe810) at time_sharing_task_executor.cpp:566:77
frame #19: 0x0000556a4b74e5c2 doris_be`void std::__invoke_impl<void, void (doris::vectorized::TimeSharingTaskExecutor::*&)(), doris::vectorized::TimeSharingTaskExecutor*&>((null)=__invoke_memfun_deref @ 0x00007f96db7746cf, __f=0x00007f978f80af80, __t=0x00007f978f80af90) at invoke.h:74:14
frame #20: 0x0000556a4b74e50d doris_be`std::__invoke_result<void (doris::vectorized::TimeSharingTaskExecutor::*&)(), doris::vectorized::TimeSharingTaskExecutor*&>::type std::__invoke<void (doris::vectorized::TimeSharingTaskExecutor::*&)(), doris::vectorized::TimeSharingTaskExecutor*&>(__fn=0x00007f978f80af80, __args=0x00007f978f80af90) at invoke.h:96:14
frame #21: 0x0000556a4b74e4dd doris_be`void std::_Bind<void (doris::vectorized::TimeSharingTaskExecutor::* (doris::vectorized::TimeSharingTaskExecutor*))()>::__call<void, 0ul>(this=0x00007f978f80af80, __args=0x00007f96db774767, (null)=_Index_tuple<0UL> @ 0x00007f96db77473f) at functional:513:11
frame #22: 0x0000556a4b74e496 doris_be`void std::_Bind<void (doris::vectorized::TimeSharingTaskExecutor::* (doris::vectorized::TimeSharingTaskExecutor*))()>::operator()<void>(this=0x00007f978f80af80) at functional:598:17
frame #23: 0x0000556a4b74e465 doris_be`void std::__invoke_impl<void, std::_Bind<void (doris::vectorized::TimeSharingTaskExecutor::* (doris::vectorized::TimeSharingTaskExecutor*))()>&>((null)=__invoke_other @ 0x00007f96db77478f, __f=0x00007f978f80af80) at invoke.h:61:14
frame #24: 0x0000556a4b74e425 doris_be`std::enable_if<is_invocable_r_v<void, std::_Bind<void (doris::vectorized::TimeSharingTaskExecutor::* (doris::vectorized::TimeSharingTaskExecutor*))()>&>, void>::type std::__invoke_r<void, std::_Bind<void (doris::vectorized::TimeSharingTaskExecutor::* (doris::vectorized::TimeSharingTaskExecutor*))()>&>(__fn=0x00007f978f80af80) at invoke.h:111:2
frame #25: 0x0000556a4b74e28d doris_be`std::_Function_handler<void (), std::_Bind<void (doris::vectorized::TimeSharingTaskExecutor::* (doris::vectorized::TimeSharingTaskExecutor*))()>>::_M_invoke(__functor=0x00007f978f821620) at std_function.h:290:9
frame #26: 0x0000556a436da65e doris_be`std::function<void ()>::operator()(this=0x00007f978f821620) const at std_function.h:591:9
frame #27: 0x0000556a4587d962 doris_be`doris::Thread::supervise_thread(arg=0x00007f978f821610) at thread.cpp:460:5
frame #28: 0x00007f9898d66b7b libc.so.6`___lldb_unnamed_symbol3696 + 667
frame #29: 0x00007f9898de47b8 libc.so.6`___lldb_unnamed_symbol4129 + 7

消费者#

下面是消费者的堆栈

* thread #519, name = 'brpc_heavy', stop reason = breakpoint 4.1
* frame #0: 0x000055ad89c6a1e6 doris_be`doris::MemTable::insert(this=0x00007f62caea3600, input_block=0x00007f64cb71fe98, row_idxs=size=1) at memtable.cpp:199:5
frame #1: 0x000055ad89cacc15 doris_be`doris::MemTableWriter::write(this=0x00007f6579f88000, block=0x00007f64cb71fe98, row_idxs=size=1) at memtable_writer.cpp:118:27
frame #2: 0x000055ad8a76e50b doris_be`doris::DeltaWriter::write(this=0x00007f6579f08e00, block=0x00007f64cb71fe98, row_idxs=size=1) at delta_writer.cpp:160:30
frame #3: 0x000055ad8aad6744 doris_be`doris::BaseTabletsChannel::_write_block_data(doris::PTabletWriterAddBlockRequest const&, long, std::unordered_map<long, std::vector<unsigned int, doris::CustomStdAllocator<unsigned int, doris::Allocator<false, false, false, doris::DefaultMemoryAllocator, true>>>, std::hash<long>, std::equal_to<long>, std::allocator<std::pair<long const, std::vector<unsigned int, doris::CustomStdAllocator<unsigned int, doris::Allocator<false, false, false, doris::DefaultMemoryAllocator, true>>>>>>&, doris::PTabletWriterAddBlockResult*)::$_0::operator()(this=0x00007f64cb71fd60, writer=0x00007f6579f08e00) const at tablets_channel.cpp:619:9
frame #4: 0x000055ad8aad66fb doris_be`doris::Status std::__invoke_impl<doris::Status, doris::BaseTabletsChannel::_write_block_data(doris::PTabletWriterAddBlockRequest const&, long, std::unordered_map<long, std::vector<unsigned int, doris::CustomStdAllocator<unsigned int, doris::Allocator<false, false, false, doris::DefaultMemoryAllocator, true>>>, std::hash<long>, std::equal_to<long>, std::allocator<std::pair<long const, std::vector<unsigned int, doris::CustomStdAllocator<unsigned int, doris::Allocator<false, false, false, doris::DefaultMemoryAllocator, true>>>>>>&, doris::PTabletWriterAddBlockResult*)::$_0&, doris::BaseDeltaWriter*>((null)=__invoke_other @ 0x00007f64cb71f927, __f=0x00007f64cb71fd60, __args=0x00007f64cb71f9d8) at invoke.h:61:14
frame #5: 0x000055ad8aad6688 doris_be`std::enable_if<is_invocable_r_v<doris::Status, doris::BaseTabletsChannel::_write_block_data(doris::PTabletWriterAddBlockRequest const&, long, std::unordered_map<long, std::vector<unsigned int, doris::CustomStdAllocator<unsigned int, doris::Allocator<false, false, false, doris::DefaultMemoryAllocator, true>>>, std::hash<long>, std::equal_to<long>, std::allocator<std::pair<long const, std::vector<unsigned int, doris::CustomStdAllocator<unsigned int, doris::Allocator<false, false, false, doris::DefaultMemoryAllocator, true>>>>>>&, doris::PTabletWriterAddBlockResult*)::$_0&, doris::BaseDeltaWriter*>, doris::Status>::type std::__invoke_r<doris::Status, doris::BaseTabletsChannel::_write_block_data(doris::PTabletWriterAddBlockRequest const&, long, std::unordered_map<long, std::vector<unsigned int, doris::CustomStdAllocator<unsigned int, doris::Allocator<false, false, false, doris::DefaultMemoryAllocator, true>>>, std::hash<long>, std::equal_to<long>, std::allocator<std::pair<long const, std::vector<unsigned int, doris::CustomStdAllocator<unsigned int, doris::Allocator<false, false, false, doris::DefaultMemoryAllocator, true>>>>>>&, doris::PTabletWriterAddBlockResult*)::$_0&, doris::BaseDeltaWriter*>(__fn=0x00007f64cb71fd60, __args=0x00007f64cb71f9d8) at invoke.h:114:9
frame #6: 0x000055ad8aad6588 doris_be`std::_Function_handler<doris::Status (doris::BaseDeltaWriter*), doris::BaseTabletsChannel::_write_block_data(doris::PTabletWriterAddBlockRequest const&, long, std::unordered_map<long, std::vector<unsigned int, doris::CustomStdAllocator<unsigned int, doris::Allocator<false, false, false, doris::DefaultMemoryAllocator, true>>>, std::hash<long>, std::equal_to<long>, std::allocator<std::pair<long const, std::vector<unsigned int, doris::CustomStdAllocator<unsigned int, doris::Allocator<false, false, false, doris::DefaultMemoryAllocator, true>>>>>>&, doris::PTabletWriterAddBlockResult*)::$_0>::_M_invoke(__functor=0x00007f64cb71fd60, __args=0x00007f64cb71f9d8) at std_function.h:290:9
frame #7: 0x000055ad8aae0b59 doris_be`std::function<doris::Status (doris::BaseDeltaWriter*)>::operator()(this=0x00007f64cb71fd60, __args=0x00007f6579f08e00) const at std_function.h:591:9
frame #8: 0x000055ad8aad5ca5 doris_be`doris::BaseTabletsChannel::_write_block_data(doris::PTabletWriterAddBlockRequest const&, long, std::unordered_map<long, std::vector<unsigned int, doris::CustomStdAllocator<unsigned int, doris::Allocator<false, false, false, doris::DefaultMemoryAllocator, true>>>, std::hash<long>, std::equal_to<long>, std::allocator<std::pair<long const, std::vector<unsigned int, doris::CustomStdAllocator<unsigned int, doris::Allocator<false, false, false, doris::DefaultMemoryAllocator, true>>>>>>&, doris::PTabletWriterAddBlockResult*)::$_2::operator()(this=0x00007f64cb71fde0, tablet_id=1772371123463, write_func=function<doris::Status (doris::BaseDeltaWriter *)> @ 0x00007f64cb71fd60) const at tablets_channel.cpp:599:21
frame #9: 0x000055ad8aad5855 doris_be`doris::BaseTabletsChannel::_write_block_data(this=0x00007f6579d54b10, request=0x00007f631fa65d00, cur_seq=0, tablet_to_rowidxs=size=1, response=0x00007f61bef4f9c0) at tablets_channel.cpp:619:9
frame #10: 0x000055ad8aad05ad doris_be`doris::TabletsChannel::add_batch(this=0x00007f6579d54b10, request=0x00007f631fa65d00, response=0x00007f61bef4f9c0) at tablets_channel.cpp:657:12
frame #11: 0x000055ad8a919e7c doris_be`doris::LoadChannel::add_batch(this=0x00007f6579c1a600, request=0x00007f631fa65d00, response=0x00007f61bef4f9c0) at load_channel.cpp:195:9
frame #12: 0x000055ad8a90e6c1 doris_be`doris::LoadChannelMgr::add_batch(this=0x00007f65e4f0d280, request=0x00007f631fa65d00, response=0x00007f61bef4f9c0) at load_channel_mgr.cpp:178:26
frame #13: 0x000055ad8ac5dc47 doris_be`doris::PInternalService::tablet_writer_add_block(google::protobuf::RpcController*, doris::PTabletWriterAddBlockRequest const*, doris::PTabletWriterAddBlockResult*, google::protobuf::Closure*)::$_0::operator()(this=0x00007f65e2726640) const at internal_service.cpp:502:54
frame #14: 0x000055ad8ac5db85 doris_be`void std::__invoke_impl<void, doris::PInternalService::tablet_writer_add_block(google::protobuf::RpcController*, doris::PTabletWriterAddBlockRequest const*, doris::PTabletWriterAddBlockResult*, google::protobuf::Closure*)::$_0&>((null)=__invoke_other @ 0x00007f64cb72057f, __f=0x00007f65e2726640) at invoke.h:61:14
frame #15: 0x000055ad8ac5db45 doris_be`std::enable_if<is_invocable_r_v<void, doris::PInternalService::tablet_writer_add_block(google::protobuf::RpcController*, doris::PTabletWriterAddBlockRequest const*, doris::PTabletWriterAddBlockResult*, google::protobuf::Closure*)::$_0&>, void>::type std::__invoke_r<void, doris::PInternalService::tablet_writer_add_block(google::protobuf::RpcController*, doris::PTabletWriterAddBlockRequest const*, doris::PTabletWriterAddBlockResult*, google::protobuf::Closure*)::$_0&>(__fn=0x00007f65e2726640) at invoke.h:111:2
frame #16: 0x000055ad8ac5da2d doris_be`std::_Function_handler<void (), doris::PInternalService::tablet_writer_add_block(google::protobuf::RpcController*, doris::PTabletWriterAddBlockRequest const*, doris::PTabletWriterAddBlockResult*, google::protobuf::Closure*)::$_0>::_M_invoke(__functor=0x00007f64cb7206d0) at std_function.h:290:9
frame #17: 0x000055ad88d2265e doris_be`std::function<void ()>::operator()(this=0x00007f64cb7206d0) const at std_function.h:591:9
frame #18: 0x000055ad8ac7e6e2 doris_be`doris::WorkThreadPool<false>::work_thread(this=0x00007f658ae2ac10, thread_id=1) at work_thread_pool.hpp:159:17
frame #19: 0x000055ad8ac7f1cc doris_be`void std::__invoke_impl<void, void (doris::WorkThreadPool<false>::* const&)(int), doris::WorkThreadPool<false>*&, int&>((null)=__invoke_memfun_deref @ 0x00007f64cb7207df, __f=0x00007f658108b2a8, __t=0x00007f658108b2c0, __args=0x00007f658108b2b8) at invoke.h:74:14
frame #20: 0x000055ad8ac7f155 doris_be`std::__invoke_result<void (doris::WorkThreadPool<false>::* const&)(int), doris::WorkThreadPool<false>*&, int&>::type std::__invoke<void (doris::WorkThreadPool<false>::* const&)(int), doris::WorkThreadPool<false>*&, int&>(__fn=0x00007f658108b2a8, __args=0x00007f658108b2c0, __args=0x00007f658108b2b8) at invoke.h:96:14
frame #21: 0x000055ad8ac7f125 doris_be`decltype(std::__invoke((*this)._M_pmf, std::forward<doris::WorkThreadPool<false>*&>(fp), std::forward<int&>(fp))) std::_Mem_fn_base<void (doris::WorkThreadPool<false>::*)(int), true>::operator()<doris::WorkThreadPool<false>*&, int&>(this=0x00007f658108b2a8, __args=0x00007f658108b2c0, __args=0x00007f658108b2b8) const at functional:177:11
frame #22: 0x000055ad8ac7f0f5 doris_be`void std::__invoke_impl<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)>&, doris::WorkThreadPool<false>*&, int&>((null)=__invoke_other @ 0x00007f64cb72086f, __f=0x00007f658108b2a8, __args=0x00007f658108b2c0, __args=0x00007f658108b2b8) at invoke.h:61:14
frame #23: 0x000055ad8ac7f065 doris_be`std::enable_if<is_invocable_r_v<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)>&, doris::WorkThreadPool<false>*&, int&>, void>::type std::__invoke_r<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)>&, doris::WorkThreadPool<false>*&, int&>(__fn=0x00007f658108b2a8, __args=0x00007f658108b2c0, __args=0x00007f658108b2b8) at invoke.h:111:2
frame #24: 0x000055ad8ac7f032 doris_be`void std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>::__call<void, 0ul, 1ul>(this=0x00007f658108b2a8, __args=0x00007f64cb720907, (null)=_Index_tuple<0UL, 1UL> @ 0x00007f64cb7208df) at functional:661:11
frame #25: 0x000055ad8ac7efc6 doris_be`void std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>::operator()<>(this=0x00007f658108b2a8) at functional:720:17
frame #26: 0x000055ad8ac7ef95 doris_be`void std::__invoke_impl<void, std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>>((null)=__invoke_other @ 0x00007f64cb72092f, __f=0x00007f658108b2a8) at invoke.h:61:14
frame #27: 0x000055ad8ac7ef55 doris_be`std::__invoke_result<std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>>::type std::__invoke<std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>>(__fn=0x00007f658108b2a8) at invoke.h:96:14
frame #28: 0x000055ad8ac7ef2d doris_be`void std::thread::_Invoker<std::tuple<std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>>>::_M_invoke<0ul>(this=0x00007f658108b2a8, (null)=_Index_tuple<0UL> @ 0x00007f64cb72096f) at std_thread.h:301:13
frame #29: 0x000055ad8ac7ef05 doris_be`std::thread::_Invoker<std::tuple<std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>>>::operator()(this=0x00007f658108b2a8) at std_thread.h:308:11
frame #30: 0x000055ad8ac7ee49 doris_be`std::thread::_State_impl<std::thread::_Invoker<std::tuple<std::_Bind_result<void, std::_Mem_fn<void (doris::WorkThreadPool<false>::*)(int)> (doris::WorkThreadPool<false>*, int)>>>>::_M_run(this=0x00007f658108b2a0) at std_thread.h:253:13
frame #31: 0x00007f66b50e1224 libstdc++.so.6`___lldb_unnamed_symbol8036 + 20
frame #32: 0x00007f66b4d66b7b libc.so.6`___lldb_unnamed_symbol3696 + 667
frame #33: 0x00007f66b4de47b8 libc.so.6`___lldb_unnamed_symbol4129 + 7

如果memtable满了之后会推到线程池然后

void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
int64_t submit_task_time) {
signal::set_signal_task_id(_rowset_writer->load_id());
signal::tablet_id = memtable_ptr->tablet_id();
Defer defer {[&]() {
std::lock_guard<std::mutex> lock(_mutex);
_stats.flush_submit_count--;
if (_stats.flush_submit_count == 0) {
_submit_task_finish_cond.notify_one();
}
_stats.flush_running_count--;
if (_stats.flush_running_count == 0) {
_running_task_finish_cond.notify_one();
}
}};
DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_before_first_shutdown",
{ std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
if (_is_shutdown()) {
return;
}
DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_first_shutdown",
{ std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
_stats.flush_running_count++;
// double check if shutdown to avoid wait running task finish count not accurate
if (_is_shutdown()) {
return;
}
DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_second_shutdown",
{ std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
_stats.flush_wait_time_ns += flush_wait_time_ns;
// If previous flush has failed, return directly
{
std::shared_lock rdlk(_flush_status_lock);
if (!_flush_status.ok()) {
return;
}
}
MonotonicStopWatch timer;
timer.start();
size_t memory_usage = memtable_ptr->memory_usage();
int64_t flush_size;
Status s = _do_flush_memtable(memtable_ptr.get(), segment_id, &flush_size);
{
std::shared_lock rdlk(_flush_status_lock);
if (!_flush_status.ok()) {
return;
}
}
if (!s.ok()) {
std::lock_guard wrlk(_flush_status_lock);
LOG(WARNING) << "Flush memtable failed with res = " << s
<< ", load_id: " << print_id(_rowset_writer->load_id());
_flush_status = s;
return;
}
VLOG_CRITICAL << "flush memtable wait time: "
<< PrettyPrinter::print(flush_wait_time_ns, TUnit::TIME_NS)
<< ", flush memtable cost: "
<< PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS)
<< ", submit count: " << _stats.flush_submit_count
<< ", running count: " << _stats.flush_running_count
<< ", finish count: " << _stats.flush_finish_count
<< ", mem size: " << PrettyPrinter::print_bytes(memory_usage)
<< ", disk size: " << PrettyPrinter::print_bytes(flush_size);
_stats.flush_time_ns += timer.elapsed_time();
_stats.flush_finish_count++;
_stats.flush_size_bytes += memtable_ptr->memory_usage();
_stats.flush_disk_size_bytes += flush_size;
}

线程池的task最后执行的是

class MemtableFlushTask final : public Runnable {
ENABLE_FACTORY_CREATOR(MemtableFlushTask);
public:
MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::shared_ptr<MemTable> memtable,
int32_t segment_id, int64_t submit_task_time)
: _flush_token(flush_token),
_memtable(memtable),
_segment_id(segment_id),
_submit_task_time(submit_task_time) {
g_flush_task_num << 1;
}
~MemtableFlushTask() override { g_flush_task_num << -1; }
void run() override {
auto token = _flush_token.lock();
if (token) {
token->_flush_memtable(_memtable, _segment_id, _submit_task_time); // 核心代码
} else {
LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
}
}
private:
std::weak_ptr<FlushToken> _flush_token;
std::shared_ptr<MemTable> _memtable;
int32_t _segment_id;
int64_t _submit_task_time;
};

执行的是FlushToken::_do_flush_memtable

void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
int64_t submit_task_time) {
signal::set_signal_task_id(_rowset_writer->load_id());
signal::tablet_id = memtable_ptr->tablet_id();
Defer defer {[&]() {
std::lock_guard<std::mutex> lock(_mutex);
_stats.flush_submit_count--;
if (_stats.flush_submit_count == 0) {
_submit_task_finish_cond.notify_one();
}
_stats.flush_running_count--;
if (_stats.flush_running_count == 0) {
_running_task_finish_cond.notify_one();
}
}};
DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_before_first_shutdown",
{ std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
if (_is_shutdown()) {
return;
}
DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_first_shutdown",
{ std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
_stats.flush_running_count++;
// double check if shutdown to avoid wait running task finish count not accurate
if (_is_shutdown()) {
return;
}
DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_second_shutdown",
{ std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); });
uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
_stats.flush_wait_time_ns += flush_wait_time_ns;
// If previous flush has failed, return directly
{
std::shared_lock rdlk(_flush_status_lock);
if (!_flush_status.ok()) {
return;
}
}
MonotonicStopWatch timer;
timer.start();
size_t memory_usage = memtable_ptr->memory_usage();
int64_t flush_size;
Status s = _do_flush_memtable(memtable_ptr.get(), segment_id, &flush_size);
{
std::shared_lock rdlk(_flush_status_lock);
if (!_flush_status.ok()) {
return;
}
}
if (!s.ok()) {
std::lock_guard wrlk(_flush_status_lock);
LOG(WARNING) << "Flush memtable failed with res = " << s
<< ", load_id: " << print_id(_rowset_writer->load_id());
_flush_status = s;
return;
}
VLOG_CRITICAL << "flush memtable wait time: "
<< PrettyPrinter::print(flush_wait_time_ns, TUnit::TIME_NS)
<< ", flush memtable cost: "
<< PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS)
<< ", submit count: " << _stats.flush_submit_count
<< ", running count: " << _stats.flush_running_count
<< ", finish count: " << _stats.flush_finish_count
<< ", mem size: " << PrettyPrinter::print_bytes(memory_usage)
<< ", disk size: " << PrettyPrinter::print_bytes(flush_size);
_stats.flush_time_ns += timer.elapsed_time();
_stats.flush_finish_count++;
_stats.flush_size_bytes += memtable_ptr->memory_usage();
_stats.flush_disk_size_bytes += flush_size;
}

最后执行的是FlushToken::_do_flush_memtable

Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) {
VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id()
<< ", memsize: " << PrettyPrinter::print_bytes(memtable->memory_usage())
<< ", rows: " << memtable->stat().raw_rows;
memtable->update_mem_type(MemType::FLUSH);
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
SCOPED_ATTACH_TASK(memtable->resource_ctx());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker());
SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
// DEFER_RELEASE_RESERVED();
// auto reserve_size = memtable->get_flush_reserve_memory_size();
// if (memtable->resource_ctx()->task_controller()->is_enable_reserve_memory() &&
// reserve_size > 0) {
// RETURN_IF_ERROR(_try_reserve_memory(memtable->resource_ctx(), reserve_size));
// }
// Defer defer {[&]() {
// ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
// }};
std::unique_ptr<Block> block;
RETURN_IF_ERROR(memtable->to_block(&block));
RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size));
memtable->set_flush_success();
}
_memtable_stat += memtable->stat();
DorisMetrics::instance()->memtable_flush_total->increment(1);
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
VLOG_CRITICAL << "after flush memtable for tablet: " << memtable->tablet_id()
<< ", flushsize: " << PrettyPrinter::print_bytes(*flush_size);
return Status::OK();
}
Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) {
VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id()
<< ", memsize: " << PrettyPrinter::print_bytes(memtable->memory_usage())
<< ", rows: " << memtable->stat().raw_rows;
memtable->update_mem_type(MemType::FLUSH);
int64_t duration_ns = 0;
{
...
RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size));
...
}

我们看看实现

Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) {
if (block->rows() == 0) {
return Status::OK();
}
{
SCOPED_RAW_TIMER(&_segment_writer_ns);
RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size));
}
// delete bitmap and seg compaction are done on the destination BE.
return Status::OK();
}
void Tablet::add_rowsets(const std::vector<RowsetSharedPtr>& to_add) {
if (to_add.empty()) {
return;
}
std::vector<RowsetMetaSharedPtr> rs_metas;
rs_metas.reserve(to_add.size());
for (auto& rs : to_add) {
_rs_version_map.emplace(rs->version(), rs);
_timestamped_version_tracker.add_version(rs->version());
rs_metas.push_back(rs->rowset_meta());
}
_tablet_meta->modify_rs_metas(rs_metas, {});
}
Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) {
if (block->rows() == 0) {
return Status::OK();
}
{
SCOPED_RAW_TIMER(&_segment_writer_ns);
RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size));
}
// delete bitmap and seg compaction are done on the destination BE.
return Status::OK();
}

调用:

Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) {
if (block->rows() == 0) {
return Status::OK();
}
vectorized::Block flush_block(*block);
if (_context.write_type != DataWriteType::TYPE_COMPACTION &&
_context.tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(_parse_variant_columns(flush_block));
}
bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
if (config::enable_vertical_segment_writer) {
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
} else {
std::unique_ptr<segment_v2::SegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
}
return Status::OK();
}

后面调用

Status SegmentWriter::append_block(const Block* block, size_t row_pos, size_t num_rows) {
if (_opts.rowset_ctx->partial_update_info &&
_opts.rowset_ctx->partial_update_info->is_partial_update() &&
_opts.write_type == DataWriteType::TYPE_DIRECT &&
!_opts.rowset_ctx->is_transient_rowset_writer) {
if (_opts.rowset_ctx->partial_update_info->is_fixed_partial_update()) {
RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows));
} else {
return Status::NotSupported<false>(
"SegmentWriter doesn't support flexible partial update, please set "
"enable_vertical_segment_writer=true in be.conf on all BEs to use "
"VerticalSegmentWriter.");
}
return Status::OK();
}
if (block->columns() < _column_writers.size()) {
return Status::InternalError(
"block->columns() < _column_writers.size(), block->columns()=" +
std::to_string(block->columns()) +
", _column_writers.size()=" + std::to_string(_column_writers.size()) +
", _tablet_schema->dump_structure()=" + _tablet_schema->dump_structure());
}
CHECK(block->columns() >= _column_writers.size())
<< ", block->columns()=" << block->columns()
<< ", _column_writers.size()=" << _column_writers.size()
<< ", _tablet_schema->dump_structure()=" << _tablet_schema->dump_structure();
// Row column should be filled here when it's a directly write from memtable
// or it's schema change write(since column data type maybe changed, so we should reubild)
if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
_opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
_serialize_block_to_row_column(*block);
}
if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
_tablet_schema->num_variant_columns() > 0) {
RETURN_IF_ERROR(variant_util::parse_and_materialize_variant_columns(
const_cast<Block&>(*block), *_tablet_schema, _column_ids));
}
_olap_data_convertor->set_source_content(block, row_pos, num_rows);
// find all row pos for short key indexes
std::vector<size_t> short_key_pos;
if (_has_key) {
// We build a short key index every `_opts.num_rows_per_block` rows. Specifically, we
// build a short key index using 1st rows for first block and `_short_key_row_pos - _row_count`
// for next blocks.
// Ensure we build a short key index using 1st rows only for the first block (ISSUE-9766).
if (UNLIKELY(_short_key_row_pos == 0 && _num_rows_written == 0)) {
short_key_pos.push_back(0);
}
while (_short_key_row_pos + _opts.num_rows_per_block < _num_rows_written + num_rows) {
_short_key_row_pos += _opts.num_rows_per_block;
short_key_pos.push_back(_short_key_row_pos - _num_rows_written);
}
}
// convert column data from engine format to storage layer format
std::vector<IOlapColumnDataAccessor*> key_columns;
IOlapColumnDataAccessor* seq_column = nullptr;
for (size_t id = 0; id < _column_writers.size(); ++id) {
// olap data convertor alway start from id = 0
auto converted_result = _olap_data_convertor->convert_column_data(id);
if (!converted_result.first.ok()) {
return converted_result.first;
}
auto cid = _column_ids[id];
if (_has_key && cid < _tablet_schema->num_key_columns()) {
key_columns.push_back(converted_result.second);
} else if (_has_key && _tablet_schema->has_sequence_col() &&
cid == _tablet_schema->sequence_col_idx()) {
seq_column = converted_result.second;
}
RETURN_IF_ERROR(_column_writers[id]->append(converted_result.second->get_nullmap(),
converted_result.second->get_data(), num_rows));
}
if (_opts.write_type == DataWriteType::TYPE_COMPACTION) {
RETURN_IF_ERROR(
_variant_stats_calculator->calculate_variant_stats(block, row_pos, num_rows));
}
if (_has_key) {
if (_is_mow_with_cluster_key()) {
// for now we don't need to query short key index for CLUSTER BY feature,
// but we still write the index for future usage.
// 1. generate primary key index, the key_columns is primary_key_columns
RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, key_columns,
seq_column, num_rows, true));
// 2. generate short key index (use cluster key)
key_columns.clear();
for (const auto& cid : _tablet_schema->cluster_key_uids()) {
// find cluster key index in tablet schema
auto cluster_key_index = _tablet_schema->field_index(cid);
if (cluster_key_index == -1) {
return Status::InternalError(
"could not find cluster key column with unique_id=" +
std::to_string(cid) + " in tablet schema");
}
bool found = false;
for (auto i = 0; i < _column_ids.size(); ++i) {
if (_column_ids[i] == cluster_key_index) {
auto converted_result = _olap_data_convertor->convert_column_data(i);
if (!converted_result.first.ok()) {
return converted_result.first;
}
key_columns.push_back(converted_result.second);
found = true;
break;
}
}
if (!found) {
return Status::InternalError(
"could not found cluster key column with unique_id=" +
std::to_string(cid) +
", tablet schema index=" + std::to_string(cluster_key_index));
}
}
RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
} else if (_is_mow()) {
RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column,
num_rows, false));
} else {
RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
}
}
_num_rows_written += num_rows;
_olap_data_convertor->clear_source_content();
return Status::OK();
}

核心转换逻辑,将行转换成列:

std::pair<Status, IOlapColumnDataAccessor*> OlapBlockDataConvertor::convert_column_data(
size_t cid) {
assert(cid < _convertors.size());
auto convert_func = [&]() -> Status {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_convertors[cid]->convert_to_olap());
return Status::OK();
};
auto status = convert_func();
return {status, _convertors[cid].get()};
}

最后segment 写入数据

Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size) {
MonotonicStopWatch timer;
timer.start();
// check disk capacity
if (_data_dir != nullptr && _data_dir->reach_capacity_limit((int64_t)estimate_segment_size())) {
return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit, path: {}",
_data_dir->path_hash(), _data_dir->path());
}
// write data
RETURN_IF_ERROR(finalize_columns_data());
// Get the index start before finalize_footer since this function would write new data.
uint64_t index_start = _file_writer->bytes_appended();
// write index
RETURN_IF_ERROR(finalize_columns_index(index_size));
// write footer
RETURN_IF_ERROR(finalize_footer(segment_file_size));
if (timer.elapsed_time() > 5000000000l) {
LOG(INFO) << "segment flush consumes a lot time_ns " << timer.elapsed_time()
<< ", segmemt_size " << *segment_file_size;
}
// When the cache type is not ttl(expiration time == 0), the data should be split into normal cache queue
// and index cache queue
if (auto* cache_builder = _file_writer->cache_builder(); cache_builder != nullptr &&
cache_builder->_expiration_time == 0 &&
config::is_cloud_mode()) {
auto size = *index_size + *segment_file_size;
auto holder = cache_builder->allocate_cache_holder(index_start, size, _tablet->tablet_id());
for (auto& segment : holder->file_blocks) {
static_cast<void>(segment->change_cache_type(io::FileCacheType::INDEX));
}
}
return Status::OK();
}

实际的堆栈:

* thread #439, name = 'mf_normal [work', stop reason = breakpoint 2.1
* frame #0: 0x000055ddfe4eaabc doris_be`doris::SegmentFlusher::flush_single_block(this=0x00007fbbbce0e450, block=0x00007fbc60901420, segment_id=0, flush_size=0x00007fbe639f4b50) at segment_creator.cpp:64:9
frame #1: 0x000055ddfe4ef61a doris_be`doris::SegmentCreator::flush_single_block(this=0x00007fbbbce0e448, block=0x00007fbc60901420, segment_id=0, flush_size=0x00007fbe639f4b50) at segment_creator.cpp:404:5
frame #2: 0x000055ddfe46a3c5 doris_be`doris::BaseBetaRowsetWriter::flush_memtable(this=0x00007fbbbce0e000, block=0x00007fbc60901420, segment_id=0, flush_size=0x00007fbe639f4b50) at beta_rowset_writer.cpp:817:9
frame #3: 0x000055ddfe311da3 doris_be`doris::FlushToken::_do_flush_memtable(this=0x00007fbbbce22010, memtable=0x00007fbbbcce9200, segment_id=0, flush_size=0x00007fbe639f4b50) at memtable_flush_executor.cpp:210:9
frame #4: 0x000055ddfe30fed4 doris_be`doris::FlushToken::_flush_memtable(this=0x00007fbbbce22010, memtable_ptr=std::__shared_ptr<doris::MemTable, __gnu_cxx::_S_atomic>::element_type @ 0x00007fbbbcce9200, segment_id=0, submit_task_time=120805632954059) at memtable_flush_executor.cpp:265:16
frame #5: 0x000055ddfe3177b2 doris_be`doris::MemtableFlushTask::run(this=0x00007fbbbcdd70f0) at memtable_flush_executor.cpp:63:20
frame #6: 0x000055ddff559fc8 doris_be`doris::ThreadPool::dispatch_thread(this=0x00007fbef9d2e980) at threadpool.cpp:616:24
frame #7: 0x000055ddff568d42 doris_be`void std::__invoke_impl<void, void (doris::ThreadPool::*&)(), doris::ThreadPool*&>((null)=__invoke_memfun_deref @ 0x00007fbe639f56cf, __f=0x00007fbef9e46b20, __t=0x00007fbef9e46b30) at invoke.h:74:14
frame #8: 0x000055ddff568c8d doris_be`std::__invoke_result<void (doris::ThreadPool::*&)(), doris::ThreadPool*&>::type std::__invoke<void (doris::ThreadPool::*&)(), doris::ThreadPool*&>(__fn=0x00007fbef9e46b20, __args=0x00007fbef9e46b30) at invoke.h:96:14
frame #9: 0x000055ddff568c5d doris_be`void std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>::__call<void, 0ul>(this=0x00007fbef9e46b20, __args=0x00007fbe639f5767, (null)=_Index_tuple<0UL> @ 0x00007fbe639f573f) at functional:513:11
frame #10: 0x000055ddff568c16 doris_be`void std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>::operator()<void>(this=0x00007fbef9e46b20) at functional:598:17
frame #11: 0x000055ddff568be5 doris_be`void std::__invoke_impl<void, std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>&>((null)=__invoke_other @ 0x00007fbe639f578f, __f=0x00007fbef9e46b20) at invoke.h:61:14
frame #12: 0x000055ddff568ba5 doris_be`std::enable_if<is_invocable_r_v<void, std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>&>, void>::type std::__invoke_r<void, std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>&>(__fn=0x00007fbef9e46b20) at invoke.h:111:2
frame #13: 0x000055ddff568a0d doris_be`std::_Function_handler<void (), std::_Bind<void (doris::ThreadPool::* (doris::ThreadPool*))()>>::_M_invoke(__functor=0x00007fbef9e4b520) at std_function.h:290:9
frame #14: 0x000055ddfd3a265e doris_be`std::function<void ()>::operator()(this=0x00007fbef9e4b520) const at std_function.h:591:9
frame #15: 0x000055ddff545962 doris_be`doris::Thread::supervise_thread(arg=0x00007fbef9e4b510) at thread.cpp:460:5
frame #16: 0x00007fc02369db7b libc.so.6`___lldb_unnamed_symbol3696 + 667
frame #17: 0x00007fc02371b7b8 libc.so.6`___lldb_unnamed_symbol4129 + 7

rowset/tablelet/segment/sst/row 关系#

rowset/tablelet/segment和sst是什么关系呢?

  • tablet 包含多个rowset
  • rowset是一次写入,一次写入包括一行或者多行
  • 一个rowset 包含一个或者多个segment
  • segment 包括一行或者多行

相关阅读#

segment 格式

rowset 和segment关系

rowset 和segment关系2

rowset和segment关系3

重新刷新doris

Doris 写入
https://tatamagic.com/posts/doris/
Author
dinosaur
Published at
2026-01-03
License
CC BY-NC-SA 4.0