]> git.proxmox.com Git - mirror_frr.git/blob - ospfclient/ospfclient.py
Merge pull request #12798 from donaldsharp/rib_match_multicast
[mirror_frr.git] / ospfclient / ospfclient.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
3 # SPDX-License-Identifier: GPL-2.0-or-later
4 #
5 # December 22 2021, Christian Hopps <chopps@labn.net>
6 #
7 # Copyright 2021-2022, LabN Consulting, L.L.C.
8 #
9
10 import argparse
11 import asyncio
12 import errno
13 import logging
14 import socket
15 import struct
16 import sys
17 from asyncio import Event, Lock
18 from ipaddress import ip_address as ip
19
20 FMT_APIMSGHDR = ">BBHL"
21 FMT_APIMSGHDR_SIZE = struct.calcsize(FMT_APIMSGHDR)
22
23 FMT_LSA_FILTER = ">HBB" # + plus x"I" areas
24 LSAF_ORIGIN_NON_SELF = 0
25 LSAF_ORIGIN_SELF = 1
26 LSAF_ORIGIN_ANY = 2
27
28 FMT_LSA_HEADER = ">HBBIILHH"
29 FMT_LSA_HEADER_SIZE = struct.calcsize(FMT_LSA_HEADER)
30
31 # ------------------------
32 # Messages to OSPF daemon.
33 # ------------------------
34
35 MSG_REGISTER_OPAQUETYPE = 1
36 MSG_UNREGISTER_OPAQUETYPE = 2
37 MSG_REGISTER_EVENT = 3
38 MSG_SYNC_LSDB = 4
39 MSG_ORIGINATE_REQUEST = 5
40 MSG_DELETE_REQUEST = 6
41 MSG_SYNC_REACHABLE = 7
42 MSG_SYNC_ISM = 8
43 MSG_SYNC_NSM = 9
44 MSG_SYNC_ROUTER_ID = 19
45
46 smsg_info = {
47 MSG_REGISTER_OPAQUETYPE: ("REGISTER_OPAQUETYPE", "BBxx"),
48 MSG_UNREGISTER_OPAQUETYPE: ("UNREGISTER_OPAQUETYPE", "BBxx"),
49 MSG_REGISTER_EVENT: ("REGISTER_EVENT", FMT_LSA_FILTER),
50 MSG_SYNC_LSDB: ("SYNC_LSDB", FMT_LSA_FILTER),
51 MSG_ORIGINATE_REQUEST: ("ORIGINATE_REQUEST", ">II" + FMT_LSA_HEADER[1:]),
52 MSG_DELETE_REQUEST: ("DELETE_REQUEST", ">IBBxBL"),
53 MSG_SYNC_REACHABLE: ("MSG_SYNC_REACHABLE", ""),
54 MSG_SYNC_ISM: ("MSG_SYNC_ISM", ""),
55 MSG_SYNC_NSM: ("MSG_SYNC_NSM", ""),
56 MSG_SYNC_ROUTER_ID: ("MSG_SYNC_ROUTER_ID", ""),
57 }
58
59 # OSPF API MSG Delete Flag.
60 OSPF_API_DEL_ZERO_LEN_LSA = 0x01 # send withdrawal with no LSA data
61
62 # --------------------------
63 # Messages from OSPF daemon.
64 # --------------------------
65
66 MSG_REPLY = 10
67 MSG_READY_NOTIFY = 11
68 MSG_LSA_UPDATE_NOTIFY = 12
69 MSG_LSA_DELETE_NOTIFY = 13
70 MSG_NEW_IF = 14
71 MSG_DEL_IF = 15
72 MSG_ISM_CHANGE = 16
73 MSG_NSM_CHANGE = 17
74 MSG_REACHABLE_CHANGE = 18
75 MSG_ROUTER_ID_CHANGE = 20
76
77 amsg_info = {
78 MSG_REPLY: ("REPLY", "bxxx"),
79 MSG_READY_NOTIFY: ("READY_NOTIFY", ">BBxxI"),
80 MSG_LSA_UPDATE_NOTIFY: ("LSA_UPDATE_NOTIFY", ">IIBxxx" + FMT_LSA_HEADER[1:]),
81 MSG_LSA_DELETE_NOTIFY: ("LSA_DELETE_NOTIFY", ">IIBxxx" + FMT_LSA_HEADER[1:]),
82 MSG_NEW_IF: ("NEW_IF", ">II"),
83 MSG_DEL_IF: ("DEL_IF", ">I"),
84 MSG_ISM_CHANGE: ("ISM_CHANGE", ">IIBxxx"),
85 MSG_NSM_CHANGE: ("NSM_CHANGE", ">IIIBxxx"),
86 MSG_REACHABLE_CHANGE: ("REACHABLE_CHANGE", ">HH"),
87 MSG_ROUTER_ID_CHANGE: ("ROUTER_ID_CHANGE", ">I"),
88 }
89
90 OSPF_API_OK = 0
91 OSPF_API_NOSUCHINTERFACE = -1
92 OSPF_API_NOSUCHAREA = -2
93 OSPF_API_NOSUCHLSA = -3
94 OSPF_API_ILLEGALLSATYPE = -4
95 OSPF_API_OPAQUETYPEINUSE = -5
96 OSPF_API_OPAQUETYPENOTREGISTERED = -6
97 OSPF_API_NOTREADY = -7
98 OSPF_API_NOMEMORY = -8
99 OSPF_API_ERROR = -9
100 OSPF_API_UNDEF = -10
101
102 msg_errname = {
103 OSPF_API_OK: "OSPF_API_OK",
104 OSPF_API_NOSUCHINTERFACE: "OSPF_API_NOSUCHINTERFACE",
105 OSPF_API_NOSUCHAREA: "OSPF_API_NOSUCHAREA",
106 OSPF_API_NOSUCHLSA: "OSPF_API_NOSUCHLSA",
107 OSPF_API_ILLEGALLSATYPE: "OSPF_API_ILLEGALLSATYPE",
108 OSPF_API_OPAQUETYPEINUSE: "OSPF_API_OPAQUETYPEINUSE",
109 OSPF_API_OPAQUETYPENOTREGISTERED: "OSPF_API_OPAQUETYPENOTREGISTERED",
110 OSPF_API_NOTREADY: "OSPF_API_NOTREADY",
111 OSPF_API_NOMEMORY: "OSPF_API_NOMEMORY",
112 OSPF_API_ERROR: "OSPF_API_ERROR",
113 OSPF_API_UNDEF: "OSPF_API_UNDEF",
114 }
115
116 # msg_info = {**smsg_info, **amsg_info}
117 msg_info = {}
118 msg_info.update(smsg_info)
119 msg_info.update(amsg_info)
120 msg_name = {k: v[0] for k, v in msg_info.items()}
121 msg_fmt = {k: v[1] for k, v in msg_info.items()}
122 msg_size = {k: struct.calcsize(v) for k, v in msg_fmt.items()}
123
124
125 def api_msgname(mt):
126 return msg_name.get(mt, str(mt))
127
128
129 def api_errname(ecode):
130 return msg_errname.get(ecode, str(ecode))
131
132
133 # -------------------
134 # API Semantic Errors
135 # -------------------
136
137
138 class APIError(Exception):
139 pass
140
141
142 class MsgTypeError(Exception):
143 pass
144
145
146 class SeqNumError(Exception):
147 pass
148
149
150 # ---------
151 # LSA Types
152 # ---------
153
154 LSA_TYPE_UNKNOWN = 0
155 LSA_TYPE_ROUTER = 1
156 LSA_TYPE_NETWORK = 2
157 LSA_TYPE_SUMMARY = 3
158 LSA_TYPE_ASBR_SUMMARY = 4
159 LSA_TYPE_AS_EXTERNAL = 5
160 LSA_TYPE_GROUP_MEMBER = 6
161 LSA_TYPE_AS_NSSA = 7
162 LSA_TYPE_EXTERNAL_ATTRIBUTES = 8
163 LSA_TYPE_OPAQUE_LINK = 9
164 LSA_TYPE_OPAQUE_AREA = 10
165 LSA_TYPE_OPAQUE_AS = 11
166
167
168 def lsa_typename(lsa_type):
169 names = {
170 LSA_TYPE_ROUTER: "LSA:ROUTER",
171 LSA_TYPE_NETWORK: "LSA:NETWORK",
172 LSA_TYPE_SUMMARY: "LSA:SUMMARY",
173 LSA_TYPE_ASBR_SUMMARY: "LSA:ASBR_SUMMARY",
174 LSA_TYPE_AS_EXTERNAL: "LSA:AS_EXTERNAL",
175 LSA_TYPE_GROUP_MEMBER: "LSA:GROUP_MEMBER",
176 LSA_TYPE_AS_NSSA: "LSA:AS_NSSA",
177 LSA_TYPE_EXTERNAL_ATTRIBUTES: "LSA:EXTERNAL_ATTRIBUTES",
178 LSA_TYPE_OPAQUE_LINK: "LSA:OPAQUE_LINK",
179 LSA_TYPE_OPAQUE_AREA: "LSA:OPAQUE_AREA",
180 LSA_TYPE_OPAQUE_AS: "LSA:OPAQUE_AS",
181 }
182 return names.get(lsa_type, str(lsa_type))
183
184
185 # ------------------------------
186 # Interface State Machine States
187 # ------------------------------
188
189 ISM_DEPENDUPON = 0
190 ISM_DOWN = 1
191 ISM_LOOPBACK = 2
192 ISM_WAITING = 3
193 ISM_POINTTOPOINT = 4
194 ISM_DROTHER = 5
195 ISM_BACKUP = 6
196 ISM_DR = 7
197
198
199 def ism_name(state):
200 names = {
201 ISM_DEPENDUPON: "ISM_DEPENDUPON",
202 ISM_DOWN: "ISM_DOWN",
203 ISM_LOOPBACK: "ISM_LOOPBACK",
204 ISM_WAITING: "ISM_WAITING",
205 ISM_POINTTOPOINT: "ISM_POINTTOPOINT",
206 ISM_DROTHER: "ISM_DROTHER",
207 ISM_BACKUP: "ISM_BACKUP",
208 ISM_DR: "ISM_DR",
209 }
210 return names.get(state, str(state))
211
212
213 # -----------------------------
214 # Neighbor State Machine States
215 # -----------------------------
216
217 NSM_DEPENDUPON = 0
218 NSM_DELETED = 1
219 NSM_DOWN = 2
220 NSM_ATTEMPT = 3
221 NSM_INIT = 4
222 NSM_TWOWAY = 5
223 NSM_EXSTART = 6
224 NSM_EXCHANGE = 7
225 NSM_LOADING = 8
226 NSM_FULL = 9
227
228
229 def nsm_name(state):
230 names = {
231 NSM_DEPENDUPON: "NSM_DEPENDUPON",
232 NSM_DELETED: "NSM_DELETED",
233 NSM_DOWN: "NSM_DOWN",
234 NSM_ATTEMPT: "NSM_ATTEMPT",
235 NSM_INIT: "NSM_INIT",
236 NSM_TWOWAY: "NSM_TWOWAY",
237 NSM_EXSTART: "NSM_EXSTART",
238 NSM_EXCHANGE: "NSM_EXCHANGE",
239 NSM_LOADING: "NSM_LOADING",
240 NSM_FULL: "NSM_FULL",
241 }
242 return names.get(state, str(state))
243
244
245 class WithNothing:
246 "An object that does nothing when used with `with` statement."
247
248 async def __aenter__(self):
249 return
250
251 async def __aexit__(self, *args, **kwargs):
252 return
253
254
255 # --------------
256 # Client Classes
257 # --------------
258
259
260 class OspfApiClient:
261 def __str__(self):
262 return "OspfApiClient({})".format(self.server)
263
264 @staticmethod
265 def _get_bound_sockets(port):
266 s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
267 try:
268 s1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
269 # s1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
270 s1.bind(("", port))
271 s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
272 try:
273 s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
274 # s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
275 s2.bind(("", port + 1))
276 return s1, s2
277 except Exception:
278 s2.close()
279 raise
280 except Exception:
281 s1.close()
282 raise
283
284 def __init__(self, server="localhost", handlers=None):
285 """A client connection to OSPF Daemon using the OSPF API
286
287 The client object is not created in a connected state. To connect to the server
288 the `connect` method should be called. If an error is encountered when sending
289 messages to the server an exception will be raised and the connection will be
290 closed. When this happens `connect` may be called again to restore the
291 connection.
292
293 Args:
294 server: hostname or IP address of server default is "localhost"
295 handlers: dict of message handlers, the key is the API message
296 type, the value is a function. The functions signature is:
297 `handler(msg_type, msg, msg_extra, *params)`, where `msg` is the
298 message data after the API header, `*params` will be the
299 unpacked message values, and msg_extra are any bytes beyond the
300 fixed parameters of the message.
301 Raises:
302 Will raise exceptions for failures with various `socket` modules
303 functions such as `socket.socket`, `socket.setsockopt`, `socket.bind`.
304 """
305 self._seq = 0
306 self._s = None
307 self._as = None
308 self._ls = None
309 self._ar = self._r = self._w = None
310 self.server = server
311 self.handlers = handlers if handlers is not None else dict()
312 self.write_lock = Lock()
313
314 # try and get consecutive 2 ports
315 PORTSTART = 49152
316 PORTEND = 65534
317 for port in range(PORTSTART, PORTEND + 2, 2):
318 try:
319 logging.debug("%s: binding to ports %s, %s", self, port, port + 1)
320 self._s, self._ls = self._get_bound_sockets(port)
321 break
322 except OSError as error:
323 if error.errno != errno.EADDRINUSE or port == PORTEND:
324 logging.warning("%s: binding port %s error %s", self, port, error)
325 raise
326 logging.debug("%s: ports %s, %s in use.", self, port, port + 1)
327 else:
328 assert False, "Should not reach this code execution point"
329
330 async def _connect_locked(self):
331 logging.debug("%s: connect to OSPF API", self)
332
333 loop = asyncio.get_event_loop()
334
335 self._ls.listen()
336 try:
337 logging.debug("%s: connecting sync socket to server", self)
338 await loop.sock_connect(self._s, (self.server, 2607))
339
340 logging.debug("%s: accepting connect from server", self)
341 self._as, _ = await loop.sock_accept(self._ls)
342 except Exception:
343 await self._close_locked()
344 raise
345
346 logging.debug("%s: success", self)
347 self._r, self._w = await asyncio.open_connection(sock=self._s)
348 self._ar, _ = await asyncio.open_connection(sock=self._as)
349 self._seq = 1
350
351 async def connect(self):
352 async with self.write_lock:
353 await self._connect_locked()
354
355 @property
356 def closed(self):
357 "True if the connection is closed."
358 return self._seq == 0
359
360 async def _close_locked(self):
361 logging.debug("%s: closing", self)
362 if self._s:
363 if self._w:
364 self._w.close()
365 await self._w.wait_closed()
366 self._w = None
367 else:
368 self._s.close()
369 self._s = None
370 self._r = None
371 assert self._w is None
372 if self._as:
373 self._as.close()
374 self._as = None
375 self._ar = None
376 if self._ls:
377 self._ls.close()
378 self._ls = None
379 self._seq = 0
380
381 async def close(self):
382 async with self.write_lock:
383 await self._close_locked()
384
385 @staticmethod
386 async def _msg_read(r, expseq=-1):
387 """Read an OSPF API message from the socket `r`
388
389 Args:
390 r: socket to read msg from
391 expseq: sequence number to expect or -1 for any.
392 Raises:
393 Will raise exceptions for failures with various `socket` modules,
394 Additionally may raise SeqNumError if unexpected seqnum is received.
395 """
396 try:
397 mh = await r.readexactly(FMT_APIMSGHDR_SIZE)
398 v, mt, l, seq = struct.unpack(FMT_APIMSGHDR, mh)
399 if v != 1:
400 raise Exception("received unexpected OSPF API version {}".format(v))
401 if expseq == -1:
402 logging.debug("_msg_read: got seq: 0x%x on async read", seq)
403 elif seq != expseq:
404 raise SeqNumError("rx {} != {}".format(seq, expseq))
405 msg = await r.readexactly(l) if l else b""
406 return mt, msg
407 except asyncio.IncompleteReadError:
408 raise EOFError
409
410 async def msg_read(self):
411 """Read a message from the async notify channel.
412
413 Raises:
414 May raise exceptions for failures with various `socket` modules.
415 """
416 return await OspfApiClient._msg_read(self._ar, -1)
417
418 async def msg_send(self, mt, mp):
419 """Send a message to OSPF API and wait for error code reply.
420
421 Args:
422 mt: the messaage type
423 mp: the message payload
424 Returns:
425 error: an OSPF_API_XXX error code, 0 for OK.
426 Raises:
427 Raises SeqNumError if the synchronous reply is the wrong sequence number;
428 MsgTypeError if the synchronous reply is not MSG_REPLY. Also,
429 may raise exceptions for failures with various `socket` modules,
430
431 The connection will be closed.
432 """
433 logging.debug("SEND: %s: sending %s seq 0x%x", self, api_msgname(mt), self._seq)
434 mh = struct.pack(FMT_APIMSGHDR, 1, mt, len(mp), self._seq)
435
436 seq = self._seq
437 self._seq = seq + 1
438
439 try:
440 async with self.write_lock:
441 self._w.write(mh + mp)
442 await self._w.drain()
443 mt, mp = await OspfApiClient._msg_read(self._r, seq)
444
445 if mt != MSG_REPLY:
446 raise MsgTypeError(
447 "rx {} != {}".format(api_msgname(mt), api_msgname(MSG_REPLY))
448 )
449
450 return struct.unpack(msg_fmt[MSG_REPLY], mp)[0]
451 except Exception:
452 # We've written data with a sequence number
453 await self.close()
454 raise
455
456 async def msg_send_raises(self, mt, mp=b"\x00" * 4):
457 """Send a message to OSPF API and wait for error code reply.
458
459 Args:
460 mt: the messaage type
461 mp: the message payload
462 Raises:
463 APIError if the server replies with an error.
464
465 Also may raise exceptions for failures with various `socket` modules,
466 as well as MsgTypeError if the synchronous reply is incorrect.
467 The connection will be closed for these non-API error exceptions.
468 """
469 ecode = await self.msg_send(mt, mp)
470 if ecode:
471 raise APIError("{} error {}".format(api_msgname(mt), api_errname(ecode)))
472
473 async def handle_async_msg(self, mt, msg):
474 if mt not in msg_fmt:
475 logging.debug("RECV: %s: unknown async msg type %s", self, mt)
476 return
477
478 fmt = msg_fmt[mt]
479 sz = msg_size[mt]
480 tup = struct.unpack(fmt, msg[:sz])
481 extra = msg[sz:]
482
483 if mt not in self.handlers:
484 logging.debug(
485 "RECV: %s: no handlers for msg type %s", self, api_msgname(mt)
486 )
487 return
488
489 logging.debug("RECV: %s: calling handler for %s", self, api_msgname(mt))
490 await self.handlers[mt](mt, msg, extra, *tup)
491
492 #
493 # Client to Server Messaging
494 #
495 @staticmethod
496 def lsa_type_mask(*lsa_types):
497 "Return a 16 bit mask for each LSA type passed."
498 if not lsa_types:
499 return 0xFFFF
500 mask = 0
501 for t in lsa_types:
502 assert 0 < t < 16, "LSA type {} out of range [1, 15]".format(t)
503 mask |= 1 << t
504 return mask
505
506 @staticmethod
507 def lsa_filter(origin, areas, lsa_types):
508 """Return an LSA filter.
509
510 Return the filter message bytes based on `origin` the `areas` list and the LSAs
511 types in the `lsa_types` list.
512 """
513 mask = OspfApiClient.lsa_type_mask(*lsa_types)
514 narea = len(areas)
515 fmt = FMT_LSA_FILTER + ("{}I".format(narea) if narea else "")
516 # lsa type mask, origin, number of areas, each area
517 return struct.pack(fmt, mask, origin, narea, *areas)
518
519 async def req_lsdb_sync(self):
520 "Register for all LSA notifications and request an LSDB synchronoization."
521 logging.debug("SEND: %s: request LSDB events", self)
522 mp = OspfApiClient.lsa_filter(LSAF_ORIGIN_ANY, [], [])
523 await self.msg_send_raises(MSG_REGISTER_EVENT, mp)
524
525 logging.debug("SEND: %s: request LSDB sync", self)
526 await self.msg_send_raises(MSG_SYNC_LSDB, mp)
527
528 async def req_reachable_routers(self):
529 "Request a dump of all reachable routers."
530 logging.debug("SEND: %s: request reachable changes", self)
531 await self.msg_send_raises(MSG_SYNC_REACHABLE)
532
533 async def req_ism_states(self):
534 "Request a dump of the current ISM states of all interfaces."
535 logging.debug("SEND: %s: request ISM changes", self)
536 await self.msg_send_raises(MSG_SYNC_ISM)
537
538 async def req_nsm_states(self):
539 "Request a dump of the current NSM states of all neighbors."
540 logging.debug("SEND: %s: request NSM changes", self)
541 await self.msg_send_raises(MSG_SYNC_NSM)
542
543 async def req_router_id_sync(self):
544 "Request a dump of the current NSM states of all neighbors."
545 logging.debug("SEND: %s: request router ID sync", self)
546 await self.msg_send_raises(MSG_SYNC_ROUTER_ID)
547
548
549 class OspfOpaqueClient(OspfApiClient):
550 """A client connection to OSPF Daemon for manipulating Opaque LSA data.
551
552 The client object is not created in a connected state. To connect to the server
553 the `connect` method should be called. If an error is encountered when sending
554 messages to the server an exception will be raised and the connection will be
555 closed. When this happens `connect` may be called again to restore the
556 connection.
557
558 Args:
559 server: hostname or IP address of server default is "localhost"
560 wait_ready: if True then wait for OSPF to signal ready, in newer versions
561 FRR ospfd is always ready so this overhead can be skipped.
562 default is False.
563
564 Raises:
565 Will raise exceptions for failures with various `socket` modules
566 functions such as `socket.socket`, `socket.setsockopt`, `socket.bind`.
567 """
568
569 def __init__(self, server="localhost", wait_ready=False):
570 handlers = {
571 MSG_LSA_UPDATE_NOTIFY: self._lsa_change_msg,
572 MSG_LSA_DELETE_NOTIFY: self._lsa_change_msg,
573 MSG_NEW_IF: self._if_msg,
574 MSG_DEL_IF: self._if_msg,
575 MSG_ISM_CHANGE: self._if_change_msg,
576 MSG_NSM_CHANGE: self._nbr_change_msg,
577 MSG_REACHABLE_CHANGE: self._reachable_msg,
578 MSG_ROUTER_ID_CHANGE: self._router_id_msg,
579 }
580 if wait_ready:
581 handlers[MSG_READY_NOTIFY] = self._ready_msg
582
583 super().__init__(server, handlers)
584
585 self.wait_ready = wait_ready
586 self.ready_lock = Lock() if wait_ready else WithNothing()
587 self.ready_cond = {
588 LSA_TYPE_OPAQUE_LINK: {},
589 LSA_TYPE_OPAQUE_AREA: {},
590 LSA_TYPE_OPAQUE_AS: {},
591 }
592 self.router_id = ip(0)
593 self.router_id_change_cb = None
594
595 self.lsid_seq_num = {}
596
597 self.lsa_change_cb = None
598 self.opaque_change_cb = {}
599
600 self.reachable_routers = set()
601 self.reachable_change_cb = None
602
603 self.if_area = {}
604 self.ism_states = {}
605 self.ism_change_cb = None
606
607 self.nsm_states = {}
608 self.nsm_change_cb = None
609
610 async def _register_opaque_data(self, lsa_type, otype):
611 async with self.ready_lock:
612 cond = self.ready_cond[lsa_type].get(otype)
613 assert cond is None, "multiple registers for {} opaque-type {}".format(
614 lsa_typename(lsa_type), otype
615 )
616
617 logging.debug("register %s opaque-type %s", lsa_typename(lsa_type), otype)
618
619 mt = MSG_REGISTER_OPAQUETYPE
620 mp = struct.pack(msg_fmt[mt], lsa_type, otype)
621 await self.msg_send_raises(mt, mp)
622
623 # If we are not waiting, mark ready for register check
624 if not self.wait_ready:
625 self.ready_cond[lsa_type][otype] = True
626
627 async def _handle_msg_loop(self):
628 try:
629 logging.debug("entering async msg handling loop")
630 while True:
631 mt, msg = await self.msg_read()
632 if mt in amsg_info:
633 await self.handle_async_msg(mt, msg)
634 else:
635 mts = api_msgname(mt)
636 logging.warning(
637 "ignoring unexpected msg: %s len: %s", mts, len(msg)
638 )
639 except EOFError:
640 logging.info("Got EOF from OSPF API server on async notify socket")
641 return 2
642
643 @staticmethod
644 def _opaque_args(lsa_type, otype, oid, mp):
645 lsid = (otype << 24) | oid
646 return 0, 0, lsa_type, lsid, 0, 0, 0, FMT_LSA_HEADER_SIZE + len(mp)
647
648 @staticmethod
649 def _make_opaque_lsa(lsa_type, otype, oid, mp):
650 # /* Make a new LSA from parameters */
651 lsa = struct.pack(
652 FMT_LSA_HEADER, *OspfOpaqueClient._opaque_args(lsa_type, otype, oid, mp)
653 )
654 lsa += mp
655 return lsa
656
657 async def _ready_msg(self, mt, msg, extra, lsa_type, otype, addr):
658 assert self.wait_ready
659
660 if lsa_type == LSA_TYPE_OPAQUE_LINK:
661 e = "ifaddr {}".format(ip(addr))
662 elif lsa_type == LSA_TYPE_OPAQUE_AREA:
663 e = "area {}".format(ip(addr))
664 else:
665 e = ""
666 logging.info(
667 "RECV: %s ready notify for %s opaque-type %s%s",
668 self,
669 lsa_typename(lsa_type),
670 otype,
671 e,
672 )
673
674 # Signal all waiting senders they can send now.
675 async with self.ready_lock:
676 cond = self.ready_cond[lsa_type].get(otype)
677 self.ready_cond[lsa_type][otype] = True
678
679 if cond is True:
680 logging.warning(
681 "RECV: dup ready received for %s opaque-type %s",
682 lsa_typename(lsa_type),
683 otype,
684 )
685 elif cond:
686 for evt in cond:
687 evt.set()
688
689 async def _if_msg(self, mt, msg, extra, *args):
690 if mt == MSG_NEW_IF:
691 ifaddr, aid = args
692 else:
693 assert mt == MSG_DEL_IF
694 ifaddr, aid = args[0], 0
695 logging.info(
696 "RECV: %s ifaddr %s areaid %s", api_msgname(mt), ip(ifaddr), ip(aid)
697 )
698
699 async def _if_change_msg(self, mt, msg, extra, ifaddr, aid, state):
700 ifaddr = ip(ifaddr)
701 aid = ip(aid)
702
703 logging.info(
704 "RECV: %s ifaddr %s areaid %s state %s",
705 api_msgname(mt),
706 ifaddr,
707 aid,
708 ism_name(state),
709 )
710
711 self.if_area[ifaddr] = aid
712 self.ism_states[ifaddr] = state
713
714 if self.ism_change_cb:
715 self.ism_change_cb(ifaddr, aid, state)
716
717 async def _nbr_change_msg(self, mt, msg, extra, ifaddr, nbraddr, router_id, state):
718 ifaddr = ip(ifaddr)
719 nbraddr = ip(nbraddr)
720 router_id = ip(router_id)
721
722 logging.info(
723 "RECV: %s ifaddr %s nbraddr %s router_id %s state %s",
724 api_msgname(mt),
725 ifaddr,
726 nbraddr,
727 router_id,
728 nsm_name(state),
729 )
730
731 if ifaddr not in self.nsm_states:
732 self.nsm_states[ifaddr] = {}
733 self.nsm_states[ifaddr][(nbraddr, router_id)] = state
734
735 if self.nsm_change_cb:
736 self.nsm_change_cb(ifaddr, nbraddr, router_id, state)
737
738 async def _lsa_change_msg(self, mt, msg, extra, ifaddr, aid, is_self, *ls_header):
739 (
740 lsa_age, # ls_age,
741 _, # ls_options,
742 lsa_type,
743 ls_id,
744 _, # ls_adv_router,
745 ls_seq,
746 _, # ls_cksum,
747 ls_len,
748 ) = ls_header
749
750 otype = (ls_id >> 24) & 0xFF
751
752 if mt == MSG_LSA_UPDATE_NOTIFY:
753 ts = "update"
754 else:
755 assert mt == MSG_LSA_DELETE_NOTIFY
756 ts = "delete"
757
758 logging.info(
759 "RECV: LSA %s msg for LSA %s in area %s seq 0x%x len %s age %s",
760 ts,
761 ip(ls_id),
762 ip(aid),
763 ls_seq,
764 ls_len,
765 lsa_age,
766 )
767 idx = (lsa_type, otype)
768
769 pre_lsa_size = msg_size[mt] - FMT_LSA_HEADER_SIZE
770 lsa = msg[pre_lsa_size:]
771
772 if idx in self.opaque_change_cb:
773 self.opaque_change_cb[idx](mt, ifaddr, aid, ls_header, extra, lsa)
774
775 if self.lsa_change_cb:
776 self.lsa_change_cb(mt, ifaddr, aid, ls_header, extra, lsa)
777
778 async def _reachable_msg(self, mt, msg, extra, nadd, nremove):
779 router_ids = struct.unpack(">{}I".format(nadd + nremove), extra)
780 router_ids = [ip(x) for x in router_ids]
781 logging.info(
782 "RECV: %s added %s removed %s",
783 api_msgname(mt),
784 router_ids[:nadd],
785 router_ids[nadd:],
786 )
787 self.reachable_routers |= set(router_ids[:nadd])
788 self.reachable_routers -= set(router_ids[nadd:])
789 logging.info("RECV: %s new set %s", api_msgname(mt), self.reachable_routers)
790
791 if self.reachable_change_cb:
792 logging.info("RECV: %s calling callback", api_msgname(mt))
793 await self.reachable_change_cb(router_ids[:nadd], router_ids[nadd:])
794
795 async def _router_id_msg(self, mt, msg, extra, router_id):
796 router_id = ip(router_id)
797 logging.info("RECV: %s router ID %s", api_msgname(mt), router_id)
798 old_router_id = self.router_id
799 if old_router_id == router_id:
800 return
801
802 self.router_id = router_id
803 logging.info(
804 "RECV: %s new router ID %s older router ID %s",
805 api_msgname(mt),
806 router_id,
807 old_router_id,
808 )
809
810 if self.router_id_change_cb:
811 logging.info("RECV: %s calling callback", api_msgname(mt))
812 await self.router_id_change_cb(router_id, old_router_id)
813
814 async def add_opaque_data(self, addr, lsa_type, otype, oid, data):
815 """Add an instance of opaque data.
816
817 Add an instance of opaque data. This call will register for the given
818 LSA and opaque type if not already done.
819
820 Args:
821 addr: depends on lsa_type, LINK => ifaddr, AREA => area ID, AS => ignored
822 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
823 otype: (octet) opaque type
824 oid: (3 octets) ID of this opaque data
825 data: the opaque data
826 Raises:
827 See `msg_send_raises`
828 """
829 assert self.ready_cond.get(lsa_type, {}).get(otype) is True, "Not Registered!"
830
831 if lsa_type == LSA_TYPE_OPAQUE_LINK:
832 ifaddr, aid = int(addr), 0
833 elif lsa_type == LSA_TYPE_OPAQUE_AREA:
834 ifaddr, aid = 0, int(addr)
835 else:
836 assert lsa_type == LSA_TYPE_OPAQUE_AS
837 ifaddr, aid = 0, 0
838
839 mt = MSG_ORIGINATE_REQUEST
840 msg = struct.pack(
841 msg_fmt[mt],
842 ifaddr,
843 aid,
844 *OspfOpaqueClient._opaque_args(lsa_type, otype, oid, data),
845 )
846 msg += data
847 await self.msg_send_raises(mt, msg)
848
849 async def delete_opaque_data(self, addr, lsa_type, otype, oid, flags=0):
850 """Delete an instance of opaque data.
851
852 Delete an instance of opaque data. This call will register for the given
853 LSA and opaque type if not already done.
854
855 Args:
856 addr: depends on lsa_type, LINK => ifaddr, AREA => area ID, AS => ignored
857 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
858 otype: (octet) opaque type.
859 oid: (3 octets) ID of this opaque data
860 flags: (octet) optional flags (e.g., OSPF_API_DEL_ZERO_LEN_LSA, defaults to no flags)
861 Raises:
862 See `msg_send_raises`
863 """
864 assert self.ready_cond.get(lsa_type, {}).get(otype) is True, "Not Registered!"
865
866 mt = MSG_DELETE_REQUEST
867 mp = struct.pack(msg_fmt[mt], int(addr), lsa_type, otype, flags, oid)
868 await self.msg_send_raises(mt, mp)
869
870 async def is_registered(self, lsa_type, otype):
871 """Determine if an (lsa_type, otype) tuple has been registered with FRR
872
873 This determines if the type has been registered, but not necessarily if it is
874 ready, if that is required use the `wait_opaque_ready` metheod.
875
876 Args:
877 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
878 otype: (octet) opaque type.
879 """
880 async with self.ready_lock:
881 return self.ready_cond.get(lsa_type, {}).get(otype) is not None
882
883 async def register_opaque_data(self, lsa_type, otype, callback=None):
884 """Register intent to advertise opaque data.
885
886 The application should wait for the async notificaiton that the server is
887 ready to advertise the given opaque data type. The API currently only allows
888 a single "owner" of each unique (lsa_type,otype). To wait call `wait_opaque_ready`
889
890 Args:
891 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
892 otype: (octet) opaque type.
893 callback: if given, callback will be called when changes are received for
894 LSA of the given (lsa_type, otype). The callbacks signature is:
895
896 `callback(msg_type, ifaddr, area_id, lsa_header, data, lsa)`
897
898 Args:
899 msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY
900 ifaddr: integer identifying an interface (by IP address)
901 area_id: integer identifying an area
902 lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH")
903 data: the opaque data that follows the LSA header
904 lsa: the octets of the full lsa
905 Raises:
906 See `msg_send_raises`
907 """
908 assert not await self.is_registered(
909 lsa_type, otype
910 ), "Registering registered type"
911
912 if callback:
913 self.opaque_change_cb[(lsa_type, otype)] = callback
914 elif (lsa_type, otype) in self.opaque_change_cb:
915 logging.warning(
916 "OSPFCLIENT: register: removing callback for %s opaque-type %s",
917 lsa_typename(lsa_type),
918 otype,
919 )
920 del self.opaque_change_cb[(lsa_type, otype)]
921
922 await self._register_opaque_data(lsa_type, otype)
923
924 async def wait_opaque_ready(self, lsa_type, otype):
925 async with self.ready_lock:
926 cond = self.ready_cond[lsa_type].get(otype)
927 if cond is True:
928 return
929
930 assert self.wait_ready
931
932 logging.debug(
933 "waiting for ready %s opaque-type %s", lsa_typename(lsa_type), otype
934 )
935
936 if not cond:
937 cond = self.ready_cond[lsa_type][otype] = []
938
939 evt = Event()
940 cond.append(evt)
941
942 await evt.wait()
943 logging.debug("READY for %s opaque-type %s", lsa_typename(lsa_type), otype)
944
945 async def register_opaque_data_wait(self, lsa_type, otype, callback=None):
946 """Register intent to advertise opaque data and wait for ready.
947
948 The API currently only allows a single "owner" of each unique (lsa_type,otype).
949
950 Args:
951 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
952 otype: (octet) opaque type.
953 callback: if given, callback will be called when changes are received for
954 LSA of the given (lsa_type, otype). The callbacks signature is:
955
956 `callback(msg_type, ifaddr, area_id, lsa_header, data, lsa)`
957
958 Args:
959 msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY
960 ifaddr: integer identifying an interface (by IP address)
961 area_id: integer identifying an area
962 lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH")
963 data: the opaque data that follows the LSA header
964 lsa: the octets of the full lsa
965 Raises:
966
967 See `msg_send_raises`
968 """
969 await self.register_opaque_data(lsa_type, otype, callback)
970 await self.wait_opaque_ready(lsa_type, otype)
971
972 async def unregister_opaque_data(self, lsa_type, otype):
973 """Unregister intent to advertise opaque data.
974
975 This will also cause the server to flush/delete all opaque data of
976 the given (lsa_type,otype).
977
978 Args:
979 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
980 otype: (octet) opaque type.
981 Raises:
982 See `msg_send_raises`
983 """
984 assert await self.is_registered(
985 lsa_type, otype
986 ), "Unregistering unregistered type"
987
988 if (lsa_type, otype) in self.opaque_change_cb:
989 del self.opaque_change_cb[(lsa_type, otype)]
990
991 mt = MSG_UNREGISTER_OPAQUETYPE
992 mp = struct.pack(msg_fmt[mt], lsa_type, otype)
993 await self.msg_send_raises(mt, mp)
994
995 async def monitor_lsa(self, callback=None):
996 """Monitor changes to LSAs.
997
998 Args:
999 callback: if given, callback will be called when changes are received for
1000 any LSA. The callback signature is:
1001
1002 `callback(msg_type, ifaddr, area_id, lsa_header, extra, lsa)`
1003
1004 Args:
1005 msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY
1006 ifaddr: integer identifying an interface (by IP address)
1007 area_id: integer identifying an area
1008 lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH")
1009 extra: the octets that follow the LSA header
1010 lsa: the octets of the full lsa
1011 """
1012 self.lsa_change_cb = callback
1013 await self.req_lsdb_sync()
1014
1015 async def monitor_reachable(self, callback=None):
1016 """Monitor the set of reachable routers.
1017
1018 The property `reachable_routers` contains the set() of reachable router IDs
1019 as integers. This set is updated prior to calling the `callback`
1020
1021 Args:
1022 callback: callback will be called when the set of reachable
1023 routers changes. The callback signature is:
1024
1025 `callback(added, removed)`
1026
1027 Args:
1028 added: list of integer router IDs being added
1029 removed: list of integer router IDs being removed
1030 """
1031 self.reachable_change_cb = callback
1032 await self.req_reachable_routers()
1033
1034 async def monitor_ism(self, callback=None):
1035 """Monitor the state of OSPF enabled interfaces.
1036
1037 Args:
1038 callback: callback will be called when an interface changes state.
1039 The callback signature is:
1040
1041 `callback(ifaddr, area_id, state)`
1042
1043 Args:
1044 ifaddr: integer identifying an interface (by IP address)
1045 area_id: integer identifying an area
1046 state: ISM_*
1047 """
1048 self.ism_change_cb = callback
1049 await self.req_ism_states()
1050
1051 async def monitor_nsm(self, callback=None):
1052 """Monitor the state of OSPF neighbors.
1053
1054 Args:
1055 callback: callback will be called when a neighbor changes state.
1056 The callback signature is:
1057
1058 `callback(ifaddr, nbr_addr, router_id, state)`
1059
1060 Args:
1061 ifaddr: integer identifying an interface (by IP address)
1062 nbr_addr: integer identifying neighbor by IP address
1063 router_id: integer identifying neighbor router ID
1064 state: NSM_*
1065 """
1066 self.nsm_change_cb = callback
1067 await self.req_nsm_states()
1068
1069 async def monitor_router_id(self, callback=None):
1070 """Monitor the OSPF router ID.
1071
1072 The property `router_id` contains the OSPF urouter ID.
1073 This value is updated prior to calling the `callback`
1074
1075 Args:
1076 callback: callback will be called when the router ID changes.
1077 The callback signature is:
1078
1079 `callback(new_router_id, old_router_id)`
1080
1081 Args:
1082 new_router_id: the new router ID
1083 old_router_id: the old router ID
1084 """
1085 self.router_id_change_cb = callback
1086 await self.req_router_id_sync()
1087
1088
1089 # ================
1090 # CLI/Script Usage
1091 # ================
1092 def next_action(action_list=None):
1093 "Get next action from list or STDIN"
1094 if action_list:
1095 for action in action_list:
1096 yield action
1097 else:
1098 while True:
1099 action = input("")
1100 if not action:
1101 break
1102 yield action.strip()
1103
1104
1105 async def async_main(args):
1106 c = OspfOpaqueClient(args.server)
1107 await c.connect()
1108
1109 try:
1110 # Start handling async messages from server.
1111 if sys.version_info[1] > 6:
1112 asyncio.create_task(c._handle_msg_loop())
1113 else:
1114 asyncio.get_event_loop().create_task(c._handle_msg_loop())
1115
1116 await c.req_lsdb_sync()
1117 await c.req_reachable_routers()
1118 await c.req_ism_states()
1119 await c.req_nsm_states()
1120
1121 for action in next_action(args.actions):
1122 _s = action.split(",")
1123 what = _s.pop(False)
1124 if what.casefold() == "wait":
1125 stime = int(_s.pop(False))
1126 logging.info("waiting %s seconds", stime)
1127 await asyncio.sleep(stime)
1128 logging.info("wait complete: %s seconds", stime)
1129 continue
1130 ltype = int(_s.pop(False))
1131 if ltype == 11:
1132 addr = ip(0)
1133 else:
1134 aval = _s.pop(False)
1135 try:
1136 addr = ip(int(aval))
1137 except ValueError:
1138 addr = ip(aval)
1139 oargs = [addr, ltype, int(_s.pop(False)), int(_s.pop(False))]
1140
1141 if not await c.is_registered(oargs[1], oargs[2]):
1142 await c.register_opaque_data_wait(oargs[1], oargs[2])
1143
1144 if what.casefold() == "add":
1145 try:
1146 b = bytes.fromhex(_s.pop(False))
1147 except IndexError:
1148 b = b""
1149 logging.info("opaque data is %s octets", len(b))
1150 # Needs to be multiple of 4 in length
1151 mod = len(b) % 4
1152 if mod:
1153 b += b"\x00" * (4 - mod)
1154 logging.info("opaque padding to %s octets", len(b))
1155
1156 await c.add_opaque_data(*oargs, b)
1157 else:
1158 assert what.casefold().startswith("del")
1159 f = 0
1160 if len(_s) >= 1:
1161 try:
1162 f = int(_s.pop(False))
1163 except IndexError:
1164 f = 0
1165 await c.delete_opaque_data(*oargs, f)
1166 if not args.actions or args.exit:
1167 return 0
1168 except Exception as error:
1169 logging.error("async_main: unexpected error: %s", error, exc_info=True)
1170 return 2
1171
1172 try:
1173 logging.info("Sleeping forever")
1174 while True:
1175 await asyncio.sleep(120)
1176 except EOFError:
1177 logging.info("Got EOF from OSPF API server on async notify socket")
1178 return 2
1179
1180
1181 def main(*args):
1182 ap = argparse.ArgumentParser(args)
1183 ap.add_argument("--logtag", default="CLIENT", help="tag to identify log messages")
1184 ap.add_argument("--exit", action="store_true", help="Exit after commands")
1185 ap.add_argument("--server", default="localhost", help="OSPF API server")
1186 ap.add_argument("-v", "--verbose", action="store_true", help="be verbose")
1187 ap.add_argument(
1188 "actions",
1189 nargs="*",
1190 help="WAIT,SEC|(ADD|DEL),LSATYPE,[ADDR,],OTYPE,OID,[HEXDATA|DEL_FLAG]",
1191 )
1192 args = ap.parse_args()
1193
1194 level = logging.DEBUG if args.verbose else logging.INFO
1195 logging.basicConfig(
1196 level=level,
1197 format="%(asctime)s %(levelname)s: {}: %(name)s %(message)s".format(
1198 args.logtag
1199 ),
1200 )
1201
1202 logging.info("ospfclient: starting")
1203
1204 status = 3
1205 try:
1206 if sys.version_info[1] > 6:
1207 # python >= 3.7
1208 status = asyncio.run(async_main(args))
1209 else:
1210 loop = asyncio.get_event_loop()
1211 try:
1212 status = loop.run_until_complete(async_main(args))
1213 finally:
1214 loop.close()
1215 except KeyboardInterrupt:
1216 logging.info("Exiting, received KeyboardInterrupt in main")
1217 except Exception as error:
1218 logging.info("Exiting, unexpected exception %s", error, exc_info=True)
1219 else:
1220 logging.info("ospfclient: clean exit")
1221
1222 return status
1223
1224
1225 if __name__ == "__main__":
1226 exit_status = main()
1227 sys.exit(exit_status)