return 0;
}
-static void mgmt_be_client_process_msg(void *user_ctx, uint8_t *data,
- size_t len)
+static void mgmt_be_client_process_msg(uint8_t version, void *user_ctx,
+ uint8_t *data, size_t len)
{
struct mgmt_be_client_ctx *client_ctx = user_ctx;
Mgmtd__BeMessage *be_msg;
}
int rv = mgmt_msg_send_msg(
- &client_ctx->mstate, be_msg,
+ &client_ctx->mstate, MGMT_MSG_VERSION_PROTOBUF, be_msg,
mgmtd__be_message__get_packed_size(be_msg),
(size_t(*)(void *, void *))mgmtd__be_message__pack,
MGMTD_DBG_BE_CLIENT_CHECK());
}
int rv = mgmt_msg_send_msg(
- &client_ctx->mstate, fe_msg,
+ &client_ctx->mstate, MGMT_MSG_VERSION_PROTOBUF, fe_msg,
mgmtd__fe_message__get_packed_size(fe_msg),
(size_t(*)(void *, void *))mgmtd__fe_message__pack,
MGMTD_DBG_FE_CLIENT_CHECK());
return 0;
}
-static void mgmt_fe_client_process_msg(void *user_ctx, uint8_t *data,
- size_t len)
+static void mgmt_fe_client_process_msg(uint8_t version, void *user_ctx,
+ uint8_t *data, size_t len)
{
struct mgmt_fe_client_ctx *client_ctx = user_ctx;
Mgmtd__FeMessage *fe_msg;
left = stream_get_endp(ms->ins);
while (left > (long)sizeof(struct mgmt_msg_hdr)) {
mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);
- if (mhdr->marker != MGMT_MSG_MARKER) {
+ if (!MGMT_MSG_IS_MARKER(mhdr->marker)) {
MGMT_MSG_DBG(dbgtag, "recv corrupt buffer, disconnect");
return MSR_DISCONNECT;
}
* true if more to process (so reschedule) else false
*/
bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
- void (*handle_msg)(void *user, uint8_t *msg,
- size_t msglen),
+ void (*handle_msg)(uint8_t version, void *user,
+ uint8_t *msg, size_t msglen),
void *user, bool debug)
{
const char *dbgtag = debug ? ms->idtag : NULL;
left -= mhdr->len, data += mhdr->len) {
mhdr = (struct mgmt_msg_hdr *)data;
- assert(mhdr->marker == MGMT_MSG_MARKER);
+ assert(MGMT_MSG_IS_MARKER(mhdr->marker));
assert(left >= mhdr->len);
- handle_msg(user, (uint8_t *)(mhdr + 1),
+ handle_msg(MGMT_MSG_MARKER_VERSION(mhdr->marker), user,
+ (uint8_t *)(mhdr + 1),
mhdr->len - sizeof(struct mgmt_msg_hdr));
ms->nrxm++;
nproc++;
*
* Args:
* ms: mgmt_msg_state for this process.
- * fd: socket/file to read data from.
+ * version: version of this message, will be given to receiving side.
+ * msg: the message to be sent.
+ * len: the length of the message.
+ * packf: a function to pack the message.
* debug: true to enable debug logging.
*
* Returns:
* 0 on success, otherwise -1 on failure. The only failure mode is if a
* the message exceeds the maximum message size configured on init.
*/
-int mgmt_msg_send_msg(struct mgmt_msg_state *ms, void *msg, size_t len,
- mgmt_msg_packf packf, bool debug)
+int mgmt_msg_send_msg(struct mgmt_msg_state *ms, uint8_t version, void *msg,
+ size_t len, size_t (*packf)(void *msg, void *buf),
+ bool debug)
{
const char *dbgtag = debug ? ms->idtag : NULL;
struct mgmt_msg_hdr *mhdr;
/* We have a stream with space, pack the message into it. */
mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(s) + s->endp);
- mhdr->marker = MGMT_MSG_MARKER;
+ mhdr->marker = MGMT_MSG_MARKER(version);
mhdr->len = mlen;
stream_forward_endp(s, sizeof(*mhdr));
endp = stream_get_endp(s);
dstbuf = STREAM_DATA(s) + endp;
- n = packf(msg, dstbuf);
+ if (packf)
+ n = packf(msg, dstbuf);
+ else {
+ memcpy(dstbuf, msg, len);
+ n = len;
+ }
stream_set_endp(s, endp + n);
ms->ntxm++;
#include "stream.h"
#include "frrevent.h"
-#define MGMT_MSG_MARKER (0x4D724B21u) /* ASCII - "MrK!"*/
+/*
+ * Messages on the stream start with a marker that encodes a version octet.
+ */
+#define MGMT_MSG_MARKER_PFX (0x23232300u) /* ASCII - "###\ooo"*/
+#define MGMT_MSG_IS_MARKER(x) (((x)&0xFFFFFF00u) == MGMT_MSG_MARKER_PFX)
+#define MGMT_MSG_MARKER(version) (MGMT_MSG_MARKER_PFX | (version))
+#define MGMT_MSG_MARKER_VERSION(x) (0xFF & (x))
+
+#define MGMT_MSG_VERSION_PROTOBUF 0
+#define MGMT_MSG_VERSION_NATIVE 1
struct mgmt_msg_state {
struct stream *ins;
MSW_DISCONNECT, /* disconnect and start reconnecting */
};
-static inline uint8_t *msg_payload(struct mgmt_msg_hdr *mhdr)
-{
- return (uint8_t *)(mhdr + 1);
-}
-
-typedef size_t (*mgmt_msg_packf)(void *msg, void *data);
-
extern int mgmt_msg_connect(const char *path, size_t sendbuf, size_t recvbuf,
const char *dbgtag);
extern void mgmt_msg_destroy(struct mgmt_msg_state *ms);
size_t max_write_buf, size_t max_msg_sz,
const char *idtag);
extern bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
- void (*handle_msg)(void *user, uint8_t *msg,
- size_t msglen),
+ void (*handle_msg)(uint8_t version, void *user,
+ uint8_t *msg, size_t msglen),
void *user, bool debug);
extern enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,
bool debug);
extern size_t mgmt_msg_reset_writes(struct mgmt_msg_state *ms);
-extern int mgmt_msg_send_msg(struct mgmt_msg_state *ms, void *msg, size_t len,
+extern int mgmt_msg_send_msg(struct mgmt_msg_state *ms, uint8_t version,
+ void *msg, size_t len,
size_t (*packf)(void *msg, void *buf), bool debug);
extern enum mgmt_msg_wsched mgmt_msg_write(struct mgmt_msg_state *ms, int fd,
bool debug);
}
int rv = mgmt_msg_send_msg(
- &adapter->mstate, be_msg,
+ &adapter->mstate, MGMT_MSG_VERSION_PROTOBUF, be_msg,
mgmtd__be_message__get_packed_size(be_msg),
(size_t(*)(void *, void *))mgmtd__be_message__pack,
MGMT_DEBUG_BE_CHECK());
return mgmt_be_adapter_send_msg(adapter, &be_msg);
}
-static void mgmt_be_adapter_process_msg(void *user_ctx, uint8_t *data,
- size_t len)
+static void mgmt_be_adapter_process_msg(uint8_t version, void *user_ctx,
+ uint8_t *data, size_t len)
{
struct mgmt_be_client_adapter *adapter = user_ctx;
Mgmtd__BeMessage *be_msg;
}
int rv = mgmt_msg_send_msg(
- &adapter->mstate, fe_msg,
+ &adapter->mstate, MGMT_MSG_VERSION_PROTOBUF, fe_msg,
mgmtd__fe_message__get_packed_size(fe_msg),
(size_t(*)(void *, void *))mgmtd__fe_message__pack,
MGMT_DEBUG_FE_CHECK());
return 0;
}
-static void mgmt_fe_adapter_process_msg(void *user_ctx, uint8_t *data,
- size_t len)
+static void mgmt_fe_adapter_process_msg(uint8_t version, void *user_ctx,
+ uint8_t *data, size_t len)
{
struct mgmt_fe_client_adapter *adapter = user_ctx;
Mgmtd__FeMessage *fe_msg;