4#ifndef MCAP_COMPRESSION_NO_LZ4
7#ifndef MCAP_COMPRESSION_NO_ZSTD
9# include <zstd_errors.h>
21 (void)uncompressedSize;
22 assert(
size == uncompressedSize);
32 const auto available =
size_ - offset;
64 if (offset >=
size_) {
103 if (offset >=
size_) {
126#ifndef MCAP_COMPRESSION_NO_LZ4
128 const LZ4F_errorCode_t err =
130 if (LZ4F_isError(err)) {
132 internal::StrCat(
"failed to create lz4 decompression context: ", LZ4F_getErrorName(err));
172 uint64_t uncompressedSize,
ByteArray* output) {
178 output->
resize(uncompressedSize);
180 size_t dstSize = uncompressedSize;
181 size_t srcSize = compressedSize;
184 data, &srcSize,
nullptr);
186 if (LZ4F_isError(
status)) {
187 const auto msg =
internal::StrCat(
"lz4 decompression of ", compressedSize,
" bytes into ",
188 uncompressedSize,
" output bytes failed with error ",
193 internal::StrCat(
"lz4 decompression of ", compressedSize,
" bytes into ", uncompressedSize,
194 " incomplete: consumed ", srcSize,
" and produced ", dstSize,
195 " bytes so far, expect ",
status,
" more input bytes");
199 }
else if (srcSize != compressedSize) {
201 internal::StrCat(
"lz4 decompression of ", compressedSize,
" bytes into ", uncompressedSize,
202 " output bytes only consumed ", srcSize,
" bytes");
205 }
else if (dstSize != uncompressedSize) {
207 internal::StrCat(
"lz4 decompression of ", compressedSize,
" bytes into ", uncompressedSize,
208 " output bytes only produced ", dstSize,
" bytes");
218#ifndef MCAP_COMPRESSION_NO_ZSTD
242 uint64_t uncompressedSize,
ByteArray* output) {
246 output->
resize(uncompressedSize);
248 const auto status = ZSTD_decompress(output->
data(), uncompressedSize, data, compressedSize);
249 if (
status != uncompressedSize) {
250 if (ZSTD_isError(
status)) {
252 internal::StrCat(
"zstd decompression of ", compressedSize,
" bytes into ", uncompressedSize,
253 " output bytes failed with error ", ZSTD_getErrorName(
status));
257 internal::StrCat(
"zstd decompression of ", compressedSize,
" bytes into ", uncompressedSize,
258 " output bytes only produced ",
status,
" bytes");
276 const uint64_t fileSize = reader.
size();
286 bytesRead = reader.
read(&data, 0,
sizeof(
Magic) + 1 + 8 + 4);
287 if (bytesRead !=
sizeof(
Magic) + 1 + 8 + 4) {
300 if (
auto status =
ReadRecord(reader,
sizeof(
Magic), &record); !status.ok()) {
405 chunkIntervals.
emplace_back(chunkIndex.messageStartTime, chunkIndex.messageEndTime, chunkIndex);
414 const uint64_t fileSize = reader.
size();
429 if (summaryOffsetStart < summaryStart) {
430 const auto msg =
internal::StrCat(
"summary_offset_start ", summaryOffsetStart,
431 " < summary_start ", summaryStart);
440 bool readStatistics =
false;
443 schemas_.try_emplace(schemaPtr->id, schemaPtr);
446 channels_.try_emplace(channelPtr->id, channelPtr);
462 const bool needsSorting =
473 readStatistics =
true;
476 while (typedReader.next()) {
477 const auto& status = typedReader.status();
500 schemas_.try_emplace(schemaPtr->id, schemaPtr);
503 channels_.try_emplace(channelPtr->id, channelPtr);
517 chunkIndex.chunkStartOffset = fileOffset;
518 chunkIndex.chunkLength =
520 chunkIndex.messageIndexLength = 0;
542 while (!done && typedReader.next()) {
543 const auto& status = typedReader.status();
563 const auto onProblem = [](
const Status&) {};
595 chunkRanges_.visit_overlapping(startTime, endTime, [&](
const auto& interval) {
596 const auto& chunkIndex = interval.value;
597 dataStart =
std::min(dataStart, chunkIndex.chunkStartOffset);
598 dataEnd =
std::max(dataEnd, chunkIndex.chunkStartOffset + chunkIndex.chunkLength);
600 dataEnd =
std::max(dataEnd, dataStart);
602 if (dataStart == dataEnd) {
605 return {dataStart, dataEnd};
633 const auto& maybeChannel =
channels_.find(channelId);
634 return (maybeChannel ==
channels_.end()) ? nullptr : maybeChannel->second;
638 const auto& maybeSchema =
schemas_.find(schemaId);
639 return (maybeSchema ==
schemas_.end()) ? nullptr : maybeSchema->second;
656 auto maxSize = reader.
size() - offset;
659 internal::StrCat(
"cannot read record at offset ", offset,
", ", maxSize,
" bytes remaining");
665 uint64_t bytesRead = reader.
read(&data, offset, 9);
666 if (bytesRead != 9) {
676 if (maxSize < record->dataSize) {
678 " at offset ", offset,
" has length ", record->
dataSize,
679 " but only ", maxSize,
" bytes remaining");
683 if (bytesRead != record->
dataSize) {
687 " but only read ", bytesRead,
" bytes");
718 if (length != 8 + 8 + 4) {
730 constexpr uint64_t MinSize = 4 + 4;
742 const size_t libraryOffset = 4 +
header->profile.size();
743 const std::byte* libraryData = &(record.
data[libraryOffset]);
744 const size_t maxSize = record.
dataSize - libraryOffset;
753 constexpr uint64_t FooterSize = 8 + 8 + 4;
756 if (record.
dataSize != FooterSize) {
769 constexpr uint64_t MinSize = 2 + 4 + 4 + 4;
788 offset += 4 +
schema->name.size();
795 offset += 4 +
schema->encoding.size();
807 constexpr uint64_t MinSize = 2 + 4 + 4 + 2 + 4;
829 offset += 4 +
channel->topic.size();
836 offset += 4 +
channel->messageEncoding.size();
847 constexpr uint64_t MessagePreambleSize = 2 + 4 + 8 + 8;
850 if (record.
dataSize < MessagePreambleSize) {
860 message->
data = record.
data + MessagePreambleSize;
865 constexpr uint64_t ChunkPreambleSize = 8 + 8 + 8 + 4 + 4;
868 if (record.
dataSize < ChunkPreambleSize) {
878 size_t offset = 8 + 8 + 8 + 4;
905 constexpr uint64_t PreambleSize = 2 + 4;
908 if (record.
dataSize < PreambleSize) {
916 if (recordsSize % 16 != 0 || recordsSize > record.
dataSize - PreambleSize) {
917 const auto msg =
internal::StrCat(
"invalid MessageIndex.records length: ", recordsSize);
921 const size_t recordsCount = size_t(recordsSize / 16);
923 for (
size_t i = 0; i < recordsCount; ++i) {
932 constexpr uint64_t PreambleSize = 8 + 8 + 8 + 8 + 4;
935 if (record.
dataSize < PreambleSize) {
946 if (messageIndexOffsetsSize % 10 != 0 ||
947 messageIndexOffsetsSize > record.
dataSize - PreambleSize) {
949 internal::StrCat(
"invalid ChunkIndex.message_index_offsets length:", messageIndexOffsetsSize);
953 const size_t messageIndexOffsetsCount = size_t(messageIndexOffsetsSize / 10);
955 for (
size_t i = 0; i < messageIndexOffsetsCount; ++i) {
961 uint64_t offset = PreambleSize + messageIndexOffsetsSize;
994 constexpr uint64_t MinSize = 8 +
1007 uint32_t offset = 0;
1028 offset += 4 + (uint32_t)(attachment->
name.
size());
1048 attachment->
data = record.
data + offset;
1049 offset += (uint32_t)(attachment->
dataSize);
1061 constexpr uint64_t PreambleSize = 8 + 8 + 8 + 8 + 8 + 4;
1064 if (record.
dataSize < PreambleSize) {
1075 uint32_t offset = 8 + 8 + 8 + 8 + 8;
1079 &attachmentIndex->
name);
1083 offset += 4 + (uint32_t)(attachmentIndex->
name.
size());
1095 constexpr uint64_t PreambleSize = 8 + 2 + 4 + 4 + 4 + 4 + 8 + 8 + 4;
1098 if (record.
dataSize < PreambleSize) {
1112 const uint32_t channelMessageCountsSize =
1114 if (channelMessageCountsSize % 10 != 0 ||
1115 channelMessageCountsSize > record.
dataSize - PreambleSize) {
1117 internal::StrCat(
"invalid Statistics.channelMessageCounts length:", channelMessageCountsSize);
1121 const size_t channelMessageCountsCount = size_t(channelMessageCountsSize / 10);
1122 statistics->channelMessageCounts.reserve(channelMessageCountsCount);
1123 for (
size_t i = 0; i < channelMessageCountsCount; ++i) {
1126 statistics->channelMessageCounts.emplace(channelId, messageCount);
1133 constexpr uint64_t MinSize = 4 + 4;
1146 uint64_t offset = 4 + metadata->
name.
size();
1158 constexpr uint64_t PreambleSize = 8 + 8 + 4;
1161 if (record.
dataSize < PreambleSize) {
1168 uint64_t offset = 8 + 8;
1179 constexpr uint64_t MinSize = 1 + 8 + 8;
1195 constexpr uint64_t MinSize = 4;
1208 if (compression ==
"") {
1210 }
else if (compression ==
"lz4") {
1212 }
else if (compression ==
"zstd") {
1215 return std::nullopt;
1222 : offset(startOffset)
1223 , endOffset(endOffset)
1224 , dataSource_(&dataSource)
1230 this->
offset = startOffset;
1238 return std::nullopt;
1243 return std::nullopt;
1260 : reader_{uncompressedReader_, 0, 0}
1266 switch (compression) {
1267#ifndef MCAP_COMPRESSION_NO_LZ4
1272#ifndef MCAP_COMPRESSION_NO_ZSTD
1293 if (!maybeRecord.has_value()) {
1296 const Record& record = maybeRecord.value();
1300 SchemaPtr schemaPtr = std::make_shared<Schema>();
1310 ChannelPtr channelPtr = std::make_shared<Channel>();
1370 : reader_(dataSource, startOffset,
std::min(endOffset, dataSource.size()))
1372 , parsingChunk_(false) {
1399 if (!chunkInProgress) {
1410 if (!maybeRecord.has_value()) {
1413 const Record& record = maybeRecord.value();
1437 SchemaPtr schemaPtr = std::make_shared<Schema>();
1446 ChannelPtr channelPtr = std::make_shared<Channel>();
1474 if (!maybeCompression.has_value()) {
1591 : mcapReader_(mcapReader)
1594 , onProblem_(onProblem) {}
1599 : mcapReader_(mcapReader)
1600 , dataStart_(dataStart)
1602 , readMessageOptions_(startTime, endTime)
1603 , onProblem_(onProblem) {}
1608 : mcapReader_(mcapReader)
1609 , dataStart_(dataStart)
1611 , readMessageOptions_(options)
1612 , onProblem_(onProblem) {}
1628 : impl_(
std::make_unique<
Impl>(view)) {
1629 if (!
impl_->has_value()) {
1654 offset.
offset = messageStartOffset;
1660 std::placeholders::_1, std::placeholders::_2));
1672 if (message.
logTime < view_.readMessageOptions_.startTime) {
1675 if (message.
logTime >= view_.readMessageOptions_.endTime) {
1678 auto maybeChannel = view_.mcapReader_.channel(message.
channelId);
1679 if (!maybeChannel) {
1683 ") references missing channel id ", message.
channelId)});
1687 auto& channel = *maybeChannel;
1689 if (view_.readMessageOptions_.topicFilter &&
1690 !view_.readMessageOptions_.topicFilter(channel.topic)) {
1694 if (channel.schemaId != 0) {
1695 maybeSchema = view_.mcapReader_.schema(channel.schemaId);
1700 ") references missing schema id ", channel.schemaId)});
1705 curMessage_ = message;
1706 curMessageView_.emplace(curMessage_, maybeChannel, maybeSchema, offset);
1710 curMessageView_ = std::nullopt;
1712 if (recordReader_.has_value()) {
1713 while (!curMessageView_.has_value()) {
1715 const bool found = recordReader_->next();
1718 auto& status = recordReader_->status();
1720 view_.onProblem_(status);
1724 recordReader_ = std::nullopt;
1728 }
else if (indexedMessageReader_.has_value()) {
1729 while (!curMessageView_.has_value()) {
1731 if (!indexedMessageReader_->next()) {
1734 auto status = indexedMessageReader_->status();
1736 view_.onProblem_(status);
1738 indexedMessageReader_ = std::nullopt;
1746 return *curMessageView_;
1750 return curMessageView_.has_value();
1754 return impl_->dereference();
1758 return &
impl_->dereference();
1764 if (!
impl_->has_value()) {
1775 if (a.
impl_ ==
nullptr || b.
impl_ ==
nullptr) {
1782 return &(a.
impl_->view_) == &(b.
impl_->view_);
1785 return &(a) == &(b);
1793 if (startTime > endTime) {
1806 , onMessage_(onMessage)
1809 if (chunkIndexes.size() == 0) {
1816 if (chunkIndexes.size() == 0 ||
1818 return ci.messageIndexLength == 0;
1821 "cannot read MCAP in time order with no message indexes");
1840 if (chunkIndex.messageIndexOffsets.find(channelId) != chunkIndex.messageIndexOffsets.end()) {
1844 chunkIndex.chunkStartOffset + chunkIndex.chunkLength + chunkIndex.messageIndexLength;
1855 for (
size_t chunkReaderIndex = 0; chunkReaderIndex <
chunkSlots_.size(); chunkReaderIndex++) {
1856 if (
chunkSlots_[chunkReaderIndex].unreadMessages == 0) {
1857 return chunkReaderIndex;
1867 if (!compression.has_value()) {
1877#ifndef MCAP_COMPRESSION_NO_LZ4
1883#ifndef MCAP_COMPRESSION_NO_ZSTD
1898 if (std::holds_alternative<internal::DecompressChunkJob>(nextItem)) {
1899 const auto& decompressChunkJob = std::get<internal::DecompressChunkJob>(nextItem);
1907 chunkSlot.chunkStartOffset = decompressChunkJob.chunkStartOffset;
1910 decompressChunkJob.messageIndexEndOffset);
1913 switch (record->opcode) {
1932 for (
const auto& [timestamp, byteOffset] : messageIndex.
records) {
1940 chunkSlot.unreadMessages++;
1952 }
else if (std::holds_alternative<internal::ReadMessageJob>(nextItem)) {
1954 const auto& readMessageJob = std::get<internal::ReadMessageJob>(nextItem);
1955 auto& chunkSlot =
chunkSlots_[readMessageJob.chunkReaderIndex];
1956 assert(chunkSlot.unreadMessages > 0);
1957 chunkSlot.unreadMessages--;
1959 reader.
reset(chunkSlot.decompressedChunk.data(), chunkSlot.decompressedChunk.size(),
1960 chunkSlot.decompressedChunk.size());
T binary_search(T... args)
A "null" compressed reader that directly passes through uncompressed data. No internal buffers are al...
void reset(const std::byte *data, uint64_t size, uint64_t uncompressedSize) override
Reset the reader state, clearing any internal buffers and state, and initialize with new compressed d...
uint64_t read(std::byte **output, uint64_t offset, uint64_t size) override
This method is called by MCAP reader classes when they need to read a portion of the file.
Status status() const override
Report the current status of decompression. A StatusCode other than StatusCode::Success after reset()...
uint64_t size() const override
Returns the size of the file in bytes.
uint64_t size() const override
Returns the size of the file in bytes.
uint64_t read(std::byte **output, uint64_t offset, uint64_t size) override
This method is called by MCAP reader classes when they need to read a portion of the file.
FileReader(std::FILE *file)
std::vector< std::byte > buffer_
uint64_t read(std::byte **output, uint64_t offset, uint64_t size) override
This method is called by MCAP reader classes when they need to read a portion of the file.
FileStreamReader(std::ifstream &stream)
uint64_t size() const override
Returns the size of the file in bytes.
std::vector< std::byte > buffer_
An abstract interface for compressed readers.
virtual Status status() const =0
Report the current status of decompression. A StatusCode other than StatusCode::Success after reset()...
virtual void reset(const std::byte *data, uint64_t size, uint64_t uncompressedSize)=0
Reset the reader state, clearing any internal buffers and state, and initialize with new compressed d...
Status status() const override
Report the current status of decompression. A StatusCode other than StatusCode::Success after reset()...
uint64_t size() const override
Returns the size of the file in bytes.
ByteArray uncompressedData_
void * decompressionContext_
Status decompressAll(const std::byte *data, uint64_t size, uint64_t uncompressedSize, ByteArray *output)
Decompresses an entire LZ4-encoded chunk into output.
const std::byte * compressedData_
void reset(const std::byte *data, uint64_t size, uint64_t uncompressedSize) override
Reset the reader state, clearing any internal buffers and state, and initialize with new compressed d...
uint64_t uncompressedSize_
uint64_t read(std::byte **output, uint64_t offset, uint64_t size) override
This method is called by MCAP reader classes when they need to read a portion of the file.
reference dereference() const
std::optional< TypedRecordReader > recordReader_
Impl(LinearMessageView &view)
LinearMessageView & view_
std::optional< IndexedMessageReader > indexedMessageReader_
void onMessage(const Message &message, RecordOffset offset)
Receives a message from either the linear TypedRecordReader or IndexedMessageReader....
Provides a read interface to an MCAP file.
std::unique_ptr< FileReader > fileInput_
Status readSummarySection_(IReadable &reader)
std::vector< ChunkIndex > chunkIndexes_
std::unordered_map< SchemaId, SchemaPtr > schemas_
const std::optional< Footer > & footer() const
Returns the parsed Footer record, if it has been encountered.
static Status ParseSchema(const Record &record, Schema *schema)
static Status ParseDataEnd(const Record &record, DataEnd *dataEnd)
std::pair< ByteOffset, ByteOffset > byteRange(Timestamp startTime, Timestamp endTime=MaxTime) const
Returns starting and ending byte offsets that must be read to iterate all messages in the given time ...
static Status ParseFooter(const Record &record, Footer *footer)
static Status ParseStatistics(const Record &record, Statistics *statistics)
static Status ParseChunkIndex(const Record &record, ChunkIndex *chunkIndex)
std::optional< Footer > footer_
std::optional< Header > header_
const std::unordered_map< SchemaId, SchemaPtr > schemas() const
Returns all of the parsed Schema records. Call readSummary() first to fully populate this data struct...
std::unique_ptr< FileStreamReader > fileStreamInput_
internal::IntervalTree< ByteOffset, ChunkIndex > chunkRanges_
const std::multimap< std::string, AttachmentIndex > & attachmentIndexes() const
Returns all of the parsed AttachmentIndex records. Call readSummary() first to fully populate this da...
std::multimap< std::string, MetadataIndex > metadataIndexes_
static Status ParseAttachmentIndex(const Record &record, AttachmentIndex *attachmentIndex)
static Status ParseHeader(const Record &record, Header *header)
static Status ParseMetadata(const Record &record, Metadata *metadata)
static Status ParseMessage(const Record &record, Message *message)
ChannelPtr channel(ChannelId channelId) const
Look up a Channel record by channel ID. If the Channel has not been encountered yet or does not exist...
static Status ParseMessageIndex(const Record &record, MessageIndex *messageIndex)
static Status ParseSummaryOffset(const Record &record, SummaryOffset *summaryOffset)
std::optional< Statistics > statistics_
static Status ParseChunk(const Record &record, Chunk *chunk)
std::unordered_map< ChannelId, ChannelPtr > channels_
IReadable * dataSource()
Returns a pointer to the IReadable data source backing this reader. Will return nullptr if the reader...
const std::unordered_map< ChannelId, ChannelPtr > channels() const
Returns all of the parsed Channel records. Call readSummary() first to fully populate this data struc...
SchemaPtr schema(SchemaId schemaId) const
Look up a Schema record by schema ID. If the Schema has not been encountered yet or does not exist in...
static Status ParseMetadataIndex(const Record &record, MetadataIndex *metadataIndex)
const std::vector< ChunkIndex > & chunkIndexes() const
Returns all of the parsed ChunkIndex records. Call readSummary() first to fully populate this data st...
static Status ReadFooter(IReadable &reader, uint64_t offset, Footer *footer)
Status readSummaryFromScan_(IReadable &reader)
void close()
Closes the MCAP file, clearing any internal data structures and state and dropping the data source re...
static std::optional< Compression > ParseCompression(const std::string_view compression)
Converts a compression string ("", "zstd", "lz4") to the Compression enum.
const std::optional< Statistics > & statistics() const
Returns the parsed Statistics record, if it has been encountered.
static Status ParseChannel(const Record &record, Channel *channel)
static Status ParseAttachment(const Record &record, Attachment *attachment)
Status open(IReadable &reader)
Opens an MCAP file for reading from an already constructed IReadable implementation.
LinearMessageView readMessages(Timestamp startTime=0, Timestamp endTime=MaxTime)
Returns an iterable view with begin() and end() methods for iterating Messages in the MCAP file....
std::multimap< std::string, AttachmentIndex > attachmentIndexes_
static Status ReadRecord(IReadable &reader, uint64_t offset, Record *record)
Status readSummary(ReadSummaryMethod method, const ProblemCallback &onProblem=[](const Status &) {})
Read and parse the Summary section at the end of the MCAP file, if available. This will populate inte...
const std::optional< Header > & header() const
Returns the parsed Header record, if it has been encountered.
const std::multimap< std::string, MetadataIndex > & metadataIndexes() const
Returns all of the parsed MetadataIndex records. Call readSummary() first to fully populate this data...
Status status() const override
Report the current status of decompression. A StatusCode other than StatusCode::Success after reset()...
ByteArray uncompressedData_
void reset(const std::byte *data, uint64_t size, uint64_t uncompressedSize) override
Reset the reader state, clearing any internal buffers and state, and initialize with new compressed d...
uint64_t read(std::byte **output, uint64_t offset, uint64_t size) override
This method is called by MCAP reader classes when they need to read a portion of the file.
static Status DecompressAll(const std::byte *data, uint64_t compressedSize, uint64_t uncompressedSize, ByteArray *output)
Decompresses an entire Zstd-compressed chunk into output.
uint64_t size() const override
Returns the size of the file in bytes.
T emplace_back(T... args)
Status ParseByteArray(const std::byte *data, uint64_t maxSize, ByteArray *output)
uint64_t ParseUint64(const std::byte *data)
constexpr uint64_t FooterLength
std::string MagicToHex(const std::byte *data)
std::string StrCat(T &&... args)
constexpr uint64_t MinHeaderLength
uint32_t ParseUint32(const std::byte *data)
std::string ToHex(uint8_t byte)
uint16_t ParseUint16(const std::byte *data)
Status ParseString(const std::byte *data, uint64_t maxSize, std::string *output)
Status ParseKeyValueMap(const std::byte *data, uint64_t maxSize, KeyValueMap *output)
StatusCode
Status codes for MCAP readers and writers.
@ NoMessageIndexesAvailable
@ DecompressionSizeMismatch
@ InvalidMessageReadOptions
@ UnrecognizedCompression
Compression
Supported MCAP compression algorithms.
bool CompareChunkIndexes(const ChunkIndex &a, const ChunkIndex &b)
constexpr ByteOffset EndOffset
@ AllowFallbackScan
If the Summary section is missing or incomplete, allow falling back to reading the file sequentially ...
@ ForceScan
Read the file sequentially from Header to DataEnd to produce seeking indexes and summary statistics.
@ NoFallbackScan
Parse the Summary section to produce seeking indexes and summary statistics. If the Summary section i...
MCAP_PUBLIC constexpr std::string_view OpCodeString(OpCode opcode)
Get the string representation of an OpCode.
constexpr uint8_t Magic[]
Attachment Index records are found in the Summary section, providing summary information for a single...
An Attachment is an arbitrary file embedded in an MCAP file, including a name, media type,...
Describes a Channel that messages are written to. A Channel represents a single connection from a pub...
Chunk Index records are found in the Summary section, providing summary information for a single Chun...
Timestamp messageStartTime
ByteOffset chunkStartOffset
std::unordered_map< ChannelId, ByteOffset > messageIndexOffsets
ByteOffset messageIndexLength
ByteOffset compressedSize
ByteOffset uncompressedSize
An collection of Schemas, Channels, and Messages that supports compression and indexing.
ByteOffset uncompressedSize
const std::byte * records
ByteOffset compressedSize
Timestamp messageStartTime
The final record in the Data section, signaling the end of Data and beginning of Summary....
An abstract interface for reading MCAP data.
virtual uint64_t read(std::byte **output, uint64_t offset, uint64_t size)=0
This method is called by MCAP reader classes when they need to read a portion of the file.
virtual uint64_t size() const =0
Returns the size of the file in bytes.
ByteArray decompressedChunk
size_t findFreeChunkSlot()
RecordReader recordReader_
IndexedMessageReader(McapReader &reader, const ReadMessageOptions &options, const std::function< void(const Message &, RecordOffset)> onMessage)
std::vector< ChunkSlot > chunkSlots_
std::function< void(const Message &, RecordOffset)> onMessage_
Status status() const
gets the status of the reader.
internal::ReadJobQueue queue_
void decompressChunk(const Chunk &chunk, ChunkSlot &slot)
ReadMessageOptions options_
std::unordered_set< ChannelId > selectedChannels_
bool next()
reads the next message out of the MCAP.
reference operator*() const
pointer operator->() const
MCAP_PUBLIC friend bool operator!=(const Iterator &a, const Iterator &b)
std::unique_ptr< Impl > impl_
MCAP_PUBLIC friend bool operator==(const Iterator &a, const Iterator &b)
An iterable view of Messages in an MCAP file.
ReadMessageOptions readMessageOptions_
LinearMessageView(McapReader &mcapReader, const ProblemCallback &onProblem)
A list of timestamps to byte offsets for a single Channel. This record appears after each Chunk,...
std::vector< std::pair< Timestamp, ByteOffset > > records
Returned when iterating over Messages in a file, MessageView contains a reference to one Message,...
A single Message published to a Channel.
Timestamp logTime
Nanosecond timestamp when this message was recorded or received for recording.
uint32_t sequence
An optional sequence number. If non-zero, sequence numbers should be unique per channel and increasin...
const std::byte * data
A pointer to the message payload. For readers, this pointer is only valid for the lifetime of an onMe...
Timestamp publishTime
Nanosecond timestamp when this message was initially published. If not available, this should be set ...
uint64_t dataSize
Size of the message payload in bytes, pointed to via data.
Options for reading messages out of an MCAP file.
Timestamp startTime
Only messages with log timestamps greater or equal to startTime will be included.
std::function< bool(std::string_view)> topicFilter
If provided, topicFilter is called on all topics found in the MCAP file. If topicFilter returns true ...
Status validate() const
validate the configuration.
Timestamp endTime
Only messages with log timestamps less than endTime will be included.
std::optional< ByteOffset > chunkOffset
RecordReader(IReadable &dataSource, ByteOffset startOffset, ByteOffset endOffset=EndOffset)
const Status & status() const
void reset(IReadable &dataSource, ByteOffset startOffset, ByteOffset endOffset)
std::optional< Record > next()
ByteOffset curRecordOffset() const
A generic Type-Length-Value record using a uint8 type and uint64 length. This is the generic form of ...
uint64_t recordSize() const
Describes a schema used for message encoding and decoding and/or describing the shape of messages....
The Statistics record is found in the Summary section, providing counts and timestamp ranges for the ...
Timestamp messageStartTime
Wraps a status code and string message carrying additional context.
Summary Offset records are found in the Summary Offset section. Records in the Summary section are gr...
const Status & status() const
void reset(const Chunk &chunk, Compression compression)
std::function< void(const Message &, ByteOffset)> onMessage
std::function< void(const SchemaPtr, ByteOffset)> onSchema
std::function< void(const Record &, ByteOffset)> onUnknownRecord
std::function< void(const ChannelPtr, ByteOffset)> onChannel
BufferReader uncompressedReader_
ByteOffset offset() const
A mid-level interface for parsing and validating MCAP records from a data source.
std::function< void(const Message &, ByteOffset, std::optional< ByteOffset >)> onMessage
std::function< void(const Metadata &, ByteOffset)> onMetadata
TypedRecordReader(IReadable &dataSource, ByteOffset startOffset, ByteOffset endOffset=EndOffset)
ByteOffset offset() const
std::function< void(ByteOffset)> onChunkEnd
std::function< void(const Record &, ByteOffset, std::optional< ByteOffset >)> onUnknownRecord
std::function< void(const SummaryOffset &, ByteOffset)> onSummaryOffset
std::function< void(const Footer &, ByteOffset)> onFooter
std::function< void(const DataEnd &, ByteOffset)> onDataEnd
std::function< void(const ChannelPtr, ByteOffset, std::optional< ByteOffset >)> onChannel
std::function< void(const Attachment &, ByteOffset)> onAttachment
std::function< void(const ChunkIndex &, ByteOffset)> onChunkIndex
std::function< void(const MetadataIndex &, ByteOffset)> onMetadataIndex
const Status & status() const
std::function< void(const Chunk &, ByteOffset)> onChunk
TypedChunkReader chunkReader_
std::function< void(const Statistics &, ByteOffset)> onStatistics
std::function< void(const AttachmentIndex &, ByteOffset)> onAttachmentIndex
std::function< void(const SchemaPtr, ByteOffset, std::optional< ByteOffset >)> onSchema
std::function< void(const Header &, ByteOffset)> onHeader
std::function< void(const MessageIndex &, ByteOffset)> onMessageIndex
A job to decompress the chunk starting at chunkStartOffset. The message indices starting directly aft...
ByteOffset messageIndexEndOffset
Timestamp messageStartTime
ByteOffset chunkStartOffset
void push(DecompressChunkJob &&decompressChunkJob)
A job to read a specific message at offset offset from the decompressed chunk stored in chunkReaderIn...