2 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
4 # December 22 2021, Christian Hopps <chopps@labn.net>
6 # Copyright 2021, LabN Consulting, L.L.C.
8 # This program is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU General Public License
10 # as published by the Free Software Foundation; either version 2
11 # of the License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License along
19 # with this program; see the file COPYING; if not, write to the Free Software
20 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
30 from asyncio
import Event
, Lock
31 from ipaddress
import ip_address
as ip
33 FMT_APIMSGHDR
= ">BBHL"
34 FMT_APIMSGHDR_SIZE
= struct
.calcsize(FMT_APIMSGHDR
)
36 FMT_LSA_FILTER
= ">HBB" # + plus x"I" areas
37 LSAF_ORIGIN_NON_SELF
= 0
41 FMT_LSA_HEADER
= ">HBBIILHH"
42 FMT_LSA_HEADER_SIZE
= struct
.calcsize(FMT_LSA_HEADER
)
44 # ------------------------
45 # Messages to OSPF daemon.
46 # ------------------------
48 MSG_REGISTER_OPAQUETYPE
= 1
49 MSG_UNREGISTER_OPAQUETYPE
= 2
50 MSG_REGISTER_EVENT
= 3
52 MSG_ORIGINATE_REQUEST
= 5
53 MSG_DELETE_REQUEST
= 6
54 MSG_SYNC_REACHABLE
= 7
59 MSG_REGISTER_OPAQUETYPE
: ("REGISTER_OPAQUETYPE", "BBxx"),
60 MSG_UNREGISTER_OPAQUETYPE
: ("UNREGISTER_OPAQUETYPE", "BBxx"),
61 MSG_REGISTER_EVENT
: ("REGISTER_EVENT", FMT_LSA_FILTER
),
62 MSG_SYNC_LSDB
: ("SYNC_LSDB", FMT_LSA_FILTER
),
63 MSG_ORIGINATE_REQUEST
: ("ORIGINATE_REQUEST", ">II" + FMT_LSA_HEADER
[1:]),
64 MSG_DELETE_REQUEST
: ("DELETE_REQUEST", ">IBBxxL"),
65 MSG_SYNC_REACHABLE
: ("MSG_SYNC_REACHABLE", ""),
66 MSG_SYNC_ISM
: ("MSG_SYNC_ISM", ""),
67 MSG_SYNC_NSM
: ("MSG_SYNC_NSM", ""),
70 # --------------------------
71 # Messages from OSPF daemon.
72 # --------------------------
76 MSG_LSA_UPDATE_NOTIFY
= 12
77 MSG_LSA_DELETE_NOTIFY
= 13
82 MSG_REACHABLE_CHANGE
= 18
85 MSG_REPLY
: ("REPLY", "bxxx"),
86 MSG_READY_NOTIFY
: ("READY_NOTIFY", ">BBxxI"),
87 MSG_LSA_UPDATE_NOTIFY
: ("LSA_UPDATE_NOTIFY", ">IIBxxx" + FMT_LSA_HEADER
[1:]),
88 MSG_LSA_DELETE_NOTIFY
: ("LSA_DELETE_NOTIFY", ">IIBxxx" + FMT_LSA_HEADER
[1:]),
89 MSG_NEW_IF
: ("NEW_IF", ">II"),
90 MSG_DEL_IF
: ("DEL_IF", ">I"),
91 MSG_ISM_CHANGE
: ("ISM_CHANGE", ">IIBxxx"),
92 MSG_NSM_CHANGE
: ("NSM_CHANGE", ">IIIBxxx"),
93 MSG_REACHABLE_CHANGE
: ("REACHABLE_CHANGE", ">HH"),
97 OSPF_API_NOSUCHINTERFACE
= -1
98 OSPF_API_NOSUCHAREA
= -2
99 OSPF_API_NOSUCHLSA
= -3
100 OSPF_API_ILLEGALLSATYPE
= -4
101 OSPF_API_OPAQUETYPEINUSE
= -5
102 OSPF_API_OPAQUETYPENOTREGISTERED
= -6
103 OSPF_API_NOTREADY
= -7
104 OSPF_API_NOMEMORY
= -8
109 OSPF_API_OK
: "OSPF_API_OK",
110 OSPF_API_NOSUCHINTERFACE
: "OSPF_API_NOSUCHINTERFACE",
111 OSPF_API_NOSUCHAREA
: "OSPF_API_NOSUCHAREA",
112 OSPF_API_NOSUCHLSA
: "OSPF_API_NOSUCHLSA",
113 OSPF_API_ILLEGALLSATYPE
: "OSPF_API_ILLEGALLSATYPE",
114 OSPF_API_OPAQUETYPEINUSE
: "OSPF_API_OPAQUETYPEINUSE",
115 OSPF_API_OPAQUETYPENOTREGISTERED
: "OSPF_API_OPAQUETYPENOTREGISTERED",
116 OSPF_API_NOTREADY
: "OSPF_API_NOTREADY",
117 OSPF_API_NOMEMORY
: "OSPF_API_NOMEMORY",
118 OSPF_API_ERROR
: "OSPF_API_ERROR",
119 OSPF_API_UNDEF
: "OSPF_API_UNDEF",
122 # msg_info = {**smsg_info, **amsg_info}
124 msg_info
.update(smsg_info
)
125 msg_info
.update(amsg_info
)
126 msg_name
= {k
: v
[0] for k
, v
in msg_info
.items()}
127 msg_fmt
= {k
: v
[1] for k
, v
in msg_info
.items()}
128 msg_size
= {k
: struct
.calcsize(v
) for k
, v
in msg_fmt
.items()}
132 return msg_name
.get(mt
, str(mt
))
135 def api_errname(ecode
):
136 return msg_errname
.get(ecode
, str(ecode
))
139 # -------------------
140 # API Semantic Errors
141 # -------------------
144 class APIError(Exception):
148 class MsgTypeError(Exception):
152 class SeqNumError(Exception):
164 LSA_TYPE_ASBR_SUMMARY
= 4
165 LSA_TYPE_AS_EXTERNAL
= 5
166 LSA_TYPE_GROUP_MEMBER
= 6
168 LSA_TYPE_EXTERNAL_ATTRIBUTES
= 8
169 LSA_TYPE_OPAQUE_LINK
= 9
170 LSA_TYPE_OPAQUE_AREA
= 10
171 LSA_TYPE_OPAQUE_AS
= 11
174 def lsa_typename(lsa_type
):
176 LSA_TYPE_ROUTER
: "LSA:ROUTER",
177 LSA_TYPE_NETWORK
: "LSA:NETWORK",
178 LSA_TYPE_SUMMARY
: "LSA:SUMMARY",
179 LSA_TYPE_ASBR_SUMMARY
: "LSA:ASBR_SUMMARY",
180 LSA_TYPE_AS_EXTERNAL
: "LSA:AS_EXTERNAL",
181 LSA_TYPE_GROUP_MEMBER
: "LSA:GROUP_MEMBER",
182 LSA_TYPE_AS_NSSA
: "LSA:AS_NSSA",
183 LSA_TYPE_EXTERNAL_ATTRIBUTES
: "LSA:EXTERNAL_ATTRIBUTES",
184 LSA_TYPE_OPAQUE_LINK
: "LSA:OPAQUE_LINK",
185 LSA_TYPE_OPAQUE_AREA
: "LSA:OPAQUE_AREA",
186 LSA_TYPE_OPAQUE_AS
: "LSA:OPAQUE_AS",
188 return names
.get(lsa_type
, str(lsa_type
))
191 # ------------------------------
192 # Interface State Machine States
193 # ------------------------------
207 ISM_DEPENDUPON
: "ISM_DEPENDUPON",
208 ISM_DOWN
: "ISM_DOWN",
209 ISM_LOOPBACK
: "ISM_LOOPBACK",
210 ISM_WAITING
: "ISM_WAITING",
211 ISM_POINTTOPOINT
: "ISM_POINTTOPOINT",
212 ISM_DROTHER
: "ISM_DROTHER",
213 ISM_BACKUP
: "ISM_BACKUP",
216 return names
.get(state
, str(state
))
219 # -----------------------------
220 # Neighbor State Machine States
221 # -----------------------------
237 NSM_DEPENDUPON
: "NSM_DEPENDUPON",
238 NSM_DELETED
: "NSM_DELETED",
239 NSM_DOWN
: "NSM_DOWN",
240 NSM_ATTEMPT
: "NSM_ATTEMPT",
241 NSM_INIT
: "NSM_INIT",
242 NSM_TWOWAY
: "NSM_TWOWAY",
243 NSM_EXSTART
: "NSM_EXSTART",
244 NSM_EXCHANGE
: "NSM_EXCHANGE",
245 NSM_LOADING
: "NSM_LOADING",
246 NSM_FULL
: "NSM_FULL",
248 return names
.get(state
, str(state
))
258 return "OspfApiClient({})".format(self
.server
)
261 def _get_bound_sockets(port
):
262 s1
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
264 s1
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
265 # s1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
267 s2
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
, 0)
269 s2
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
270 # s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
271 s2
.bind(("", port
+ 1))
280 def __init__(self
, server
="localhost", handlers
=None):
281 """A client connection to OSPF Daemon using the OSPF API
283 The client object is not created in a connected state. To connect to the server
284 the `connect` method should be called. If an error is encountered when sending
285 messages to the server an exception will be raised and the connection will be
286 closed. When this happens `connect` may be called again to restore the
290 server: hostname or IP address of server default is "localhost"
291 handlers: dict of message handlers, the key is the API message
292 type, the value is a function. The functions signature is:
293 `handler(msg_type, msg, msg_extra, *params)`, where `msg` is the
294 message data after the API header, `*params` will be the
295 unpacked message values, and msg_extra are any bytes beyond the
296 fixed parameters of the message.
298 Will raise exceptions for failures with various `socket` modules
299 functions such as `socket.socket`, `socket.setsockopt`, `socket.bind`.
305 self
._ar
= self
._r
= self
._w
= None
307 self
.handlers
= handlers
if handlers
is not None else dict()
308 self
.write_lock
= Lock()
310 # try and get consecutive 2 ports
313 for port
in range(PORTSTART
, PORTEND
+ 2, 2):
315 logging
.debug("%s: binding to ports %s, %s", self
, port
, port
+ 1)
316 self
._s
, self
._ls
= self
._get
_bound
_sockets
(port
)
318 except OSError as error
:
319 if error
.errno
!= errno
.EADDRINUSE
or port
== PORTEND
:
320 logging
.warning("%s: binding port %s error %s", self
, port
, error
)
322 logging
.debug("%s: ports %s, %s in use.", self
, port
, port
+ 1)
324 assert False, "Should not reach this code execution point"
326 async def _connect_locked(self
):
327 logging
.debug("%s: connect to OSPF API", self
)
329 loop
= asyncio
.get_event_loop()
333 logging
.debug("%s: connecting sync socket to server", self
)
334 await loop
.sock_connect(self
._s
, (self
.server
, 2607))
336 logging
.debug("%s: accepting connect from server", self
)
337 self
._as
, _
= await loop
.sock_accept(self
._ls
)
339 await self
._close
_locked
()
342 logging
.debug("%s: success", self
)
343 self
._r
, self
._w
= await asyncio
.open_connection(sock
=self
._s
)
344 self
._ar
, _
= await asyncio
.open_connection(sock
=self
._as
)
347 async def connect(self
):
348 async with self
.write_lock
:
349 await self
._connect
_locked
()
353 "True if the connection is closed."
354 return self
._seq
== 0
356 async def _close_locked(self
):
357 logging
.debug("%s: closing", self
)
361 await self
._w
.wait_closed()
367 assert self
._w
is None
377 async def close(self
):
378 async with self
.write_lock
:
379 await self
._close
_locked
()
382 async def _msg_read(r
, expseq
=-1):
383 """Read an OSPF API message from the socket `r`
386 r: socket to read msg from
387 expseq: sequence number to expect or -1 for any.
389 Will raise exceptions for failures with various `socket` modules,
390 Additionally may raise SeqNumError if unexpected seqnum is received.
393 mh
= await r
.readexactly(FMT_APIMSGHDR_SIZE
)
394 v
, mt
, l
, seq
= struct
.unpack(FMT_APIMSGHDR
, mh
)
396 raise Exception("received unexpected OSPF API version {}".format(v
))
398 logging
.debug("_msg_read: got seq: 0x%x on async read", seq
)
400 raise SeqNumError("rx {} != {}".format(seq
, expseq
))
401 msg
= await r
.readexactly(l
) if l
else b
""
403 except asyncio
.IncompleteReadError
:
406 async def msg_read(self
):
407 """Read a message from the async notify channel.
410 May raise exceptions for failures with various `socket` modules.
412 return await OspfApiClient
._msg
_read
(self
._ar
, -1)
414 async def msg_send(self
, mt
, mp
):
415 """Send a message to OSPF API and wait for error code reply.
418 mt: the messaage type
419 mp: the message payload
421 error: an OSPF_API_XXX error code, 0 for OK.
423 Raises SeqNumError if the synchronous reply is the wrong sequence number;
424 MsgTypeError if the synchronous reply is not MSG_REPLY. Also,
425 may raise exceptions for failures with various `socket` modules,
427 The connection will be closed.
429 logging
.debug("SEND: %s: sending %s seq 0x%x", self
, api_msgname(mt
), self
._seq
)
430 mh
= struct
.pack(FMT_APIMSGHDR
, 1, mt
, len(mp
), self
._seq
)
436 async with self
.write_lock
:
437 self
._w
.write(mh
+ mp
)
438 await self
._w
.drain()
439 mt
, mp
= await OspfApiClient
._msg
_read
(self
._r
, seq
)
443 "rx {} != {}".format(api_msgname(mt
), api_msgname(MSG_REPLY
))
446 return struct
.unpack(msg_fmt
[MSG_REPLY
], mp
)[0]
448 # We've written data with a sequence number
452 async def msg_send_raises(self
, mt
, mp
=b
"\x00" * 4):
453 """Send a message to OSPF API and wait for error code reply.
456 mt: the messaage type
457 mp: the message payload
459 APIError if the server replies with an error.
461 Also may raise exceptions for failures with various `socket` modules,
462 as well as MsgTypeError if the synchronous reply is incorrect.
463 The connection will be closed for these non-API error exceptions.
465 ecode
= await self
.msg_send(mt
, mp
)
467 raise APIError("{} error {}".format(api_msgname(mt
), api_errname(ecode
)))
469 async def handle_async_msg(self
, mt
, msg
):
470 if mt
not in msg_fmt
:
471 logging
.debug("RECV: %s: unknown async msg type %s", self
, mt
)
476 tup
= struct
.unpack(fmt
, msg
[:sz
])
479 if mt
not in self
.handlers
:
481 "RECV: %s: no handlers for msg type %s", self
, api_msgname(mt
)
485 logging
.debug("RECV: %s: calling handler for %s", self
, api_msgname(mt
))
486 await self
.handlers
[mt
](mt
, msg
, extra
, *tup
)
489 # Client to Server Messaging
492 def lsa_type_mask(*lsa_types
):
493 "Return a 16 bit mask for each LSA type passed."
498 assert 0 < t
< 16, "LSA type {} out of range [1, 15]".format(t
)
503 def lsa_filter(origin
, areas
, lsa_types
):
504 """Return an LSA filter.
506 Return the filter message bytes based on `origin` the `areas` list and the LSAs
507 types in the `lsa_types` list.
509 mask
= OspfApiClient
.lsa_type_mask(*lsa_types
)
511 fmt
= FMT_LSA_FILTER
+ ("{}I".format(narea
) if narea
else "")
512 # lsa type mask, origin, number of areas, each area
513 return struct
.pack(fmt
, mask
, origin
, narea
, *areas
)
515 async def req_lsdb_sync(self
):
516 "Register for all LSA notifications and request an LSDB synchronoization."
517 logging
.debug("SEND: %s: request LSDB events", self
)
518 mp
= OspfApiClient
.lsa_filter(LSAF_ORIGIN_ANY
, [], [])
519 await self
.msg_send_raises(MSG_REGISTER_EVENT
, mp
)
521 logging
.debug("SEND: %s: request LSDB sync", self
)
522 await self
.msg_send_raises(MSG_SYNC_LSDB
, mp
)
524 async def req_reachable_routers(self
):
525 "Request a dump of all reachable routers."
526 logging
.debug("SEND: %s: request reachable changes", self
)
527 await self
.msg_send_raises(MSG_SYNC_REACHABLE
)
529 async def req_ism_states(self
):
530 "Request a dump of the current ISM states of all interfaces."
531 logging
.debug("SEND: %s: request ISM changes", self
)
532 await self
.msg_send_raises(MSG_SYNC_ISM
)
534 async def req_nsm_states(self
):
535 "Request a dump of the current NSM states of all neighbors."
536 logging
.debug("SEND: %s: request NSM changes", self
)
537 await self
.msg_send_raises(MSG_SYNC_NSM
)
540 class OspfOpaqueClient(OspfApiClient
):
541 """A client connection to OSPF Daemon for manipulating Opaque LSA data.
543 The client object is not created in a connected state. To connect to the server
544 the `connect` method should be called. If an error is encountered when sending
545 messages to the server an exception will be raised and the connection will be
546 closed. When this happens `connect` may be called again to restore the
550 server: hostname or IP address of server default is "localhost"
553 Will raise exceptions for failures with various `socket` modules
554 functions such as `socket.socket`, `socket.setsockopt`, `socket.bind`.
557 def __init__(self
, server
="localhost"):
559 MSG_READY_NOTIFY
: self
._ready
_msg
,
560 MSG_LSA_UPDATE_NOTIFY
: self
._lsa
_change
_msg
,
561 MSG_LSA_DELETE_NOTIFY
: self
._lsa
_change
_msg
,
562 MSG_NEW_IF
: self
._if
_msg
,
563 MSG_DEL_IF
: self
._if
_msg
,
564 MSG_ISM_CHANGE
: self
._if
_change
_msg
,
565 MSG_NSM_CHANGE
: self
._nbr
_change
_msg
,
566 MSG_REACHABLE_CHANGE
: self
._reachable
_msg
,
568 super().__init
__(server
, handlers
)
570 self
.ready_lock
= Lock()
572 LSA_TYPE_OPAQUE_LINK
: {},
573 LSA_TYPE_OPAQUE_AREA
: {},
574 LSA_TYPE_OPAQUE_AS
: {},
576 self
.lsid_seq_num
= {}
578 self
.lsa_change_cb
= None
579 self
.opaque_change_cb
= {}
581 self
.reachable_routers
= set()
582 self
.reachable_change_cb
= None
586 self
.ism_change_cb
= None
589 self
.nsm_change_cb
= None
591 async def _register_opaque_data(self
, lsa_type
, otype
):
592 async with self
.ready_lock
:
593 cond
= self
.ready_cond
[lsa_type
].get(otype
)
594 assert cond
is None, "multiple registers for {} opaque-type {}".format(
595 lsa_typename(lsa_type
), otype
598 logging
.debug("register %s opaque-type %s", lsa_typename(lsa_type
), otype
)
600 mt
= MSG_REGISTER_OPAQUETYPE
601 mp
= struct
.pack(msg_fmt
[mt
], lsa_type
, otype
)
602 await self
.msg_send_raises(mt
, mp
)
604 async def _assure_opaque_ready(self
, lsa_type
, otype
):
605 async with self
.ready_lock
:
606 if self
.ready_cond
[lsa_type
].get(otype
) is True:
609 await self
._register
_opaque
_data
(lsa_type
, otype
)
610 await self
.wait_opaque_ready(lsa_type
, otype
)
612 async def _handle_msg_loop(self
):
614 logging
.debug("entering async msg handling loop")
616 mt
, msg
= await self
.msg_read()
618 await self
.handle_async_msg(mt
, msg
)
620 mts
= api_msgname(mt
)
622 "ignoring unexpected msg: %s len: %s", mts
, len(msg
)
625 logging
.info("Got EOF from OSPF API server on async notify socket")
629 def _opaque_args(lsa_type
, otype
, oid
, mp
):
630 lsid
= (otype
<< 24) | oid
631 return 0, 0, lsa_type
, lsid
, 0, 0, 0, FMT_LSA_HEADER_SIZE
+ len(mp
)
634 def _make_opaque_lsa(lsa_type
, otype
, oid
, mp
):
635 # /* Make a new LSA from parameters */
637 FMT_LSA_HEADER
, *OspfOpaqueClient
._opaque
_args
(lsa_type
, otype
, oid
, mp
)
642 async def _ready_msg(self
, mt
, msg
, extra
, lsa_type
, otype
, addr
):
643 if lsa_type
== LSA_TYPE_OPAQUE_LINK
:
644 e
= "ifaddr {}".format(ip(addr
))
645 elif lsa_type
== LSA_TYPE_OPAQUE_AREA
:
646 e
= "area {}".format(ip(addr
))
650 "RECV: %s ready notify for %s opaque-type %s%s",
652 lsa_typename(lsa_type
),
657 # Signal all waiting senders they can send now.
658 async with self
.ready_lock
:
659 cond
= self
.ready_cond
[lsa_type
].get(otype
)
660 self
.ready_cond
[lsa_type
][otype
] = True
664 "RECV: dup ready received for %s opaque-type %s",
665 lsa_typename(lsa_type
),
672 async def _if_msg(self
, mt
, msg
, extra
, *args
):
676 assert mt
== MSG_DEL_IF
677 ifaddr
, aid
= args
[0], 0
679 "RECV: %s ifaddr %s areaid %s", api_msgname(mt
), ip(ifaddr
), ip(aid
)
682 async def _if_change_msg(self
, mt
, msg
, extra
, ifaddr
, aid
, state
):
687 "RECV: %s ifaddr %s areaid %s state %s",
694 self
.if_area
[ifaddr
] = aid
695 self
.ism_states
[ifaddr
] = state
697 if self
.ism_change_cb
:
698 self
.ism_change_cb(ifaddr
, aid
, state
)
700 async def _nbr_change_msg(self
, mt
, msg
, extra
, ifaddr
, nbraddr
, router_id
, state
):
702 nbraddr
= ip(nbraddr
)
703 router_id
= ip(router_id
)
706 "RECV: %s ifaddr %s nbraddr %s router_id %s state %s",
714 if ifaddr
not in self
.nsm_states
:
715 self
.nsm_states
[ifaddr
] = {}
716 self
.nsm_states
[ifaddr
][(nbraddr
, router_id
)] = state
718 if self
.nsm_change_cb
:
719 self
.nsm_change_cb(ifaddr
, nbraddr
, router_id
, state
)
721 async def _lsa_change_msg(self
, mt
, msg
, extra
, ifaddr
, aid
, is_self
, *ls_header
):
733 otype
= (ls_id
>> 24) & 0xFF
735 if mt
== MSG_LSA_UPDATE_NOTIFY
:
738 assert mt
== MSG_LSA_DELETE_NOTIFY
742 "RECV: LSA %s msg for LSA %s in area %s seq 0x%x len %s age %s",
750 idx
= (lsa_type
, otype
)
752 pre_lsa_size
= msg_size
[mt
] - FMT_LSA_HEADER_SIZE
753 lsa
= msg
[pre_lsa_size
:]
755 if idx
in self
.opaque_change_cb
:
756 self
.opaque_change_cb
[idx
](mt
, ifaddr
, aid
, ls_header
, extra
, lsa
)
758 if self
.lsa_change_cb
:
759 self
.lsa_change_cb(mt
, ifaddr
, aid
, ls_header
, extra
, lsa
)
761 async def _reachable_msg(self
, mt
, msg
, extra
, nadd
, nremove
):
762 router_ids
= struct
.unpack(">{}I".format(nadd
+ nremove
), extra
)
763 router_ids
= [ip(x
) for x
in router_ids
]
765 "RECV: %s added %s removed %s",
770 self
.reachable_routers |
= set(router_ids
[:nadd
])
771 self
.reachable_routers
-= set(router_ids
[nadd
:])
772 logging
.info("RECV: %s new set %s", api_msgname(mt
), self
.reachable_routers
)
774 if self
.reachable_change_cb
:
775 logging
.info("RECV: %s calling callback", api_msgname(mt
))
776 await self
.reachable_change_cb(router_ids
[:nadd
], router_ids
[nadd
:])
778 async def add_opaque_data(self
, addr
, lsa_type
, otype
, oid
, data
):
779 """Add an instance of opaque data.
781 Add an instance of opaque data. This call will register for the given
782 LSA and opaque type if not already done.
785 addr: depends on lsa_type, LINK => ifaddr, AREA => area ID, AS => ignored
786 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
787 otype: (octet) opaque type
788 oid: (3 octets) ID of this opaque data
789 data: the opaque data
791 See `msg_send_raises`
794 if lsa_type
== LSA_TYPE_OPAQUE_LINK
:
795 ifaddr
, aid
= int(addr
), 0
796 elif lsa_type
== LSA_TYPE_OPAQUE_AREA
:
797 ifaddr
, aid
= 0, int(addr
)
799 assert lsa_type
== LSA_TYPE_OPAQUE_AS
802 mt
= MSG_ORIGINATE_REQUEST
807 *OspfOpaqueClient
._opaque
_args
(lsa_type
, otype
, oid
, data
),
810 await self
._assure
_opaque
_ready
(lsa_type
, otype
)
811 await self
.msg_send_raises(mt
, msg
)
813 async def delete_opaque_data(self
, addr
, lsa_type
, otype
, oid
):
814 """Delete an instance of opaque data.
816 Delete an instance of opaque data. This call will register for the given
817 LSA and opaque type if not already done.
820 addr: depends on lsa_type, LINK => ifaddr, AREA => area ID, AS => ignored
821 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
822 otype: (octet) opaque type. Note: the type will be registered if the user
823 has not explicity done that yet with `register_opaque_data`.
824 oid: (3 octets) ID of this opaque data
826 See `msg_send_raises`
828 if (lsa_type
, otype
) in self
.opaque_change_cb
:
829 del self
.opaque_change_cb
[(lsa_type
, otype
)]
831 mt
= MSG_DELETE_REQUEST
832 await self
._assure
_opaque
_ready
(lsa_type
, otype
)
833 mp
= struct
.pack(msg_fmt
[mt
], int(addr
), lsa_type
, otype
, oid
)
834 await self
.msg_send_raises(mt
, mp
)
836 async def register_opaque_data(self
, lsa_type
, otype
, callback
=None):
837 """Register intent to advertise opaque data.
839 The application should wait for the async notificaiton that the server is
840 ready to advertise the given opaque data type. The API currently only allows
841 a single "owner" of each unique (lsa_type,otype). To wait call `wait_opaque_ready`
844 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
845 otype: (octet) opaque type. Note: the type will be registered if the user
846 has not explicity done that yet with `register_opaque_data`.
847 callback: if given, callback will be called when changes are received for
848 LSA of the given (lsa_type, otype). The callbacks signature is:
850 `callback(msg_type, ifaddr, area_id, lsa_header, data, lsa)`
853 msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY
854 ifaddr: integer identifying an interface (by IP address)
855 area_id: integer identifying an area
856 lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH")
857 data: the opaque data that follows the LSA header
858 lsa: the octets of the full lsa
860 See `msg_send_raises`
863 self
.opaque_change_cb
[(lsa_type
, otype
)] = callback
864 elif (lsa_type
, otype
) in self
.opaque_change_cb
:
866 "OSPFCLIENT: register: removing callback for %s opaque-type %s",
867 lsa_typename(lsa_type
),
870 del self
.opaque_change_cb
[(lsa_type
, otype
)]
872 await self
._register
_opaque
_data
(lsa_type
, otype
)
874 async def wait_opaque_ready(self
, lsa_type
, otype
):
875 async with self
.ready_lock
:
876 cond
= self
.ready_cond
[lsa_type
].get(otype
)
881 "waiting for ready %s opaque-type %s", lsa_typename(lsa_type
), otype
885 cond
= self
.ready_cond
[lsa_type
][otype
] = []
891 logging
.debug("READY for %s opaque-type %s", lsa_typename(lsa_type
), otype
)
893 async def register_opaque_data_wait(self
, lsa_type
, otype
, callback
=None):
894 """Register intent to advertise opaque data and wait for ready.
896 The API currently only allows a single "owner" of each unique (lsa_type,otype).
899 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
900 otype: (octet) opaque type. Note: the type will be registered if the user
901 has not explicity done that yet with `register_opaque_data`.
902 callback: if given, callback will be called when changes are received for
903 LSA of the given (lsa_type, otype). The callbacks signature is:
905 `callback(msg_type, ifaddr, area_id, lsa_header, data, lsa)`
908 msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY
909 ifaddr: integer identifying an interface (by IP address)
910 area_id: integer identifying an area
911 lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH")
912 data: the opaque data that follows the LSA header
913 lsa: the octets of the full lsa
916 See `msg_send_raises`
919 self
.opaque_change_cb
[(lsa_type
, otype
)] = callback
920 elif (lsa_type
, otype
) in self
.opaque_change_cb
:
922 "OSPFCLIENT: register: removing callback for %s opaque-type %s",
923 lsa_typename(lsa_type
),
926 del self
.opaque_change_cb
[(lsa_type
, otype
)]
928 return await self
._assure
_opaque
_ready
(lsa_type
, otype
)
930 async def unregister_opaque_data(self
, lsa_type
, otype
):
931 """Unregister intent to advertise opaque data.
933 This will also cause the server to flush/delete all opaque data of
934 the given (lsa_type,otype).
937 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
938 otype: (octet) opaque type. Note: the type will be registered if the user
939 has not explicity done that yet with `register_opaque_data`.
941 See `msg_send_raises`
944 if (lsa_type
, otype
) in self
.opaque_change_cb
:
945 del self
.opaque_change_cb
[(lsa_type
, otype
)]
947 mt
= MSG_UNREGISTER_OPAQUETYPE
948 mp
= struct
.pack(msg_fmt
[mt
], lsa_type
, otype
)
949 await self
.msg_send_raises(mt
, mp
)
951 async def monitor_lsa(self
, callback
=None):
952 """Monitor changes to LSAs.
955 callback: if given, callback will be called when changes are received for
956 any LSA. The callback signature is:
958 `callback(msg_type, ifaddr, area_id, lsa_header, extra, lsa)`
961 msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY
962 ifaddr: integer identifying an interface (by IP address)
963 area_id: integer identifying an area
964 lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH")
965 extra: the octets that follow the LSA header
966 lsa: the octets of the full lsa
968 self
.lsa_change_cb
= callback
969 await self
.req_lsdb_sync()
971 async def monitor_reachable(self
, callback
=None):
972 """Monitor the set of reachable routers.
974 The property `reachable_routers` contains the set() of reachable router IDs
975 as integers. This set is updated prior to calling the `callback`
978 callback: callback will be called when the set of reachable
979 routers changes. The callback signature is:
981 `callback(added, removed)`
984 added: list of integer router IDs being added
985 removed: list of integer router IDs being removed
987 self
.reachable_change_cb
= callback
988 await self
.req_reachable_routers()
990 async def monitor_ism(self
, callback
=None):
991 """Monitor the state of OSPF enabled interfaces.
994 callback: callback will be called when an interface changes state.
995 The callback signature is:
997 `callback(ifaddr, area_id, state)`
1000 ifaddr: integer identifying an interface (by IP address)
1001 area_id: integer identifying an area
1004 self
.ism_change_cb
= callback
1005 await self
.req_ism_states()
1007 async def monitor_nsm(self
, callback
=None):
1008 """Monitor the state of OSPF neighbors.
1011 callback: callback will be called when a neighbor changes state.
1012 The callback signature is:
1014 `callback(ifaddr, nbr_addr, router_id, state)`
1017 ifaddr: integer identifying an interface (by IP address)
1018 nbr_addr: integer identifying neighbor by IP address
1019 router_id: integer identifying neighbor router ID
1022 self
.nsm_change_cb
= callback
1023 await self
.req_nsm_states()
1031 async def async_main(args
):
1032 c
= OspfOpaqueClient(args
.server
)
1036 # Start handling async messages from server.
1037 if sys
.version_info
[1] > 6:
1038 asyncio
.create_task(c
._handle
_msg
_loop
())
1040 asyncio
.get_event_loop().create_task(c
._handle
_msg
_loop
())
1042 await c
.req_lsdb_sync()
1043 await c
.req_reachable_routers()
1044 await c
.req_ism_states()
1045 await c
.req_nsm_states()
1048 for action
in args
.actions
:
1049 _s
= action
.split(",")
1050 what
= _s
.pop(False)
1051 ltype
= int(_s
.pop(False))
1055 aval
= _s
.pop(False)
1057 addr
= ip(int(aval
))
1060 oargs
= [addr
, ltype
, int(_s
.pop(False)), int(_s
.pop(False))]
1061 assert len(_s
) <= 1, "Bad format for action argument"
1063 b
= bytes
.fromhex(_s
.pop(False))
1066 logging
.info("opaque data is %s octets", len(b
))
1067 # Needs to be multiple of 4 in length
1070 b
+= b
"\x00" * (4 - mod
)
1071 logging
.info("opaque padding to %s octets", len(b
))
1073 if what
.casefold() == "add":
1074 await c
.add_opaque_data(*oargs
, b
)
1076 assert what
.casefold().startswith("del")
1077 await c
.delete_opaque_data(*oargs
)
1080 except Exception as error
:
1081 logging
.error("async_main: unexpected error: %s", error
, exc_info
=True)
1085 logging
.info("Sleeping forever")
1087 await asyncio
.sleep(120)
1089 logging
.info("Got EOF from OSPF API server on async notify socket")
1094 ap
= argparse
.ArgumentParser(args
)
1095 ap
.add_argument("--exit", action
="store_true", help="Exit after commands")
1096 ap
.add_argument("--server", default
="localhost", help="OSPF API server")
1097 ap
.add_argument("-v", "--verbose", action
="store_true", help="be verbose")
1099 "actions", nargs
="*", help="(ADD|DEL),LSATYPE,[ADDR,],OTYPE,OID,[HEXDATA]"
1101 args
= ap
.parse_args()
1103 level
= logging
.DEBUG
if args
.verbose
else logging
.INFO
1104 logging
.basicConfig(
1105 level
=level
, format
="%(asctime)s %(levelname)s: CLIENT: %(name)s %(message)s"
1108 logging
.info("ospfclient: starting")
1112 if sys
.version_info
[1] > 6:
1114 status
= asyncio
.run(async_main(args
))
1116 loop
= asyncio
.get_event_loop()
1118 status
= loop
.run_until_complete(async_main(args
))
1121 except KeyboardInterrupt:
1122 logging
.info("Exiting, received KeyboardInterrupt in main")
1123 except Exception as error
:
1124 logging
.info("Exiting, unexpected exception %s", error
, exc_info
=True)
1126 logging
.info("ospfclient: clean exit")
1131 if __name__
== "__main__":
1132 exit_status
= main()
1133 sys
.exit(exit_status
)