pjmsg_mcap_wrapper
Loading...
Searching...
No Matches
writer.cpp
Go to the documentation of this file.
1/**
2 @file
3 @author Alexander Sherikov
4 @copyright 2024 Alexander Sherikov. Licensed under the Apache License,
5 Version 2.0. (see LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
6 @brief
7*/
8
10
11#include "HeaderCdrAux.ipp"
14#include "TimeCdrAux.ipp"
15
16#define MCAP_IMPLEMENTATION
17#define MCAP_COMPRESSION_NO_LZ4
18#define MCAP_COMPRESSION_NO_ZSTD
19#define MCAP_PUBLIC __attribute__((visibility("hidden")))
20
21#pragma GCC diagnostic push
22/// @todo presumably GCC bug
23#pragma GCC diagnostic ignored "-Warray-bounds"
24#include <mcap/writer.hpp>
25#pragma GCC diagnostic pop
26
27#include "messages.h"
28
29#include <random>
30
31
32namespace
33{
34 template <typename... t_String>
35 std::string str_concat(t_String &&...strings)
36 {
37 std::string result;
38 (result += ... += std::forward<t_String>(strings));
39 return result;
40 }
41
42 uint64_t now()
43 {
44 return (std::chrono::duration_cast<std::chrono::nanoseconds>(
45 std::chrono::system_clock::now().time_since_epoch())
46 .count());
47 }
48
49 uint32_t getRandomUInt32()
50 {
52
55
56 return (distrib(gen));
57 }
58} // namespace
59
60
61namespace pjmsg_mcap_wrapper
62{
64 {
65 public:
68
70
71 public:
73 {
74 setVersion(getRandomUInt32());
75 }
76
77 void setVersion(const uint32_t version)
78 {
79 if (names_.names_version() != version)
80 {
81 names_.names_version() = version;
83 version_updated_ = true;
84 }
85 }
86 };
87} // namespace pjmsg_mcap_wrapper
88
89
90namespace pjmsg_mcap_wrapper
91{
92 Message::Message() : pimpl_(std::make_unique<Message::Implementation>())
93 {
94 }
95
96 Message::~Message() = default;
97
99 {
100 return (pimpl_->names_.names());
101 }
102
104 {
105 return (pimpl_->values_.values());
106 }
107
109 {
110 if (not pimpl_->version_updated_)
111 {
112 pimpl_->setVersion(pimpl_->names_.names_version() + 1);
113 }
114 }
115
116 void Message::setStamp(const uint64_t timestamp)
117 {
118 const int32_t sec = static_cast<int32_t>(timestamp / std::nano::den);
119 const uint32_t nanosec = timestamp % std::nano::den;
120
121 pimpl_->names_.header().stamp().sec(sec);
122 pimpl_->values_.header().stamp().sec(sec);
123 pimpl_->names_.header().stamp().nanosec(nanosec);
124 pimpl_->values_.header().stamp().nanosec(nanosec);
125 }
126
128 {
129 pimpl_->names_.names().reserve(size);
130 pimpl_->values_.values().reserve(size);
131 }
132
134 {
135 pimpl_->names_.names().resize(size);
136 pimpl_->values_.values().resize(size);
137 }
138
140 {
141 return (names()[index]);
142 }
143
144 double &Message::value(const std::size_t index)
145 {
146 return (values()[index]);
147 }
148
150 {
151 return (pimpl_->names_.names().size());
152 }
153
154 void Message::setVersion(const uint32_t version)
155 {
156 pimpl_->setVersion(version);
157 }
158} // namespace pjmsg_mcap_wrapper
159
160
161namespace pjmsg_mcap_wrapper
162{
164 {
165 protected:
166 template <class t_Message>
168 {
169 protected:
172
173 protected:
174 uint32_t getSize(const t_Message &message)
175 {
176 size_t current_alignment{ 0 };
177 return (cdr_size_calculator_.calculate_serialized_size(message, current_alignment)
178 + 4u /*encapsulation*/);
179 }
180
181 public:
183 {
184 }
185
186 void initialize(mcap::McapWriter &writer, const std::string_view &msg_topic)
187 {
188 mcap::Schema schema(
189 pjmsg_mcap_wrapper_private::pjmsg::Message<t_Message>::type,
190 "ros2msg",
191 pjmsg_mcap_wrapper_private::pjmsg::Message<t_Message>::schema);
192 writer.addSchema(schema);
193
194 mcap::Channel channel(msg_topic, "ros2msg", schema.id);
195 writer.addChannel(channel);
196
197 message_.channelId = channel.id;
198 }
199
200 void write(mcap::McapWriter &writer, std::vector<std::byte> &buffer, const t_Message &message)
201 {
202 buffer.resize(getSize(message));
203 message_.data = buffer.data();
204
205 {
207 reinterpret_cast<char *>(buffer.data()), buffer.size()); // NOLINT
211
213 ser << message;
214 ser.set_dds_cdr_options({ 0, 0 });
215
217 }
218
219 message_.logTime = now();
221
222
223 const mcap::Status res = writer.write(message_);
224 if (not res.ok())
225 {
226 throw std::runtime_error(str_concat("Failed to write a message: ", res.message));
227 }
228 }
229 };
230
231 public:
232 std::tuple<Channel<plotjuggler_msgs::msg::StatisticsNames>, Channel<plotjuggler_msgs::msg::StatisticsValues>>
234
237
238 public:
240 {
241 writer_.close();
242 }
243
244 void initialize(const std::filesystem::path &filename, const std::string &topic_prefix)
245 {
246 {
248 /// @todo needed if compression is used, delays writing
249 options.noChunking = true;
250 const mcap::Status res = writer_.open(filename.native(), options);
251 if (not res.ok())
252 {
253 throw std::runtime_error(
254 str_concat("Failed to open ", filename.native(), " for writing: ", res.message));
255 }
256 }
257
258 std::get<Channel<plotjuggler_msgs::msg::StatisticsNames>>(channels_).initialize(
259 writer_, str_concat(topic_prefix, "/names"));
260
261 std::get<Channel<plotjuggler_msgs::msg::StatisticsValues>>(channels_).initialize(
262 writer_, str_concat(topic_prefix, "/values"));
263 }
264
265 template <class t_Message>
266 void write(const t_Message &message)
267 {
268 std::get<Channel<t_Message>>(channels_).write(writer_, buffer_, message);
269 }
270 };
271} // namespace pjmsg_mcap_wrapper
272
273
274namespace pjmsg_mcap_wrapper
275{
276 Writer::Writer() : pimpl_(std::make_unique<Writer::Implementation>())
277 {
278 }
279
280 Writer::~Writer() = default;
281
282 void Writer::initialize(const std::filesystem::path &filename, const std::string &topic_prefix)
283 {
284 pimpl_->initialize(filename, topic_prefix);
285 }
286
288 {
289 // pimpl_->writer_.closeLastChunk();
290 pimpl_->writer_.dataSink()->flush();
291 }
292
293 void Writer::write(const Message &message)
294 {
295 if (message.pimpl_->version_updated_)
296 {
297 pimpl_->write(message.pimpl_->names_);
298 message.pimpl_->version_updated_ = false;
299 }
300 pimpl_->write(message.pimpl_->values_);
301 }
302} // namespace pjmsg_mcap_wrapper
Sink class.
This class offers an interface to calculate the encoded size of a type serialized using a support enc...
size_t calculate_serialized_size(const _T &data, size_t &current_alignment)
Generic template which calculates the encoded size of an instance of an unknown type.
This class offers an interface to serialize/deserialize some basic types using CDR protocol inside an...
Definition Cdr.h:69
Cdr_DllAPI bool set_encoding_flag(EncodingAlgorithmFlag encoding_flag)
Sets the EncodingAlgorithmFlag for the encapsulation when the CDR type is CdrVersion::DDS_CDR,...
Definition Cdr.cpp:358
Cdr_DllAPI Cdr & serialize_encapsulation()
This function writes the encapsulation of the CDR stream. If the CDR stream should contain an encapsu...
Definition Cdr.cpp:302
static Cdr_DllAPI const Endianness DEFAULT_ENDIAN
Default endianess in the system.
Definition Cdr.h:84
Cdr_DllAPI void set_dds_cdr_options(const std::array< uint8_t, 2 > &options)
This function sets the option flags when the CDR type is eprosima::fastcdr::DDS_CDR.
Definition Cdr.cpp:380
Cdr_DllAPI size_t get_serialized_data_length() const
This function returns the length of the serialized data inside the stream.
Definition Cdr.cpp:445
This class represents a stream of bytes that contains (or will contain) serialized data....
Definition FastBuffer.h:244
Provides a write interface to an MCAP file.
Definition writer.hpp:322
void addSchema(Schema &schema)
Add a new schema to the MCAP file and set schema.id to a generated schema id. The schema id is used w...
Definition writer.inl:515
Status open(std::string_view filename, const McapWriterOptions &options)
Open a new MCAP file for writing and write the header.
Definition writer.inl:342
void addChannel(Channel &channel)
Add a new channel to the MCAP file and set channel.id to a generated channel id. The channel id is us...
Definition writer.inl:520
Status write(const Message &message)
Write a message to the output stream.
Definition writer.inl:525
void close()
Write the MCAP footer, flush pending writes to the output stream, and reset internal state....
Definition writer.inl:373
void setVersion(const uint32_t version)
Definition writer.cpp:77
plotjuggler_msgs::msg::StatisticsNames names_
Definition writer.cpp:66
plotjuggler_msgs::msg::StatisticsValues values_
Definition writer.cpp:67
std::size_t size() const
Definition writer.cpp:149
void resize(const std::size_t size)
Definition writer.cpp:133
std::vector< std::string > & names()
Definition writer.cpp:98
void setVersion(const uint32_t version)
Definition writer.cpp:154
std::vector< double > & values()
Definition writer.cpp:103
std::string & name(const std::size_t index)
Definition writer.cpp:139
double & value(const std::size_t index)
Definition writer.cpp:144
const std::unique_ptr< Implementation > pimpl_
Definition all.h:26
void setStamp(const uint64_t timestamp)
Definition writer.cpp:116
void reserve(const std::size_t size)
Definition writer.cpp:127
uint32_t getSize(const t_Message &message)
Definition writer.cpp:174
void initialize(mcap::McapWriter &writer, const std::string_view &msg_topic)
Definition writer.cpp:186
eprosima::fastcdr::CdrSizeCalculator cdr_size_calculator_
Definition writer.cpp:171
void write(mcap::McapWriter &writer, std::vector< std::byte > &buffer, const t_Message &message)
Definition writer.cpp:200
void write(const t_Message &message)
Definition writer.cpp:266
std::tuple< Channel< plotjuggler_msgs::msg::StatisticsNames >, Channel< plotjuggler_msgs::msg::StatisticsValues > > channels_
Definition writer.cpp:233
void initialize(const std::filesystem::path &filename, const std::string &topic_prefix)
Definition writer.cpp:244
void write(const Message &message)
Definition writer.cpp:293
void initialize(const std::filesystem::path &filename, const std::string &topic_prefix)
Definition writer.cpp:282
const std::unique_ptr< Implementation > pimpl_
Definition all.h:54
T data(T... args)
@ PLAIN_CDR
Specifies that the content is PLAIN_CDR.
CdrVersion
This enumeration represents the kinds of CDR serialization supported by eprosima::fastcdr::CDR.
@ XCDRv1
XCDRv1 encoding defined by standard DDS X-Types 1.3.
Definition Cdr.h:49
T reserve(T... args)
T resize(T... args)
T size(T... args)
Describes a Channel that messages are written to. A Channel represents a single connection from a pub...
Definition types.hpp:159
ChannelId id
Definition types.hpp:160
Configuration options for McapWriter.
Definition writer.hpp:21
bool noChunking
Do not write Chunks to the file, instead writing Schema, Channel, and Message records directly into t...
Definition writer.hpp:42
A single Message published to a Channel.
Definition types.hpp:182
Timestamp logTime
Nanosecond timestamp when this message was recorded or received for recording.
Definition types.hpp:193
const std::byte * data
A pointer to the message payload. For readers, this pointer is only valid for the lifetime of an onMe...
Definition types.hpp:208
Timestamp publishTime
Nanosecond timestamp when this message was initially published. If not available, this should be set ...
Definition types.hpp:198
ChannelId channelId
Definition types.hpp:183
uint64_t dataSize
Size of the message payload in bytes, pointed to via data.
Definition types.hpp:202
Describes a schema used for message encoding and decoding and/or describing the shape of messages....
Definition types.hpp:132
SchemaId id
Definition types.hpp:133
Wraps a status code and string message carrying additional context.
Definition errors.hpp:36
bool ok() const
Definition errors.hpp:115
std::string message
Definition errors.hpp:38
This class represents the structure StatisticsNames defined by the user in the IDL file.
This class represents the structure StatisticsValues defined by the user in the IDL file.