5#ifndef MCAP_COMPRESSION_NO_LZ4
9#ifndef MCAP_COMPRESSION_NO_ZSTD
11# include <zstd_errors.h>
19 : crc_(internal::CRC32_INIT) {}
50 const auto msg =
internal::StrCat(
"failed to open file \"", filename,
"\" for writing");
60 assert(written ==
size);
148#ifndef MCAP_COMPRESSION_NO_LZ4
159 return LZ4HC_CLEVEL_DEFAULT;
161 return LZ4HC_CLEVEL_OPT_MIN;
163 return LZ4HC_CLEVEL_MAX;
170 : compressionLevel_(compressionLevel) {
179 LZ4F_preferences_t preferences = LZ4F_INIT_PREFERENCES;
186 if (LZ4F_isError(dstSize)) {
187 std::cerr <<
"LZ4F_compressFrame failed: " << LZ4F_getErrorName(dstSize) <<
"\n";
221#ifndef MCAP_COMPRESSION_NO_ZSTD
246 ZSTD_CCtx_setParameter(
zstdContext_, ZSTD_c_compressionLevel,
264 if (ZSTD_isError(dstSize)) {
265 const auto errCode = ZSTD_getErrorCode(dstSize);
266 std::cerr <<
"ZSTD_compress2 failed: " << ZSTD_getErrorName(dstSize) <<
" ("
267 << ZSTD_getErrorString(errCode) <<
")\n";
318#ifndef MCAP_COMPRESSION_NO_LZ4
323#ifndef MCAP_COMPRESSION_NO_ZSTD
331 chunkWriter->crcEnabled = !options.
noChunkCRC;
332 if (chunkWriter->crcEnabled) {
333 chunkWriter->resetCrc();
368 if (chunkWriter && !chunkWriter->empty()) {
393 summaryStart = fileOutput.size();
408 if (channelMessageCounts.find(channel.id) != channelMessageCounts.end()) {
409 write(fileOutput, channel);
414 ByteOffset statisticsStart = fileOutput.size();
420 ByteOffset chunkIndexStart = fileOutput.size();
424 write(fileOutput, chunkIndexRecord);
428 ByteOffset attachmentIndexStart = fileOutput.size();
432 write(fileOutput, attachmentIndexRecord);
436 ByteOffset metadataIndexStart = fileOutput.size();
440 write(fileOutput, metadataIndexRecord);
446 summaryOffsetStart = fileOutput.size();
456 chunkIndexStart - statisticsStart});
460 attachmentIndexStart - chunkIndexStart});
464 metadataIndexStart - attachmentIndexStart});
468 summaryOffsetStart - metadataIndexStart});
470 }
else if (summaryStart == fileOutput.size()) {
491#ifndef MCAP_COMPRESSION_NO_LZ4
494#ifndef MCAP_COMPRESSION_NO_ZSTD
533 if (channelMessageCounts.find(message.
channelId) == channelMessageCounts.end()) {
534 const size_t channelIndex = message.
channelId - 1;
540 const auto& channel =
channels_[channelIndex];
543 if ((channel.schemaId != 0) &&
545 const size_t schemaIndex = channel.schemaId - 1;
546 if (schemaIndex >=
schemas_.size()) {
563 channelMessageCounts.emplace(message.
channelId, 0);
569 if (chunkWriter !=
nullptr &&
592 channelMessageCounts[message.
channelId] += 1;
595 if (chunkWriter !=
nullptr) {
599 messageIndex.channelId = message.
channelId;
600 messageIndex.records.emplace_back(message.
logTime, messageOffset);
625 if (chunkWriter && !chunkWriter->empty()) {
631 uint32_t sizePrefix = 0;
635 sizePrefix = uint32_t(attachment.
name.
size());
649 const uint64_t fileOffset = fileOutput.size();
652 write(fileOutput, attachment);
673 if (chunkWriter && !chunkWriter->empty()) {
677 const uint64_t fileOffset = fileOutput.size();
680 write(fileOutput, metadata);
711#ifndef MCAP_COMPRESSION_NO_ZSTD
715#ifndef MCAP_COMPRESSION_NO_LZ4
731#ifndef MCAP_COMPRESSION_NO_LZ4
735#ifndef MCAP_COMPRESSION_NO_ZSTD
744 constexpr uint64_t MIN_COMPRESSION_SIZE = 1024;
746 constexpr double MIN_COMPRESSION_RATIO = 1.02;
750 uint64_t compressedSize = uncompressedSize;
759 const double compressionRatio = double(uncompressedSize) / double(chunkData.
compressedSize());
768 const uint32_t uncompressedCrc = chunkData.
crc();
771 const uint64_t chunkStartOffset = output.
size();
773 compressionStr, compressedSize, compressedData});
775 const uint64_t chunkLength = output.
size() - chunkStartOffset;
779 auto& chunkIndexRecord =
chunkIndex_.emplace_back();
781 const uint64_t messageIndexOffset = output.
size();
787 if (messageIndex.records.size() > 0) {
788 chunkIndexRecord.messageIndexOffsets.emplace(channelId, output.
size());
789 write(output, messageIndex);
793 messageIndex.records.clear();
797 const uint64_t messageIndexLength = output.
size() - messageIndexOffset;
805 chunkIndexRecord.chunkStartOffset = chunkStartOffset;
806 chunkIndexRecord.chunkLength = chunkLength;
807 chunkIndexRecord.messageIndexLength = messageIndexLength;
808 chunkIndexRecord.compression = compressionStr;
809 chunkIndexRecord.compressedSize = compressedSize;
810 chunkIndexRecord.uncompressedSize = uncompressedSize;
816 if (messageIndex.records.size() > 0) {
817 write(output, messageIndex);
821 messageIndex.records.clear();
846 write(output, recordSize);
850 return 9 + recordSize;
854 const uint64_t recordSize = 8 +
859 write(output, recordSize);
862 uint32_t summaryCrc = 0;
864 summaryCrc = output.
crc();
866 write(output, summaryCrc);
868 return 9 + recordSize;
872 const uint64_t recordSize = 2 +
878 write(output, recordSize);
884 return 9 + recordSize;
889 const uint64_t recordSize = 2 +
896 write(output, recordSize);
903 return 9 + recordSize;
907 return 2 + 4 + 8 + 8 + message.
dataSize;
914 write(output, recordSize);
921 return 9 + recordSize;
929 write(output, recordSize);
938 return 9 + recordSize;
943 const uint64_t recordSize = 4 + metadata.
name.
size() + 4 + metadataSize;
946 write(output, recordSize);
950 return 9 + recordSize;
954 const uint64_t recordSize =
958 write(output, recordSize);
968 return 9 + recordSize;
972 const uint32_t recordsSize = (uint32_t)(index.
records.
size()) * 16;
973 const uint64_t recordSize = 2 + 4 + recordsSize;
976 write(output, recordSize);
979 write(output, recordsSize);
980 for (
const auto& [timestamp, offset] : index.
records) {
981 write(output, timestamp);
982 write(output, offset);
985 return 9 + recordSize;
990 const uint64_t recordSize = 8 +
994 4 + messageIndexOffsetsSize +
1001 write(output, recordSize);
1007 write(output, messageIndexOffsetsSize);
1009 write(output, channelId);
1010 write(output, offset);
1018 return 9 + recordSize;
1022 const uint64_t recordSize = 8 +
1031 write(output, recordSize);
1040 return 9 + recordSize;
1044 const uint64_t recordSize = 8 +
1049 write(output, recordSize);
1054 return 9 + recordSize;
1059 const uint64_t recordSize = 8 +
1067 4 + channelMessageCountsSize;
1070 write(output, recordSize);
1080 write(output, channelMessageCountsSize);
1082 write(output, channelId);
1083 write(output, messageCount);
1086 return 9 + recordSize;
1090 const uint64_t recordSize = 1 +
1095 write(output, recordSize);
1100 return 9 + recordSize;
1104 const uint64_t recordSize = 4;
1107 write(output, recordSize);
1110 return 9 + recordSize;
1132 output.
write(
reinterpret_cast<const std::byte*
>(&value),
sizeof(value));
1136 output.
write(
reinterpret_cast<const std::byte*
>(&value),
sizeof(value));
1140 output.
write(
reinterpret_cast<const std::byte*
>(&value),
sizeof(value));
1144 output.
write(
reinterpret_cast<const std::byte*
>(&value),
sizeof(value));
1155 for (
const auto& [key, value] : map) {
1161 for (
const auto& [key, value] : pairs) {
1163 write(output, value);
uint64_t size() const override
Returns the size in bytes of the uncompressed data.
uint64_t compressedSize() const override
Returns the size in bytes of the compressed data. This will only be called after end().
std::vector< std::byte > buffer_
void handleWrite(const std::byte *data, uint64_t size) override
void handleClear() override
bool empty() const override
Returns true if write() has never been called since initialization or the last call to clear().
const std::byte * data() const override
Returns a pointer to the uncompressed data.
const std::byte * compressedData() const override
Returns a pointer to the compressed data. This will only be called after end().
void end() override
Called when the writer wants to close the current output Chunk. After this call, data() and size() sh...
Status open(std::string_view filename)
void flush() override
flushes any buffered data to the output. This is called by McapWriter after every completed chunk....
void handleWrite(const std::byte *data, uint64_t size) override
uint64_t size() const override
Returns the current size of the file in bytes. This must be equal to the sum of all size parameters p...
void end() override
Called when the writer is finished writing data to the output MCAP file.
An abstract interface for writing Chunk data. Chunk data is buffered in memory and written to disk as...
virtual void handleClear()=0
virtual uint64_t compressedSize() const =0
Returns the size in bytes of the compressed data. This will only be called after end().
void clear()
Clear the internal state of the writer, discarding any input or output buffers.
virtual void end() override=0
Called when the writer wants to close the current output Chunk. After this call, data() and size() sh...
virtual const std::byte * compressedData() const =0
Returns a pointer to the compressed data. This will only be called after end().
virtual const std::byte * data() const =0
Returns a pointer to the uncompressed data.
An abstract interface for writing MCAP data.
void write(const std::byte *data, uint64_t size)
Called whenever the writer needs to write data to the output MCAP file.
virtual void flush()
flushes any buffered data to the output. This is called by McapWriter after every completed chunk....
virtual void handleWrite(const std::byte *data, uint64_t size)=0
void resetCrc()
Resets the CRC32 calculation.
uint32_t crc()
Returns the CRC32 of the uncompressed data.
virtual uint64_t size() const =0
Returns the current size of the file in bytes. This must be equal to the sum of all size parameters p...
bool empty() const override
Returns true if write() has never been called since initialization or the last call to clear().
uint64_t compressedSize() const override
Returns the size in bytes of the compressed data. This will only be called after end().
CompressionLevel compressionLevel_
std::vector< std::byte > compressedBuffer_
LZ4Writer(CompressionLevel compressionLevel, uint64_t chunkSize)
const std::byte * data() const override
Returns a pointer to the uncompressed data.
uint64_t size() const override
Returns the size in bytes of the uncompressed data.
void end() override
Called when the writer wants to close the current output Chunk. After this call, data() and size() sh...
std::vector< std::byte > uncompressedBuffer_
void handleWrite(const std::byte *data, uint64_t size) override
const std::byte * compressedData() const override
Returns a pointer to the compressed data. This will only be called after end().
void handleClear() override
std::vector< MetadataIndex > metadataIndex_
std::unique_ptr< StreamWriter > streamOutput_
std::unique_ptr< ZStdWriter > zstdChunk_
std::vector< Schema > schemas_
void writeChunk(IWritable &output, IChunkWriter &chunkData)
std::vector< Channel > channels_
std::vector< AttachmentIndex > attachmentIndex_
static void writeMagic(IWritable &output)
Timestamp currentChunkStart_
void addSchema(Schema &schema)
Add a new schema to the MCAP file and set schema.id to a generated schema id. The schema id is used w...
std::unordered_map< ChannelId, MessageIndex > currentMessageIndex_
IWritable * dataSink()
Returns a pointer to the IWritable data destination backing this writer. Will return nullptr if the w...
Timestamp currentChunkEnd_
std::unique_ptr< FileWriter > fileOutput_
static uint64_t getRecordSize(const Message &message)
std::unordered_set< SchemaId > writtenSchemas_
uint64_t uncompressedSize_
void closeLastChunk()
finishes the current chunk in progress and writes it to the file, if a chunk is in progress.
std::unique_ptr< BufferWriter > uncompressedChunk_
IChunkWriter * getChunkWriter()
void terminate()
Reset internal state without writing the MCAP footer or flushing pending writes. This should only be ...
Status open(std::string_view filename, const McapWriterOptions &options)
Open a new MCAP file for writing and write the header.
std::unique_ptr< LZ4Writer > lz4Chunk_
void addChannel(Channel &channel)
Add a new channel to the MCAP file and set channel.id to a generated channel id. The channel id is us...
Status write(const Message &message)
Write a message to the output stream.
std::vector< ChunkIndex > chunkIndex_
const Statistics & statistics() const
Current MCAP file-level statistics. This is written as a Statistics record in the Summary section of ...
void close()
Write the MCAP footer, flush pending writes to the output stream, and reset internal state....
McapWriterOptions options_
StreamWriter(std::ostream &stream)
void handleWrite(const std::byte *data, uint64_t size) override
uint64_t size() const override
Returns the current size of the file in bytes. This must be equal to the sum of all size parameters p...
void flush() override
flushes any buffered data to the output. This is called by McapWriter after every completed chunk....
void end() override
Called when the writer is finished writing data to the output MCAP file.
ZSTD_CCtx_s * zstdContext_
const std::byte * compressedData() const override
Returns a pointer to the compressed data. This will only be called after end().
void handleWrite(const std::byte *data, uint64_t size) override
void end() override
Called when the writer wants to close the current output Chunk. After this call, data() and size() sh...
bool empty() const override
Returns true if write() has never been called since initialization or the last call to clear().
std::vector< std::byte > uncompressedBuffer_
void handleClear() override
uint64_t size() const override
Returns the size in bytes of the uncompressed data.
ZStdWriter(CompressionLevel compressionLevel, uint64_t chunkSize)
const std::byte * data() const override
Returns a pointer to the uncompressed data.
std::vector< std::byte > compressedBuffer_
uint64_t compressedSize() const override
Returns the size in bytes of the compressed data. This will only be called after end().
T emplace_back(T... args)
int ZStdCompressionLevel(CompressionLevel level)
uint32_t crc32Update(const uint32_t prev, const std::byte *const data, const size_t length)
std::string StrCat(T &&... args)
uint32_t KeyValueMapSize(const KeyValueMap &map)
const std::string CompressionString(Compression compression)
uint32_t crc32Final(uint32_t crc)
static constexpr uint32_t CRC32_INIT
int LZ4CompressionLevel(CompressionLevel level)
constexpr Timestamp MaxTime
Compression
Supported MCAP compression algorithms.
CompressionLevel
Compression level to use when compression is enabled. Slower generally produces smaller files,...
constexpr uint8_t Magic[]
Attachment Index records are found in the Summary section, providing summary information for a single...
An Attachment is an arbitrary file embedded in an MCAP file, including a name, media type,...
Describes a Channel that messages are written to. A Channel represents a single connection from a pub...
std::string messageEncoding
Chunk Index records are found in the Summary section, providing summary information for a single Chun...
Timestamp messageStartTime
ByteOffset chunkStartOffset
std::unordered_map< ChannelId, ByteOffset > messageIndexOffsets
ByteOffset messageIndexLength
ByteOffset compressedSize
ByteOffset uncompressedSize
An collection of Schemas, Channels, and Messages that supports compression and indexing.
ByteOffset uncompressedSize
const std::byte * records
ByteOffset compressedSize
Timestamp messageStartTime
The final record in the Data section, signaling the end of Data and beginning of Summary....
Configuration options for McapWriter.
bool forceCompression
By default, Chunks that do not benefit from compression will be written uncompressed....
bool noSummary
Do not write Summary or Summary Offset sections to the file, placing the Footer record immediately af...
CompressionLevel compressionLevel
Compression level to use when writing Chunks. Slower generally produces smaller files,...
bool enableDataCRC
Enable CRC calculations for all records in the data section.
std::string library
A freeform string written by recording libraries. For this library, the default is "libmcap {Major}....
bool noChunking
Do not write Chunks to the file, instead writing Schema, Channel, and Message records directly into t...
bool noChunkCRC
Disable CRC calculations for Chunks.
std::string profile
The recording profile. See https://mcap.dev/spec/registry#well-known-profiles for more information on...
bool noMessageIndex
Do not write Message Index records to the file. If noMessageIndex=true and noChunkIndex=false,...
bool noSummaryCRC
Disable CRC calculations for the summary section.
uint64_t chunkSize
Target uncompressed Chunk payload size in bytes. Once a Chunk's uncompressed data is about to exceed ...
bool noAttachmentCRC
Disable CRC calculations for Attachments.
Compression compression
Compression algorithm to use when writing Chunks. This option is ignored if noChunking=true.
A list of timestamps to byte offsets for a single Channel. This record appears after each Chunk,...
std::vector< std::pair< Timestamp, ByteOffset > > records
A single Message published to a Channel.
Timestamp logTime
Nanosecond timestamp when this message was recorded or received for recording.
uint32_t sequence
An optional sequence number. If non-zero, sequence numbers should be unique per channel and increasin...
const std::byte * data
A pointer to the message payload. For readers, this pointer is only valid for the lifetime of an onMe...
Timestamp publishTime
Nanosecond timestamp when this message was initially published. If not available, this should be set ...
uint64_t dataSize
Size of the message payload in bytes, pointed to via data.
A generic Type-Length-Value record using a uint8 type and uint64 length. This is the generic form of ...
Describes a schema used for message encoding and decoding and/or describing the shape of messages....
The Statistics record is found in the Summary section, providing counts and timestamp ranges for the ...
Timestamp messageStartTime
std::unordered_map< ChannelId, uint64_t > channelMessageCounts
Wraps a status code and string message carrying additional context.
Summary Offset records are found in the Summary Offset section. Records in the Summary section are gr...