pjmsg_mcap_wrapper
Loading...
Searching...
No Matches
writer.inl
Go to the documentation of this file.
1#include "crc32.hpp"
2#include <algorithm>
3#include <cassert>
4#include <iostream>
5#ifndef MCAP_COMPRESSION_NO_LZ4
6# include <lz4frame.h>
7# include <lz4hc.h>
8#endif
9#ifndef MCAP_COMPRESSION_NO_ZSTD
10# include <zstd.h>
11# include <zstd_errors.h>
12#endif
13
14namespace mcap {
15
16// IWritable ///////////////////////////////////////////////////////////////////
17
19 : crc_(internal::CRC32_INIT) {}
20
21void IWritable::write(const std::byte* data, uint64_t size) {
22 if (crcEnabled) {
24 }
25 handleWrite(data, size);
26}
27
28uint32_t IWritable::crc() {
29 uint32_t crc32 = 0;
30 if (crcEnabled) {
32 }
33 return crc32;
34}
35
39
40// FileWriter //////////////////////////////////////////////////////////////////
41
45
47 end();
48 file_ = std::fopen(filename.data(), "wb");
49 if (!file_) {
50 const auto msg = internal::StrCat("failed to open file \"", filename, "\" for writing");
51 return Status(StatusCode::OpenFailed, msg);
52 }
54}
55
56void FileWriter::handleWrite(const std::byte* data, uint64_t size) {
57 assert(file_);
58 const size_t written = std::fwrite(data, 1, size, file_);
59 (void)written;
60 assert(written == size);
61 size_ += size;
62}
63
65 if (file_) {
67 }
68}
69
71 if (file_) {
73 file_ = nullptr;
74 }
75 size_ = 0;
76}
77
78uint64_t FileWriter::size() const {
79 return size_;
80}
81
82// StreamWriter ////////////////////////////////////////////////////////////////
83
85 : stream_(stream)
86 , size_(0) {}
87
88void StreamWriter::handleWrite(const std::byte* data, uint64_t size) {
89 stream_.write(reinterpret_cast<const char*>(data), std::streamsize(size));
90 size_ += size;
91}
92
96
98 flush();
99}
100
101uint64_t StreamWriter::size() const {
102 return size_;
103}
104
105// IChunkWriter ////////////////////////////////////////////////////////////////
106
108 handleClear();
109 resetCrc();
110}
111
112// BufferWriter //////////////////////////////////////////////////////////////
113
114void BufferWriter::handleWrite(const std::byte* data, uint64_t size) {
116}
117
119 // no-op
120}
121
122uint64_t BufferWriter::size() const {
123 return buffer_.size();
124}
125
127 return buffer_.size();
128}
129
131 return buffer_.empty();
132}
133
137
139 return buffer_.data();
140}
141
143 return buffer_.data();
144}
145
146// LZ4Writer ///////////////////////////////////////////////////////////////////
147
148#ifndef MCAP_COMPRESSION_NO_LZ4
149namespace internal {
150
152 switch (level) {
154 return -1; // "fast acceleration"
156 return 0; // "fast mode"
158 default:
159 return LZ4HC_CLEVEL_DEFAULT;
161 return LZ4HC_CLEVEL_OPT_MIN;
163 return LZ4HC_CLEVEL_MAX;
164 }
165}
166
167} // namespace internal
168
169LZ4Writer::LZ4Writer(CompressionLevel compressionLevel, uint64_t chunkSize)
170 : compressionLevel_(compressionLevel) {
171 uncompressedBuffer_.reserve(chunkSize);
172}
173
177
179 LZ4F_preferences_t preferences = LZ4F_INIT_PREFERENCES;
180 preferences.compressionLevel = internal::LZ4CompressionLevel(compressionLevel_);
181 const auto dstCapacity = LZ4F_compressFrameBound(uncompressedBuffer_.size(), &preferences);
182 compressedBuffer_.resize(dstCapacity);
183 const auto dstSize =
184 LZ4F_compressFrame(compressedBuffer_.data(), dstCapacity, uncompressedBuffer_.data(),
185 uncompressedBuffer_.size(), &preferences);
186 if (LZ4F_isError(dstSize)) {
187 std::cerr << "LZ4F_compressFrame failed: " << LZ4F_getErrorName(dstSize) << "\n";
188 std::abort();
189 }
190 compressedBuffer_.resize(dstSize);
191}
192
193uint64_t LZ4Writer::size() const {
194 return uncompressedBuffer_.size();
195}
196
198 return compressedBuffer_.size();
199}
200
201bool LZ4Writer::empty() const {
203}
204
209
211 return uncompressedBuffer_.data();
212}
213
215 return compressedBuffer_.data();
216}
217#endif
218
219// ZStdWriter //////////////////////////////////////////////////////////////////
220
221#ifndef MCAP_COMPRESSION_NO_ZSTD
222namespace internal {
223
225 switch (level) {
227 return -5;
229 return -3;
231 default:
232 return 1;
234 return 5;
236 return 19;
237 }
238}
239
240} // namespace internal
241
242// ZStdWriter //////////////////////////////////////////////////////////////////
243
244ZStdWriter::ZStdWriter(CompressionLevel compressionLevel, uint64_t chunkSize) {
245 zstdContext_ = ZSTD_createCCtx();
246 ZSTD_CCtx_setParameter(zstdContext_, ZSTD_c_compressionLevel,
247 internal::ZStdCompressionLevel(compressionLevel));
248 uncompressedBuffer_.reserve(chunkSize);
249}
250
252 ZSTD_freeCCtx(zstdContext_);
253}
254
258
260 const auto dstCapacity = ZSTD_compressBound(uncompressedBuffer_.size());
261 compressedBuffer_.resize(dstCapacity);
262 const size_t dstSize = ZSTD_compress2(zstdContext_, compressedBuffer_.data(), dstCapacity,
264 if (ZSTD_isError(dstSize)) {
265 const auto errCode = ZSTD_getErrorCode(dstSize);
266 std::cerr << "ZSTD_compress2 failed: " << ZSTD_getErrorName(dstSize) << " ("
267 << ZSTD_getErrorString(errCode) << ")\n";
268 std::abort();
269 }
270 ZSTD_CCtx_reset(zstdContext_, ZSTD_reset_session_only);
271 compressedBuffer_.resize(dstSize);
272}
273
274uint64_t ZStdWriter::size() const {
275 return uncompressedBuffer_.size();
276}
277
279 return compressedBuffer_.size();
280}
281
285
290
292 return uncompressedBuffer_.data();
293}
294
296 return compressedBuffer_.data();
297}
298#endif
299
300// McapWriter //////////////////////////////////////////////////////////////////
301
305
306void McapWriter::open(IWritable& writer, const McapWriterOptions& options) {
307 // If the writer was opened, close it first
308 close();
309 options_ = options;
310 opened_ = true;
311 chunkSize_ = options.noChunking ? 0 : options.chunkSize;
313 switch (compression_) {
315 default:
316 uncompressedChunk_ = std::make_unique<BufferWriter>();
317 break;
318#ifndef MCAP_COMPRESSION_NO_LZ4
319 case Compression::Lz4:
320 lz4Chunk_ = std::make_unique<LZ4Writer>(options.compressionLevel, chunkSize_);
321 break;
322#endif
323#ifndef MCAP_COMPRESSION_NO_ZSTD
325 zstdChunk_ = std::make_unique<ZStdWriter>(options.compressionLevel, chunkSize_);
326 break;
327#endif
328 }
329 auto* chunkWriter = getChunkWriter();
330 if (chunkWriter) {
331 chunkWriter->crcEnabled = !options.noChunkCRC;
332 if (chunkWriter->crcEnabled) {
333 chunkWriter->resetCrc();
334 }
335 }
336 writer.crcEnabled = options.enableDataCRC;
337 output_ = &writer;
338 writeMagic(writer);
339 write(writer, Header{options.profile, options.library});
340}
341
343 // If the writer was opened, close it first
344 close();
345 fileOutput_ = std::make_unique<FileWriter>();
346 const auto status = fileOutput_->open(filename);
347 if (!status.ok()) {
348 fileOutput_.reset();
349 return status;
350 }
351 open(*fileOutput_, options);
352 return StatusCode::Success;
353}
354
355void McapWriter::open(std::ostream& stream, const McapWriterOptions& options) {
356 // If the writer was opened, close it first
357 close();
358 streamOutput_ = std::make_unique<StreamWriter>(stream);
359 open(*streamOutput_, options);
360}
361
363 if (!opened_ || !output_) {
364 return;
365 }
366 auto& fileOutput = *output_;
367 auto* chunkWriter = getChunkWriter();
368 if (chunkWriter && !chunkWriter->empty()) {
369 writeChunk(fileOutput, *chunkWriter);
370 }
371}
372
374 if (!opened_ || !output_) {
375 return;
376 }
378
379 auto& fileOutput = *output_;
380
381 // Write the Data End record
382 write(fileOutput, DataEnd{fileOutput.crc()});
383 if (!options_.noSummaryCRC) {
384 output_->crcEnabled = true;
385 output_->resetCrc();
386 }
387
388 ByteOffset summaryStart = 0;
389 ByteOffset summaryOffsetStart = 0;
390
391 if (!options_.noSummary) {
392 // Get the offset of the End Of File section
393 summaryStart = fileOutput.size();
394
395 ByteOffset schemaStart = fileOutput.size();
397 // Write all schema records
398 for (const auto& schemaId : writtenSchemas_) {
399 write(fileOutput, schemas_[schemaId - 1]);
400 }
401 }
402
403 ByteOffset channelStart = fileOutput.size();
405 // Write all channel records, but only if they appeared in this file
406 auto& channelMessageCounts = statistics_.channelMessageCounts;
407 for (const auto& channel : channels_) {
408 if (channelMessageCounts.find(channel.id) != channelMessageCounts.end()) {
409 write(fileOutput, channel);
410 }
411 }
412 }
413
414 ByteOffset statisticsStart = fileOutput.size();
415 if (!options_.noStatistics) {
416 // Write the statistics record
417 write(fileOutput, statistics_);
418 }
419
420 ByteOffset chunkIndexStart = fileOutput.size();
421 if (!options_.noChunkIndex) {
422 // Write chunk index records
423 for (const auto& chunkIndexRecord : chunkIndex_) {
424 write(fileOutput, chunkIndexRecord);
425 }
426 }
427
428 ByteOffset attachmentIndexStart = fileOutput.size();
430 // Write attachment index records
431 for (const auto& attachmentIndexRecord : attachmentIndex_) {
432 write(fileOutput, attachmentIndexRecord);
433 }
434 }
435
436 ByteOffset metadataIndexStart = fileOutput.size();
438 // Write metadata index records
439 for (const auto& metadataIndexRecord : metadataIndex_) {
440 write(fileOutput, metadataIndexRecord);
441 }
442 }
443
445 // Write summary offset records
446 summaryOffsetStart = fileOutput.size();
448 write(fileOutput, SummaryOffset{OpCode::Schema, schemaStart, channelStart - schemaStart});
449 }
450 if (!options_.noRepeatedChannels && !channels_.empty()) {
451 write(fileOutput,
452 SummaryOffset{OpCode::Channel, channelStart, statisticsStart - channelStart});
453 }
454 if (!options_.noStatistics) {
455 write(fileOutput, SummaryOffset{OpCode::Statistics, statisticsStart,
456 chunkIndexStart - statisticsStart});
457 }
458 if (!options_.noChunkIndex && !chunkIndex_.empty()) {
459 write(fileOutput, SummaryOffset{OpCode::ChunkIndex, chunkIndexStart,
460 attachmentIndexStart - chunkIndexStart});
461 }
463 write(fileOutput, SummaryOffset{OpCode::AttachmentIndex, attachmentIndexStart,
464 metadataIndexStart - attachmentIndexStart});
465 }
466 if (!options_.noMetadataIndex && !metadataIndex_.empty()) {
467 write(fileOutput, SummaryOffset{OpCode::MetadataIndex, metadataIndexStart,
468 summaryOffsetStart - metadataIndexStart});
469 }
470 } else if (summaryStart == fileOutput.size()) {
471 // No summary records were written
472 summaryStart = 0;
473 }
474 }
475
476 // Write the footer and trailing magic
477 write(fileOutput, Footer{summaryStart, summaryOffsetStart}, !options_.noSummaryCRC);
478 writeMagic(fileOutput);
479
480 // Flush output
481 fileOutput.end();
482
483 terminate();
484}
485
487 output_ = nullptr;
488 fileOutput_.reset();
489 streamOutput_.reset();
490 uncompressedChunk_.reset();
491#ifndef MCAP_COMPRESSION_NO_LZ4
492 lz4Chunk_.reset();
493#endif
494#ifndef MCAP_COMPRESSION_NO_ZSTD
495 zstdChunk_.reset();
496#endif
497
498 attachmentIndex_.clear();
499 metadataIndex_.clear();
500 chunkIndex_.clear();
501 statistics_ = {};
503 currentMessageIndex_.clear();
508
509 // Don't clear schemas or channels, those can be re-used between files
510 // Only the channels and schemas actually referenced in the file will be written to it.
511
512 opened_ = false;
513}
514
516 schema.id = uint16_t(schemas_.size() + 1);
517 schemas_.push_back(schema);
518}
519
521 channel.id = uint16_t(channels_.size() + 1);
522 channels_.push_back(channel);
523}
524
526 if (!output_) {
527 return StatusCode::NotOpen;
528 }
529 auto& output = getOutput();
530 auto& channelMessageCounts = statistics_.channelMessageCounts;
531
532 // Write out Channel if we have not yet done so
533 if (channelMessageCounts.find(message.channelId) == channelMessageCounts.end()) {
534 const size_t channelIndex = message.channelId - 1;
535 if (channelIndex >= channels_.size()) {
536 const auto msg = internal::StrCat("invalid channel id ", message.channelId);
538 }
539
540 const auto& channel = channels_[channelIndex];
541
542 // Check if the Schema record needs to be written
543 if ((channel.schemaId != 0) &&
544 (writtenSchemas_.find(channel.schemaId) == writtenSchemas_.end())) {
545 const size_t schemaIndex = channel.schemaId - 1;
546 if (schemaIndex >= schemas_.size()) {
547 const auto msg = internal::StrCat("invalid schema id ", channel.schemaId);
549 }
550
551 // Write the Schema record
552 uncompressedSize_ += write(output, schemas_[schemaIndex]);
553 writtenSchemas_.insert(channel.schemaId);
554
555 // Update schema statistics
557 }
558
559 // Write the Channel record
560 uncompressedSize_ += write(output, channel);
561
562 // Update channel statistics
563 channelMessageCounts.emplace(message.channelId, 0);
565 }
566
567 // Before writing a message that would overflow the current chunk, close it.
568 auto* chunkWriter = getChunkWriter();
569 if (chunkWriter != nullptr && /* Chunked? */
570 uncompressedSize_ != 0 && /* Current chunk is not empty/new? */
571 9 + getRecordSize(message) + uncompressedSize_ >= chunkSize_ /* Overflowing? */) {
572 auto& fileOutput = *output_;
573 writeChunk(fileOutput, *chunkWriter);
574 }
575
576 // For the chunk-local message index.
577 const uint64_t messageOffset = uncompressedSize_;
578
579 // Write the message
580 uncompressedSize_ += write(output, message);
581
582 // Update message statistics
583 if (!options_.noSummary) {
584 if (statistics_.messageCount == 0) {
587 } else {
590 }
592 channelMessageCounts[message.channelId] += 1;
593 }
594
595 if (chunkWriter != nullptr) {
597 // Update the message index
598 auto& messageIndex = currentMessageIndex_[message.channelId];
599 messageIndex.channelId = message.channelId;
600 messageIndex.records.emplace_back(message.logTime, messageOffset);
601 }
602
603 // Update the chunk index start/end times
606
607 // Check if the current chunk is ready to close
609 auto& fileOutput = *output_;
610 writeChunk(fileOutput, *chunkWriter);
611 }
612 }
613
614 return StatusCode::Success;
615}
616
618 if (!output_) {
619 return StatusCode::NotOpen;
620 }
621 auto& fileOutput = *output_;
622
623 // Check if we have an open chunk that needs to be closed
624 auto* chunkWriter = getChunkWriter();
625 if (chunkWriter && !chunkWriter->empty()) {
626 writeChunk(fileOutput, *chunkWriter);
627 }
628
630 // Calculate the CRC32 of the attachment
631 uint32_t sizePrefix = 0;
632 uint32_t crc = internal::CRC32_INIT;
633 crc = internal::crc32Update(crc, reinterpret_cast<const std::byte*>(&attachment.logTime), 8);
634 crc = internal::crc32Update(crc, reinterpret_cast<const std::byte*>(&attachment.createTime), 8);
635 sizePrefix = uint32_t(attachment.name.size());
636 crc = internal::crc32Update(crc, reinterpret_cast<const std::byte*>(&sizePrefix), 4);
637 crc = internal::crc32Update(crc, reinterpret_cast<const std::byte*>(attachment.name.data()),
638 sizePrefix);
639 sizePrefix = uint32_t(attachment.mediaType.size());
640 crc = internal::crc32Update(crc, reinterpret_cast<const std::byte*>(&sizePrefix), 4);
642 crc, reinterpret_cast<const std::byte*>(attachment.mediaType.data()), sizePrefix);
643 crc = internal::crc32Update(crc, reinterpret_cast<const std::byte*>(&attachment.dataSize), 8);
644 crc = internal::crc32Update(crc, reinterpret_cast<const std::byte*>(attachment.data),
645 attachment.dataSize);
646 attachment.crc = internal::crc32Final(crc);
647 }
648
649 const uint64_t fileOffset = fileOutput.size();
650
651 // Write the attachment
652 write(fileOutput, attachment);
653
654 // Update statistics and attachment index
655 if (!options_.noSummary) {
658 attachmentIndex_.emplace_back(attachment, fileOffset);
659 }
660 }
661
662 return StatusCode::Success;
663}
664
666 if (!output_) {
667 return StatusCode::NotOpen;
668 }
669 auto& fileOutput = *output_;
670
671 // Check if we have an open chunk that needs to be closed
672 auto* chunkWriter = getChunkWriter();
673 if (chunkWriter && !chunkWriter->empty()) {
674 writeChunk(fileOutput, *chunkWriter);
675 }
676
677 const uint64_t fileOffset = fileOutput.size();
678
679 // Write the metadata
680 write(fileOutput, metadata);
681
682 // Update statistics and metadata index
683 if (!options_.noSummary) {
686 metadataIndex_.emplace_back(metadata, fileOffset);
687 }
688 }
689
690 return StatusCode::Success;
691}
692
694 return statistics_;
695}
696
700
701// Private methods /////////////////////////////////////////////////////////////
702
704 if (chunkSize_ == 0) {
705 return *output_;
706 }
707 switch (compression_) {
708 default:
710 return *uncompressedChunk_;
711#ifndef MCAP_COMPRESSION_NO_ZSTD
713 return *zstdChunk_;
714#endif
715#ifndef MCAP_COMPRESSION_NO_LZ4
716 case Compression::Lz4:
717 return *lz4Chunk_;
718#endif
719 }
720}
721
723 if (chunkSize_ == 0) {
724 return nullptr;
725 }
726
727 switch (compression_) {
729 default:
730 return uncompressedChunk_.get();
731#ifndef MCAP_COMPRESSION_NO_LZ4
732 case Compression::Lz4:
733 return lz4Chunk_.get();
734#endif
735#ifndef MCAP_COMPRESSION_NO_ZSTD
737 return zstdChunk_.get();
738#endif
739 }
740}
741
743 // Both LZ4 and ZSTD recommend ~1KB as the minimum size for compressed data
744 constexpr uint64_t MIN_COMPRESSION_SIZE = 1024;
745 // Throw away any compression results that save less than 2% of the original size
746 constexpr double MIN_COMPRESSION_RATIO = 1.02;
747
748 Compression compression = Compression::None;
749 const uint64_t uncompressedSize = uncompressedSize_;
750 uint64_t compressedSize = uncompressedSize;
751 const std::byte* compressedData = chunkData.data();
752
753 if (options_.forceCompression || uncompressedSize >= MIN_COMPRESSION_SIZE) {
754 // Flush any in-progress compression stream
755 chunkData.end();
756
757 // Only use the compressed data if it is materially smaller than the
758 // uncompressed data
759 const double compressionRatio = double(uncompressedSize) / double(chunkData.compressedSize());
760 if (options_.forceCompression || compressionRatio >= MIN_COMPRESSION_RATIO) {
761 compression = compression_;
762 compressedSize = chunkData.compressedSize();
763 compressedData = chunkData.compressedData();
764 }
765 }
766
767 const auto compressionStr = internal::CompressionString(compression);
768 const uint32_t uncompressedCrc = chunkData.crc();
769
770 // Write the chunk
771 const uint64_t chunkStartOffset = output.size();
772 write(output, Chunk{currentChunkStart_, currentChunkEnd_, uncompressedSize, uncompressedCrc,
773 compressionStr, compressedSize, compressedData});
774
775 const uint64_t chunkLength = output.size() - chunkStartOffset;
776
777 if (!options_.noChunkIndex) {
778 // Create a chunk index record
779 auto& chunkIndexRecord = chunkIndex_.emplace_back();
780
781 const uint64_t messageIndexOffset = output.size();
783 // Write the message index records
784 for (auto& [channelId, messageIndex] : currentMessageIndex_) {
785 // currentMessageIndex_ contains entries for every channel ever seen, not just in this
786 // chunk. Only write message index records for channels with messages in this chunk.
787 if (messageIndex.records.size() > 0) {
788 chunkIndexRecord.messageIndexOffsets.emplace(channelId, output.size());
789 write(output, messageIndex);
790 // reset this message index for the next chunk. This allows us to re-use
791 // allocations vs. the alternative strategy of allocating a fresh set of MessageIndex
792 // objects per chunk.
793 messageIndex.records.clear();
794 }
795 }
796 }
797 const uint64_t messageIndexLength = output.size() - messageIndexOffset;
798
799 // Fill in the newly created chunk index record. This will be written into
800 // the summary section when close() is called. Note that currentChunkStart_
801 // may still be initialized to MaxTime if this chunk does not contain any
802 // messages.
803 chunkIndexRecord.messageStartTime = currentChunkStart_ == MaxTime ? 0 : currentChunkStart_;
804 chunkIndexRecord.messageEndTime = currentChunkEnd_;
805 chunkIndexRecord.chunkStartOffset = chunkStartOffset;
806 chunkIndexRecord.chunkLength = chunkLength;
807 chunkIndexRecord.messageIndexLength = messageIndexLength;
808 chunkIndexRecord.compression = compressionStr;
809 chunkIndexRecord.compressedSize = compressedSize;
810 chunkIndexRecord.uncompressedSize = uncompressedSize;
811 } else if (!options_.noMessageIndex) {
812 // Write the message index records
813 for (auto& [channelId, messageIndex] : currentMessageIndex_) {
814 // currentMessageIndex_ contains entries for every channel ever seen, not just in this
815 // chunk. Only write message index records for channels with messages in this chunk.
816 if (messageIndex.records.size() > 0) {
817 write(output, messageIndex);
818 // reset this message index for the next chunk. This allows us to re-use
819 // allocations vs. the alternative strategy of allocating a fresh set of MessageIndex
820 // objects per chunk.
821 messageIndex.records.clear();
822 }
823 }
824 }
825
826 // Reset uncompressedSize and start/end times for the next chunk
830
831 // Update statistics
833
834 // Reset the chunk writer
835 chunkData.clear();
836}
837
839 write(output, reinterpret_cast<const std::byte*>(Magic), sizeof(Magic));
840}
841
842uint64_t McapWriter::write(IWritable& output, const Header& header) {
843 const uint64_t recordSize = 4 + header.profile.size() + 4 + header.library.size();
844
845 write(output, OpCode::Header);
846 write(output, recordSize);
847 write(output, header.profile);
848 write(output, header.library);
849
850 return 9 + recordSize;
851}
852
853uint64_t McapWriter::write(IWritable& output, const Footer& footer, const bool crcEnabled) {
854 const uint64_t recordSize = /* summary_start */ 8 +
855 /* summary_offset_start */ 8 +
856 /* summary_crc */ 4;
857
858 write(output, OpCode::Footer);
859 write(output, recordSize);
860 write(output, footer.summaryStart);
861 write(output, footer.summaryOffsetStart);
862 uint32_t summaryCrc = 0;
863 if (crcEnabled) {
864 summaryCrc = output.crc();
865 }
866 write(output, summaryCrc);
867
868 return 9 + recordSize;
869}
870
871uint64_t McapWriter::write(IWritable& output, const Schema& schema) {
872 const uint64_t recordSize = /* id */ 2 +
873 /* name */ 4 + schema.name.size() +
874 /* encoding */ 4 + schema.encoding.size() +
875 /* data */ 4 + schema.data.size();
876
877 write(output, OpCode::Schema);
878 write(output, recordSize);
879 write(output, schema.id);
880 write(output, schema.name);
881 write(output, schema.encoding);
882 write(output, schema.data);
883
884 return 9 + recordSize;
885}
886
887uint64_t McapWriter::write(IWritable& output, const Channel& channel) {
888 const uint32_t metadataSize = internal::KeyValueMapSize(channel.metadata);
889 const uint64_t recordSize = /* id */ 2 +
890 /* topic */ 4 + channel.topic.size() +
891 /* message_encoding */ 4 + channel.messageEncoding.size() +
892 /* schema_id */ 2 +
893 /* metadata */ 4 + metadataSize;
894
895 write(output, OpCode::Channel);
896 write(output, recordSize);
897 write(output, channel.id);
898 write(output, channel.schemaId);
899 write(output, channel.topic);
900 write(output, channel.messageEncoding);
901 write(output, channel.metadata, metadataSize);
902
903 return 9 + recordSize;
904}
905
906uint64_t McapWriter::getRecordSize(const Message& message) {
907 return 2 + 4 + 8 + 8 + message.dataSize;
908}
909
910uint64_t McapWriter::write(IWritable& output, const Message& message) {
911 const uint64_t recordSize = getRecordSize(message);
912
913 write(output, OpCode::Message);
914 write(output, recordSize);
915 write(output, message.channelId);
916 write(output, message.sequence);
917 write(output, message.logTime);
918 write(output, message.publishTime);
919 write(output, message.data, message.dataSize);
920
921 return 9 + recordSize;
922}
923
924uint64_t McapWriter::write(IWritable& output, const Attachment& attachment) {
925 const uint64_t recordSize = 4 + attachment.name.size() + 8 + 8 + 4 + attachment.mediaType.size() +
926 8 + attachment.dataSize + 4;
927
928 write(output, OpCode::Attachment);
929 write(output, recordSize);
930 write(output, attachment.logTime);
931 write(output, attachment.createTime);
932 write(output, attachment.name);
933 write(output, attachment.mediaType);
934 write(output, attachment.dataSize);
935 write(output, attachment.data, attachment.dataSize);
936 write(output, attachment.crc);
937
938 return 9 + recordSize;
939}
940
941uint64_t McapWriter::write(IWritable& output, const Metadata& metadata) {
942 const uint32_t metadataSize = internal::KeyValueMapSize(metadata.metadata);
943 const uint64_t recordSize = 4 + metadata.name.size() + 4 + metadataSize;
944
945 write(output, OpCode::Metadata);
946 write(output, recordSize);
947 write(output, metadata.name);
948 write(output, metadata.metadata, metadataSize);
949
950 return 9 + recordSize;
951}
952
953uint64_t McapWriter::write(IWritable& output, const Chunk& chunk) {
954 const uint64_t recordSize =
955 8 + 8 + 8 + 4 + 4 + chunk.compression.size() + 8 + chunk.compressedSize;
956
957 write(output, OpCode::Chunk);
958 write(output, recordSize);
959 write(output, chunk.messageStartTime);
960 write(output, chunk.messageEndTime);
961 write(output, chunk.uncompressedSize);
962 write(output, chunk.uncompressedCrc);
963 write(output, chunk.compression);
964 write(output, chunk.compressedSize);
965 write(output, chunk.records, chunk.compressedSize);
966 output.flush();
967
968 return 9 + recordSize;
969}
970
971uint64_t McapWriter::write(IWritable& output, const MessageIndex& index) {
972 const uint32_t recordsSize = (uint32_t)(index.records.size()) * 16;
973 const uint64_t recordSize = 2 + 4 + recordsSize;
974
976 write(output, recordSize);
977 write(output, index.channelId);
978
979 write(output, recordsSize);
980 for (const auto& [timestamp, offset] : index.records) {
981 write(output, timestamp);
982 write(output, offset);
983 }
984
985 return 9 + recordSize;
986}
987
988uint64_t McapWriter::write(IWritable& output, const ChunkIndex& index) {
989 const uint32_t messageIndexOffsetsSize = (uint32_t)(index.messageIndexOffsets.size()) * 10;
990 const uint64_t recordSize = /* start_time */ 8 +
991 /* end_time */ 8 +
992 /* chunk_start_offset */ 8 +
993 /* chunk_length */ 8 +
994 /* message_index_offsets */ 4 + messageIndexOffsetsSize +
995 /* message_index_length */ 8 +
996 /* compression */ 4 + index.compression.size() +
997 /* compressed_size */ 8 +
998 /* uncompressed_size */ 8;
999
1000 write(output, OpCode::ChunkIndex);
1001 write(output, recordSize);
1002 write(output, index.messageStartTime);
1003 write(output, index.messageEndTime);
1004 write(output, index.chunkStartOffset);
1005 write(output, index.chunkLength);
1006
1007 write(output, messageIndexOffsetsSize);
1008 for (const auto& [channelId, offset] : index.messageIndexOffsets) {
1009 write(output, channelId);
1010 write(output, offset);
1011 }
1012
1013 write(output, index.messageIndexLength);
1014 write(output, index.compression);
1015 write(output, index.compressedSize);
1016 write(output, index.uncompressedSize);
1017
1018 return 9 + recordSize;
1019}
1020
1021uint64_t McapWriter::write(IWritable& output, const AttachmentIndex& index) {
1022 const uint64_t recordSize = /* offset */ 8 +
1023 /* length */ 8 +
1024 /* log_time */ 8 +
1025 /* create_time */ 8 +
1026 /* data_size */ 8 +
1027 /* name */ 4 + index.name.size() +
1028 /* media_type */ 4 + index.mediaType.size();
1029
1031 write(output, recordSize);
1032 write(output, index.offset);
1033 write(output, index.length);
1034 write(output, index.logTime);
1035 write(output, index.createTime);
1036 write(output, index.dataSize);
1037 write(output, index.name);
1038 write(output, index.mediaType);
1039
1040 return 9 + recordSize;
1041}
1042
1043uint64_t McapWriter::write(IWritable& output, const MetadataIndex& index) {
1044 const uint64_t recordSize = /* offset */ 8 +
1045 /* length */ 8 +
1046 /* name */ 4 + index.name.size();
1047
1049 write(output, recordSize);
1050 write(output, index.offset);
1051 write(output, index.length);
1052 write(output, index.name);
1053
1054 return 9 + recordSize;
1055}
1056
1057uint64_t McapWriter::write(IWritable& output, const Statistics& stats) {
1058 const uint32_t channelMessageCountsSize = (uint32_t)(stats.channelMessageCounts.size()) * 10;
1059 const uint64_t recordSize = /* message_count */ 8 +
1060 /* schema_count */ 2 +
1061 /* channel_count */ 4 +
1062 /* attachment_count */ 4 +
1063 /* metadata_count */ 4 +
1064 /* chunk_count */ 4 +
1065 /* message_start_time */ 8 +
1066 /* message_end_time */ 8 +
1067 /* channel_message_counts */ 4 + channelMessageCountsSize;
1068
1069 write(output, OpCode::Statistics);
1070 write(output, recordSize);
1071 write(output, stats.messageCount);
1072 write(output, stats.schemaCount);
1073 write(output, stats.channelCount);
1074 write(output, stats.attachmentCount);
1075 write(output, stats.metadataCount);
1076 write(output, stats.chunkCount);
1077 write(output, stats.messageStartTime);
1078 write(output, stats.messageEndTime);
1079
1080 write(output, channelMessageCountsSize);
1081 for (const auto& [channelId, messageCount] : stats.channelMessageCounts) {
1082 write(output, channelId);
1083 write(output, messageCount);
1084 }
1085
1086 return 9 + recordSize;
1087}
1088
1089uint64_t McapWriter::write(IWritable& output, const SummaryOffset& summaryOffset) {
1090 const uint64_t recordSize = /* group_opcode */ 1 +
1091 /* group_start */ 8 +
1092 /* group_length */ 8;
1093
1095 write(output, recordSize);
1096 write(output, summaryOffset.groupOpCode);
1097 write(output, summaryOffset.groupStart);
1098 write(output, summaryOffset.groupLength);
1099
1100 return 9 + recordSize;
1101}
1102
1103uint64_t McapWriter::write(IWritable& output, const DataEnd& dataEnd) {
1104 const uint64_t recordSize = /* data_section_crc */ 4;
1105
1106 write(output, OpCode::DataEnd);
1107 write(output, recordSize);
1108 write(output, dataEnd.dataSectionCrc);
1109
1110 return 9 + recordSize;
1111}
1112
1113uint64_t McapWriter::write(IWritable& output, const Record& record) {
1114 write(output, OpCode(record.opcode));
1115 write(output, record.dataSize);
1116 write(output, record.data, record.dataSize);
1117
1118 return 9 + record.dataSize;
1119}
1120
1122 write(output, uint32_t(str.size()));
1123 output.write(reinterpret_cast<const std::byte*>(str.data()), str.size());
1124}
1125
1126void McapWriter::write(IWritable& output, const ByteArray bytes) {
1127 write(output, uint32_t(bytes.size()));
1128 output.write(bytes.data(), bytes.size());
1129}
1130
1131void McapWriter::write(IWritable& output, OpCode value) {
1132 output.write(reinterpret_cast<const std::byte*>(&value), sizeof(value));
1133}
1134
1135void McapWriter::write(IWritable& output, uint16_t value) {
1136 output.write(reinterpret_cast<const std::byte*>(&value), sizeof(value));
1137}
1138
1139void McapWriter::write(IWritable& output, uint32_t value) {
1140 output.write(reinterpret_cast<const std::byte*>(&value), sizeof(value));
1141}
1142
1143void McapWriter::write(IWritable& output, uint64_t value) {
1144 output.write(reinterpret_cast<const std::byte*>(&value), sizeof(value));
1145}
1146
1147void McapWriter::write(IWritable& output, const std::byte* data, uint64_t size) {
1148 output.write(reinterpret_cast<const std::byte*>(data), size);
1149}
1150
1151void McapWriter::write(IWritable& output, const KeyValueMap& map, uint32_t size) {
1152 // Create a vector of key-value pairs so we can lexicographically sort by key
1154 pairs.reserve(map.size());
1155 for (const auto& [key, value] : map) {
1156 pairs.emplace_back(key, value);
1157 }
1158 std::sort(pairs.begin(), pairs.end());
1159
1160 write(output, size > 0 ? size : internal::KeyValueMapSize(map));
1161 for (const auto& [key, value] : pairs) {
1162 write(output, key);
1163 write(output, value);
1164 }
1165}
1166
1167} // namespace mcap
T abort(T... args)
T begin(T... args)
uint64_t size() const override
Returns the size in bytes of the uncompressed data.
Definition writer.inl:122
uint64_t compressedSize() const override
Returns the size in bytes of the compressed data. This will only be called after end().
Definition writer.inl:126
std::vector< std::byte > buffer_
Definition writer.hpp:265
void handleWrite(const std::byte *data, uint64_t size) override
Definition writer.inl:114
void handleClear() override
Definition writer.inl:134
bool empty() const override
Returns true if write() has never been called since initialization or the last call to clear().
Definition writer.inl:130
const std::byte * data() const override
Returns a pointer to the uncompressed data.
Definition writer.inl:138
const std::byte * compressedData() const override
Returns a pointer to the compressed data. This will only be called after end().
Definition writer.inl:142
void end() override
Called when the writer wants to close the current output Chunk. After this call, data() and size() sh...
Definition writer.inl:118
uint64_t size_
Definition writer.hpp:179
~FileWriter() override
Definition writer.inl:42
std::FILE * file_
Definition writer.hpp:178
Status open(std::string_view filename)
Definition writer.inl:46
void flush() override
flushes any buffered data to the output. This is called by McapWriter after every completed chunk....
Definition writer.inl:64
void handleWrite(const std::byte *data, uint64_t size) override
Definition writer.inl:56
uint64_t size() const override
Returns the current size of the file in bytes. This must be equal to the sum of all size parameters p...
Definition writer.inl:78
void end() override
Called when the writer is finished writing data to the output MCAP file.
Definition writer.inl:70
An abstract interface for writing Chunk data. Chunk data is buffered in memory and written to disk as...
Definition writer.hpp:205
virtual void handleClear()=0
virtual uint64_t compressedSize() const =0
Returns the size in bytes of the compressed data. This will only be called after end().
void clear()
Clear the internal state of the writer, discarding any input or output buffers.
Definition writer.inl:107
virtual void end() override=0
Called when the writer wants to close the current output Chunk. After this call, data() and size() sh...
virtual const std::byte * compressedData() const =0
Returns a pointer to the compressed data. This will only be called after end().
virtual const std::byte * data() const =0
Returns a pointer to the uncompressed data.
An abstract interface for writing MCAP data.
Definition writer.hpp:114
void write(const std::byte *data, uint64_t size)
Called whenever the writer needs to write data to the output MCAP file.
Definition writer.inl:21
virtual void flush()
flushes any buffered data to the output. This is called by McapWriter after every completed chunk....
Definition writer.hpp:153
virtual void handleWrite(const std::byte *data, uint64_t size)=0
uint32_t crc_
Definition writer.hpp:159
void resetCrc()
Resets the CRC32 calculation.
Definition writer.inl:36
uint32_t crc()
Returns the CRC32 of the uncompressed data.
Definition writer.inl:28
IWritable() noexcept
Definition writer.inl:18
virtual uint64_t size() const =0
Returns the current size of the file in bytes. This must be equal to the sum of all size parameters p...
bool empty() const override
Returns true if write() has never been called since initialization or the last call to clear().
Definition writer.inl:201
uint64_t compressedSize() const override
Returns the size in bytes of the compressed data. This will only be called after end().
Definition writer.inl:197
CompressionLevel compressionLevel_
Definition writer.hpp:289
std::vector< std::byte > compressedBuffer_
Definition writer.hpp:288
LZ4Writer(CompressionLevel compressionLevel, uint64_t chunkSize)
Definition writer.inl:169
const std::byte * data() const override
Returns a pointer to the uncompressed data.
Definition writer.inl:210
uint64_t size() const override
Returns the size in bytes of the uncompressed data.
Definition writer.inl:193
void end() override
Called when the writer wants to close the current output Chunk. After this call, data() and size() sh...
Definition writer.inl:178
std::vector< std::byte > uncompressedBuffer_
Definition writer.hpp:287
void handleWrite(const std::byte *data, uint64_t size) override
Definition writer.inl:174
const std::byte * compressedData() const override
Returns a pointer to the compressed data. This will only be called after end().
Definition writer.inl:214
void handleClear() override
Definition writer.inl:205
std::vector< MetadataIndex > metadataIndex_
Definition writer.hpp:494
std::unique_ptr< StreamWriter > streamOutput_
Definition writer.hpp:483
std::unique_ptr< ZStdWriter > zstdChunk_
Definition writer.hpp:489
std::vector< Schema > schemas_
Definition writer.hpp:491
void writeChunk(IWritable &output, IChunkWriter &chunkData)
Definition writer.inl:742
std::vector< Channel > channels_
Definition writer.hpp:492
std::vector< AttachmentIndex > attachmentIndex_
Definition writer.hpp:493
static void writeMagic(IWritable &output)
Definition writer.inl:838
Timestamp currentChunkStart_
Definition writer.hpp:499
void addSchema(Schema &schema)
Add a new schema to the MCAP file and set schema.id to a generated schema id. The schema id is used w...
Definition writer.inl:515
std::unordered_map< ChannelId, MessageIndex > currentMessageIndex_
Definition writer.hpp:498
IWritable * dataSink()
Returns a pointer to the IWritable data destination backing this writer. Will return nullptr if the w...
Definition writer.inl:697
Timestamp currentChunkEnd_
Definition writer.hpp:500
IWritable & getOutput()
Definition writer.inl:703
std::unique_ptr< FileWriter > fileOutput_
Definition writer.hpp:482
uint64_t chunkSize_
Definition writer.hpp:480
static uint64_t getRecordSize(const Message &message)
Definition writer.inl:906
Compression compression_
Definition writer.hpp:501
std::unordered_set< SchemaId > writtenSchemas_
Definition writer.hpp:497
uint64_t uncompressedSize_
Definition writer.hpp:502
void closeLastChunk()
finishes the current chunk in progress and writes it to the file, if a chunk is in progress.
Definition writer.inl:362
std::unique_ptr< BufferWriter > uncompressedChunk_
Definition writer.hpp:484
Statistics statistics_
Definition writer.hpp:496
IChunkWriter * getChunkWriter()
Definition writer.inl:722
void terminate()
Reset internal state without writing the MCAP footer or flushing pending writes. This should only be ...
Definition writer.inl:486
Status open(std::string_view filename, const McapWriterOptions &options)
Open a new MCAP file for writing and write the header.
Definition writer.inl:342
IWritable * output_
Definition writer.hpp:481
std::unique_ptr< LZ4Writer > lz4Chunk_
Definition writer.hpp:486
void addChannel(Channel &channel)
Add a new channel to the MCAP file and set channel.id to a generated channel id. The channel id is us...
Definition writer.inl:520
Status write(const Message &message)
Write a message to the output stream.
Definition writer.inl:525
std::vector< ChunkIndex > chunkIndex_
Definition writer.hpp:495
const Statistics & statistics() const
Current MCAP file-level statistics. This is written as a Statistics record in the Summary section of ...
Definition writer.inl:693
void close()
Write the MCAP footer, flush pending writes to the output stream, and reset internal state....
Definition writer.inl:373
McapWriterOptions options_
Definition writer.hpp:479
StreamWriter(std::ostream &stream)
Definition writer.inl:84
void handleWrite(const std::byte *data, uint64_t size) override
Definition writer.inl:88
std::ostream & stream_
Definition writer.hpp:196
uint64_t size() const override
Returns the current size of the file in bytes. This must be equal to the sum of all size parameters p...
Definition writer.inl:101
void flush() override
flushes any buffered data to the output. This is called by McapWriter after every completed chunk....
Definition writer.inl:93
void end() override
Called when the writer is finished writing data to the output MCAP file.
Definition writer.inl:97
ZSTD_CCtx_s * zstdContext_
Definition writer.hpp:315
const std::byte * compressedData() const override
Returns a pointer to the compressed data. This will only be called after end().
Definition writer.inl:295
void handleWrite(const std::byte *data, uint64_t size) override
Definition writer.inl:255
void end() override
Called when the writer wants to close the current output Chunk. After this call, data() and size() sh...
Definition writer.inl:259
bool empty() const override
Returns true if write() has never been called since initialization or the last call to clear().
Definition writer.inl:282
~ZStdWriter() override
Definition writer.inl:251
std::vector< std::byte > uncompressedBuffer_
Definition writer.hpp:313
void handleClear() override
Definition writer.inl:286
uint64_t size() const override
Returns the size in bytes of the uncompressed data.
Definition writer.inl:274
ZStdWriter(CompressionLevel compressionLevel, uint64_t chunkSize)
Definition writer.inl:244
const std::byte * data() const override
Returns a pointer to the uncompressed data.
Definition writer.inl:291
std::vector< std::byte > compressedBuffer_
Definition writer.hpp:314
uint64_t compressedSize() const override
Returns the size in bytes of the compressed data. This will only be called after end().
Definition writer.inl:278
T clear(T... args)
T data(T... args)
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T fclose(T... args)
T fflush(T... args)
T find(T... args)
T flush(T... args)
T fopen(T... args)
T fwrite(T... args)
T insert(T... args)
T max(T... args)
T min(T... args)
int ZStdCompressionLevel(CompressionLevel level)
Definition writer.inl:224
uint32_t crc32Update(const uint32_t prev, const std::byte *const data, const size_t length)
Definition crc32.hpp:74
std::string StrCat(T &&... args)
Definition internal.hpp:45
uint32_t KeyValueMapSize(const KeyValueMap &map)
Definition internal.hpp:51
const std::string CompressionString(Compression compression)
Definition internal.hpp:59
uint32_t crc32Final(uint32_t crc)
Definition crc32.hpp:104
static constexpr uint32_t CRC32_INIT
Definition crc32.hpp:66
int LZ4CompressionLevel(CompressionLevel level)
Definition writer.inl:151
Definition crc32.hpp:5
constexpr Timestamp MaxTime
Definition types.hpp:32
Compression
Supported MCAP compression algorithms.
Definition types.hpp:37
OpCode
MCAP record types.
Definition types.hpp:59
CompressionLevel
Compression level to use when compression is enabled. Slower generally produces smaller files,...
Definition types.hpp:48
uint64_t ByteOffset
Definition types.hpp:22
constexpr uint8_t Magic[]
Definition types.hpp:29
T reserve(T... args)
T resize(T... args)
T size(T... args)
T sort(T... args)
Attachment Index records are found in the Summary section, providing summary information for a single...
Definition types.hpp:270
std::string mediaType
Definition types.hpp:277
ByteOffset length
Definition types.hpp:272
std::string name
Definition types.hpp:276
ByteOffset offset
Definition types.hpp:271
Timestamp createTime
Definition types.hpp:274
An Attachment is an arbitrary file embedded in an MCAP file, including a name, media type,...
Definition types.hpp:256
Timestamp createTime
Definition types.hpp:258
std::string name
Definition types.hpp:259
const std::byte * data
Definition types.hpp:262
Timestamp logTime
Definition types.hpp:257
uint32_t crc
Definition types.hpp:263
uint64_t dataSize
Definition types.hpp:261
std::string mediaType
Definition types.hpp:260
Describes a Channel that messages are written to. A Channel represents a single connection from a pub...
Definition types.hpp:159
std::string messageEncoding
Definition types.hpp:162
std::string topic
Definition types.hpp:161
SchemaId schemaId
Definition types.hpp:163
ChannelId id
Definition types.hpp:160
KeyValueMap metadata
Definition types.hpp:164
Chunk Index records are found in the Summary section, providing summary information for a single Chun...
Definition types.hpp:239
Timestamp messageStartTime
Definition types.hpp:240
ByteOffset chunkStartOffset
Definition types.hpp:242
std::unordered_map< ChannelId, ByteOffset > messageIndexOffsets
Definition types.hpp:244
ByteOffset messageIndexLength
Definition types.hpp:245
ByteOffset compressedSize
Definition types.hpp:247
ByteOffset uncompressedSize
Definition types.hpp:248
Timestamp messageEndTime
Definition types.hpp:241
std::string compression
Definition types.hpp:246
ByteOffset chunkLength
Definition types.hpp:243
An collection of Schemas, Channels, and Messages that supports compression and indexing.
Definition types.hpp:215
ByteOffset uncompressedSize
Definition types.hpp:218
const std::byte * records
Definition types.hpp:222
ByteOffset compressedSize
Definition types.hpp:221
Timestamp messageEndTime
Definition types.hpp:217
Timestamp messageStartTime
Definition types.hpp:216
std::string compression
Definition types.hpp:220
uint32_t uncompressedCrc
Definition types.hpp:219
The final record in the Data section, signaling the end of Data and beginning of Summary....
Definition types.hpp:350
uint32_t dataSectionCrc
Definition types.hpp:351
The final record in an MCAP file (before the trailing magic byte sequence). Contains byte offsets fro...
Definition types.hpp:115
ByteOffset summaryOffsetStart
Definition types.hpp:117
ByteOffset summaryStart
Definition types.hpp:116
Appears at the beginning of every MCAP file (after the magic byte sequence) and contains the recordin...
Definition types.hpp:103
std::string profile
Definition types.hpp:104
std::string library
Definition types.hpp:105
Configuration options for McapWriter.
Definition writer.hpp:21
bool forceCompression
By default, Chunks that do not benefit from compression will be written uncompressed....
Definition writer.hpp:83
bool noSummary
Do not write Summary or Summary Offset sections to the file, placing the Footer record immediately af...
Definition writer.hpp:56
CompressionLevel compressionLevel
Compression level to use when writing Chunks. Slower generally produces smaller files,...
Definition writer.hpp:77
bool enableDataCRC
Enable CRC calculations for all records in the data section.
Definition writer.hpp:33
std::string library
A freeform string written by recording libraries. For this library, the default is "libmcap {Major}....
Definition writer.hpp:94
bool noChunking
Do not write Chunks to the file, instead writing Schema, Channel, and Message records directly into t...
Definition writer.hpp:42
bool noChunkCRC
Disable CRC calculations for Chunks.
Definition writer.hpp:25
std::string profile
The recording profile. See https://mcap.dev/spec/registry#well-known-profiles for more information on...
Definition writer.hpp:89
bool noMessageIndex
Do not write Message Index records to the file. If noMessageIndex=true and noChunkIndex=false,...
Definition writer.hpp:48
bool noSummaryCRC
Disable CRC calculations for the summary section.
Definition writer.hpp:37
uint64_t chunkSize
Target uncompressed Chunk payload size in bytes. Once a Chunk's uncompressed data is about to exceed ...
Definition writer.hpp:66
bool noAttachmentCRC
Disable CRC calculations for Attachments.
Definition writer.hpp:29
Compression compression
Compression algorithm to use when writing Chunks. This option is ignored if noChunking=true.
Definition writer.hpp:71
A list of timestamps to byte offsets for a single Channel. This record appears after each Chunk,...
Definition types.hpp:229
ChannelId channelId
Definition types.hpp:230
std::vector< std::pair< Timestamp, ByteOffset > > records
Definition types.hpp:231
A single Message published to a Channel.
Definition types.hpp:182
Timestamp logTime
Nanosecond timestamp when this message was recorded or received for recording.
Definition types.hpp:193
uint32_t sequence
An optional sequence number. If non-zero, sequence numbers should be unique per channel and increasin...
Definition types.hpp:188
const std::byte * data
A pointer to the message payload. For readers, this pointer is only valid for the lifetime of an onMe...
Definition types.hpp:208
Timestamp publishTime
Nanosecond timestamp when this message was initially published. If not available, this should be set ...
Definition types.hpp:198
ChannelId channelId
Definition types.hpp:183
uint64_t dataSize
Size of the message payload in bytes, pointed to via data.
Definition types.hpp:202
Metadata Index records are found in the Summary section, providing summary information for a single M...
Definition types.hpp:325
std::string name
Definition types.hpp:328
Holds a named map of key/value strings containing arbitrary user data. Metadata records are found in ...
Definition types.hpp:316
std::string name
Definition types.hpp:317
KeyValueMap metadata
Definition types.hpp:318
A generic Type-Length-Value record using a uint8 type and uint64 length. This is the generic form of ...
Definition types.hpp:87
uint64_t dataSize
Definition types.hpp:89
std::byte * data
Definition types.hpp:90
OpCode opcode
Definition types.hpp:88
Describes a schema used for message encoding and decoding and/or describing the shape of messages....
Definition types.hpp:132
SchemaId id
Definition types.hpp:133
std::string encoding
Definition types.hpp:135
std::string name
Definition types.hpp:134
ByteArray data
Definition types.hpp:136
The Statistics record is found in the Summary section, providing counts and timestamp ranges for the ...
Definition types.hpp:300
Timestamp messageEndTime
Definition types.hpp:308
uint64_t messageCount
Definition types.hpp:301
Timestamp messageStartTime
Definition types.hpp:307
uint16_t schemaCount
Definition types.hpp:302
uint32_t channelCount
Definition types.hpp:303
uint32_t metadataCount
Definition types.hpp:305
uint32_t attachmentCount
Definition types.hpp:304
uint32_t chunkCount
Definition types.hpp:306
std::unordered_map< ChannelId, uint64_t > channelMessageCounts
Definition types.hpp:309
Wraps a status code and string message carrying additional context.
Definition errors.hpp:36
Summary Offset records are found in the Summary Offset section. Records in the Summary section are gr...
Definition types.hpp:340
ByteOffset groupLength
Definition types.hpp:343
ByteOffset groupStart
Definition types.hpp:342
T write(T... args)