2 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
3 # SPDX-License-Identifier: GPL-2.0-or-later
5 # December 22 2021, Christian Hopps <chopps@labn.net>
7 # Copyright 2021-2022, LabN Consulting, L.L.C.
17 from asyncio
import Event
, Lock
18 from ipaddress
import ip_address
as ip
20 FMT_APIMSGHDR
= ">BBHL"
21 FMT_APIMSGHDR_SIZE
= struct
.calcsize(FMT_APIMSGHDR
)
23 FMT_LSA_FILTER
= ">HBB" # + plus x"I" areas
24 LSAF_ORIGIN_NON_SELF
= 0
28 FMT_LSA_HEADER
= ">HBBIILHH"
29 FMT_LSA_HEADER_SIZE
= struct
.calcsize(FMT_LSA_HEADER
)
31 # ------------------------
32 # Messages to OSPF daemon.
33 # ------------------------
35 MSG_REGISTER_OPAQUETYPE
= 1
36 MSG_UNREGISTER_OPAQUETYPE
= 2
37 MSG_REGISTER_EVENT
= 3
39 MSG_ORIGINATE_REQUEST
= 5
40 MSG_DELETE_REQUEST
= 6
41 MSG_SYNC_REACHABLE
= 7
44 MSG_SYNC_ROUTER_ID
= 19
47 MSG_REGISTER_OPAQUETYPE
: ("REGISTER_OPAQUETYPE", "BBxx"),
48 MSG_UNREGISTER_OPAQUETYPE
: ("UNREGISTER_OPAQUETYPE", "BBxx"),
49 MSG_REGISTER_EVENT
: ("REGISTER_EVENT", FMT_LSA_FILTER
),
50 MSG_SYNC_LSDB
: ("SYNC_LSDB", FMT_LSA_FILTER
),
51 MSG_ORIGINATE_REQUEST
: ("ORIGINATE_REQUEST", ">II" + FMT_LSA_HEADER
[1:]),
52 MSG_DELETE_REQUEST
: ("DELETE_REQUEST", ">IBBxBL"),
53 MSG_SYNC_REACHABLE
: ("MSG_SYNC_REACHABLE", ""),
54 MSG_SYNC_ISM
: ("MSG_SYNC_ISM", ""),
55 MSG_SYNC_NSM
: ("MSG_SYNC_NSM", ""),
56 MSG_SYNC_ROUTER_ID
: ("MSG_SYNC_ROUTER_ID", ""),
59 # OSPF API MSG Delete Flag.
60 OSPF_API_DEL_ZERO_LEN_LSA
= 0x01 # send withdrawal with no LSA data
62 # --------------------------
63 # Messages from OSPF daemon.
64 # --------------------------
68 MSG_LSA_UPDATE_NOTIFY
= 12
69 MSG_LSA_DELETE_NOTIFY
= 13
74 MSG_REACHABLE_CHANGE
= 18
75 MSG_ROUTER_ID_CHANGE
= 20
78 MSG_REPLY
: ("REPLY", "bxxx"),
79 MSG_READY_NOTIFY
: ("READY_NOTIFY", ">BBxxI"),
80 MSG_LSA_UPDATE_NOTIFY
: ("LSA_UPDATE_NOTIFY", ">IIBxxx" + FMT_LSA_HEADER
[1:]),
81 MSG_LSA_DELETE_NOTIFY
: ("LSA_DELETE_NOTIFY", ">IIBxxx" + FMT_LSA_HEADER
[1:]),
82 MSG_NEW_IF
: ("NEW_IF", ">II"),
83 MSG_DEL_IF
: ("DEL_IF", ">I"),
84 MSG_ISM_CHANGE
: ("ISM_CHANGE", ">IIBxxx"),
85 MSG_NSM_CHANGE
: ("NSM_CHANGE", ">IIIBxxx"),
86 MSG_REACHABLE_CHANGE
: ("REACHABLE_CHANGE", ">HH"),
87 MSG_ROUTER_ID_CHANGE
: ("ROUTER_ID_CHANGE", ">I"),
91 OSPF_API_NOSUCHINTERFACE
= -1
92 OSPF_API_NOSUCHAREA
= -2
93 OSPF_API_NOSUCHLSA
= -3
94 OSPF_API_ILLEGALLSATYPE
= -4
95 OSPF_API_OPAQUETYPEINUSE
= -5
96 OSPF_API_OPAQUETYPENOTREGISTERED
= -6
97 OSPF_API_NOTREADY
= -7
98 OSPF_API_NOMEMORY
= -8
103 OSPF_API_OK
: "OSPF_API_OK",
104 OSPF_API_NOSUCHINTERFACE
: "OSPF_API_NOSUCHINTERFACE",
105 OSPF_API_NOSUCHAREA
: "OSPF_API_NOSUCHAREA",
106 OSPF_API_NOSUCHLSA
: "OSPF_API_NOSUCHLSA",
107 OSPF_API_ILLEGALLSATYPE
: "OSPF_API_ILLEGALLSATYPE",
108 OSPF_API_OPAQUETYPEINUSE
: "OSPF_API_OPAQUETYPEINUSE",
109 OSPF_API_OPAQUETYPENOTREGISTERED
: "OSPF_API_OPAQUETYPENOTREGISTERED",
110 OSPF_API_NOTREADY
: "OSPF_API_NOTREADY",
111 OSPF_API_NOMEMORY
: "OSPF_API_NOMEMORY",
112 OSPF_API_ERROR
: "OSPF_API_ERROR",
113 OSPF_API_UNDEF
: "OSPF_API_UNDEF",
116 # msg_info = {**smsg_info, **amsg_info}
118 msg_info
.update(smsg_info
)
119 msg_info
.update(amsg_info
)
120 msg_name
= {k
: v
[0] for k
, v
in msg_info
.items()}
121 msg_fmt
= {k
: v
[1] for k
, v
in msg_info
.items()}
122 msg_size
= {k
: struct
.calcsize(v
) for k
, v
in msg_fmt
.items()}
126 return msg_name
.get(mt
, str(mt
))
129 def api_errname(ecode
):
130 return msg_errname
.get(ecode
, str(ecode
))
133 # -------------------
134 # API Semantic Errors
135 # -------------------
138 class APIError(Exception):
142 class MsgTypeError(Exception):
146 class SeqNumError(Exception):
158 LSA_TYPE_ASBR_SUMMARY
= 4
159 LSA_TYPE_AS_EXTERNAL
= 5
160 LSA_TYPE_GROUP_MEMBER
= 6
162 LSA_TYPE_EXTERNAL_ATTRIBUTES
= 8
163 LSA_TYPE_OPAQUE_LINK
= 9
164 LSA_TYPE_OPAQUE_AREA
= 10
165 LSA_TYPE_OPAQUE_AS
= 11
168 def lsa_typename(lsa_type
):
170 LSA_TYPE_ROUTER
: "LSA:ROUTER",
171 LSA_TYPE_NETWORK
: "LSA:NETWORK",
172 LSA_TYPE_SUMMARY
: "LSA:SUMMARY",
173 LSA_TYPE_ASBR_SUMMARY
: "LSA:ASBR_SUMMARY",
174 LSA_TYPE_AS_EXTERNAL
: "LSA:AS_EXTERNAL",
175 LSA_TYPE_GROUP_MEMBER
: "LSA:GROUP_MEMBER",
176 LSA_TYPE_AS_NSSA
: "LSA:AS_NSSA",
177 LSA_TYPE_EXTERNAL_ATTRIBUTES
: "LSA:EXTERNAL_ATTRIBUTES",
178 LSA_TYPE_OPAQUE_LINK
: "LSA:OPAQUE_LINK",
179 LSA_TYPE_OPAQUE_AREA
: "LSA:OPAQUE_AREA",
180 LSA_TYPE_OPAQUE_AS
: "LSA:OPAQUE_AS",
182 return names
.get(lsa_type
, str(lsa_type
))
185 # ------------------------------
186 # Interface State Machine States
187 # ------------------------------
201 ISM_DEPENDUPON
: "ISM_DEPENDUPON",
202 ISM_DOWN
: "ISM_DOWN",
203 ISM_LOOPBACK
: "ISM_LOOPBACK",
204 ISM_WAITING
: "ISM_WAITING",
205 ISM_POINTTOPOINT
: "ISM_POINTTOPOINT",
206 ISM_DROTHER
: "ISM_DROTHER",
207 ISM_BACKUP
: "ISM_BACKUP",
210 return names
.get(state
, str(state
))
213 # -----------------------------
214 # Neighbor State Machine States
215 # -----------------------------
231 NSM_DEPENDUPON
: "NSM_DEPENDUPON",
232 NSM_DELETED
: "NSM_DELETED",
233 NSM_DOWN
: "NSM_DOWN",
234 NSM_ATTEMPT
: "NSM_ATTEMPT",
235 NSM_INIT
: "NSM_INIT",
236 NSM_TWOWAY
: "NSM_TWOWAY",
237 NSM_EXSTART
: "NSM_EXSTART",
238 NSM_EXCHANGE
: "NSM_EXCHANGE",
239 NSM_LOADING
: "NSM_LOADING",
240 NSM_FULL
: "NSM_FULL",
242 return names
.get(state
, str(state
))
246 "An object that does nothing when used with `with` statement."
248 async def __aenter__(self
):
251 async def __aexit__(self
, *args
, **kwargs
):
262 return "OspfApiClient({})".format(self
.server
)
265 def _get_bound_sockets(port
):
266 s1
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
268 s1
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
269 # s1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
271 s2
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
273 s2
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
274 # s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
275 s2
.bind(("", port
+ 1))
284 def __init__(self
, server
="localhost", handlers
=None):
285 """A client connection to OSPF Daemon using the OSPF API
287 The client object is not created in a connected state. To connect to the server
288 the `connect` method should be called. If an error is encountered when sending
289 messages to the server an exception will be raised and the connection will be
290 closed. When this happens `connect` may be called again to restore the
294 server: hostname or IP address of server default is "localhost"
295 handlers: dict of message handlers, the key is the API message
296 type, the value is a function. The functions signature is:
297 `handler(msg_type, msg, msg_extra, *params)`, where `msg` is the
298 message data after the API header, `*params` will be the
299 unpacked message values, and msg_extra are any bytes beyond the
300 fixed parameters of the message.
302 Will raise exceptions for failures with various `socket` modules
303 functions such as `socket.socket`, `socket.setsockopt`, `socket.bind`.
309 self
._ar
= self
._r
= self
._w
= None
311 self
.handlers
= handlers
if handlers
is not None else dict()
312 self
.write_lock
= Lock()
314 # try and get consecutive 2 ports
317 for port
in range(PORTSTART
, PORTEND
+ 2, 2):
319 logging
.debug("%s: binding to ports %s, %s", self
, port
, port
+ 1)
320 self
._s
, self
._ls
= self
._get
_bound
_sockets
(port
)
322 except OSError as error
:
323 if error
.errno
!= errno
.EADDRINUSE
or port
== PORTEND
:
324 logging
.warning("%s: binding port %s error %s", self
, port
, error
)
326 logging
.debug("%s: ports %s, %s in use.", self
, port
, port
+ 1)
328 assert False, "Should not reach this code execution point"
330 async def _connect_locked(self
):
331 logging
.debug("%s: connect to OSPF API", self
)
333 loop
= asyncio
.get_event_loop()
337 logging
.debug("%s: connecting sync socket to server", self
)
338 await loop
.sock_connect(self
._s
, (self
.server
, 2607))
340 logging
.debug("%s: accepting connect from server", self
)
341 self
._as
, _
= await loop
.sock_accept(self
._ls
)
343 await self
._close
_locked
()
346 logging
.debug("%s: success", self
)
347 self
._r
, self
._w
= await asyncio
.open_connection(sock
=self
._s
)
348 self
._ar
, _
= await asyncio
.open_connection(sock
=self
._as
)
351 async def connect(self
):
352 async with self
.write_lock
:
353 await self
._connect
_locked
()
357 "True if the connection is closed."
358 return self
._seq
== 0
360 async def _close_locked(self
):
361 logging
.debug("%s: closing", self
)
365 await self
._w
.wait_closed()
371 assert self
._w
is None
381 async def close(self
):
382 async with self
.write_lock
:
383 await self
._close
_locked
()
386 async def _msg_read(r
, expseq
=-1):
387 """Read an OSPF API message from the socket `r`
390 r: socket to read msg from
391 expseq: sequence number to expect or -1 for any.
393 Will raise exceptions for failures with various `socket` modules,
394 Additionally may raise SeqNumError if unexpected seqnum is received.
397 mh
= await r
.readexactly(FMT_APIMSGHDR_SIZE
)
398 v
, mt
, l
, seq
= struct
.unpack(FMT_APIMSGHDR
, mh
)
400 raise Exception("received unexpected OSPF API version {}".format(v
))
402 logging
.debug("_msg_read: got seq: 0x%x on async read", seq
)
404 raise SeqNumError("rx {} != {}".format(seq
, expseq
))
405 msg
= await r
.readexactly(l
) if l
else b
""
407 except asyncio
.IncompleteReadError
:
410 async def msg_read(self
):
411 """Read a message from the async notify channel.
414 May raise exceptions for failures with various `socket` modules.
416 return await OspfApiClient
._msg
_read
(self
._ar
, -1)
418 async def msg_send(self
, mt
, mp
):
419 """Send a message to OSPF API and wait for error code reply.
422 mt: the messaage type
423 mp: the message payload
425 error: an OSPF_API_XXX error code, 0 for OK.
427 Raises SeqNumError if the synchronous reply is the wrong sequence number;
428 MsgTypeError if the synchronous reply is not MSG_REPLY. Also,
429 may raise exceptions for failures with various `socket` modules,
431 The connection will be closed.
433 logging
.debug("SEND: %s: sending %s seq 0x%x", self
, api_msgname(mt
), self
._seq
)
434 mh
= struct
.pack(FMT_APIMSGHDR
, 1, mt
, len(mp
), self
._seq
)
440 async with self
.write_lock
:
441 self
._w
.write(mh
+ mp
)
442 await self
._w
.drain()
443 mt
, mp
= await OspfApiClient
._msg
_read
(self
._r
, seq
)
447 "rx {} != {}".format(api_msgname(mt
), api_msgname(MSG_REPLY
))
450 return struct
.unpack(msg_fmt
[MSG_REPLY
], mp
)[0]
452 # We've written data with a sequence number
456 async def msg_send_raises(self
, mt
, mp
=b
"\x00" * 4):
457 """Send a message to OSPF API and wait for error code reply.
460 mt: the messaage type
461 mp: the message payload
463 APIError if the server replies with an error.
465 Also may raise exceptions for failures with various `socket` modules,
466 as well as MsgTypeError if the synchronous reply is incorrect.
467 The connection will be closed for these non-API error exceptions.
469 ecode
= await self
.msg_send(mt
, mp
)
471 raise APIError("{} error {}".format(api_msgname(mt
), api_errname(ecode
)))
473 async def handle_async_msg(self
, mt
, msg
):
474 if mt
not in msg_fmt
:
475 logging
.debug("RECV: %s: unknown async msg type %s", self
, mt
)
480 tup
= struct
.unpack(fmt
, msg
[:sz
])
483 if mt
not in self
.handlers
:
485 "RECV: %s: no handlers for msg type %s", self
, api_msgname(mt
)
489 logging
.debug("RECV: %s: calling handler for %s", self
, api_msgname(mt
))
490 await self
.handlers
[mt
](mt
, msg
, extra
, *tup
)
493 # Client to Server Messaging
496 def lsa_type_mask(*lsa_types
):
497 "Return a 16 bit mask for each LSA type passed."
502 assert 0 < t
< 16, "LSA type {} out of range [1, 15]".format(t
)
507 def lsa_filter(origin
, areas
, lsa_types
):
508 """Return an LSA filter.
510 Return the filter message bytes based on `origin` the `areas` list and the LSAs
511 types in the `lsa_types` list.
513 mask
= OspfApiClient
.lsa_type_mask(*lsa_types
)
515 fmt
= FMT_LSA_FILTER
+ ("{}I".format(narea
) if narea
else "")
516 # lsa type mask, origin, number of areas, each area
517 return struct
.pack(fmt
, mask
, origin
, narea
, *areas
)
519 async def req_lsdb_sync(self
):
520 "Register for all LSA notifications and request an LSDB synchronoization."
521 logging
.debug("SEND: %s: request LSDB events", self
)
522 mp
= OspfApiClient
.lsa_filter(LSAF_ORIGIN_ANY
, [], [])
523 await self
.msg_send_raises(MSG_REGISTER_EVENT
, mp
)
525 logging
.debug("SEND: %s: request LSDB sync", self
)
526 await self
.msg_send_raises(MSG_SYNC_LSDB
, mp
)
528 async def req_reachable_routers(self
):
529 "Request a dump of all reachable routers."
530 logging
.debug("SEND: %s: request reachable changes", self
)
531 await self
.msg_send_raises(MSG_SYNC_REACHABLE
)
533 async def req_ism_states(self
):
534 "Request a dump of the current ISM states of all interfaces."
535 logging
.debug("SEND: %s: request ISM changes", self
)
536 await self
.msg_send_raises(MSG_SYNC_ISM
)
538 async def req_nsm_states(self
):
539 "Request a dump of the current NSM states of all neighbors."
540 logging
.debug("SEND: %s: request NSM changes", self
)
541 await self
.msg_send_raises(MSG_SYNC_NSM
)
543 async def req_router_id_sync(self
):
544 "Request a dump of the current NSM states of all neighbors."
545 logging
.debug("SEND: %s: request router ID sync", self
)
546 await self
.msg_send_raises(MSG_SYNC_ROUTER_ID
)
549 class OspfOpaqueClient(OspfApiClient
):
550 """A client connection to OSPF Daemon for manipulating Opaque LSA data.
552 The client object is not created in a connected state. To connect to the server
553 the `connect` method should be called. If an error is encountered when sending
554 messages to the server an exception will be raised and the connection will be
555 closed. When this happens `connect` may be called again to restore the
559 server: hostname or IP address of server default is "localhost"
560 wait_ready: if True then wait for OSPF to signal ready, in newer versions
561 FRR ospfd is always ready so this overhead can be skipped.
565 Will raise exceptions for failures with various `socket` modules
566 functions such as `socket.socket`, `socket.setsockopt`, `socket.bind`.
569 def __init__(self
, server
="localhost", wait_ready
=False):
571 MSG_LSA_UPDATE_NOTIFY
: self
._lsa
_change
_msg
,
572 MSG_LSA_DELETE_NOTIFY
: self
._lsa
_change
_msg
,
573 MSG_NEW_IF
: self
._if
_msg
,
574 MSG_DEL_IF
: self
._if
_msg
,
575 MSG_ISM_CHANGE
: self
._if
_change
_msg
,
576 MSG_NSM_CHANGE
: self
._nbr
_change
_msg
,
577 MSG_REACHABLE_CHANGE
: self
._reachable
_msg
,
578 MSG_ROUTER_ID_CHANGE
: self
._router
_id
_msg
,
581 handlers
[MSG_READY_NOTIFY
] = self
._ready
_msg
583 super().__init
__(server
, handlers
)
585 self
.wait_ready
= wait_ready
586 self
.ready_lock
= Lock() if wait_ready
else WithNothing()
588 LSA_TYPE_OPAQUE_LINK
: {},
589 LSA_TYPE_OPAQUE_AREA
: {},
590 LSA_TYPE_OPAQUE_AS
: {},
592 self
.router_id
= ip(0)
593 self
.router_id_change_cb
= None
595 self
.lsid_seq_num
= {}
597 self
.lsa_change_cb
= None
598 self
.opaque_change_cb
= {}
600 self
.reachable_routers
= set()
601 self
.reachable_change_cb
= None
605 self
.ism_change_cb
= None
608 self
.nsm_change_cb
= None
610 async def _register_opaque_data(self
, lsa_type
, otype
):
611 async with self
.ready_lock
:
612 cond
= self
.ready_cond
[lsa_type
].get(otype
)
613 assert cond
is None, "multiple registers for {} opaque-type {}".format(
614 lsa_typename(lsa_type
), otype
617 logging
.debug("register %s opaque-type %s", lsa_typename(lsa_type
), otype
)
619 mt
= MSG_REGISTER_OPAQUETYPE
620 mp
= struct
.pack(msg_fmt
[mt
], lsa_type
, otype
)
621 await self
.msg_send_raises(mt
, mp
)
623 # If we are not waiting, mark ready for register check
624 if not self
.wait_ready
:
625 self
.ready_cond
[lsa_type
][otype
] = True
627 async def _handle_msg_loop(self
):
629 logging
.debug("entering async msg handling loop")
631 mt
, msg
= await self
.msg_read()
633 await self
.handle_async_msg(mt
, msg
)
635 mts
= api_msgname(mt
)
637 "ignoring unexpected msg: %s len: %s", mts
, len(msg
)
640 logging
.info("Got EOF from OSPF API server on async notify socket")
644 def _opaque_args(lsa_type
, otype
, oid
, mp
):
645 lsid
= (otype
<< 24) | oid
646 return 0, 0, lsa_type
, lsid
, 0, 0, 0, FMT_LSA_HEADER_SIZE
+ len(mp
)
649 def _make_opaque_lsa(lsa_type
, otype
, oid
, mp
):
650 # /* Make a new LSA from parameters */
652 FMT_LSA_HEADER
, *OspfOpaqueClient
._opaque
_args
(lsa_type
, otype
, oid
, mp
)
657 async def _ready_msg(self
, mt
, msg
, extra
, lsa_type
, otype
, addr
):
658 assert self
.wait_ready
660 if lsa_type
== LSA_TYPE_OPAQUE_LINK
:
661 e
= "ifaddr {}".format(ip(addr
))
662 elif lsa_type
== LSA_TYPE_OPAQUE_AREA
:
663 e
= "area {}".format(ip(addr
))
667 "RECV: %s ready notify for %s opaque-type %s%s",
669 lsa_typename(lsa_type
),
674 # Signal all waiting senders they can send now.
675 async with self
.ready_lock
:
676 cond
= self
.ready_cond
[lsa_type
].get(otype
)
677 self
.ready_cond
[lsa_type
][otype
] = True
681 "RECV: dup ready received for %s opaque-type %s",
682 lsa_typename(lsa_type
),
689 async def _if_msg(self
, mt
, msg
, extra
, *args
):
693 assert mt
== MSG_DEL_IF
694 ifaddr
, aid
= args
[0], 0
696 "RECV: %s ifaddr %s areaid %s", api_msgname(mt
), ip(ifaddr
), ip(aid
)
699 async def _if_change_msg(self
, mt
, msg
, extra
, ifaddr
, aid
, state
):
704 "RECV: %s ifaddr %s areaid %s state %s",
711 self
.if_area
[ifaddr
] = aid
712 self
.ism_states
[ifaddr
] = state
714 if self
.ism_change_cb
:
715 self
.ism_change_cb(ifaddr
, aid
, state
)
717 async def _nbr_change_msg(self
, mt
, msg
, extra
, ifaddr
, nbraddr
, router_id
, state
):
719 nbraddr
= ip(nbraddr
)
720 router_id
= ip(router_id
)
723 "RECV: %s ifaddr %s nbraddr %s router_id %s state %s",
731 if ifaddr
not in self
.nsm_states
:
732 self
.nsm_states
[ifaddr
] = {}
733 self
.nsm_states
[ifaddr
][(nbraddr
, router_id
)] = state
735 if self
.nsm_change_cb
:
736 self
.nsm_change_cb(ifaddr
, nbraddr
, router_id
, state
)
738 async def _lsa_change_msg(self
, mt
, msg
, extra
, ifaddr
, aid
, is_self
, *ls_header
):
750 otype
= (ls_id
>> 24) & 0xFF
752 if mt
== MSG_LSA_UPDATE_NOTIFY
:
755 assert mt
== MSG_LSA_DELETE_NOTIFY
759 "RECV: LSA %s msg for LSA %s in area %s seq 0x%x len %s age %s",
767 idx
= (lsa_type
, otype
)
769 pre_lsa_size
= msg_size
[mt
] - FMT_LSA_HEADER_SIZE
770 lsa
= msg
[pre_lsa_size
:]
772 if idx
in self
.opaque_change_cb
:
773 self
.opaque_change_cb
[idx
](mt
, ifaddr
, aid
, ls_header
, extra
, lsa
)
775 if self
.lsa_change_cb
:
776 self
.lsa_change_cb(mt
, ifaddr
, aid
, ls_header
, extra
, lsa
)
778 async def _reachable_msg(self
, mt
, msg
, extra
, nadd
, nremove
):
779 router_ids
= struct
.unpack(">{}I".format(nadd
+ nremove
), extra
)
780 router_ids
= [ip(x
) for x
in router_ids
]
782 "RECV: %s added %s removed %s",
787 self
.reachable_routers |
= set(router_ids
[:nadd
])
788 self
.reachable_routers
-= set(router_ids
[nadd
:])
789 logging
.info("RECV: %s new set %s", api_msgname(mt
), self
.reachable_routers
)
791 if self
.reachable_change_cb
:
792 logging
.info("RECV: %s calling callback", api_msgname(mt
))
793 await self
.reachable_change_cb(router_ids
[:nadd
], router_ids
[nadd
:])
795 async def _router_id_msg(self
, mt
, msg
, extra
, router_id
):
796 router_id
= ip(router_id
)
797 logging
.info("RECV: %s router ID %s", api_msgname(mt
), router_id
)
798 old_router_id
= self
.router_id
799 if old_router_id
== router_id
:
802 self
.router_id
= router_id
804 "RECV: %s new router ID %s older router ID %s",
810 if self
.router_id_change_cb
:
811 logging
.info("RECV: %s calling callback", api_msgname(mt
))
812 await self
.router_id_change_cb(router_id
, old_router_id
)
814 async def add_opaque_data(self
, addr
, lsa_type
, otype
, oid
, data
):
815 """Add an instance of opaque data.
817 Add an instance of opaque data. This call will register for the given
818 LSA and opaque type if not already done.
821 addr: depends on lsa_type, LINK => ifaddr, AREA => area ID, AS => ignored
822 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
823 otype: (octet) opaque type
824 oid: (3 octets) ID of this opaque data
825 data: the opaque data
827 See `msg_send_raises`
829 assert self
.ready_cond
.get(lsa_type
, {}).get(otype
) is True, "Not Registered!"
831 if lsa_type
== LSA_TYPE_OPAQUE_LINK
:
832 ifaddr
, aid
= int(addr
), 0
833 elif lsa_type
== LSA_TYPE_OPAQUE_AREA
:
834 ifaddr
, aid
= 0, int(addr
)
836 assert lsa_type
== LSA_TYPE_OPAQUE_AS
839 mt
= MSG_ORIGINATE_REQUEST
844 *OspfOpaqueClient
._opaque
_args
(lsa_type
, otype
, oid
, data
),
847 await self
.msg_send_raises(mt
, msg
)
849 async def delete_opaque_data(self
, addr
, lsa_type
, otype
, oid
, flags
=0):
850 """Delete an instance of opaque data.
852 Delete an instance of opaque data. This call will register for the given
853 LSA and opaque type if not already done.
856 addr: depends on lsa_type, LINK => ifaddr, AREA => area ID, AS => ignored
857 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
858 otype: (octet) opaque type.
859 oid: (3 octets) ID of this opaque data
860 flags: (octet) optional flags (e.g., OSPF_API_DEL_ZERO_LEN_LSA, defaults to no flags)
862 See `msg_send_raises`
864 assert self
.ready_cond
.get(lsa_type
, {}).get(otype
) is True, "Not Registered!"
866 mt
= MSG_DELETE_REQUEST
867 mp
= struct
.pack(msg_fmt
[mt
], int(addr
), lsa_type
, otype
, flags
, oid
)
868 await self
.msg_send_raises(mt
, mp
)
870 async def is_registered(self
, lsa_type
, otype
):
871 """Determine if an (lsa_type, otype) tuple has been registered with FRR
873 This determines if the type has been registered, but not necessarily if it is
874 ready, if that is required use the `wait_opaque_ready` metheod.
877 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
878 otype: (octet) opaque type.
880 async with self
.ready_lock
:
881 return self
.ready_cond
.get(lsa_type
, {}).get(otype
) is not None
883 async def register_opaque_data(self
, lsa_type
, otype
, callback
=None):
884 """Register intent to advertise opaque data.
886 The application should wait for the async notificaiton that the server is
887 ready to advertise the given opaque data type. The API currently only allows
888 a single "owner" of each unique (lsa_type,otype). To wait call `wait_opaque_ready`
891 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
892 otype: (octet) opaque type.
893 callback: if given, callback will be called when changes are received for
894 LSA of the given (lsa_type, otype). The callbacks signature is:
896 `callback(msg_type, ifaddr, area_id, lsa_header, data, lsa)`
899 msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY
900 ifaddr: integer identifying an interface (by IP address)
901 area_id: integer identifying an area
902 lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH")
903 data: the opaque data that follows the LSA header
904 lsa: the octets of the full lsa
906 See `msg_send_raises`
908 assert not await self
.is_registered(
910 ), "Registering registered type"
913 self
.opaque_change_cb
[(lsa_type
, otype
)] = callback
914 elif (lsa_type
, otype
) in self
.opaque_change_cb
:
916 "OSPFCLIENT: register: removing callback for %s opaque-type %s",
917 lsa_typename(lsa_type
),
920 del self
.opaque_change_cb
[(lsa_type
, otype
)]
922 await self
._register
_opaque
_data
(lsa_type
, otype
)
924 async def wait_opaque_ready(self
, lsa_type
, otype
):
925 async with self
.ready_lock
:
926 cond
= self
.ready_cond
[lsa_type
].get(otype
)
930 assert self
.wait_ready
933 "waiting for ready %s opaque-type %s", lsa_typename(lsa_type
), otype
937 cond
= self
.ready_cond
[lsa_type
][otype
] = []
943 logging
.debug("READY for %s opaque-type %s", lsa_typename(lsa_type
), otype
)
945 async def register_opaque_data_wait(self
, lsa_type
, otype
, callback
=None):
946 """Register intent to advertise opaque data and wait for ready.
948 The API currently only allows a single "owner" of each unique (lsa_type,otype).
951 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
952 otype: (octet) opaque type.
953 callback: if given, callback will be called when changes are received for
954 LSA of the given (lsa_type, otype). The callbacks signature is:
956 `callback(msg_type, ifaddr, area_id, lsa_header, data, lsa)`
959 msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY
960 ifaddr: integer identifying an interface (by IP address)
961 area_id: integer identifying an area
962 lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH")
963 data: the opaque data that follows the LSA header
964 lsa: the octets of the full lsa
967 See `msg_send_raises`
969 await self
.register_opaque_data(lsa_type
, otype
, callback
)
970 await self
.wait_opaque_ready(lsa_type
, otype
)
972 async def unregister_opaque_data(self
, lsa_type
, otype
):
973 """Unregister intent to advertise opaque data.
975 This will also cause the server to flush/delete all opaque data of
976 the given (lsa_type,otype).
979 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
980 otype: (octet) opaque type.
982 See `msg_send_raises`
984 assert await self
.is_registered(
986 ), "Unregistering unregistered type"
988 if (lsa_type
, otype
) in self
.opaque_change_cb
:
989 del self
.opaque_change_cb
[(lsa_type
, otype
)]
991 mt
= MSG_UNREGISTER_OPAQUETYPE
992 mp
= struct
.pack(msg_fmt
[mt
], lsa_type
, otype
)
993 await self
.msg_send_raises(mt
, mp
)
995 async def monitor_lsa(self
, callback
=None):
996 """Monitor changes to LSAs.
999 callback: if given, callback will be called when changes are received for
1000 any LSA. The callback signature is:
1002 `callback(msg_type, ifaddr, area_id, lsa_header, extra, lsa)`
1005 msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY
1006 ifaddr: integer identifying an interface (by IP address)
1007 area_id: integer identifying an area
1008 lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH")
1009 extra: the octets that follow the LSA header
1010 lsa: the octets of the full lsa
1012 self
.lsa_change_cb
= callback
1013 await self
.req_lsdb_sync()
1015 async def monitor_reachable(self
, callback
=None):
1016 """Monitor the set of reachable routers.
1018 The property `reachable_routers` contains the set() of reachable router IDs
1019 as integers. This set is updated prior to calling the `callback`
1022 callback: callback will be called when the set of reachable
1023 routers changes. The callback signature is:
1025 `callback(added, removed)`
1028 added: list of integer router IDs being added
1029 removed: list of integer router IDs being removed
1031 self
.reachable_change_cb
= callback
1032 await self
.req_reachable_routers()
1034 async def monitor_ism(self
, callback
=None):
1035 """Monitor the state of OSPF enabled interfaces.
1038 callback: callback will be called when an interface changes state.
1039 The callback signature is:
1041 `callback(ifaddr, area_id, state)`
1044 ifaddr: integer identifying an interface (by IP address)
1045 area_id: integer identifying an area
1048 self
.ism_change_cb
= callback
1049 await self
.req_ism_states()
1051 async def monitor_nsm(self
, callback
=None):
1052 """Monitor the state of OSPF neighbors.
1055 callback: callback will be called when a neighbor changes state.
1056 The callback signature is:
1058 `callback(ifaddr, nbr_addr, router_id, state)`
1061 ifaddr: integer identifying an interface (by IP address)
1062 nbr_addr: integer identifying neighbor by IP address
1063 router_id: integer identifying neighbor router ID
1066 self
.nsm_change_cb
= callback
1067 await self
.req_nsm_states()
1069 async def monitor_router_id(self
, callback
=None):
1070 """Monitor the OSPF router ID.
1072 The property `router_id` contains the OSPF urouter ID.
1073 This value is updated prior to calling the `callback`
1076 callback: callback will be called when the router ID changes.
1077 The callback signature is:
1079 `callback(new_router_id, old_router_id)`
1082 new_router_id: the new router ID
1083 old_router_id: the old router ID
1085 self
.router_id_change_cb
= callback
1086 await self
.req_router_id_sync()
1092 def next_action(action_list
=None):
1093 "Get next action from list or STDIN"
1095 for action
in action_list
:
1102 yield action
.strip()
1105 async def async_main(args
):
1106 c
= OspfOpaqueClient(args
.server
)
1110 # Start handling async messages from server.
1111 if sys
.version_info
[1] > 6:
1112 asyncio
.create_task(c
._handle
_msg
_loop
())
1114 asyncio
.get_event_loop().create_task(c
._handle
_msg
_loop
())
1116 await c
.req_lsdb_sync()
1117 await c
.req_reachable_routers()
1118 await c
.req_ism_states()
1119 await c
.req_nsm_states()
1121 for action
in next_action(args
.actions
):
1122 _s
= action
.split(",")
1123 what
= _s
.pop(False)
1124 if what
.casefold() == "wait":
1125 stime
= int(_s
.pop(False))
1126 logging
.info("waiting %s seconds", stime
)
1127 await asyncio
.sleep(stime
)
1128 logging
.info("wait complete: %s seconds", stime
)
1130 ltype
= int(_s
.pop(False))
1134 aval
= _s
.pop(False)
1136 addr
= ip(int(aval
))
1139 oargs
= [addr
, ltype
, int(_s
.pop(False)), int(_s
.pop(False))]
1141 if not await c
.is_registered(oargs
[1], oargs
[2]):
1142 await c
.register_opaque_data_wait(oargs
[1], oargs
[2])
1144 if what
.casefold() == "add":
1146 b
= bytes
.fromhex(_s
.pop(False))
1149 logging
.info("opaque data is %s octets", len(b
))
1150 # Needs to be multiple of 4 in length
1153 b
+= b
"\x00" * (4 - mod
)
1154 logging
.info("opaque padding to %s octets", len(b
))
1156 await c
.add_opaque_data(*oargs
, b
)
1158 assert what
.casefold().startswith("del")
1162 f
= int(_s
.pop(False))
1165 await c
.delete_opaque_data(*oargs
, f
)
1166 if not args
.actions
or args
.exit
:
1168 except Exception as error
:
1169 logging
.error("async_main: unexpected error: %s", error
, exc_info
=True)
1173 logging
.info("Sleeping forever")
1175 await asyncio
.sleep(120)
1177 logging
.info("Got EOF from OSPF API server on async notify socket")
1182 ap
= argparse
.ArgumentParser(args
)
1183 ap
.add_argument("--logtag", default
="CLIENT", help="tag to identify log messages")
1184 ap
.add_argument("--exit", action
="store_true", help="Exit after commands")
1185 ap
.add_argument("--server", default
="localhost", help="OSPF API server")
1186 ap
.add_argument("-v", "--verbose", action
="store_true", help="be verbose")
1190 help="WAIT,SEC|(ADD|DEL),LSATYPE,[ADDR,],OTYPE,OID,[HEXDATA|DEL_FLAG]",
1192 args
= ap
.parse_args()
1194 level
= logging
.DEBUG
if args
.verbose
else logging
.INFO
1195 logging
.basicConfig(
1197 format
="%(asctime)s %(levelname)s: {}: %(name)s %(message)s".format(
1202 logging
.info("ospfclient: starting")
1206 if sys
.version_info
[1] > 6:
1208 status
= asyncio
.run(async_main(args
))
1210 loop
= asyncio
.get_event_loop()
1212 status
= loop
.run_until_complete(async_main(args
))
1215 except KeyboardInterrupt:
1216 logging
.info("Exiting, received KeyboardInterrupt in main")
1217 except Exception as error
:
1218 logging
.info("Exiting, unexpected exception %s", error
, exc_info
=True)
1220 logging
.info("ospfclient: clean exit")
1225 if __name__
== "__main__":
1226 exit_status
= main()
1227 sys
.exit(exit_status
)