pjmsg_mcap_wrapper
Loading...
Searching...
No Matches
read_job_queue.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "types.hpp"
4#include <algorithm>
5#include <variant>
6
7namespace mcap::internal {
8
9// Helper for writing compile-time exhaustive variant visitors.
10template <class>
11inline constexpr bool always_false_v = false;
12
13/**
14 * @brief A job to read a specific message at offset `offset` from the decompressed chunk
15 * stored in `chunkReaderIndex`. A timestamp is provided to order this job relative to other jobs.
16 */
22
23/**
24 * @brief A job to decompress the chunk starting at `chunkStartOffset`. The message indices
25 * starting directly after the chunk record and ending at `messageIndexEndOffset` will be used to
26 * find specific messages within the chunk.
27 */
34
35/**
36 * @brief A union of jobs that an indexed MCAP reader executes.
37 */
39
40/**
41 * @brief A priority queue of jobs for an indexed MCAP reader to execute.
42 */
44private:
45 bool reverse_ = false;
47
48 /**
49 * @brief return the timestamp key that should be used to compare jobs.
50 */
51 static Timestamp TimeComparisonKey(const ReadJob& job, bool reverse) {
52 Timestamp result = 0;
54 [&](auto&& arg) {
55 using T = std::decay_t<decltype(arg)>;
56 if constexpr (std::is_same_v<T, ReadMessageJob>) {
57 result = arg.timestamp;
58 } else if constexpr (std::is_same_v<T, DecompressChunkJob>) {
59 if (reverse) {
60 result = arg.messageEndTime;
61 } else {
62 result = arg.messageStartTime;
63 }
64 } else {
65 static_assert(always_false_v<T>, "non-exhaustive visitor!");
66 }
67 },
68 job);
69 return result;
70 }
71 static RecordOffset PositionComparisonKey(const ReadJob& job, bool reverse) {
72 RecordOffset result;
74 [&](auto&& arg) {
75 using T = std::decay_t<decltype(arg)>;
76 if constexpr (std::is_same_v<T, ReadMessageJob>) {
77 result = arg.offset;
78 } else if constexpr (std::is_same_v<T, DecompressChunkJob>) {
79 if (reverse) {
80 result.offset = arg.messageIndexEndOffset;
81 } else {
82 result.offset = arg.chunkStartOffset;
83 }
84 } else {
85 static_assert(always_false_v<T>, "non-exhaustive visitor!");
86 }
87 },
88 job);
89 return result;
90 }
91
92 static bool CompareForward(const ReadJob& a, const ReadJob& b) {
93 auto aTimestamp = TimeComparisonKey(a, false);
94 auto bTimestamp = TimeComparisonKey(b, false);
95 if (aTimestamp == bTimestamp) {
96 return PositionComparisonKey(a, false) > PositionComparisonKey(b, false);
97 }
98 return aTimestamp > bTimestamp;
99 }
100
101 static bool CompareReverse(const ReadJob& a, const ReadJob& b) {
102 auto aTimestamp = TimeComparisonKey(a, true);
103 auto bTimestamp = TimeComparisonKey(b, true);
104 if (aTimestamp == bTimestamp) {
105 return PositionComparisonKey(a, true) < PositionComparisonKey(b, true);
106 }
107 return aTimestamp < bTimestamp;
108 }
109
110public:
111 explicit ReadJobQueue(bool reverse)
112 : reverse_(reverse) {}
113 void push(DecompressChunkJob&& decompressChunkJob) {
114 heap_.emplace_back(std::move(decompressChunkJob));
115 if (!reverse_) {
116 std::push_heap(heap_.begin(), heap_.end(), CompareForward);
117 } else {
118 std::push_heap(heap_.begin(), heap_.end(), CompareReverse);
119 }
120 }
121
122 void push(ReadMessageJob&& readMessageJob) {
123 heap_.emplace_back(std::move(readMessageJob));
124 if (!reverse_) {
125 std::push_heap(heap_.begin(), heap_.end(), CompareForward);
126 } else {
127 std::push_heap(heap_.begin(), heap_.end(), CompareReverse);
128 }
129 }
130
132 if (!reverse_) {
133 std::pop_heap(heap_.begin(), heap_.end(), CompareForward);
134 } else {
135 std::pop_heap(heap_.begin(), heap_.end(), CompareReverse);
136 }
137 auto popped = heap_.back();
138 heap_.pop_back();
139 return popped;
140 }
141
142 size_t len() const {
143 return heap_.size();
144 }
145};
146
147} // namespace mcap::internal
constexpr bool always_false_v
uint64_t Timestamp
Definition types.hpp:21
uint64_t ByteOffset
Definition types.hpp:22
T pop_heap(T... args)
T push_heap(T... args)
ByteOffset offset
Definition types.hpp:355
A job to decompress the chunk starting at chunkStartOffset. The message indices starting directly aft...
A priority queue of jobs for an indexed MCAP reader to execute.
static Timestamp TimeComparisonKey(const ReadJob &job, bool reverse)
return the timestamp key that should be used to compare jobs.
void push(ReadMessageJob &&readMessageJob)
void push(DecompressChunkJob &&decompressChunkJob)
static RecordOffset PositionComparisonKey(const ReadJob &job, bool reverse)
static bool CompareForward(const ReadJob &a, const ReadJob &b)
std::vector< ReadJob > heap_
static bool CompareReverse(const ReadJob &a, const ReadJob &b)
A job to read a specific message at offset offset from the decompressed chunk stored in chunkReaderIn...
T visit(T... args)