pjmsg_mcap_wrapper
Loading...
Searching...
No Matches
writer.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "types.hpp"
4#include "visibility.hpp"
5#include <cstdio>
6#include <memory>
7#include <string>
8#include <unordered_set>
9#include <vector>
10
11// Forward declaration
12#ifndef MCAP_COMPRESSION_NO_ZSTD
13struct ZSTD_CCtx_s;
14#endif
15
16namespace mcap {
17
18/**
19 * @brief Configuration options for McapWriter.
20 */
22 /**
23 * @brief Disable CRC calculations for Chunks.
24 */
25 bool noChunkCRC = false;
26 /**
27 * @brief Disable CRC calculations for Attachments.
28 */
29 bool noAttachmentCRC = false;
30 /**
31 * @brief Enable CRC calculations for all records in the data section.
32 */
33 bool enableDataCRC = false;
34 /**
35 * @brief Disable CRC calculations for the summary section.
36 */
37 bool noSummaryCRC = false;
38 /**
39 * @brief Do not write Chunks to the file, instead writing Schema, Channel,
40 * and Message records directly into the Data section.
41 */
42 bool noChunking = false;
43 /**
44 * @brief Do not write Message Index records to the file. If
45 * `noMessageIndex=true` and `noChunkIndex=false`, Chunk Index records will
46 * still be written to the Summary section, providing a coarse message index.
47 */
48 bool noMessageIndex = false;
49 /**
50 * @brief Do not write Summary or Summary Offset sections to the file, placing
51 * the Footer record immediately after DataEnd. This can provide some speed
52 * boost to file writing and produce smaller files, at the expense of
53 * requiring a conversion process later if fast summarization or indexed
54 * access is desired.
55 */
56 bool noSummary = false;
57 /**
58 * @brief Target uncompressed Chunk payload size in bytes. Once a Chunk's
59 * uncompressed data is about to exceed this size, the Chunk will be
60 * compressed (if enabled) and written to disk. Note that this is a 'soft'
61 * ceiling as some Chunks could exceed this size due to either indexing
62 * data or when a single message is larger than `chunkSize`, in which case,
63 * the Chunk will contain only this one large message.
64 * This option is ignored if `noChunking=true`.
65 */
66 uint64_t chunkSize = DefaultChunkSize;
67 /**
68 * @brief Compression algorithm to use when writing Chunks. This option is
69 * ignored if `noChunking=true`.
70 */
71 Compression compression = Compression::Zstd;
72 /**
73 * @brief Compression level to use when writing Chunks. Slower generally
74 * produces smaller files, at the expense of more CPU time. These levels map
75 * to different internal settings for each compression algorithm.
76 */
77 CompressionLevel compressionLevel = CompressionLevel::Default;
78 /**
79 * @brief By default, Chunks that do not benefit from compression will be
80 * written uncompressed. This option can be used to force compression on all
81 * Chunks. This option is ignored if `noChunking=true`.
82 */
83 bool forceCompression = false;
84 /**
85 * @brief The recording profile. See
86 * https://mcap.dev/spec/registry#well-known-profiles
87 * for more information on well-known profiles.
88 */
90 /**
91 * @brief A freeform string written by recording libraries. For this library,
92 * the default is "libmcap {Major}.{Minor}.{Patch}".
93 */
94 std::string library = "libmcap " MCAP_LIBRARY_VERSION;
95
96 // The following options are less commonly used, providing more fine-grained
97 // control of index records and the Summary section
98
99 bool noRepeatedSchemas = false;
100 bool noRepeatedChannels = false;
101 bool noAttachmentIndex = false;
102 bool noMetadataIndex = false;
103 bool noChunkIndex = false;
104 bool noStatistics = false;
105 bool noSummaryOffsets = false;
106
108 : profile(profile) {}
109};
110
111/**
112 * @brief An abstract interface for writing MCAP data.
113 */
115public:
116 bool crcEnabled = false;
117
118 IWritable() noexcept;
119 virtual ~IWritable() = default;
120
121 /**
122 * @brief Called whenever the writer needs to write data to the output MCAP
123 * file.
124 *
125 * @param data A pointer to the data to write.
126 * @param size Size of the data in bytes.
127 */
128 void write(const std::byte* data, uint64_t size);
129 /**
130 * @brief Called when the writer is finished writing data to the output MCAP
131 * file.
132 */
133 virtual void end() = 0;
134 /**
135 * @brief Returns the current size of the file in bytes. This must be equal to
136 * the sum of all `size` parameters passed to `write()`.
137 */
138 virtual uint64_t size() const = 0;
139 /**
140 * @brief Returns the CRC32 of the uncompressed data.
141 */
142 uint32_t crc();
143 /**
144 * @brief Resets the CRC32 calculation.
145 */
146 void resetCrc();
147
148 /**
149 * @brief flushes any buffered data to the output. This is called by McapWriter after every
150 * completed chunk. Callers may also retain a reference to the writer and call flush() at their
151 * own cadence. Defaults to a no-op.
152 */
153 virtual void flush() {}
154
155protected:
156 virtual void handleWrite(const std::byte* data, uint64_t size) = 0;
157
158private:
159 uint32_t crc_;
160};
161
162/**
163 * @brief Implements the IWritable interface used by McapWriter by wrapping a
164 * FILE* pointer created by fopen().
165 */
166class MCAP_PUBLIC FileWriter final : public IWritable {
167public:
168 ~FileWriter() override;
169
170 Status open(std::string_view filename);
171
172 void handleWrite(const std::byte* data, uint64_t size) override;
173 void end() override;
174 void flush() override;
175 uint64_t size() const override;
176
177private:
178 std::FILE* file_ = nullptr;
179 uint64_t size_ = 0;
180};
181
182/**
183 * @brief Implements the IWritable interface used by McapWriter by wrapping a
184 * std::ostream stream.
185 */
186class MCAP_PUBLIC StreamWriter final : public IWritable {
187public:
188 StreamWriter(std::ostream& stream);
189
190 void handleWrite(const std::byte* data, uint64_t size) override;
191 void end() override;
192 void flush() override;
193 uint64_t size() const override;
194
195private:
197 uint64_t size_ = 0;
198};
199
200/**
201 * @brief An abstract interface for writing Chunk data. Chunk data is buffered
202 * in memory and written to disk as a single record, to support optimal
203 * compression and calculating the final Chunk data size.
204 */
206public:
207 virtual ~IChunkWriter() override = default;
208
209 /**
210 * @brief Called when the writer wants to close the current output Chunk.
211 * After this call, `data()` and `size()` should return the data and size of
212 * the compressed data.
213 */
214 virtual void end() override = 0;
215 /**
216 * @brief Returns the size in bytes of the uncompressed data.
217 */
218 virtual uint64_t size() const override = 0;
219
220 /**
221 * @brief Returns the size in bytes of the compressed data. This will only be
222 * called after `end()`.
223 */
224 virtual uint64_t compressedSize() const = 0;
225 /**
226 * @brief Returns true if `write()` has never been called since initialization
227 * or the last call to `clear()`.
228 */
229 virtual bool empty() const = 0;
230 /**
231 * @brief Clear the internal state of the writer, discarding any input or
232 * output buffers.
233 */
234 void clear();
235 /**
236 * @brief Returns a pointer to the uncompressed data.
237 */
238 virtual const std::byte* data() const = 0;
239 /**
240 * @brief Returns a pointer to the compressed data. This will only be called
241 * after `end()`.
242 */
243 virtual const std::byte* compressedData() const = 0;
244
245protected:
246 virtual void handleClear() = 0;
247};
248
249/**
250 * @brief An in-memory IChunkWriter implementation backed by a
251 * growable buffer.
252 */
254public:
255 void handleWrite(const std::byte* data, uint64_t size) override;
256 void end() override;
257 uint64_t size() const override;
258 uint64_t compressedSize() const override;
259 bool empty() const override;
260 void handleClear() override;
261 const std::byte* data() const override;
262 const std::byte* compressedData() const override;
263
264private:
266};
267
268#ifndef MCAP_COMPRESSION_NO_LZ4
269/**
270 * @brief An in-memory IChunkWriter implementation that holds data in a
271 * temporary buffer before flushing to an LZ4-compressed buffer.
272 */
273class MCAP_PUBLIC LZ4Writer final : public IChunkWriter {
274public:
275 LZ4Writer(CompressionLevel compressionLevel, uint64_t chunkSize);
276
277 void handleWrite(const std::byte* data, uint64_t size) override;
278 void end() override;
279 uint64_t size() const override;
280 uint64_t compressedSize() const override;
281 bool empty() const override;
282 void handleClear() override;
283 const std::byte* data() const override;
284 const std::byte* compressedData() const override;
285
286private:
290};
291#endif
292
293#ifndef MCAP_COMPRESSION_NO_ZSTD
294/**
295 * @brief An in-memory IChunkWriter implementation that holds data in a
296 * temporary buffer before flushing to an ZStandard-compressed buffer.
297 */
298class MCAP_PUBLIC ZStdWriter final : public IChunkWriter {
299public:
300 ZStdWriter(CompressionLevel compressionLevel, uint64_t chunkSize);
301 ~ZStdWriter() override;
302
303 void handleWrite(const std::byte* data, uint64_t size) override;
304 void end() override;
305 uint64_t size() const override;
306 uint64_t compressedSize() const override;
307 bool empty() const override;
308 void handleClear() override;
309 const std::byte* data() const override;
310 const std::byte* compressedData() const override;
311
312private:
315 ZSTD_CCtx_s* zstdContext_ = nullptr;
316};
317#endif
318
319/**
320 * @brief Provides a write interface to an MCAP file.
321 */
323public:
324 ~McapWriter();
325
326 /**
327 * @brief Open a new MCAP file for writing and write the header.
328 *
329 * If the writer was already opened, this calls `close`() first to reset the state.
330 * A writer may be re-used after being reset via `close`() or `terminate`().
331 *
332 * @param filename Filename of the MCAP file to write.
333 * @param options Options for MCAP writing. `profile` is required.
334 * @return A non-success status if the file could not be opened for writing.
335 */
336 Status open(std::string_view filename, const McapWriterOptions& options);
337
338 /**
339 * @brief Open a new MCAP file for writing and write the header.
340 *
341 * If the writer was already opened, this calls `close`() first to reset the state.
342 * A writer may be re-used after being reset via `close`() or `terminate`().
343 *
344 * @param writer An implementation of the IWritable interface. Output bytes
345 * will be written to this object.
346 * @param options Options for MCAP writing. `profile` is required.
347 */
348 void open(IWritable& writer, const McapWriterOptions& options);
349
350 /**
351 * @brief Open a new MCAP file for writing and write the header.
352 *
353 * @param stream Output stream to write to.
354 * @param options Options for MCAP writing. `profile` is required.
355 */
356 void open(std::ostream& stream, const McapWriterOptions& options);
357
358 /**
359 * @brief Write the MCAP footer, flush pending writes to the output stream,
360 * and reset internal state. The writer may be re-used with another call to open afterwards.
361 */
362 void close();
363
364 /**
365 * @brief Reset internal state without writing the MCAP footer or flushing
366 * pending writes. This should only be used in error cases as the output MCAP
367 * file will be truncated. The writer may be re-used with another call to open afterwards.
368 */
369 void terminate();
370
371 /**
372 * @brief Add a new schema to the MCAP file and set `schema.id` to a generated
373 * schema id. The schema id is used when adding channels to the file.
374 *
375 * Schemas are not cleared when the state is reset via `close`() or `terminate`().
376 * If you're re-using a writer for multiple files in a row, the schemas only need
377 * to be added once, before first use.
378 *
379 * This method does not de-duplicate schemas.
380 *
381 * @param schema Description of the schema to register. The `id` field is
382 * ignored and will be set to a generated schema id.
383 */
384 void addSchema(Schema& schema);
385
386 /**
387 * @brief Add a new channel to the MCAP file and set `channel.id` to a
388 * generated channel id. The channel id is used when adding messages to the
389 * file.
390 *
391 * Channels are not cleared when the state is reset via `close`() or `terminate`().
392 * If you're re-using a writer for multiple files in a row, the channels only need
393 * to be added once, before first use.
394 *
395 * This method does not de-duplicate channels.
396 *
397 * @param channel Description of the channel to register. The `id` value is
398 * ignored and will be set to a generated channel id.
399 */
400 void addChannel(Channel& channel);
401
402 /**
403 * @brief Write a message to the output stream.
404 *
405 * @param msg Message to add.
406 * @return A non-zero error code on failure.
407 */
408 Status write(const Message& message);
409
410 /**
411 * @brief Write an attachment to the output stream.
412 *
413 * @param attachment Attachment to add. The `attachment.crc` will be
414 * calculated and set if configuration options allow CRC calculation.
415 * @return A non-zero error code on failure.
416 */
417 Status write(Attachment& attachment);
418
419 /**
420 * @brief Write a metadata record to the output stream.
421 *
422 * @param metadata Named group of key/value string pairs to add.
423 * @return A non-zero error code on failure.
424 */
425 Status write(const Metadata& metadata);
426
427 /**
428 * @brief Current MCAP file-level statistics. This is written as a Statistics
429 * record in the Summary section of the MCAP file.
430 */
431 const Statistics& statistics() const;
432
433 /**
434 * @brief Returns a pointer to the IWritable data destination backing this
435 * writer. Will return nullptr if the writer is not open.
436 */
437 IWritable* dataSink();
438
439 /**
440 * @brief finishes the current chunk in progress and writes it to the file, if a chunk
441 * is in progress.
442 */
443 void closeLastChunk();
444
445 // The following static methods are used for serialization of records and
446 // primitives to an output stream. They are not intended to be used directly
447 // unless you are implementing a lower level writer or tests
448
449 static void writeMagic(IWritable& output);
450
451 static uint64_t write(IWritable& output, const Header& header);
452 static uint64_t write(IWritable& output, const Footer& footer, bool crcEnabled);
453 static uint64_t write(IWritable& output, const Schema& schema);
454 static uint64_t write(IWritable& output, const Channel& channel);
455 static uint64_t getRecordSize(const Message& message);
456 static uint64_t write(IWritable& output, const Message& message);
457 static uint64_t write(IWritable& output, const Attachment& attachment);
458 static uint64_t write(IWritable& output, const Metadata& metadata);
459 static uint64_t write(IWritable& output, const Chunk& chunk);
460 static uint64_t write(IWritable& output, const MessageIndex& index);
461 static uint64_t write(IWritable& output, const ChunkIndex& index);
462 static uint64_t write(IWritable& output, const AttachmentIndex& index);
463 static uint64_t write(IWritable& output, const MetadataIndex& index);
464 static uint64_t write(IWritable& output, const Statistics& stats);
465 static uint64_t write(IWritable& output, const SummaryOffset& summaryOffset);
466 static uint64_t write(IWritable& output, const DataEnd& dataEnd);
467 static uint64_t write(IWritable& output, const Record& record);
468
469 static void write(IWritable& output, const std::string_view str);
470 static void write(IWritable& output, const ByteArray bytes);
471 static void write(IWritable& output, OpCode value);
472 static void write(IWritable& output, uint16_t value);
473 static void write(IWritable& output, uint32_t value);
474 static void write(IWritable& output, uint64_t value);
475 static void write(IWritable& output, const std::byte* data, uint64_t size);
476 static void write(IWritable& output, const KeyValueMap& map, uint32_t size = 0);
477
478private:
479 McapWriterOptions options_{""};
480 uint64_t chunkSize_ = DefaultChunkSize;
481 IWritable* output_ = nullptr;
485#ifndef MCAP_COMPRESSION_NO_LZ4
487#endif
488#ifndef MCAP_COMPRESSION_NO_ZSTD
490#endif
496 Statistics statistics_{};
499 Timestamp currentChunkStart_ = MaxTime;
500 Timestamp currentChunkEnd_ = 0;
501 Compression compression_ = Compression::None;
502 uint64_t uncompressedSize_ = 0;
503 bool opened_ = false;
504
505 IWritable& getOutput();
506 IChunkWriter* getChunkWriter();
507 void writeChunk(IWritable& output, IChunkWriter& chunkData);
508};
509
510} // namespace mcap
511
512#ifdef MCAP_IMPLEMENTATION
513# include "writer.inl"
514#endif
An in-memory IChunkWriter implementation backed by a growable buffer.
Definition writer.hpp:253
std::vector< std::byte > buffer_
Definition writer.hpp:265
Implements the IWritable interface used by McapWriter by wrapping a FILE* pointer created by fopen().
Definition writer.hpp:166
An abstract interface for writing Chunk data. Chunk data is buffered in memory and written to disk as...
Definition writer.hpp:205
virtual void handleClear()=0
virtual bool empty() const =0
Returns true if write() has never been called since initialization or the last call to clear().
virtual ~IChunkWriter() override=default
virtual uint64_t size() const override=0
Returns the size in bytes of the uncompressed data.
virtual uint64_t compressedSize() const =0
Returns the size in bytes of the compressed data. This will only be called after end().
virtual void end() override=0
Called when the writer wants to close the current output Chunk. After this call, data() and size() sh...
virtual const std::byte * compressedData() const =0
Returns a pointer to the compressed data. This will only be called after end().
virtual const std::byte * data() const =0
Returns a pointer to the uncompressed data.
An abstract interface for writing MCAP data.
Definition writer.hpp:114
virtual void handleWrite(const std::byte *data, uint64_t size)=0
uint32_t crc_
Definition writer.hpp:159
An in-memory IChunkWriter implementation that holds data in a temporary buffer before flushing to an ...
Definition writer.hpp:273
CompressionLevel compressionLevel_
Definition writer.hpp:289
std::vector< std::byte > compressedBuffer_
Definition writer.hpp:288
std::vector< std::byte > uncompressedBuffer_
Definition writer.hpp:287
Provides a write interface to an MCAP file.
Definition writer.hpp:322
std::vector< MetadataIndex > metadataIndex_
Definition writer.hpp:494
std::unique_ptr< StreamWriter > streamOutput_
Definition writer.hpp:483
std::unique_ptr< ZStdWriter > zstdChunk_
Definition writer.hpp:489
std::vector< Schema > schemas_
Definition writer.hpp:491
std::vector< Channel > channels_
Definition writer.hpp:492
std::vector< AttachmentIndex > attachmentIndex_
Definition writer.hpp:493
std::unordered_map< ChannelId, MessageIndex > currentMessageIndex_
Definition writer.hpp:498
std::unique_ptr< FileWriter > fileOutput_
Definition writer.hpp:482
std::unordered_set< SchemaId > writtenSchemas_
Definition writer.hpp:497
std::unique_ptr< BufferWriter > uncompressedChunk_
Definition writer.hpp:484
std::unique_ptr< LZ4Writer > lz4Chunk_
Definition writer.hpp:486
std::vector< ChunkIndex > chunkIndex_
Definition writer.hpp:495
Implements the IWritable interface used by McapWriter by wrapping a std::ostream stream.
Definition writer.hpp:186
std::ostream & stream_
Definition writer.hpp:196
An in-memory IChunkWriter implementation that holds data in a temporary buffer before flushing to an ...
Definition writer.hpp:298
std::vector< std::byte > uncompressedBuffer_
Definition writer.hpp:313
std::vector< std::byte > compressedBuffer_
Definition writer.hpp:314
Definition crc32.hpp:5
constexpr Timestamp MaxTime
Definition types.hpp:32
constexpr uint64_t DefaultChunkSize
Definition types.hpp:30
Compression
Supported MCAP compression algorithms.
Definition types.hpp:37
uint64_t Timestamp
Definition types.hpp:21
OpCode
MCAP record types.
Definition types.hpp:59
CompressionLevel
Compression level to use when compression is enabled. Slower generally produces smaller files,...
Definition types.hpp:48
Attachment Index records are found in the Summary section, providing summary information for a single...
Definition types.hpp:270
An Attachment is an arbitrary file embedded in an MCAP file, including a name, media type,...
Definition types.hpp:256
Describes a Channel that messages are written to. A Channel represents a single connection from a pub...
Definition types.hpp:159
Chunk Index records are found in the Summary section, providing summary information for a single Chun...
Definition types.hpp:239
An collection of Schemas, Channels, and Messages that supports compression and indexing.
Definition types.hpp:215
The final record in the Data section, signaling the end of Data and beginning of Summary....
Definition types.hpp:350
The final record in an MCAP file (before the trailing magic byte sequence). Contains byte offsets fro...
Definition types.hpp:115
Appears at the beginning of every MCAP file (after the magic byte sequence) and contains the recordin...
Definition types.hpp:103
Configuration options for McapWriter.
Definition writer.hpp:21
McapWriterOptions(const std::string_view profile)
Definition writer.hpp:107
std::string profile
The recording profile. See https://mcap.dev/spec/registry#well-known-profiles for more information on...
Definition writer.hpp:89
A list of timestamps to byte offsets for a single Channel. This record appears after each Chunk,...
Definition types.hpp:229
A single Message published to a Channel.
Definition types.hpp:182
Metadata Index records are found in the Summary section, providing summary information for a single M...
Definition types.hpp:325
Holds a named map of key/value strings containing arbitrary user data. Metadata records are found in ...
Definition types.hpp:316
A generic Type-Length-Value record using a uint8 type and uint64 length. This is the generic form of ...
Definition types.hpp:87
Describes a schema used for message encoding and decoding and/or describing the shape of messages....
Definition types.hpp:132
The Statistics record is found in the Summary section, providing counts and timestamp ranges for the ...
Definition types.hpp:300
Wraps a status code and string message carrying additional context.
Definition errors.hpp:36
Summary Offset records are found in the Summary Offset section. Records in the Summary section are gr...
Definition types.hpp:340
#define MCAP_LIBRARY_VERSION
Definition types.hpp:17
#define MCAP_PUBLIC