pjmsg_mcap_wrapper
Loading...
Searching...
No Matches
reader.inl
Go to the documentation of this file.
1#include "internal.hpp"
2#include <algorithm>
3#include <cassert>
4#ifndef MCAP_COMPRESSION_NO_LZ4
5# include <lz4frame.h>
6#endif
7#ifndef MCAP_COMPRESSION_NO_ZSTD
8# include <zstd.h>
9# include <zstd_errors.h>
10#endif
11
12namespace mcap {
13
14bool CompareChunkIndexes(const ChunkIndex& a, const ChunkIndex& b) {
16}
17
18// BufferReader ////////////////////////////////////////////////////////////////
19
20void BufferReader::reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) {
21 (void)uncompressedSize;
22 assert(size == uncompressedSize);
23 data_ = data;
24 size_ = size;
25}
26
27uint64_t BufferReader::read(std::byte** output, uint64_t offset, uint64_t size) {
28 if (!data_ || offset >= size_) {
29 return 0;
30 }
31
32 const auto available = size_ - offset;
33 *output = const_cast<std::byte*>(data_) + offset;
34 return std::min(size, available);
35}
36
37uint64_t BufferReader::size() const {
38 return size_;
39}
40
44
45// FileReader //////////////////////////////////////////////////////////////////
46
48 : file_(file)
49 , size_(0)
50 , position_(0) {
51 assert(file_);
52
53 // Determine the size of the file
54 std::fseek(file_, 0, SEEK_END);
56 std::fseek(file_, 0, SEEK_SET);
57}
58
59uint64_t FileReader::size() const {
60 return size_;
61}
62
63uint64_t FileReader::read(std::byte** output, uint64_t offset, uint64_t size) {
64 if (offset >= size_) {
65 return 0;
66 }
67
68 if (offset != position_) {
69 std::fseek(file_, (long)(offset), SEEK_SET);
71 position_ = offset;
72 }
73
74 if (size > buffer_.size()) {
76 }
77
78 const uint64_t bytesRead = uint64_t(std::fread(buffer_.data(), 1, size, file_));
79 *output = buffer_.data();
80
81 position_ += bytesRead;
82 return bytesRead;
83}
84
85// FileStreamReader ////////////////////////////////////////////////////////////
86
88 : stream_(stream)
89 , position_(0) {
90 assert(stream.is_open());
91
92 // Determine the size of the file
93 stream_.seekg(0, stream.end);
95 stream_.seekg(0, stream.beg);
96}
97
98uint64_t FileStreamReader::size() const {
99 return size_;
100}
101
102uint64_t FileStreamReader::read(std::byte** output, uint64_t offset, uint64_t size) {
103 if (offset >= size_) {
104 return 0;
105 }
106
107 if (offset != position_) {
108 stream_.seekg(offset);
109 position_ = offset;
110 }
111
112 if (size > buffer_.size()) {
114 }
115
116 stream_.read(reinterpret_cast<char*>(buffer_.data()), size);
117 *output = buffer_.data();
118
119 const uint64_t bytesRead = stream_.gcount();
120 position_ += bytesRead;
121 return bytesRead;
122}
123
124// LZ4Reader ///////////////////////////////////////////////////////////////////
125
126#ifndef MCAP_COMPRESSION_NO_LZ4
128 const LZ4F_errorCode_t err =
129 LZ4F_createDecompressionContext((LZ4F_dctx**)&decompressionContext_, LZ4F_VERSION);
130 if (LZ4F_isError(err)) {
131 const auto msg =
132 internal::StrCat("failed to create lz4 decompression context: ", LZ4F_getErrorName(err));
134 decompressionContext_ = nullptr;
135 }
136}
137
140 LZ4F_freeDecompressionContext((LZ4F_dctx*)decompressionContext_);
141 }
142}
143
144void LZ4Reader::reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) {
146 return;
147 }
148 compressedData_ = data;
150 status_ = decompressAll(data, size, uncompressedSize, &uncompressedData_);
152}
153
154uint64_t LZ4Reader::read(std::byte** output, uint64_t offset, uint64_t size) {
155 if (offset >= uncompressedSize_) {
156 return 0;
157 }
158
159 const auto available = uncompressedSize_ - offset;
160 *output = uncompressedData_.data() + offset;
161 return std::min(size, available);
162}
163
164uint64_t LZ4Reader::size() const {
165 return uncompressedSize_;
166}
167
169 return status_;
170}
171Status LZ4Reader::decompressAll(const std::byte* data, uint64_t compressedSize,
172 uint64_t uncompressedSize, ByteArray* output) {
174 return status_;
175 }
176 auto result = Status();
177 // Allocate space for the uncompressed data
178 output->resize(uncompressedSize);
179
180 size_t dstSize = uncompressedSize;
181 size_t srcSize = compressedSize;
182 LZ4F_resetDecompressionContext((LZ4F_dctx*)decompressionContext_);
183 const auto status = LZ4F_decompress((LZ4F_dctx*)decompressionContext_, output->data(), &dstSize,
184 data, &srcSize, nullptr);
185 if (status != 0) {
186 if (LZ4F_isError(status)) {
187 const auto msg = internal::StrCat("lz4 decompression of ", compressedSize, " bytes into ",
188 uncompressedSize, " output bytes failed with error ",
189 (int)status, " (", LZ4F_getErrorName(status), ")");
191 } else {
192 const auto msg =
193 internal::StrCat("lz4 decompression of ", compressedSize, " bytes into ", uncompressedSize,
194 " incomplete: consumed ", srcSize, " and produced ", dstSize,
195 " bytes so far, expect ", status, " more input bytes");
197 }
198 output->clear();
199 } else if (srcSize != compressedSize) {
200 const auto msg =
201 internal::StrCat("lz4 decompression of ", compressedSize, " bytes into ", uncompressedSize,
202 " output bytes only consumed ", srcSize, " bytes");
204 output->clear();
205 } else if (dstSize != uncompressedSize) {
206 const auto msg =
207 internal::StrCat("lz4 decompression of ", compressedSize, " bytes into ", uncompressedSize,
208 " output bytes only produced ", dstSize, " bytes");
210 output->clear();
211 }
212 return result;
213}
214#endif
215
216// ZStdReader //////////////////////////////////////////////////////////////////
217
218#ifndef MCAP_COMPRESSION_NO_ZSTD
219void ZStdReader::reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) {
220 status_ = DecompressAll(data, size, uncompressedSize, &uncompressedData_);
221}
222
223uint64_t ZStdReader::read(std::byte** output, uint64_t offset, uint64_t size) {
224 if (offset >= uncompressedData_.size()) {
225 return 0;
226 }
227
228 const auto available = uncompressedData_.size() - offset;
229 *output = uncompressedData_.data() + offset;
230 return std::min(size, available);
231}
232
233uint64_t ZStdReader::size() const {
234 return uncompressedData_.size();
235}
236
238 return status_;
239}
240
241Status ZStdReader::DecompressAll(const std::byte* data, uint64_t compressedSize,
242 uint64_t uncompressedSize, ByteArray* output) {
243 auto result = Status();
244
245 // Allocate space for the decompressed data
246 output->resize(uncompressedSize);
247
248 const auto status = ZSTD_decompress(output->data(), uncompressedSize, data, compressedSize);
249 if (status != uncompressedSize) {
250 if (ZSTD_isError(status)) {
251 const auto msg =
252 internal::StrCat("zstd decompression of ", compressedSize, " bytes into ", uncompressedSize,
253 " output bytes failed with error ", ZSTD_getErrorName(status));
255 } else {
256 const auto msg =
257 internal::StrCat("zstd decompression of ", compressedSize, " bytes into ", uncompressedSize,
258 " output bytes only produced ", status, " bytes");
260 }
261 output->clear();
262 }
263 return result;
264}
265#endif
266
267// McapReader //////////////////////////////////////////////////////////////////
268
272
274 reset_();
275
276 const uint64_t fileSize = reader.size();
277
280 }
281
282 std::byte* data = nullptr;
283 uint64_t bytesRead;
284
285 // Read the magic bytes and header up to the first variable length string
286 bytesRead = reader.read(&data, 0, sizeof(Magic) + 1 + 8 + 4);
287 if (bytesRead != sizeof(Magic) + 1 + 8 + 4) {
289 }
290
291 // Check the header magic bytes
292 if (std::memcmp(data, Magic, sizeof(Magic)) != 0) {
293 const auto msg =
294 internal::StrCat("invalid magic bytes in Header: 0x", internal::MagicToHex(data));
296 }
297
298 // Read the Header record
299 Record record;
300 if (auto status = ReadRecord(reader, sizeof(Magic), &record); !status.ok()) {
301 return status;
302 }
303 if (record.opcode != OpCode::Header) {
304 const auto msg = internal::StrCat("invalid opcode, expected Header: 0x",
305 internal::ToHex(uint8_t(record.opcode)));
306 return Status{StatusCode::InvalidFile, msg};
307 }
309 if (auto status = ParseHeader(record, &header); !status.ok()) {
310 return status;
311 }
312 header_ = header;
313
314 // The Data section starts after the magic bytes and Header record
315 dataStart_ = sizeof(Magic) + record.recordSize();
316 // Set dataEnd_ to just before the Footer for now. This will be updated when
317 // the Data End record is encountered and/or the summary section is parsed
318 dataEnd_ = fileSize - internal::FooterLength;
319
320 input_ = &reader;
321
322 return StatusCode::Success;
323}
324
326 if (file_) {
328 file_ = nullptr;
329 }
330 file_ = std::fopen(filename.data(), "rb");
331 if (!file_) {
332 const auto msg = internal::StrCat("failed to open \"", filename, "\"");
333 return Status{StatusCode::OpenFailed, msg};
334 }
335
336 fileInput_ = std::make_unique<FileReader>(file_);
337 return open(*fileInput_);
338}
339
341 fileStreamInput_ = std::make_unique<FileStreamReader>(stream);
342 return open(*fileStreamInput_);
343}
344
346 input_ = nullptr;
347 if (file_) {
349 file_ = nullptr;
350 }
351 fileInput_.reset();
352 fileStreamInput_.reset();
353 reset_();
354}
355
357 header_ = std::nullopt;
358 footer_ = std::nullopt;
359 statistics_ = std::nullopt;
360 chunkIndexes_.clear();
361 attachmentIndexes_.clear();
362 schemas_.clear();
363 channels_.clear();
364 dataStart_ = 0;
366 parsedSummary_ = false;
367}
368
370 if (!input_) {
371 const Status status{StatusCode::NotOpen};
372 onProblem(status);
373 return status;
374 }
375
376 auto& reader = *input_;
377 bool parsed = false;
378
379 if (method != ReadSummaryMethod::ForceScan) {
380 // Build indexes and read stats from the Summary section
381 const auto status = readSummarySection_(reader);
382 if (status.ok()) {
383 // Summary section parsing was successful
384 parsed = true;
385 } else if (method == ReadSummaryMethod::NoFallbackScan) {
386 // No fallback allowed, fail immediately
387 onProblem(status);
388 return status;
389 }
390 }
391
392 if (!parsed) {
393 const auto status = readSummaryFromScan_(reader);
394 if (!status.ok()) {
395 // Scanning failed, fail immediately
396 onProblem(status);
397 return status;
398 }
399 }
400
401 // Convert the list of chunk indexes to an interval tree indexed by message start/end times
402 std::vector<ChunkInterval> chunkIntervals;
403 chunkIntervals.reserve(chunkIndexes_.size());
404 for (const auto& chunkIndex : chunkIndexes_) {
405 chunkIntervals.emplace_back(chunkIndex.messageStartTime, chunkIndex.messageEndTime, chunkIndex);
406 }
408
409 parsedSummary_ = true;
410 return StatusCode::Success;
411}
412
414 const uint64_t fileSize = reader.size();
415
416 // Read the footer
417 auto footer = Footer{};
418 if (auto status = ReadFooter(reader, fileSize - internal::FooterLength, &footer); !status.ok()) {
419 return status;
420 }
421 footer_ = footer;
422
423 // Get summaryStart and summaryOffsetStart, allowing for zeroed values
424 const ByteOffset summaryStart =
425 footer.summaryStart != 0 ? footer.summaryStart : fileSize - internal::FooterLength;
426 const ByteOffset summaryOffsetStart =
427 footer.summaryOffsetStart != 0 ? footer.summaryOffsetStart : fileSize - internal::FooterLength;
428 // Sanity check the ordering
429 if (summaryOffsetStart < summaryStart) {
430 const auto msg = internal::StrCat("summary_offset_start ", summaryOffsetStart,
431 " < summary_start ", summaryStart);
433 }
434
435 attachmentIndexes_.clear();
436 metadataIndexes_.clear();
437 chunkIndexes_.clear();
438
439 // Read the Summary section
440 bool readStatistics = false;
441 TypedRecordReader typedReader{reader, summaryStart, summaryOffsetStart};
442 typedReader.onSchema = [&](SchemaPtr schemaPtr, ByteOffset, std::optional<ByteOffset>) {
443 schemas_.try_emplace(schemaPtr->id, schemaPtr);
444 };
445 typedReader.onChannel = [&](ChannelPtr channelPtr, ByteOffset, std::optional<ByteOffset>) {
446 channels_.try_emplace(channelPtr->id, channelPtr);
447 };
448 typedReader.onAttachmentIndex = [&](const AttachmentIndex& attachmentIndex, ByteOffset) {
449 attachmentIndexes_.emplace(attachmentIndex.name, attachmentIndex);
450 };
451 typedReader.onMetadataIndex = [&](const MetadataIndex& metadataIndex, ByteOffset) {
452 metadataIndexes_.emplace(metadataIndex.name, metadataIndex);
453 };
454 typedReader.onChunkIndex = [&](const ChunkIndex chunkIndex, ByteOffset) {
455 // Check if this chunk index is a duplicate
456 if (std::binary_search(chunkIndexes_.begin(), chunkIndexes_.end(), chunkIndex,
458 return;
459 }
460
461 // Check if this chunk index is out of order
462 const bool needsSorting =
463 !chunkIndexes_.empty() && chunkIndexes_.back().chunkStartOffset > chunkIndex.chunkStartOffset;
464 // Add the new chunk index interval
465 chunkIndexes_.push_back(chunkIndex);
466 // Sort if the new chunk index is out of order
467 if (needsSorting) {
469 }
470 };
471 typedReader.onStatistics = [&](const Statistics& statistics, ByteOffset) {
473 readStatistics = true;
474 };
475
476 while (typedReader.next()) {
477 const auto& status = typedReader.status();
478 if (!status.ok()) {
479 return status;
480 }
481 }
482
483 dataEnd_ = summaryStart;
484 return readStatistics ? StatusCode::Success : StatusCode::MissingStatistics;
485}
486
488 bool done = false;
491
492 schemas_.clear();
493 channels_.clear();
494 attachmentIndexes_.clear();
495 metadataIndexes_.clear();
496 chunkIndexes_.clear();
497
498 TypedRecordReader typedReader{reader, dataStart_, dataEnd_};
499 typedReader.onSchema = [&](SchemaPtr schemaPtr, ByteOffset, std::optional<ByteOffset>) {
500 schemas_.try_emplace(schemaPtr->id, schemaPtr);
501 };
502 typedReader.onChannel = [&](ChannelPtr channelPtr, ByteOffset, std::optional<ByteOffset>) {
503 channels_.try_emplace(channelPtr->id, channelPtr);
504 };
505 typedReader.onAttachment = [&](const Attachment& attachment, ByteOffset fileOffset) {
506 AttachmentIndex attachmentIndex{attachment, fileOffset};
507 attachmentIndexes_.emplace(attachment.name, attachmentIndex);
508 };
509 typedReader.onMetadata = [&](const Metadata& metadata, ByteOffset fileOffset) {
510 MetadataIndex metadataIndex{metadata, fileOffset};
511 metadataIndexes_.emplace(metadata.name, metadataIndex);
512 };
513 typedReader.onChunk = [&](const Chunk& chunk, ByteOffset fileOffset) {
514 ChunkIndex chunkIndex{};
515 chunkIndex.messageStartTime = chunk.messageStartTime;
516 chunkIndex.messageEndTime = chunk.messageEndTime;
517 chunkIndex.chunkStartOffset = fileOffset;
518 chunkIndex.chunkLength =
519 9 + 8 + 8 + 8 + 4 + 4 + chunk.compression.size() + 8 + chunk.compressedSize;
520 chunkIndex.messageIndexLength = 0;
521 chunkIndex.compression = chunk.compression;
522 chunkIndex.compressedSize = chunk.compressedSize;
523 chunkIndex.uncompressedSize = chunk.uncompressedSize;
524
525 chunkIndexes_.emplace_back(std::move(chunkIndex));
526 };
527 typedReader.onMessage = [&](const Message& message, ByteOffset, std::optional<ByteOffset>) {
528 if (message.logTime < statistics.messageStartTime) {
529 statistics.messageStartTime = message.logTime;
530 }
531 if (message.logTime > statistics.messageEndTime) {
532 statistics.messageEndTime = message.logTime;
533 }
534 statistics.messageCount++;
535 statistics.channelMessageCounts[message.channelId]++;
536 };
537 typedReader.onDataEnd = [&](const DataEnd&, ByteOffset fileOffset) {
538 dataEnd_ = fileOffset;
539 done = true;
540 };
541
542 while (!done && typedReader.next()) {
543 const auto& status = typedReader.status();
544 if (!status.ok()) {
545 return status;
546 }
547 }
548
549 if (statistics.messageStartTime == EndOffset) {
550 statistics.messageStartTime = 0;
551 }
552 statistics.schemaCount = (uint16_t)(schemas_.size());
553 statistics.channelCount = (uint32_t)(channels_.size());
554 statistics.attachmentCount = (uint32_t)(attachmentIndexes_.size());
555 statistics.metadataCount = (uint32_t)(metadataIndexes_.size());
556 statistics.chunkCount = (uint32_t)(chunkIndexes_.size());
557 statistics_ = std::move(statistics);
558
559 return StatusCode::Success;
560}
561
563 const auto onProblem = [](const Status&) {};
564 return readMessages(onProblem, startTime, endTime);
565}
566
568 Timestamp endTime) {
569 ReadMessageOptions options;
570 options.startTime = startTime;
571 options.endTime = endTime;
572 return readMessages(onProblem, options);
573}
574
576 const ReadMessageOptions& options) {
577 // Check that open() has been successfully called
578 if (!dataSource() || dataStart_ == 0) {
579 onProblem(StatusCode::NotOpen);
580 return LinearMessageView{*this, onProblem};
581 }
582
583 const auto [startOffset, endOffset] = byteRange(options.startTime, options.endTime);
584 return LinearMessageView{*this, options, startOffset, endOffset, onProblem};
585}
586
588 Timestamp endTime) const {
589 if (!parsedSummary_ || chunkRanges_.empty()) {
590 return {dataStart_, dataEnd_};
591 }
592
593 ByteOffset dataStart = dataEnd_;
594 ByteOffset dataEnd = dataStart_;
595 chunkRanges_.visit_overlapping(startTime, endTime, [&](const auto& interval) {
596 const auto& chunkIndex = interval.value;
597 dataStart = std::min(dataStart, chunkIndex.chunkStartOffset);
598 dataEnd = std::max(dataEnd, chunkIndex.chunkStartOffset + chunkIndex.chunkLength);
599 });
600 dataEnd = std::max(dataEnd, dataStart);
601
602 if (dataStart == dataEnd) {
603 return {0, 0};
604 }
605 return {dataStart, dataEnd};
606}
607
611
613 return header_;
614}
615
617 return footer_;
618}
619
623
627
631
633 const auto& maybeChannel = channels_.find(channelId);
634 return (maybeChannel == channels_.end()) ? nullptr : maybeChannel->second;
635}
636
638 const auto& maybeSchema = schemas_.find(schemaId);
639 return (maybeSchema == schemas_.end()) ? nullptr : maybeSchema->second;
640}
641
645
649
653
654Status McapReader::ReadRecord(IReadable& reader, uint64_t offset, Record* record) {
655 // Check that we can read at least 9 bytes (opcode + length)
656 auto maxSize = reader.size() - offset;
657 if (maxSize < 9) {
658 const auto msg =
659 internal::StrCat("cannot read record at offset ", offset, ", ", maxSize, " bytes remaining");
660 return Status{StatusCode::InvalidFile, msg};
661 }
662
663 // Read opcode and length
664 std::byte* data;
665 uint64_t bytesRead = reader.read(&data, offset, 9);
666 if (bytesRead != 9) {
668 }
669
670 // Parse opcode and length
671 record->opcode = OpCode(data[0]);
672 record->dataSize = internal::ParseUint64(data + 1);
673
674 // Read payload
675 maxSize -= 9;
676 if (maxSize < record->dataSize) {
677 const auto msg = internal::StrCat("record type 0x", internal::ToHex(uint8_t(record->opcode)),
678 " at offset ", offset, " has length ", record->dataSize,
679 " but only ", maxSize, " bytes remaining");
681 }
682 bytesRead = reader.read(&record->data, offset + 9, record->dataSize);
683 if (bytesRead != record->dataSize) {
684 const auto msg =
685 internal::StrCat("attempted to read ", record->dataSize, " bytes for record type 0x",
686 internal::ToHex(uint8_t(record->opcode)), " at offset ", offset,
687 " but only read ", bytesRead, " bytes");
688 return Status{StatusCode::ReadFailed, msg};
689 }
690
691 return StatusCode::Success;
692}
693
694Status McapReader::ReadFooter(IReadable& reader, uint64_t offset, Footer* footer) {
695 std::byte* data;
696 uint64_t bytesRead = reader.read(&data, offset, internal::FooterLength);
697 if (bytesRead != internal::FooterLength) {
699 }
700
701 // Check the footer magic bytes
702 if (std::memcmp(data + internal::FooterLength - sizeof(Magic), Magic, sizeof(Magic)) != 0) {
703 const auto msg =
704 internal::StrCat("invalid magic bytes in Footer: 0x",
707 }
708
709 if (OpCode(data[0]) != OpCode::Footer) {
710 const auto msg =
711 internal::StrCat("invalid opcode, expected Footer: 0x", internal::ToHex(data[0]));
712 return Status{StatusCode::InvalidFile, msg};
713 }
714
715 // Sanity check the record length. This is just an additional safeguard, since the footer has a
716 // fixed length
717 const uint64_t length = internal::ParseUint64(data + 1);
718 if (length != 8 + 8 + 4) {
719 const auto msg = internal::StrCat("invalid Footer length: ", length);
721 }
722
723 footer->summaryStart = internal::ParseUint64(data + 1 + 8);
724 footer->summaryOffsetStart = internal::ParseUint64(data + 1 + 8 + 8);
725 footer->summaryCrc = internal::ParseUint32(data + 1 + 8 + 8 + 8);
726 return StatusCode::Success;
727}
728
730 constexpr uint64_t MinSize = 4 + 4;
731
732 assert(record.opcode == OpCode::Header);
733 if (record.dataSize < MinSize) {
734 const auto msg = internal::StrCat("invalid Header length: ", record.dataSize);
736 }
737
738 if (auto status = internal::ParseString(record.data, record.dataSize, &header->profile);
739 !status.ok()) {
740 return status;
741 }
742 const size_t libraryOffset = 4 + header->profile.size();
743 const std::byte* libraryData = &(record.data[libraryOffset]);
744 const size_t maxSize = record.dataSize - libraryOffset;
745 auto status = internal::ParseString(libraryData, maxSize, &header->library);
746 if (!status.ok()) {
747 return status;
748 }
749 return StatusCode::Success;
750}
751
753 constexpr uint64_t FooterSize = 8 + 8 + 4;
754
755 assert(record.opcode == OpCode::Footer);
756 if (record.dataSize != FooterSize) {
757 const auto msg = internal::StrCat("invalid Footer length: ", record.dataSize);
759 }
760
761 footer->summaryStart = internal::ParseUint64(record.data);
762 footer->summaryOffsetStart = internal::ParseUint64(record.data + 8);
763 footer->summaryCrc = internal::ParseUint32(record.data + 8 + 8);
764
765 return StatusCode::Success;
766}
767
769 constexpr uint64_t MinSize = 2 + 4 + 4 + 4;
770
771 assert(record.opcode == OpCode::Schema);
772 if (record.dataSize < MinSize) {
773 const auto msg = internal::StrCat("invalid Schema length: ", record.dataSize);
775 }
776
777 size_t offset = 0;
778
779 // id
780 schema->id = internal::ParseUint16(record.data);
781 offset += 2;
782 // name
783 if (auto status =
784 internal::ParseString(record.data + offset, record.dataSize - offset, &schema->name);
785 !status.ok()) {
786 return status;
787 }
788 offset += 4 + schema->name.size();
789 // encoding
790 if (auto status =
791 internal::ParseString(record.data + offset, record.dataSize - offset, &schema->encoding);
792 !status.ok()) {
793 return status;
794 }
795 offset += 4 + schema->encoding.size();
796 // data
797 if (auto status =
798 internal::ParseByteArray(record.data + offset, record.dataSize - offset, &schema->data);
799 !status.ok()) {
800 return status;
801 }
802
803 return StatusCode::Success;
804}
805
807 constexpr uint64_t MinSize = 2 + 4 + 4 + 2 + 4;
808
809 assert(record.opcode == OpCode::Channel);
810 if (record.dataSize < MinSize) {
811 const auto msg = internal::StrCat("invalid Channel length: ", record.dataSize);
813 }
814
815 size_t offset = 0;
816
817 // id
818 channel->id = internal::ParseUint16(record.data);
819 offset += 2;
820 // schema_id
821 channel->schemaId = internal::ParseUint16(record.data + offset);
822 offset += 2;
823 // topic
824 if (auto status =
825 internal::ParseString(record.data + offset, record.dataSize - offset, &channel->topic);
826 !status.ok()) {
827 return status;
828 }
829 offset += 4 + channel->topic.size();
830 // message_encoding
831 if (auto status = internal::ParseString(record.data + offset, record.dataSize - offset,
832 &channel->messageEncoding);
833 !status.ok()) {
834 return status;
835 }
836 offset += 4 + channel->messageEncoding.size();
837 // metadata
838 if (auto status = internal::ParseKeyValueMap(record.data + offset, record.dataSize - offset,
839 &channel->metadata);
840 !status.ok()) {
841 return status;
842 }
843 return StatusCode::Success;
844}
845
847 constexpr uint64_t MessagePreambleSize = 2 + 4 + 8 + 8;
848
849 assert(record.opcode == OpCode::Message);
850 if (record.dataSize < MessagePreambleSize) {
851 const auto msg = internal::StrCat("invalid Message length: ", record.dataSize);
853 }
854
855 message->channelId = internal::ParseUint16(record.data);
856 message->sequence = internal::ParseUint32(record.data + 2);
857 message->logTime = internal::ParseUint64(record.data + 2 + 4);
858 message->publishTime = internal::ParseUint64(record.data + 2 + 4 + 8);
859 message->dataSize = record.dataSize - MessagePreambleSize;
860 message->data = record.data + MessagePreambleSize;
861 return StatusCode::Success;
862}
863
865 constexpr uint64_t ChunkPreambleSize = 8 + 8 + 8 + 4 + 4;
866
867 assert(record.opcode == OpCode::Chunk);
868 if (record.dataSize < ChunkPreambleSize) {
869 const auto msg = internal::StrCat("invalid Chunk length: ", record.dataSize);
871 }
872
874 chunk->messageEndTime = internal::ParseUint64(record.data + 8);
875 chunk->uncompressedSize = internal::ParseUint64(record.data + 8 + 8);
876 chunk->uncompressedCrc = internal::ParseUint32(record.data + 8 + 8 + 8);
877
878 size_t offset = 8 + 8 + 8 + 4;
879
880 // compression
881 if (auto status =
882 internal::ParseString(record.data + offset, record.dataSize - offset, &chunk->compression);
883 !status.ok()) {
884 return status;
885 }
886 offset += 4 + chunk->compression.size();
887 // compressed_size
888 if (auto status = internal::ParseUint64(record.data + offset, record.dataSize - offset,
889 &chunk->compressedSize);
890 !status.ok()) {
891 return status;
892 }
893 offset += 8;
894 if (chunk->compressedSize > record.dataSize - offset) {
895 const auto msg = internal::StrCat("invalid Chunk.records length: ", chunk->compressedSize);
897 }
898 // records
899 chunk->records = record.data + offset;
900
901 return StatusCode::Success;
902}
903
905 constexpr uint64_t PreambleSize = 2 + 4;
906
907 assert(record.opcode == OpCode::MessageIndex);
908 if (record.dataSize < PreambleSize) {
909 const auto msg = internal::StrCat("invalid MessageIndex length: ", record.dataSize);
911 }
912
913 messageIndex->channelId = internal::ParseUint16(record.data);
914 const uint32_t recordsSize = internal::ParseUint32(record.data + 2);
915
916 if (recordsSize % 16 != 0 || recordsSize > record.dataSize - PreambleSize) {
917 const auto msg = internal::StrCat("invalid MessageIndex.records length: ", recordsSize);
919 }
920
921 const size_t recordsCount = size_t(recordsSize / 16);
922 messageIndex->records.reserve(recordsCount);
923 for (size_t i = 0; i < recordsCount; ++i) {
924 const auto timestamp = internal::ParseUint64(record.data + PreambleSize + i * 16);
925 const auto offset = internal::ParseUint64(record.data + PreambleSize + i * 16 + 8);
926 messageIndex->records.emplace_back(timestamp, offset);
927 }
928 return StatusCode::Success;
929}
930
932 constexpr uint64_t PreambleSize = 8 + 8 + 8 + 8 + 4;
933
934 assert(record.opcode == OpCode::ChunkIndex);
935 if (record.dataSize < PreambleSize) {
936 const auto msg = internal::StrCat("invalid ChunkIndex length: ", record.dataSize);
938 }
939
940 chunkIndex->messageStartTime = internal::ParseUint64(record.data);
941 chunkIndex->messageEndTime = internal::ParseUint64(record.data + 8);
942 chunkIndex->chunkStartOffset = internal::ParseUint64(record.data + 8 + 8);
943 chunkIndex->chunkLength = internal::ParseUint64(record.data + 8 + 8 + 8);
944 const uint32_t messageIndexOffsetsSize = internal::ParseUint32(record.data + 8 + 8 + 8 + 8);
945
946 if (messageIndexOffsetsSize % 10 != 0 ||
947 messageIndexOffsetsSize > record.dataSize - PreambleSize) {
948 const auto msg =
949 internal::StrCat("invalid ChunkIndex.message_index_offsets length:", messageIndexOffsetsSize);
951 }
952
953 const size_t messageIndexOffsetsCount = size_t(messageIndexOffsetsSize / 10);
954 chunkIndex->messageIndexOffsets.reserve(messageIndexOffsetsCount);
955 for (size_t i = 0; i < messageIndexOffsetsCount; ++i) {
956 const auto channelId = internal::ParseUint16(record.data + PreambleSize + i * 10);
957 const auto offset = internal::ParseUint64(record.data + PreambleSize + i * 10 + 2);
958 chunkIndex->messageIndexOffsets.emplace(channelId, offset);
959 }
960
961 uint64_t offset = PreambleSize + messageIndexOffsetsSize;
962 // message_index_length
963 if (auto status = internal::ParseUint64(record.data + offset, record.dataSize - offset,
964 &chunkIndex->messageIndexLength);
965 !status.ok()) {
966 return status;
967 }
968 offset += 8;
969 // compression
970 if (auto status = internal::ParseString(record.data + offset, record.dataSize - offset,
971 &chunkIndex->compression);
972 !status.ok()) {
973 return status;
974 }
975 offset += 4 + chunkIndex->compression.size();
976 // compressed_size
977 if (auto status = internal::ParseUint64(record.data + offset, record.dataSize - offset,
978 &chunkIndex->compressedSize);
979 !status.ok()) {
980 return status;
981 }
982 offset += 8;
983 // uncompressed_size
984 if (auto status = internal::ParseUint64(record.data + offset, record.dataSize - offset,
985 &chunkIndex->uncompressedSize);
986 !status.ok()) {
987 return status;
988 }
989
990 return StatusCode::Success;
991}
992
994 constexpr uint64_t MinSize = /* log_time */ 8 +
995 /* create_time */ 8 +
996 /* name */ 4 +
997 /* media_type */ 4 +
998 /* data_size */ 8 +
999 /* crc */ 4;
1000
1001 assert(record.opcode == OpCode::Attachment);
1002 if (record.dataSize < MinSize) {
1003 const auto msg = internal::StrCat("invalid Attachment length: ", record.dataSize);
1004 return Status{StatusCode::InvalidRecord, msg};
1005 }
1006
1007 uint32_t offset = 0;
1008 // log_time
1009 if (auto status =
1010 internal::ParseUint64(record.data + offset, record.dataSize - offset, &attachment->logTime);
1011 !status.ok()) {
1012 return status;
1013 }
1014 offset += 8;
1015 // create_time
1016 if (auto status = internal::ParseUint64(record.data + offset, record.dataSize - offset,
1017 &attachment->createTime);
1018 !status.ok()) {
1019 return status;
1020 }
1021 offset += 8;
1022 // name
1023 if (auto status =
1024 internal::ParseString(record.data + offset, record.dataSize - offset, &attachment->name);
1025 !status.ok()) {
1026 return status;
1027 }
1028 offset += 4 + (uint32_t)(attachment->name.size());
1029 // media_type
1030 if (auto status = internal::ParseString(record.data + offset, record.dataSize - offset,
1031 &attachment->mediaType);
1032 !status.ok()) {
1033 return status;
1034 }
1035 offset += 4 + (uint32_t)(attachment->mediaType.size());
1036 // data_size
1037 if (auto status = internal::ParseUint64(record.data + offset, record.dataSize - offset,
1038 &attachment->dataSize);
1039 !status.ok()) {
1040 return status;
1041 }
1042 offset += 8;
1043 // data
1044 if (attachment->dataSize > record.dataSize - offset) {
1045 const auto msg = internal::StrCat("invalid Attachment.data length: ", attachment->dataSize);
1046 return Status{StatusCode::InvalidRecord, msg};
1047 }
1048 attachment->data = record.data + offset;
1049 offset += (uint32_t)(attachment->dataSize);
1050 // crc
1051 if (auto status =
1052 internal::ParseUint32(record.data + offset, record.dataSize - offset, &attachment->crc);
1053 !status.ok()) {
1054 return status;
1055 }
1056
1057 return StatusCode::Success;
1058}
1059
1061 constexpr uint64_t PreambleSize = 8 + 8 + 8 + 8 + 8 + 4;
1062
1063 assert(record.opcode == OpCode::AttachmentIndex);
1064 if (record.dataSize < PreambleSize) {
1065 const auto msg = internal::StrCat("invalid AttachmentIndex length: ", record.dataSize);
1066 return Status{StatusCode::InvalidRecord, msg};
1067 }
1068
1069 attachmentIndex->offset = internal::ParseUint64(record.data);
1070 attachmentIndex->length = internal::ParseUint64(record.data + 8);
1071 attachmentIndex->logTime = internal::ParseUint64(record.data + 8 + 8);
1072 attachmentIndex->createTime = internal::ParseUint64(record.data + 8 + 8 + 8);
1073 attachmentIndex->dataSize = internal::ParseUint64(record.data + 8 + 8 + 8 + 8);
1074
1075 uint32_t offset = 8 + 8 + 8 + 8 + 8;
1076
1077 // name
1078 if (auto status = internal::ParseString(record.data + offset, record.dataSize - offset,
1079 &attachmentIndex->name);
1080 !status.ok()) {
1081 return status;
1082 }
1083 offset += 4 + (uint32_t)(attachmentIndex->name.size());
1084 // media_type
1085 if (auto status = internal::ParseString(record.data + offset, record.dataSize - offset,
1086 &attachmentIndex->mediaType);
1087 !status.ok()) {
1088 return status;
1089 }
1090
1091 return StatusCode::Success;
1092}
1093
1095 constexpr uint64_t PreambleSize = 8 + 2 + 4 + 4 + 4 + 4 + 8 + 8 + 4;
1096
1097 assert(record.opcode == OpCode::Statistics);
1098 if (record.dataSize < PreambleSize) {
1099 const auto msg = internal::StrCat("invalid Statistics length: ", record.dataSize);
1100 return Status{StatusCode::InvalidRecord, msg};
1101 }
1102
1103 statistics->messageCount = internal::ParseUint64(record.data);
1104 statistics->schemaCount = internal::ParseUint16(record.data + 8);
1105 statistics->channelCount = internal::ParseUint32(record.data + 8 + 2);
1106 statistics->attachmentCount = internal::ParseUint32(record.data + 8 + 2 + 4);
1107 statistics->metadataCount = internal::ParseUint32(record.data + 8 + 2 + 4 + 4);
1108 statistics->chunkCount = internal::ParseUint32(record.data + 8 + 2 + 4 + 4 + 4);
1109 statistics->messageStartTime = internal::ParseUint64(record.data + 8 + 2 + 4 + 4 + 4 + 4);
1110 statistics->messageEndTime = internal::ParseUint64(record.data + 8 + 2 + 4 + 4 + 4 + 4 + 8);
1111
1112 const uint32_t channelMessageCountsSize =
1113 internal::ParseUint32(record.data + 8 + 2 + 4 + 4 + 4 + 4 + 8 + 8);
1114 if (channelMessageCountsSize % 10 != 0 ||
1115 channelMessageCountsSize > record.dataSize - PreambleSize) {
1116 const auto msg =
1117 internal::StrCat("invalid Statistics.channelMessageCounts length:", channelMessageCountsSize);
1118 return Status{StatusCode::InvalidRecord, msg};
1119 }
1120
1121 const size_t channelMessageCountsCount = size_t(channelMessageCountsSize / 10);
1122 statistics->channelMessageCounts.reserve(channelMessageCountsCount);
1123 for (size_t i = 0; i < channelMessageCountsCount; ++i) {
1124 const auto channelId = internal::ParseUint16(record.data + PreambleSize + i * 10);
1125 const auto messageCount = internal::ParseUint64(record.data + PreambleSize + i * 10 + 2);
1126 statistics->channelMessageCounts.emplace(channelId, messageCount);
1127 }
1128
1129 return StatusCode::Success;
1130}
1131
1133 constexpr uint64_t MinSize = 4 + 4;
1134
1135 assert(record.opcode == OpCode::Metadata);
1136 if (record.dataSize < MinSize) {
1137 const auto msg = internal::StrCat("invalid Metadata length: ", record.dataSize);
1138 return Status{StatusCode::InvalidRecord, msg};
1139 }
1140
1141 // name
1142 if (auto status = internal::ParseString(record.data, record.dataSize, &metadata->name);
1143 !status.ok()) {
1144 return status;
1145 }
1146 uint64_t offset = 4 + metadata->name.size();
1147 // metadata
1148 if (auto status = internal::ParseKeyValueMap(record.data + offset, record.dataSize - offset,
1149 &metadata->metadata);
1150 !status.ok()) {
1151 return status;
1152 }
1153
1154 return StatusCode::Success;
1155}
1156
1158 constexpr uint64_t PreambleSize = 8 + 8 + 4;
1159
1160 assert(record.opcode == OpCode::MetadataIndex);
1161 if (record.dataSize < PreambleSize) {
1162 const auto msg = internal::StrCat("invalid MetadataIndex length: ", record.dataSize);
1163 return Status{StatusCode::InvalidRecord, msg};
1164 }
1165
1166 metadataIndex->offset = internal::ParseUint64(record.data);
1167 metadataIndex->length = internal::ParseUint64(record.data + 8);
1168 uint64_t offset = 8 + 8;
1169 if (auto status =
1170 internal::ParseString(record.data + offset, record.dataSize - offset, &metadataIndex->name);
1171 !status.ok()) {
1172 return status;
1173 }
1174
1175 return StatusCode::Success;
1176}
1177
1179 constexpr uint64_t MinSize = 1 + 8 + 8;
1180
1181 assert(record.opcode == OpCode::SummaryOffset);
1182 if (record.dataSize < MinSize) {
1183 const auto msg = internal::StrCat("invalid SummaryOffset length: ", record.dataSize);
1184 return Status{StatusCode::InvalidRecord, msg};
1185 }
1186
1187 summaryOffset->groupOpCode = OpCode(record.data[0]);
1188 summaryOffset->groupStart = internal::ParseUint64(record.data + 1);
1189 summaryOffset->groupLength = internal::ParseUint64(record.data + 1 + 8);
1190
1191 return StatusCode::Success;
1192}
1193
1195 constexpr uint64_t MinSize = 4;
1196
1197 assert(record.opcode == OpCode::DataEnd);
1198 if (record.dataSize < MinSize) {
1199 const auto msg = internal::StrCat("invalid DataEnd length: ", record.dataSize);
1200 return Status{StatusCode::InvalidRecord, msg};
1201 }
1202
1203 dataEnd->dataSectionCrc = internal::ParseUint32(record.data);
1204 return StatusCode::Success;
1205}
1206
1208 if (compression == "") {
1209 return Compression::None;
1210 } else if (compression == "lz4") {
1211 return Compression::Lz4;
1212 } else if (compression == "zstd") {
1213 return Compression::Zstd;
1214 } else {
1215 return std::nullopt;
1216 }
1217}
1218
1219// RecordReader ////////////////////////////////////////////////////////////////
1220
1221RecordReader::RecordReader(IReadable& dataSource, ByteOffset startOffset, ByteOffset endOffset)
1222 : offset(startOffset)
1223 , endOffset(endOffset)
1224 , dataSource_(&dataSource)
1225 , status_(StatusCode::Success)
1226 , curRecord_{} {}
1227
1228void RecordReader::reset(IReadable& dataSource, ByteOffset startOffset, ByteOffset _endOffset) {
1229 dataSource_ = &dataSource;
1230 this->offset = startOffset;
1231 this->endOffset = _endOffset;
1233 curRecord_ = {};
1234}
1235
1237 if (!dataSource_ || offset >= endOffset) {
1238 return std::nullopt;
1239 }
1241 if (!status_.ok()) {
1242 offset = EndOffset;
1243 return std::nullopt;
1244 }
1246 return curRecord_;
1247}
1248
1250 return status_;
1251}
1252
1256
1257// TypedChunkReader ////////////////////////////////////////////////////////////
1258
1260 : reader_{uncompressedReader_, 0, 0}
1261 , status_{StatusCode::Success} {}
1262
1263void TypedChunkReader::reset(const Chunk& chunk, Compression compression) {
1264 ICompressedReader* decompressor;
1265
1266 switch (compression) {
1267#ifndef MCAP_COMPRESSION_NO_LZ4
1268 case Compression::Lz4:
1269 decompressor = static_cast<ICompressedReader*>(&lz4Reader_);
1270 break;
1271#endif
1272#ifndef MCAP_COMPRESSION_NO_ZSTD
1273 case Compression::Zstd:
1274 decompressor = static_cast<ICompressedReader*>(&zstdReader_);
1275 break;
1276#endif
1277 case Compression::None:
1278 decompressor = static_cast<ICompressedReader*>(&uncompressedReader_);
1279 break;
1280 default:
1282 internal::StrCat("unsupported compression: ", chunk.compression));
1283 return;
1284 }
1285 decompressor->reset(chunk.records, chunk.compressedSize, chunk.uncompressedSize);
1286 reader_.reset(*decompressor, 0, decompressor->size());
1287 status_ = decompressor->status();
1288}
1289
1291 const auto maybeRecord = reader_.next();
1293 if (!maybeRecord.has_value()) {
1294 return false;
1295 }
1296 const Record& record = maybeRecord.value();
1297 switch (record.opcode) {
1298 case OpCode::Schema: {
1299 if (onSchema) {
1300 SchemaPtr schemaPtr = std::make_shared<Schema>();
1301 status_ = McapReader::ParseSchema(record, schemaPtr.get());
1302 if (status_.ok()) {
1303 onSchema(schemaPtr, reader_.curRecordOffset());
1304 }
1305 }
1306 break;
1307 }
1308 case OpCode::Channel: {
1309 if (onChannel) {
1310 ChannelPtr channelPtr = std::make_shared<Channel>();
1311 status_ = McapReader::ParseChannel(record, channelPtr.get());
1312 if (status_.ok()) {
1313 onChannel(channelPtr, reader_.curRecordOffset());
1314 }
1315 }
1316 break;
1317 }
1318 case OpCode::Message: {
1319 if (onMessage) {
1320 Message message;
1321 status_ = McapReader::ParseMessage(record, &message);
1322 if (status_.ok()) {
1323 onMessage(message, reader_.curRecordOffset());
1324 }
1325 }
1326 break;
1327 }
1328 case OpCode::Header:
1329 case OpCode::Footer:
1330 case OpCode::Chunk:
1332 case OpCode::ChunkIndex:
1333 case OpCode::Attachment:
1335 case OpCode::Statistics:
1336 case OpCode::Metadata:
1339 case OpCode::DataEnd: {
1340 // These opcodes should not appear inside chunks
1341 const auto msg =
1342 internal::StrCat("record type ", uint8_t(record.opcode), " cannot appear in Chunk");
1344 break;
1345 }
1346 default: {
1347 // Unknown opcode
1348 if (onUnknownRecord) {
1350 }
1351 break;
1352 }
1353 }
1354
1355 return true;
1356}
1357
1361
1363 return status_;
1364}
1365
1366// TypedRecordReader ///////////////////////////////////////////////////////////
1367
1369 ByteOffset endOffset)
1370 : reader_(dataSource, startOffset, std::min(endOffset, dataSource.size()))
1371 , status_(StatusCode::Success)
1372 , parsingChunk_(false) {
1373 chunkReader_.onSchema = [&](const SchemaPtr schema, ByteOffset chunkOffset) {
1374 if (onSchema) {
1375 onSchema(schema, reader_.curRecordOffset(), chunkOffset);
1376 }
1377 };
1378 chunkReader_.onChannel = [&](const ChannelPtr channel, ByteOffset chunkOffset) {
1379 if (onChannel) {
1380 onChannel(channel, reader_.curRecordOffset(), chunkOffset);
1381 }
1382 };
1383 chunkReader_.onMessage = [&](const Message& message, ByteOffset chunkOffset) {
1384 if (onMessage) {
1385 onMessage(message, reader_.curRecordOffset(), chunkOffset);
1386 }
1387 };
1388 chunkReader_.onUnknownRecord = [&](const Record& record, ByteOffset chunkOffset) {
1389 if (onUnknownRecord) {
1390 onUnknownRecord(record, reader_.curRecordOffset(), chunkOffset);
1391 }
1392 };
1393}
1394
1396 if (parsingChunk_) {
1397 const bool chunkInProgress = chunkReader_.next();
1399 if (!chunkInProgress) {
1400 parsingChunk_ = false;
1401 if (onChunkEnd) {
1403 }
1404 }
1405 return true;
1406 }
1407
1408 const auto maybeRecord = reader_.next();
1410 if (!maybeRecord.has_value()) {
1411 return false;
1412 }
1413 const Record& record = maybeRecord.value();
1414
1415 switch (record.opcode) {
1416 case OpCode::Header: {
1417 if (onHeader) {
1418 Header header;
1419 if (status_ = McapReader::ParseHeader(record, &header); status_.ok()) {
1420 onHeader(header, reader_.curRecordOffset());
1421 }
1422 }
1423 break;
1424 }
1425 case OpCode::Footer: {
1426 if (onFooter) {
1427 Footer footer;
1428 if (status_ = McapReader::ParseFooter(record, &footer); status_.ok()) {
1429 onFooter(footer, reader_.curRecordOffset());
1430 }
1431 }
1433 break;
1434 }
1435 case OpCode::Schema: {
1436 if (onSchema) {
1437 SchemaPtr schemaPtr = std::make_shared<Schema>();
1438 if (status_ = McapReader::ParseSchema(record, schemaPtr.get()); status_.ok()) {
1439 onSchema(schemaPtr, reader_.curRecordOffset(), std::nullopt);
1440 }
1441 }
1442 break;
1443 }
1444 case OpCode::Channel: {
1445 if (onChannel) {
1446 ChannelPtr channelPtr = std::make_shared<Channel>();
1447 if (status_ = McapReader::ParseChannel(record, channelPtr.get()); status_.ok()) {
1448 onChannel(channelPtr, reader_.curRecordOffset(), std::nullopt);
1449 }
1450 }
1451 break;
1452 }
1453 case OpCode::Message: {
1454 if (onMessage) {
1455 Message message;
1456 if (status_ = McapReader::ParseMessage(record, &message); status_.ok()) {
1457 onMessage(message, reader_.curRecordOffset(), std::nullopt);
1458 }
1459 }
1460 break;
1461 }
1462 case OpCode::Chunk: {
1463 if (onMessage || onChunk || onSchema || onChannel) {
1464 Chunk chunk;
1465 status_ = McapReader::ParseChunk(record, &chunk);
1466 if (!status_.ok()) {
1467 return true;
1468 }
1469 if (onChunk) {
1471 }
1472 if (onMessage || onSchema || onChannel) {
1473 const auto maybeCompression = McapReader::ParseCompression(chunk.compression);
1474 if (!maybeCompression.has_value()) {
1475 const auto msg =
1476 internal::StrCat("unrecognized compression \"", chunk.compression, "\"");
1478 return true;
1479 }
1480
1481 // Start iterating through this chunk
1482 chunkReader_.reset(chunk, maybeCompression.value());
1484 parsingChunk_ = true;
1485 }
1486 }
1487 break;
1488 }
1489 case OpCode::MessageIndex: {
1490 if (onMessageIndex) {
1491 MessageIndex messageIndex;
1492 if (status_ = McapReader::ParseMessageIndex(record, &messageIndex); status_.ok()) {
1493 onMessageIndex(messageIndex, reader_.curRecordOffset());
1494 }
1495 }
1496 break;
1497 }
1498 case OpCode::ChunkIndex: {
1499 if (onChunkIndex) {
1500 ChunkIndex chunkIndex;
1501 if (status_ = McapReader::ParseChunkIndex(record, &chunkIndex); status_.ok()) {
1502 onChunkIndex(chunkIndex, reader_.curRecordOffset());
1503 }
1504 }
1505 break;
1506 }
1507 case OpCode::Attachment: {
1508 if (onAttachment) {
1509 Attachment attachment;
1510 if (status_ = McapReader::ParseAttachment(record, &attachment); status_.ok()) {
1511 onAttachment(attachment, reader_.curRecordOffset());
1512 }
1513 }
1514 break;
1515 }
1517 if (onAttachmentIndex) {
1518 AttachmentIndex attachmentIndex;
1519 if (status_ = McapReader::ParseAttachmentIndex(record, &attachmentIndex); status_.ok()) {
1520 onAttachmentIndex(attachmentIndex, reader_.curRecordOffset());
1521 }
1522 }
1523 break;
1524 }
1525 case OpCode::Statistics: {
1526 if (onStatistics) {
1527 Statistics statistics;
1528 if (status_ = McapReader::ParseStatistics(record, &statistics); status_.ok()) {
1529 onStatistics(statistics, reader_.curRecordOffset());
1530 }
1531 }
1532 break;
1533 }
1534 case OpCode::Metadata: {
1535 if (onMetadata) {
1536 Metadata metadata;
1537 if (status_ = McapReader::ParseMetadata(record, &metadata); status_.ok()) {
1538 onMetadata(metadata, reader_.curRecordOffset());
1539 }
1540 }
1541 break;
1542 }
1543 case OpCode::MetadataIndex: {
1544 if (onMetadataIndex) {
1545 MetadataIndex metadataIndex;
1546 if (status_ = McapReader::ParseMetadataIndex(record, &metadataIndex); status_.ok()) {
1547 onMetadataIndex(metadataIndex, reader_.curRecordOffset());
1548 }
1549 }
1550 break;
1551 }
1552 case OpCode::SummaryOffset: {
1553 if (onSummaryOffset) {
1554 SummaryOffset summaryOffset;
1555 if (status_ = McapReader::ParseSummaryOffset(record, &summaryOffset); status_.ok()) {
1556 onSummaryOffset(summaryOffset, reader_.curRecordOffset());
1557 }
1558 }
1559 break;
1560 }
1561 case OpCode::DataEnd: {
1562 if (onDataEnd) {
1563 DataEnd dataEnd;
1564 if (status_ = McapReader::ParseDataEnd(record, &dataEnd); status_.ok()) {
1565 onDataEnd(dataEnd, reader_.curRecordOffset());
1566 }
1567 }
1568 break;
1569 }
1570 default:
1571 if (onUnknownRecord) {
1572 onUnknownRecord(record, reader_.curRecordOffset(), std::nullopt);
1573 }
1574 break;
1575 }
1576
1577 return true;
1578}
1579
1583
1585 return status_;
1586}
1587
1588// LinearMessageView ///////////////////////////////////////////////////////////
1589
1591 : mcapReader_(mcapReader)
1592 , dataStart_(0)
1593 , dataEnd_(0)
1594 , onProblem_(onProblem) {}
1595
1597 ByteOffset dataEnd, Timestamp startTime, Timestamp endTime,
1598 const ProblemCallback& onProblem)
1599 : mcapReader_(mcapReader)
1600 , dataStart_(dataStart)
1601 , dataEnd_(dataEnd)
1602 , readMessageOptions_(startTime, endTime)
1603 , onProblem_(onProblem) {}
1604
1606 ByteOffset dataStart, ByteOffset dataEnd,
1607 const ProblemCallback& onProblem)
1608 : mcapReader_(mcapReader)
1609 , dataStart_(dataStart)
1610 , dataEnd_(dataEnd)
1611 , readMessageOptions_(options)
1612 , onProblem_(onProblem) {}
1613
1620
1624
1625// LinearMessageView::Iterator /////////////////////////////////////////////////
1626
1628 : impl_(std::make_unique<Impl>(view)) {
1629 if (!impl_->has_value()) {
1630 impl_ = nullptr;
1631 }
1632}
1633
1635 : view_(view) {
1636 auto dataStart = view.dataStart_;
1637 auto dataEnd = view.dataEnd_;
1638 auto readMessageOptions = view.readMessageOptions_;
1639 if (readMessageOptions.readOrder == ReadMessageOptions::ReadOrder::FileOrder) {
1640 recordReader_.emplace(*(view_.mcapReader_.dataSource()), dataStart, dataEnd);
1641
1642 recordReader_->onSchema = [this](const SchemaPtr schema, ByteOffset,
1644 view_.mcapReader_.schemas_.insert_or_assign(schema->id, schema);
1645 };
1646 recordReader_->onChannel = [this](const ChannelPtr channel, ByteOffset,
1648 view_.mcapReader_.channels_.insert_or_assign(channel->id, channel);
1649 };
1650 recordReader_->onMessage = [this](const Message& message, ByteOffset messageStartOffset,
1651 std::optional<ByteOffset> chunkStartOffset) {
1652 RecordOffset offset;
1653 offset.chunkOffset = chunkStartOffset;
1654 offset.offset = messageStartOffset;
1655 onMessage(message, offset);
1656 };
1657 } else {
1658 indexedMessageReader_.emplace(view_.mcapReader_, readMessageOptions,
1660 std::placeholders::_1, std::placeholders::_2));
1661 }
1662
1663 increment();
1664}
1665
1666/**
1667 * @brief Receives a message from either the linear TypedRecordReader or IndexedMessageReader.
1668 * Sets `curMessageView` with the message along with its associated Channel and Schema.
1669 */
1671 // make sure the message is within the expected time range
1672 if (message.logTime < view_.readMessageOptions_.startTime) {
1673 return;
1674 }
1675 if (message.logTime >= view_.readMessageOptions_.endTime) {
1676 return;
1677 }
1678 auto maybeChannel = view_.mcapReader_.channel(message.channelId);
1679 if (!maybeChannel) {
1680 view_.onProblem_(
1682 internal::StrCat("message at log_time ", message.logTime, " (seq ", message.sequence,
1683 ") references missing channel id ", message.channelId)});
1684 return;
1685 }
1686
1687 auto& channel = *maybeChannel;
1688 // make sure the message is on the right topic
1689 if (view_.readMessageOptions_.topicFilter &&
1690 !view_.readMessageOptions_.topicFilter(channel.topic)) {
1691 return;
1692 }
1693 SchemaPtr maybeSchema;
1694 if (channel.schemaId != 0) {
1695 maybeSchema = view_.mcapReader_.schema(channel.schemaId);
1696 if (!maybeSchema) {
1697 view_.onProblem_(
1699 internal::StrCat("channel ", channel.id, " (", channel.topic,
1700 ") references missing schema id ", channel.schemaId)});
1701 return;
1702 }
1703 }
1704
1705 curMessage_ = message; // copy message, which may be a reference to a temporary
1706 curMessageView_.emplace(curMessage_, maybeChannel, maybeSchema, offset);
1707}
1708
1710 curMessageView_ = std::nullopt;
1711
1712 if (recordReader_.has_value()) {
1713 while (!curMessageView_.has_value()) {
1714 // Iterate through records until curMessageView_ gets filled with a value.
1715 const bool found = recordReader_->next();
1716
1717 // Surface any problem that may have occurred while reading
1718 auto& status = recordReader_->status();
1719 if (!status.ok()) {
1720 view_.onProblem_(status);
1721 }
1722
1723 if (!found) {
1724 recordReader_ = std::nullopt;
1725 return;
1726 }
1727 }
1728 } else if (indexedMessageReader_.has_value()) {
1729 while (!curMessageView_.has_value()) {
1730 // Iterate through records until curMessageView_ gets filled with a value.
1731 if (!indexedMessageReader_->next()) {
1732 // No message was found on last iteration - if this was because of an error,
1733 // alert with onProblem_.
1734 auto status = indexedMessageReader_->status();
1735 if (!status.ok()) {
1736 view_.onProblem_(status);
1737 }
1738 indexedMessageReader_ = std::nullopt;
1739 return;
1740 }
1741 }
1742 }
1743}
1744
1748
1750 return curMessageView_.has_value();
1751}
1752
1756
1760
1762 begun_ = true;
1763 impl_->increment();
1764 if (!impl_->has_value()) {
1765 impl_ = nullptr;
1766 }
1767 return *this;
1768}
1769
1771 ++*this;
1772}
1773
1775 if (a.impl_ == nullptr || b.impl_ == nullptr) {
1776 // special case for Iterator::end() == Iterator::end()
1777 return a.impl_ == b.impl_;
1778 }
1779 if (!a.begun_ && !b.begun_) {
1780 // special case for Iterator::begin() == Iterator::begin()
1781 // comparing iterators to the beginning of the same view should return true.
1782 return &(a.impl_->view_) == &(b.impl_->view_);
1783 }
1784 // In all other cases, compare by object identity.
1785 return &(a) == &(b);
1786}
1787
1789 return !(a == b);
1790}
1791
1793 if (startTime > endTime) {
1794 return Status(StatusCode::InvalidMessageReadOptions, "start time must be before end time");
1795 }
1796 return Status();
1797}
1798
1799// IndexedMessageReader ///////////////////////////////////////////////////////////
1801 McapReader& reader, const ReadMessageOptions& options,
1802 const std::function<void(const Message&, RecordOffset)> onMessage)
1803 : mcapReader_(reader)
1804 , recordReader_(*mcapReader_.dataSource(), 0, 0)
1805 , options_(options)
1806 , onMessage_(onMessage)
1807 , queue_(options_.readOrder == ReadMessageOptions::ReadOrder::ReverseLogTimeOrder) {
1808 auto chunkIndexes = mcapReader_.chunkIndexes();
1809 if (chunkIndexes.size() == 0) {
1811 if (!status_.ok()) {
1812 return;
1813 }
1814 chunkIndexes = mcapReader_.chunkIndexes();
1815 }
1816 if (chunkIndexes.size() == 0 ||
1817 std::all_of(chunkIndexes.begin(), chunkIndexes.end(), [](const ChunkIndex& ci) {
1818 return ci.messageIndexLength == 0;
1819 })) {
1821 "cannot read MCAP in time order with no message indexes");
1822 return;
1823 }
1824 for (const auto& [channelId, channel] : mcapReader_.channels()) {
1825 if (!options_.topicFilter || options_.topicFilter(channel->topic)) {
1826 selectedChannels_.insert(channelId);
1827 }
1828 }
1829 // Initialize the read job queue by finding all of the chunks that need to be read from.
1830 for (const auto& chunkIndex : mcapReader_.chunkIndexes()) {
1831 if (chunkIndex.messageStartTime >= options_.endTime) {
1832 // chunk starts after requested time range, skip it.
1833 continue;
1834 }
1835 if (chunkIndex.messageEndTime < options_.startTime) {
1836 // chunk end before requested time range starts, skip it.
1837 continue;
1838 }
1839 for (const auto& channelId : selectedChannels_) {
1840 if (chunkIndex.messageIndexOffsets.find(channelId) != chunkIndex.messageIndexOffsets.end()) {
1842 job.chunkStartOffset = chunkIndex.chunkStartOffset;
1844 chunkIndex.chunkStartOffset + chunkIndex.chunkLength + chunkIndex.messageIndexLength;
1845 job.messageStartTime = chunkIndex.messageStartTime;
1846 job.messageEndTime = chunkIndex.messageEndTime;
1847 queue_.push(std::move(job));
1848 break;
1849 }
1850 }
1851 }
1852}
1853
1855 for (size_t chunkReaderIndex = 0; chunkReaderIndex < chunkSlots_.size(); chunkReaderIndex++) {
1856 if (chunkSlots_[chunkReaderIndex].unreadMessages == 0) {
1857 return chunkReaderIndex;
1858 }
1859 }
1860 chunkSlots_.emplace_back();
1861 return chunkSlots_.size() - 1;
1862}
1863
1866 auto compression = McapReader::ParseCompression(chunk.compression);
1867 if (!compression.has_value()) {
1869 internal::StrCat("unrecognized compression: ", chunk.compression));
1870 return;
1871 }
1872 slot.decompressedChunk.clear();
1873 if (*compression == Compression::None) {
1874 slot.decompressedChunk.insert(slot.decompressedChunk.end(), &chunk.records[0],
1875 &chunk.records[chunk.uncompressedSize]);
1876 }
1877#ifndef MCAP_COMPRESSION_NO_LZ4
1878 else if (*compression == Compression::Lz4) {
1880 &slot.decompressedChunk);
1881 }
1882#endif
1883#ifndef MCAP_COMPRESSION_NO_ZSTD
1884 else if (*compression == Compression::Zstd) {
1886 &slot.decompressedChunk);
1887 }
1888#endif
1889 else {
1891 internal::StrCat("unhandled compression: ", chunk.compression));
1892 }
1893}
1894
1896 while (queue_.len() != 0) {
1897 auto nextItem = queue_.pop();
1898 if (std::holds_alternative<internal::DecompressChunkJob>(nextItem)) {
1899 const auto& decompressChunkJob = std::get<internal::DecompressChunkJob>(nextItem);
1900 // The job here is to decompress the chunk into a slot, then use the message
1901 // indices after the chunk to push ReadMessageJobs onto the queue for every message
1902 // in that chunk that needs to be read.
1903
1904 // First, find a chunk slot to decompress this chunk into.
1905 size_t chunkReaderIndex = findFreeChunkSlot();
1906 auto& chunkSlot = chunkSlots_[chunkReaderIndex];
1907 chunkSlot.chunkStartOffset = decompressChunkJob.chunkStartOffset;
1908 // Point the record reader at the chunk and message indices after it.
1909 recordReader_.reset(*mcapReader_.dataSource(), decompressChunkJob.chunkStartOffset,
1910 decompressChunkJob.messageIndexEndOffset);
1911 for (auto record = recordReader_.next(); record != std::nullopt;
1912 record = recordReader_.next()) {
1913 switch (record->opcode) {
1914 case OpCode::Chunk: {
1915 Chunk chunk;
1916 status_ = McapReader::ParseChunk(*record, &chunk);
1917 if (!status_.ok()) {
1918 return false;
1919 }
1920 decompressChunk(chunk, chunkSlot);
1921 if (!status_.ok()) {
1922 return false;
1923 }
1924 } break;
1925 case OpCode::MessageIndex: {
1926 MessageIndex messageIndex;
1927 status_ = McapReader::ParseMessageIndex(*record, &messageIndex);
1928 if (!status_.ok()) {
1929 return false;
1930 }
1931 if (selectedChannels_.find(messageIndex.channelId) != selectedChannels_.end()) {
1932 for (const auto& [timestamp, byteOffset] : messageIndex.records) {
1933 if (timestamp >= options_.startTime && timestamp < options_.endTime) {
1935 job.chunkReaderIndex = chunkReaderIndex;
1936 job.offset.offset = byteOffset;
1937 job.offset.chunkOffset = decompressChunkJob.chunkStartOffset;
1938 job.timestamp = timestamp;
1939 queue_.push(std::move(job));
1940 chunkSlot.unreadMessages++;
1941 }
1942 }
1943 }
1944 } break;
1945 default:
1947 internal::StrCat("expected only chunks and message indices, found ",
1948 OpCodeString(record->opcode)));
1949 return false;
1950 }
1951 }
1952 } else if (std::holds_alternative<internal::ReadMessageJob>(nextItem)) {
1953 // Read the message out of the already-decompressed chunk.
1954 const auto& readMessageJob = std::get<internal::ReadMessageJob>(nextItem);
1955 auto& chunkSlot = chunkSlots_[readMessageJob.chunkReaderIndex];
1956 assert(chunkSlot.unreadMessages > 0);
1957 chunkSlot.unreadMessages--;
1958 BufferReader reader;
1959 reader.reset(chunkSlot.decompressedChunk.data(), chunkSlot.decompressedChunk.size(),
1960 chunkSlot.decompressedChunk.size());
1961 recordReader_.reset(reader, readMessageJob.offset.offset, chunkSlot.decompressedChunk.size());
1962 auto record = recordReader_.next();
1964 if (!status_.ok()) {
1965 return false;
1966 }
1967 if (record->opcode != OpCode::Message) {
1968 status_ =
1970 internal::StrCat("expected a message record, got ", OpCodeString(record->opcode)));
1971 return false;
1972 }
1973 Message message;
1974 status_ = McapReader::ParseMessage(*record, &message);
1975 if (!status_.ok()) {
1976 return false;
1977 }
1978 onMessage_(message, readMessageJob.offset);
1979 return true;
1980 }
1981 }
1982 return false;
1983}
1984
1986 return status_;
1987}
1988
1989} // namespace mcap
T all_of(T... args)
T binary_search(T... args)
T bind(T... args)
A "null" compressed reader that directly passes through uncompressed data. No internal buffers are al...
Definition reader.hpp:135
const std::byte * data_
Definition reader.hpp:149
void reset(const std::byte *data, uint64_t size, uint64_t uncompressedSize) override
Reset the reader state, clearing any internal buffers and state, and initialize with new compressed d...
Definition reader.inl:20
uint64_t read(std::byte **output, uint64_t offset, uint64_t size) override
This method is called by MCAP reader classes when they need to read a portion of the file.
Definition reader.inl:27
Status status() const override
Report the current status of decompression. A StatusCode other than StatusCode::Success after reset()...
Definition reader.inl:41
uint64_t size() const override
Returns the size of the file in bytes.
Definition reader.inl:37
uint64_t size_
Definition reader.hpp:85
std::FILE * file_
Definition reader.hpp:83
uint64_t size() const override
Returns the size of the file in bytes.
Definition reader.inl:59
uint64_t read(std::byte **output, uint64_t offset, uint64_t size) override
This method is called by MCAP reader classes when they need to read a portion of the file.
Definition reader.inl:63
uint64_t position_
Definition reader.hpp:86
FileReader(std::FILE *file)
Definition reader.inl:47
std::vector< std::byte > buffer_
Definition reader.hpp:84
uint64_t read(std::byte **output, uint64_t offset, uint64_t size) override
This method is called by MCAP reader classes when they need to read a portion of the file.
Definition reader.inl:102
FileStreamReader(std::ifstream &stream)
Definition reader.inl:87
std::ifstream & stream_
Definition reader.hpp:100
uint64_t size() const override
Returns the size of the file in bytes.
Definition reader.inl:98
std::vector< std::byte > buffer_
Definition reader.hpp:101
An abstract interface for compressed readers.
Definition reader.hpp:109
virtual Status status() const =0
Report the current status of decompression. A StatusCode other than StatusCode::Success after reset()...
virtual void reset(const std::byte *data, uint64_t size, uint64_t uncompressedSize)=0
Reset the reader state, clearing any internal buffers and state, and initialize with new compressed d...
~LZ4Reader() override
Definition reader.inl:138
Status status() const override
Report the current status of decompression. A StatusCode other than StatusCode::Success after reset()...
Definition reader.inl:168
uint64_t size() const override
Returns the size of the file in bytes.
Definition reader.inl:164
uint64_t compressedSize_
Definition reader.hpp:225
ByteArray uncompressedData_
Definition reader.hpp:224
void * decompressionContext_
Definition reader.hpp:221
Status decompressAll(const std::byte *data, uint64_t size, uint64_t uncompressedSize, ByteArray *output)
Decompresses an entire LZ4-encoded chunk into output.
Definition reader.inl:171
const std::byte * compressedData_
Definition reader.hpp:223
void reset(const std::byte *data, uint64_t size, uint64_t uncompressedSize) override
Reset the reader state, clearing any internal buffers and state, and initialize with new compressed d...
Definition reader.inl:144
uint64_t uncompressedSize_
Definition reader.hpp:226
uint64_t read(std::byte **output, uint64_t offset, uint64_t size) override
This method is called by MCAP reader classes when they need to read a portion of the file.
Definition reader.inl:154
std::optional< TypedRecordReader > recordReader_
Definition reader.hpp:692
Impl(LinearMessageView &view)
Definition reader.inl:1634
std::optional< IndexedMessageReader > indexedMessageReader_
Definition reader.hpp:693
void onMessage(const Message &message, RecordOffset offset)
Receives a message from either the linear TypedRecordReader or IndexedMessageReader....
Definition reader.inl:1670
Provides a read interface to an MCAP file.
Definition reader.hpp:275
std::unique_ptr< FileReader > fileInput_
Definition reader.hpp:483
Status readSummarySection_(IReadable &reader)
Definition reader.inl:413
std::vector< ChunkIndex > chunkIndexes_
Definition reader.hpp:488
std::unordered_map< SchemaId, SchemaPtr > schemas_
Definition reader.hpp:492
const std::optional< Footer > & footer() const
Returns the parsed Footer record, if it has been encountered.
Definition reader.inl:616
static Status ParseSchema(const Record &record, Schema *schema)
Definition reader.inl:768
static Status ParseDataEnd(const Record &record, DataEnd *dataEnd)
Definition reader.inl:1194
std::pair< ByteOffset, ByteOffset > byteRange(Timestamp startTime, Timestamp endTime=MaxTime) const
Returns starting and ending byte offsets that must be read to iterate all messages in the given time ...
Definition reader.inl:587
static Status ParseFooter(const Record &record, Footer *footer)
Definition reader.inl:752
static Status ParseStatistics(const Record &record, Statistics *statistics)
Definition reader.inl:1094
static Status ParseChunkIndex(const Record &record, ChunkIndex *chunkIndex)
Definition reader.inl:931
std::optional< Footer > footer_
Definition reader.hpp:486
std::optional< Header > header_
Definition reader.hpp:485
const std::unordered_map< SchemaId, SchemaPtr > schemas() const
Returns all of the parsed Schema records. Call readSummary() first to fully populate this data struct...
Definition reader.inl:628
std::unique_ptr< FileStreamReader > fileStreamInput_
Definition reader.hpp:484
internal::IntervalTree< ByteOffset, ChunkIndex > chunkRanges_
Definition reader.hpp:489
const std::multimap< std::string, AttachmentIndex > & attachmentIndexes() const
Returns all of the parsed AttachmentIndex records. Call readSummary() first to fully populate this da...
Definition reader.inl:650
IReadable * input_
Definition reader.hpp:481
std::multimap< std::string, MetadataIndex > metadataIndexes_
Definition reader.hpp:491
static Status ParseAttachmentIndex(const Record &record, AttachmentIndex *attachmentIndex)
Definition reader.inl:1060
static Status ParseHeader(const Record &record, Header *header)
Definition reader.inl:729
static Status ParseMetadata(const Record &record, Metadata *metadata)
Definition reader.inl:1132
static Status ParseMessage(const Record &record, Message *message)
Definition reader.inl:846
ChannelPtr channel(ChannelId channelId) const
Look up a Channel record by channel ID. If the Channel has not been encountered yet or does not exist...
Definition reader.inl:632
static Status ParseMessageIndex(const Record &record, MessageIndex *messageIndex)
Definition reader.inl:904
static Status ParseSummaryOffset(const Record &record, SummaryOffset *summaryOffset)
Definition reader.inl:1178
std::optional< Statistics > statistics_
Definition reader.hpp:487
static Status ParseChunk(const Record &record, Chunk *chunk)
Definition reader.inl:864
std::unordered_map< ChannelId, ChannelPtr > channels_
Definition reader.hpp:493
std::FILE * file_
Definition reader.hpp:482
IReadable * dataSource()
Returns a pointer to the IReadable data source backing this reader. Will return nullptr if the reader...
Definition reader.inl:608
const std::unordered_map< ChannelId, ChannelPtr > channels() const
Returns all of the parsed Channel records. Call readSummary() first to fully populate this data struc...
Definition reader.inl:624
SchemaPtr schema(SchemaId schemaId) const
Look up a Schema record by schema ID. If the Schema has not been encountered yet or does not exist in...
Definition reader.inl:637
static Status ParseMetadataIndex(const Record &record, MetadataIndex *metadataIndex)
Definition reader.inl:1157
const std::vector< ChunkIndex > & chunkIndexes() const
Returns all of the parsed ChunkIndex records. Call readSummary() first to fully populate this data st...
Definition reader.inl:642
static Status ReadFooter(IReadable &reader, uint64_t offset, Footer *footer)
Definition reader.inl:694
ByteOffset dataStart_
Definition reader.hpp:494
Status readSummaryFromScan_(IReadable &reader)
Definition reader.inl:487
void close()
Closes the MCAP file, clearing any internal data structures and state and dropping the data source re...
Definition reader.inl:345
static std::optional< Compression > ParseCompression(const std::string_view compression)
Converts a compression string ("", "zstd", "lz4") to the Compression enum.
Definition reader.inl:1207
const std::optional< Statistics > & statistics() const
Returns the parsed Statistics record, if it has been encountered.
Definition reader.inl:620
static Status ParseChannel(const Record &record, Channel *channel)
Definition reader.inl:806
static Status ParseAttachment(const Record &record, Attachment *attachment)
Definition reader.inl:993
Status open(IReadable &reader)
Opens an MCAP file for reading from an already constructed IReadable implementation.
Definition reader.inl:273
ByteOffset dataEnd_
Definition reader.hpp:495
LinearMessageView readMessages(Timestamp startTime=0, Timestamp endTime=MaxTime)
Returns an iterable view with begin() and end() methods for iterating Messages in the MCAP file....
Definition reader.inl:562
std::multimap< std::string, AttachmentIndex > attachmentIndexes_
Definition reader.hpp:490
static Status ReadRecord(IReadable &reader, uint64_t offset, Record *record)
Definition reader.inl:654
Status readSummary(ReadSummaryMethod method, const ProblemCallback &onProblem=[](const Status &) {})
Read and parse the Summary section at the end of the MCAP file, if available. This will populate inte...
Definition reader.inl:369
const std::optional< Header > & header() const
Returns the parsed Header record, if it has been encountered.
Definition reader.inl:612
const std::multimap< std::string, MetadataIndex > & metadataIndexes() const
Returns all of the parsed MetadataIndex records. Call readSummary() first to fully populate this data...
Definition reader.inl:646
Status status() const override
Report the current status of decompression. A StatusCode other than StatusCode::Success after reset()...
Definition reader.inl:237
ByteArray uncompressedData_
Definition reader.hpp:185
void reset(const std::byte *data, uint64_t size, uint64_t uncompressedSize) override
Reset the reader state, clearing any internal buffers and state, and initialize with new compressed d...
Definition reader.inl:219
uint64_t read(std::byte **output, uint64_t offset, uint64_t size) override
This method is called by MCAP reader classes when they need to read a portion of the file.
Definition reader.inl:223
static Status DecompressAll(const std::byte *data, uint64_t compressedSize, uint64_t uncompressedSize, ByteArray *output)
Decompresses an entire Zstd-compressed chunk into output.
Definition reader.inl:241
uint64_t size() const override
Returns the size of the file in bytes.
Definition reader.inl:233
T clear(T... args)
T data(T... args)
T emplace_back(T... args)
T emplace(T... args)
T end(T... args)
T fclose(T... args)
T fflush(T... args)
T find(T... args)
T fopen(T... args)
T fread(T... args)
T fseek(T... args)
T ftell(T... args)
T gcount(T... args)
T get(T... args)
T insert(T... args)
T is_open(T... args)
T max(T... args)
T memcmp(T... args)
T min(T... args)
Status ParseByteArray(const std::byte *data, uint64_t maxSize, ByteArray *output)
Definition internal.hpp:131
uint64_t ParseUint64(const std::byte *data)
Definition internal.hpp:89
constexpr uint64_t FooterLength
Definition internal.hpp:18
std::string MagicToHex(const std::byte *data)
Definition internal.hpp:185
std::string StrCat(T &&... args)
Definition internal.hpp:45
constexpr uint64_t MinHeaderLength
Definition internal.hpp:13
uint32_t ParseUint32(const std::byte *data)
Definition internal.hpp:75
std::string ToHex(uint8_t byte)
Definition internal.hpp:25
uint16_t ParseUint16(const std::byte *data)
Definition internal.hpp:71
Status ParseString(const std::byte *data, uint64_t maxSize, std::string *output)
Definition internal.hpp:118
Status ParseKeyValueMap(const std::byte *data, uint64_t maxSize, KeyValueMap *output)
Definition internal.hpp:149
Definition crc32.hpp:5
StatusCode
Status codes for MCAP readers and writers.
Definition errors.hpp:10
uint16_t SchemaId
Definition types.hpp:19
Compression
Supported MCAP compression algorithms.
Definition types.hpp:37
bool CompareChunkIndexes(const ChunkIndex &a, const ChunkIndex &b)
Definition reader.inl:14
uint64_t Timestamp
Definition types.hpp:21
constexpr ByteOffset EndOffset
Definition types.hpp:31
ReadSummaryMethod
Definition reader.hpp:19
@ AllowFallbackScan
If the Summary section is missing or incomplete, allow falling back to reading the file sequentially ...
@ ForceScan
Read the file sequentially from Header to DataEnd to produce seeking indexes and summary statistics.
@ NoFallbackScan
Parse the Summary section to produce seeking indexes and summary statistics. If the Summary section i...
uint16_t ChannelId
Definition types.hpp:20
OpCode
MCAP record types.
Definition types.hpp:59
MCAP_PUBLIC constexpr std::string_view OpCodeString(OpCode opcode)
Get the string representation of an OpCode.
Definition types.inl:5
uint64_t ByteOffset
Definition types.hpp:22
constexpr uint8_t Magic[]
Definition types.hpp:29
T read(T... args)
T reserve(T... args)
T resize(T... args)
T seekg(T... args)
T size(T... args)
T sort(T... args)
Attachment Index records are found in the Summary section, providing summary information for a single...
Definition types.hpp:270
std::string mediaType
Definition types.hpp:277
ByteOffset length
Definition types.hpp:272
std::string name
Definition types.hpp:276
ByteOffset offset
Definition types.hpp:271
Timestamp createTime
Definition types.hpp:274
An Attachment is an arbitrary file embedded in an MCAP file, including a name, media type,...
Definition types.hpp:256
Timestamp createTime
Definition types.hpp:258
std::string name
Definition types.hpp:259
const std::byte * data
Definition types.hpp:262
Timestamp logTime
Definition types.hpp:257
uint32_t crc
Definition types.hpp:263
uint64_t dataSize
Definition types.hpp:261
std::string mediaType
Definition types.hpp:260
Describes a Channel that messages are written to. A Channel represents a single connection from a pub...
Definition types.hpp:159
Chunk Index records are found in the Summary section, providing summary information for a single Chun...
Definition types.hpp:239
Timestamp messageStartTime
Definition types.hpp:240
ByteOffset chunkStartOffset
Definition types.hpp:242
std::unordered_map< ChannelId, ByteOffset > messageIndexOffsets
Definition types.hpp:244
ByteOffset messageIndexLength
Definition types.hpp:245
ByteOffset compressedSize
Definition types.hpp:247
ByteOffset uncompressedSize
Definition types.hpp:248
Timestamp messageEndTime
Definition types.hpp:241
std::string compression
Definition types.hpp:246
ByteOffset chunkLength
Definition types.hpp:243
An collection of Schemas, Channels, and Messages that supports compression and indexing.
Definition types.hpp:215
ByteOffset uncompressedSize
Definition types.hpp:218
const std::byte * records
Definition types.hpp:222
ByteOffset compressedSize
Definition types.hpp:221
Timestamp messageEndTime
Definition types.hpp:217
Timestamp messageStartTime
Definition types.hpp:216
std::string compression
Definition types.hpp:220
uint32_t uncompressedCrc
Definition types.hpp:219
The final record in the Data section, signaling the end of Data and beginning of Summary....
Definition types.hpp:350
uint32_t dataSectionCrc
Definition types.hpp:351
The final record in an MCAP file (before the trailing magic byte sequence). Contains byte offsets fro...
Definition types.hpp:115
Appears at the beginning of every MCAP file (after the magic byte sequence) and contains the recordin...
Definition types.hpp:103
An abstract interface for reading MCAP data.
Definition reader.hpp:43
virtual uint64_t read(std::byte **output, uint64_t offset, uint64_t size)=0
This method is called by MCAP reader classes when they need to read a portion of the file.
virtual uint64_t size() const =0
Returns the size of the file in bytes.
RecordReader recordReader_
Definition reader.hpp:642
IndexedMessageReader(McapReader &reader, const ReadMessageOptions &options, const std::function< void(const Message &, RecordOffset)> onMessage)
Definition reader.inl:1800
std::vector< ChunkSlot > chunkSlots_
Definition reader.hpp:650
std::function< void(const Message &, RecordOffset)> onMessage_
Definition reader.hpp:648
Status status() const
gets the status of the reader.
Definition reader.inl:1985
internal::ReadJobQueue queue_
Definition reader.hpp:649
void decompressChunk(const Chunk &chunk, ChunkSlot &slot)
Definition reader.inl:1864
ReadMessageOptions options_
Definition reader.hpp:646
std::unordered_set< ChannelId > selectedChannels_
Definition reader.hpp:647
bool next()
reads the next message out of the MCAP.
Definition reader.inl:1895
MCAP_PUBLIC friend bool operator!=(const Iterator &a, const Iterator &b)
Definition reader.inl:1788
std::unique_ptr< Impl > impl_
Definition reader.hpp:702
MCAP_PUBLIC friend bool operator==(const Iterator &a, const Iterator &b)
Definition reader.inl:1774
An iterable view of Messages in an MCAP file.
Definition reader.hpp:656
ReadMessageOptions readMessageOptions_
Definition reader.hpp:723
LinearMessageView(McapReader &mcapReader, const ProblemCallback &onProblem)
Definition reader.inl:1590
McapReader & mcapReader_
Definition reader.hpp:720
A list of timestamps to byte offsets for a single Channel. This record appears after each Chunk,...
Definition types.hpp:229
ChannelId channelId
Definition types.hpp:230
std::vector< std::pair< Timestamp, ByteOffset > > records
Definition types.hpp:231
Returned when iterating over Messages in a file, MessageView contains a reference to one Message,...
Definition types.hpp:388
A single Message published to a Channel.
Definition types.hpp:182
Timestamp logTime
Nanosecond timestamp when this message was recorded or received for recording.
Definition types.hpp:193
uint32_t sequence
An optional sequence number. If non-zero, sequence numbers should be unique per channel and increasin...
Definition types.hpp:188
const std::byte * data
A pointer to the message payload. For readers, this pointer is only valid for the lifetime of an onMe...
Definition types.hpp:208
Timestamp publishTime
Nanosecond timestamp when this message was initially published. If not available, this should be set ...
Definition types.hpp:198
ChannelId channelId
Definition types.hpp:183
uint64_t dataSize
Size of the message payload in bytes, pointed to via data.
Definition types.hpp:202
Metadata Index records are found in the Summary section, providing summary information for a single M...
Definition types.hpp:325
std::string name
Definition types.hpp:328
Holds a named map of key/value strings containing arbitrary user data. Metadata records are found in ...
Definition types.hpp:316
std::string name
Definition types.hpp:317
KeyValueMap metadata
Definition types.hpp:318
Options for reading messages out of an MCAP file.
Definition reader.hpp:235
Timestamp startTime
Only messages with log timestamps greater or equal to startTime will be included.
Definition reader.hpp:240
std::function< bool(std::string_view)> topicFilter
If provided, topicFilter is called on all topics found in the MCAP file. If topicFilter returns true ...
Definition reader.hpp:250
Status validate() const
validate the configuration.
Definition reader.inl:1792
Timestamp endTime
Only messages with log timestamps less than endTime will be included.
Definition reader.hpp:244
std::optional< ByteOffset > chunkOffset
Definition types.hpp:356
ByteOffset offset
Definition types.hpp:355
RecordReader(IReadable &dataSource, ByteOffset startOffset, ByteOffset endOffset=EndOffset)
Definition reader.inl:1221
const Status & status() const
Definition reader.inl:1249
void reset(IReadable &dataSource, ByteOffset startOffset, ByteOffset endOffset)
Definition reader.inl:1228
ByteOffset offset
Definition reader.hpp:508
std::optional< Record > next()
Definition reader.inl:1236
IReadable * dataSource_
Definition reader.hpp:522
ByteOffset curRecordOffset() const
Definition reader.inl:1253
ByteOffset endOffset
Definition reader.hpp:509
A generic Type-Length-Value record using a uint8 type and uint64 length. This is the generic form of ...
Definition types.hpp:87
uint64_t dataSize
Definition types.hpp:89
uint64_t recordSize() const
Definition types.hpp:92
std::byte * data
Definition types.hpp:90
OpCode opcode
Definition types.hpp:88
Describes a schema used for message encoding and decoding and/or describing the shape of messages....
Definition types.hpp:132
The Statistics record is found in the Summary section, providing counts and timestamp ranges for the ...
Definition types.hpp:300
Timestamp messageStartTime
Definition types.hpp:307
Wraps a status code and string message carrying additional context.
Definition errors.hpp:36
bool ok() const
Definition errors.hpp:115
Summary Offset records are found in the Summary Offset section. Records in the Summary section are gr...
Definition types.hpp:340
ByteOffset groupLength
Definition types.hpp:343
ByteOffset groupStart
Definition types.hpp:342
const Status & status() const
Definition reader.inl:1362
void reset(const Chunk &chunk, Compression compression)
Definition reader.inl:1263
std::function< void(const Message &, ByteOffset)> onMessage
Definition reader.hpp:530
std::function< void(const SchemaPtr, ByteOffset)> onSchema
Definition reader.hpp:528
std::function< void(const Record &, ByteOffset)> onUnknownRecord
Definition reader.hpp:531
std::function< void(const ChannelPtr, ByteOffset)> onChannel
Definition reader.hpp:529
RecordReader reader_
Definition reader.hpp:548
BufferReader uncompressedReader_
Definition reader.hpp:550
ByteOffset offset() const
Definition reader.inl:1358
ZStdReader zstdReader_
Definition reader.hpp:555
A mid-level interface for parsing and validating MCAP records from a data source.
Definition reader.hpp:563
std::function< void(const Message &, ByteOffset, std::optional< ByteOffset >)> onMessage
Definition reader.hpp:568
std::function< void(const Metadata &, ByteOffset)> onMetadata
Definition reader.hpp:575
TypedRecordReader(IReadable &dataSource, ByteOffset startOffset, ByteOffset endOffset=EndOffset)
Definition reader.inl:1368
ByteOffset offset() const
Definition reader.inl:1580
std::function< void(ByteOffset)> onChunkEnd
Definition reader.hpp:580
std::function< void(const Record &, ByteOffset, std::optional< ByteOffset >)> onUnknownRecord
Definition reader.hpp:579
std::function< void(const SummaryOffset &, ByteOffset)> onSummaryOffset
Definition reader.hpp:577
std::function< void(const Footer &, ByteOffset)> onFooter
Definition reader.hpp:565
std::function< void(const DataEnd &, ByteOffset)> onDataEnd
Definition reader.hpp:578
std::function< void(const ChannelPtr, ByteOffset, std::optional< ByteOffset >)> onChannel
Definition reader.hpp:567
std::function< void(const Attachment &, ByteOffset)> onAttachment
Definition reader.hpp:572
RecordReader reader_
Definition reader.hpp:597
std::function< void(const ChunkIndex &, ByteOffset)> onChunkIndex
Definition reader.hpp:571
std::function< void(const MetadataIndex &, ByteOffset)> onMetadataIndex
Definition reader.hpp:576
const Status & status() const
Definition reader.inl:1584
std::function< void(const Chunk &, ByteOffset)> onChunk
Definition reader.hpp:569
TypedChunkReader chunkReader_
Definition reader.hpp:598
std::function< void(const Statistics &, ByteOffset)> onStatistics
Definition reader.hpp:574
std::function< void(const AttachmentIndex &, ByteOffset)> onAttachmentIndex
Definition reader.hpp:573
std::function< void(const SchemaPtr, ByteOffset, std::optional< ByteOffset >)> onSchema
Definition reader.hpp:566
std::function< void(const Header &, ByteOffset)> onHeader
Definition reader.hpp:564
std::function< void(const MessageIndex &, ByteOffset)> onMessageIndex
Definition reader.hpp:570
A job to decompress the chunk starting at chunkStartOffset. The message indices starting directly aft...
void push(DecompressChunkJob &&decompressChunkJob)
A job to read a specific message at offset offset from the decompressed chunk stored in chunkReaderIn...
T tellg(T... args)