]> git.proxmox.com Git - mirror_frr.git/blob - ospfclient/ospfclient.py
Merge pull request #11408 from donaldsharp/common_config
[mirror_frr.git] / ospfclient / ospfclient.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
3 #
4 # December 22 2021, Christian Hopps <chopps@labn.net>
5 #
6 # Copyright 2021, LabN Consulting, L.L.C.
7 #
8 # This program is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU General Public License
10 # as published by the Free Software Foundation; either version 2
11 # of the License, or (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License along
19 # with this program; see the file COPYING; if not, write to the Free Software
20 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 #
22
23 import argparse
24 import asyncio
25 import errno
26 import logging
27 import socket
28 import struct
29 import sys
30 from asyncio import Event, Lock
31 from ipaddress import ip_address as ip
32
33 FMT_APIMSGHDR = ">BBHL"
34 FMT_APIMSGHDR_SIZE = struct.calcsize(FMT_APIMSGHDR)
35
36 FMT_LSA_FILTER = ">HBB" # + plus x"I" areas
37 LSAF_ORIGIN_NON_SELF = 0
38 LSAF_ORIGIN_SELF = 1
39 LSAF_ORIGIN_ANY = 2
40
41 FMT_LSA_HEADER = ">HBBIILHH"
42 FMT_LSA_HEADER_SIZE = struct.calcsize(FMT_LSA_HEADER)
43
44 # ------------------------
45 # Messages to OSPF daemon.
46 # ------------------------
47
48 MSG_REGISTER_OPAQUETYPE = 1
49 MSG_UNREGISTER_OPAQUETYPE = 2
50 MSG_REGISTER_EVENT = 3
51 MSG_SYNC_LSDB = 4
52 MSG_ORIGINATE_REQUEST = 5
53 MSG_DELETE_REQUEST = 6
54 MSG_SYNC_REACHABLE = 7
55 MSG_SYNC_ISM = 8
56 MSG_SYNC_NSM = 9
57
58 smsg_info = {
59 MSG_REGISTER_OPAQUETYPE: ("REGISTER_OPAQUETYPE", "BBxx"),
60 MSG_UNREGISTER_OPAQUETYPE: ("UNREGISTER_OPAQUETYPE", "BBxx"),
61 MSG_REGISTER_EVENT: ("REGISTER_EVENT", FMT_LSA_FILTER),
62 MSG_SYNC_LSDB: ("SYNC_LSDB", FMT_LSA_FILTER),
63 MSG_ORIGINATE_REQUEST: ("ORIGINATE_REQUEST", ">II" + FMT_LSA_HEADER[1:]),
64 MSG_DELETE_REQUEST: ("DELETE_REQUEST", ">IBBxxL"),
65 MSG_SYNC_REACHABLE: ("MSG_SYNC_REACHABLE", ""),
66 MSG_SYNC_ISM: ("MSG_SYNC_ISM", ""),
67 MSG_SYNC_NSM: ("MSG_SYNC_NSM", ""),
68 }
69
70 # --------------------------
71 # Messages from OSPF daemon.
72 # --------------------------
73
74 MSG_REPLY = 10
75 MSG_READY_NOTIFY = 11
76 MSG_LSA_UPDATE_NOTIFY = 12
77 MSG_LSA_DELETE_NOTIFY = 13
78 MSG_NEW_IF = 14
79 MSG_DEL_IF = 15
80 MSG_ISM_CHANGE = 16
81 MSG_NSM_CHANGE = 17
82 MSG_REACHABLE_CHANGE = 18
83
84 amsg_info = {
85 MSG_REPLY: ("REPLY", "bxxx"),
86 MSG_READY_NOTIFY: ("READY_NOTIFY", ">BBxxI"),
87 MSG_LSA_UPDATE_NOTIFY: ("LSA_UPDATE_NOTIFY", ">IIBxxx" + FMT_LSA_HEADER[1:]),
88 MSG_LSA_DELETE_NOTIFY: ("LSA_DELETE_NOTIFY", ">IIBxxx" + FMT_LSA_HEADER[1:]),
89 MSG_NEW_IF: ("NEW_IF", ">II"),
90 MSG_DEL_IF: ("DEL_IF", ">I"),
91 MSG_ISM_CHANGE: ("ISM_CHANGE", ">IIBxxx"),
92 MSG_NSM_CHANGE: ("NSM_CHANGE", ">IIIBxxx"),
93 MSG_REACHABLE_CHANGE: ("REACHABLE_CHANGE", ">HH"),
94 }
95
96 OSPF_API_OK = 0
97 OSPF_API_NOSUCHINTERFACE = -1
98 OSPF_API_NOSUCHAREA = -2
99 OSPF_API_NOSUCHLSA = -3
100 OSPF_API_ILLEGALLSATYPE = -4
101 OSPF_API_OPAQUETYPEINUSE = -5
102 OSPF_API_OPAQUETYPENOTREGISTERED = -6
103 OSPF_API_NOTREADY = -7
104 OSPF_API_NOMEMORY = -8
105 OSPF_API_ERROR = -9
106 OSPF_API_UNDEF = -10
107
108 msg_errname = {
109 OSPF_API_OK: "OSPF_API_OK",
110 OSPF_API_NOSUCHINTERFACE: "OSPF_API_NOSUCHINTERFACE",
111 OSPF_API_NOSUCHAREA: "OSPF_API_NOSUCHAREA",
112 OSPF_API_NOSUCHLSA: "OSPF_API_NOSUCHLSA",
113 OSPF_API_ILLEGALLSATYPE: "OSPF_API_ILLEGALLSATYPE",
114 OSPF_API_OPAQUETYPEINUSE: "OSPF_API_OPAQUETYPEINUSE",
115 OSPF_API_OPAQUETYPENOTREGISTERED: "OSPF_API_OPAQUETYPENOTREGISTERED",
116 OSPF_API_NOTREADY: "OSPF_API_NOTREADY",
117 OSPF_API_NOMEMORY: "OSPF_API_NOMEMORY",
118 OSPF_API_ERROR: "OSPF_API_ERROR",
119 OSPF_API_UNDEF: "OSPF_API_UNDEF",
120 }
121
122 # msg_info = {**smsg_info, **amsg_info}
123 msg_info = {}
124 msg_info.update(smsg_info)
125 msg_info.update(amsg_info)
126 msg_name = {k: v[0] for k, v in msg_info.items()}
127 msg_fmt = {k: v[1] for k, v in msg_info.items()}
128 msg_size = {k: struct.calcsize(v) for k, v in msg_fmt.items()}
129
130
131 def api_msgname(mt):
132 return msg_name.get(mt, str(mt))
133
134
135 def api_errname(ecode):
136 return msg_errname.get(ecode, str(ecode))
137
138
139 # -------------------
140 # API Semantic Errors
141 # -------------------
142
143
144 class APIError(Exception):
145 pass
146
147
148 class MsgTypeError(Exception):
149 pass
150
151
152 class SeqNumError(Exception):
153 pass
154
155
156 # ---------
157 # LSA Types
158 # ---------
159
160 LSA_TYPE_UNKNOWN = 0
161 LSA_TYPE_ROUTER = 1
162 LSA_TYPE_NETWORK = 2
163 LSA_TYPE_SUMMARY = 3
164 LSA_TYPE_ASBR_SUMMARY = 4
165 LSA_TYPE_AS_EXTERNAL = 5
166 LSA_TYPE_GROUP_MEMBER = 6
167 LSA_TYPE_AS_NSSA = 7
168 LSA_TYPE_EXTERNAL_ATTRIBUTES = 8
169 LSA_TYPE_OPAQUE_LINK = 9
170 LSA_TYPE_OPAQUE_AREA = 10
171 LSA_TYPE_OPAQUE_AS = 11
172
173
174 def lsa_typename(lsa_type):
175 names = {
176 LSA_TYPE_ROUTER: "LSA:ROUTER",
177 LSA_TYPE_NETWORK: "LSA:NETWORK",
178 LSA_TYPE_SUMMARY: "LSA:SUMMARY",
179 LSA_TYPE_ASBR_SUMMARY: "LSA:ASBR_SUMMARY",
180 LSA_TYPE_AS_EXTERNAL: "LSA:AS_EXTERNAL",
181 LSA_TYPE_GROUP_MEMBER: "LSA:GROUP_MEMBER",
182 LSA_TYPE_AS_NSSA: "LSA:AS_NSSA",
183 LSA_TYPE_EXTERNAL_ATTRIBUTES: "LSA:EXTERNAL_ATTRIBUTES",
184 LSA_TYPE_OPAQUE_LINK: "LSA:OPAQUE_LINK",
185 LSA_TYPE_OPAQUE_AREA: "LSA:OPAQUE_AREA",
186 LSA_TYPE_OPAQUE_AS: "LSA:OPAQUE_AS",
187 }
188 return names.get(lsa_type, str(lsa_type))
189
190
191 # ------------------------------
192 # Interface State Machine States
193 # ------------------------------
194
195 ISM_DEPENDUPON = 0
196 ISM_DOWN = 1
197 ISM_LOOPBACK = 2
198 ISM_WAITING = 3
199 ISM_POINTTOPOINT = 4
200 ISM_DROTHER = 5
201 ISM_BACKUP = 6
202 ISM_DR = 7
203
204
205 def ism_name(state):
206 names = {
207 ISM_DEPENDUPON: "ISM_DEPENDUPON",
208 ISM_DOWN: "ISM_DOWN",
209 ISM_LOOPBACK: "ISM_LOOPBACK",
210 ISM_WAITING: "ISM_WAITING",
211 ISM_POINTTOPOINT: "ISM_POINTTOPOINT",
212 ISM_DROTHER: "ISM_DROTHER",
213 ISM_BACKUP: "ISM_BACKUP",
214 ISM_DR: "ISM_DR",
215 }
216 return names.get(state, str(state))
217
218
219 # -----------------------------
220 # Neighbor State Machine States
221 # -----------------------------
222
223 NSM_DEPENDUPON = 0
224 NSM_DELETED = 1
225 NSM_DOWN = 2
226 NSM_ATTEMPT = 3
227 NSM_INIT = 4
228 NSM_TWOWAY = 5
229 NSM_EXSTART = 6
230 NSM_EXCHANGE = 7
231 NSM_LOADING = 8
232 NSM_FULL = 9
233
234
235 def nsm_name(state):
236 names = {
237 NSM_DEPENDUPON: "NSM_DEPENDUPON",
238 NSM_DELETED: "NSM_DELETED",
239 NSM_DOWN: "NSM_DOWN",
240 NSM_ATTEMPT: "NSM_ATTEMPT",
241 NSM_INIT: "NSM_INIT",
242 NSM_TWOWAY: "NSM_TWOWAY",
243 NSM_EXSTART: "NSM_EXSTART",
244 NSM_EXCHANGE: "NSM_EXCHANGE",
245 NSM_LOADING: "NSM_LOADING",
246 NSM_FULL: "NSM_FULL",
247 }
248 return names.get(state, str(state))
249
250
251 # --------------
252 # Client Classes
253 # --------------
254
255
256 class OspfApiClient:
257 def __str__(self):
258 return "OspfApiClient({})".format(self.server)
259
260 @staticmethod
261 def _get_bound_sockets(port):
262 s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
263 try:
264 s1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
265 # s1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
266 s1.bind(("", port))
267 s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
268 try:
269 s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
270 # s2.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
271 s2.bind(("", port + 1))
272 return s1, s2
273 except Exception:
274 s2.close()
275 raise
276 except Exception:
277 s1.close()
278 raise
279
280 def __init__(self, server="localhost", handlers=None):
281 """A client connection to OSPF Daemon using the OSPF API
282
283 The client object is not created in a connected state. To connect to the server
284 the `connect` method should be called. If an error is encountered when sending
285 messages to the server an exception will be raised and the connection will be
286 closed. When this happens `connect` may be called again to restore the
287 connection.
288
289 Args:
290 server: hostname or IP address of server default is "localhost"
291 handlers: dict of message handlers, the key is the API message
292 type, the value is a function. The functions signature is:
293 `handler(msg_type, msg, msg_extra, *params)`, where `msg` is the
294 message data after the API header, `*params` will be the
295 unpacked message values, and msg_extra are any bytes beyond the
296 fixed parameters of the message.
297 Raises:
298 Will raise exceptions for failures with various `socket` modules
299 functions such as `socket.socket`, `socket.setsockopt`, `socket.bind`.
300 """
301 self._seq = 0
302 self._s = None
303 self._as = None
304 self._ls = None
305 self._ar = self._r = self._w = None
306 self.server = server
307 self.handlers = handlers if handlers is not None else dict()
308 self.write_lock = Lock()
309
310 # try and get consecutive 2 ports
311 PORTSTART = 49152
312 PORTEND = 65534
313 for port in range(PORTSTART, PORTEND + 2, 2):
314 try:
315 logging.debug("%s: binding to ports %s, %s", self, port, port + 1)
316 self._s, self._ls = self._get_bound_sockets(port)
317 break
318 except OSError as error:
319 if error.errno != errno.EADDRINUSE or port == PORTEND:
320 logging.warning("%s: binding port %s error %s", self, port, error)
321 raise
322 logging.debug("%s: ports %s, %s in use.", self, port, port + 1)
323 else:
324 assert False, "Should not reach this code execution point"
325
326 async def _connect_locked(self):
327 logging.debug("%s: connect to OSPF API", self)
328
329 loop = asyncio.get_event_loop()
330
331 self._ls.listen()
332 try:
333 logging.debug("%s: connecting sync socket to server", self)
334 await loop.sock_connect(self._s, (self.server, 2607))
335
336 logging.debug("%s: accepting connect from server", self)
337 self._as, _ = await loop.sock_accept(self._ls)
338 except Exception:
339 await self._close_locked()
340 raise
341
342 logging.debug("%s: success", self)
343 self._r, self._w = await asyncio.open_connection(sock=self._s)
344 self._ar, _ = await asyncio.open_connection(sock=self._as)
345 self._seq = 1
346
347 async def connect(self):
348 async with self.write_lock:
349 await self._connect_locked()
350
351 @property
352 def closed(self):
353 "True if the connection is closed."
354 return self._seq == 0
355
356 async def _close_locked(self):
357 logging.debug("%s: closing", self)
358 if self._s:
359 if self._w:
360 self._w.close()
361 await self._w.wait_closed()
362 self._w = None
363 else:
364 self._s.close()
365 self._s = None
366 self._r = None
367 assert self._w is None
368 if self._as:
369 self._as.close()
370 self._as = None
371 self._ar = None
372 if self._ls:
373 self._ls.close()
374 self._ls = None
375 self._seq = 0
376
377 async def close(self):
378 async with self.write_lock:
379 await self._close_locked()
380
381 @staticmethod
382 async def _msg_read(r, expseq=-1):
383 """Read an OSPF API message from the socket `r`
384
385 Args:
386 r: socket to read msg from
387 expseq: sequence number to expect or -1 for any.
388 Raises:
389 Will raise exceptions for failures with various `socket` modules,
390 Additionally may raise SeqNumError if unexpected seqnum is received.
391 """
392 try:
393 mh = await r.readexactly(FMT_APIMSGHDR_SIZE)
394 v, mt, l, seq = struct.unpack(FMT_APIMSGHDR, mh)
395 if v != 1:
396 raise Exception("received unexpected OSPF API version {}".format(v))
397 if expseq == -1:
398 logging.debug("_msg_read: got seq: 0x%x on async read", seq)
399 elif seq != expseq:
400 raise SeqNumError("rx {} != {}".format(seq, expseq))
401 msg = await r.readexactly(l) if l else b""
402 return mt, msg
403 except asyncio.IncompleteReadError:
404 raise EOFError
405
406 async def msg_read(self):
407 """Read a message from the async notify channel.
408
409 Raises:
410 May raise exceptions for failures with various `socket` modules.
411 """
412 return await OspfApiClient._msg_read(self._ar, -1)
413
414 async def msg_send(self, mt, mp):
415 """Send a message to OSPF API and wait for error code reply.
416
417 Args:
418 mt: the messaage type
419 mp: the message payload
420 Returns:
421 error: an OSPF_API_XXX error code, 0 for OK.
422 Raises:
423 Raises SeqNumError if the synchronous reply is the wrong sequence number;
424 MsgTypeError if the synchronous reply is not MSG_REPLY. Also,
425 may raise exceptions for failures with various `socket` modules,
426
427 The connection will be closed.
428 """
429 logging.debug("SEND: %s: sending %s seq 0x%x", self, api_msgname(mt), self._seq)
430 mh = struct.pack(FMT_APIMSGHDR, 1, mt, len(mp), self._seq)
431
432 seq = self._seq
433 self._seq = seq + 1
434
435 try:
436 async with self.write_lock:
437 self._w.write(mh + mp)
438 await self._w.drain()
439 mt, mp = await OspfApiClient._msg_read(self._r, seq)
440
441 if mt != MSG_REPLY:
442 raise MsgTypeError(
443 "rx {} != {}".format(api_msgname(mt), api_msgname(MSG_REPLY))
444 )
445
446 return struct.unpack(msg_fmt[MSG_REPLY], mp)[0]
447 except Exception:
448 # We've written data with a sequence number
449 await self.close()
450 raise
451
452 async def msg_send_raises(self, mt, mp=b"\x00" * 4):
453 """Send a message to OSPF API and wait for error code reply.
454
455 Args:
456 mt: the messaage type
457 mp: the message payload
458 Raises:
459 APIError if the server replies with an error.
460
461 Also may raise exceptions for failures with various `socket` modules,
462 as well as MsgTypeError if the synchronous reply is incorrect.
463 The connection will be closed for these non-API error exceptions.
464 """
465 ecode = await self.msg_send(mt, mp)
466 if ecode:
467 raise APIError("{} error {}".format(api_msgname(mt), api_errname(ecode)))
468
469 async def handle_async_msg(self, mt, msg):
470 if mt not in msg_fmt:
471 logging.debug("RECV: %s: unknown async msg type %s", self, mt)
472 return
473
474 fmt = msg_fmt[mt]
475 sz = msg_size[mt]
476 tup = struct.unpack(fmt, msg[:sz])
477 extra = msg[sz:]
478
479 if mt not in self.handlers:
480 logging.debug(
481 "RECV: %s: no handlers for msg type %s", self, api_msgname(mt)
482 )
483 return
484
485 logging.debug("RECV: %s: calling handler for %s", self, api_msgname(mt))
486 await self.handlers[mt](mt, msg, extra, *tup)
487
488 #
489 # Client to Server Messaging
490 #
491 @staticmethod
492 def lsa_type_mask(*lsa_types):
493 "Return a 16 bit mask for each LSA type passed."
494 if not lsa_types:
495 return 0xFFFF
496 mask = 0
497 for t in lsa_types:
498 assert 0 < t < 16, "LSA type {} out of range [1, 15]".format(t)
499 mask |= 1 << t
500 return mask
501
502 @staticmethod
503 def lsa_filter(origin, areas, lsa_types):
504 """Return an LSA filter.
505
506 Return the filter message bytes based on `origin` the `areas` list and the LSAs
507 types in the `lsa_types` list.
508 """
509 mask = OspfApiClient.lsa_type_mask(*lsa_types)
510 narea = len(areas)
511 fmt = FMT_LSA_FILTER + ("{}I".format(narea) if narea else "")
512 # lsa type mask, origin, number of areas, each area
513 return struct.pack(fmt, mask, origin, narea, *areas)
514
515 async def req_lsdb_sync(self):
516 "Register for all LSA notifications and request an LSDB synchronoization."
517 logging.debug("SEND: %s: request LSDB events", self)
518 mp = OspfApiClient.lsa_filter(LSAF_ORIGIN_ANY, [], [])
519 await self.msg_send_raises(MSG_REGISTER_EVENT, mp)
520
521 logging.debug("SEND: %s: request LSDB sync", self)
522 await self.msg_send_raises(MSG_SYNC_LSDB, mp)
523
524 async def req_reachable_routers(self):
525 "Request a dump of all reachable routers."
526 logging.debug("SEND: %s: request reachable changes", self)
527 await self.msg_send_raises(MSG_SYNC_REACHABLE)
528
529 async def req_ism_states(self):
530 "Request a dump of the current ISM states of all interfaces."
531 logging.debug("SEND: %s: request ISM changes", self)
532 await self.msg_send_raises(MSG_SYNC_ISM)
533
534 async def req_nsm_states(self):
535 "Request a dump of the current NSM states of all neighbors."
536 logging.debug("SEND: %s: request NSM changes", self)
537 await self.msg_send_raises(MSG_SYNC_NSM)
538
539
540 class OspfOpaqueClient(OspfApiClient):
541 """A client connection to OSPF Daemon for manipulating Opaque LSA data.
542
543 The client object is not created in a connected state. To connect to the server
544 the `connect` method should be called. If an error is encountered when sending
545 messages to the server an exception will be raised and the connection will be
546 closed. When this happens `connect` may be called again to restore the
547 connection.
548
549 Args:
550 server: hostname or IP address of server default is "localhost"
551
552 Raises:
553 Will raise exceptions for failures with various `socket` modules
554 functions such as `socket.socket`, `socket.setsockopt`, `socket.bind`.
555 """
556
557 def __init__(self, server="localhost"):
558 handlers = {
559 MSG_READY_NOTIFY: self._ready_msg,
560 MSG_LSA_UPDATE_NOTIFY: self._lsa_change_msg,
561 MSG_LSA_DELETE_NOTIFY: self._lsa_change_msg,
562 MSG_NEW_IF: self._if_msg,
563 MSG_DEL_IF: self._if_msg,
564 MSG_ISM_CHANGE: self._if_change_msg,
565 MSG_NSM_CHANGE: self._nbr_change_msg,
566 MSG_REACHABLE_CHANGE: self._reachable_msg,
567 }
568 super().__init__(server, handlers)
569
570 self.ready_lock = Lock()
571 self.ready_cond = {
572 LSA_TYPE_OPAQUE_LINK: {},
573 LSA_TYPE_OPAQUE_AREA: {},
574 LSA_TYPE_OPAQUE_AS: {},
575 }
576 self.lsid_seq_num = {}
577
578 self.lsa_change_cb = None
579 self.opaque_change_cb = {}
580
581 self.reachable_routers = set()
582 self.reachable_change_cb = None
583
584 self.if_area = {}
585 self.ism_states = {}
586 self.ism_change_cb = None
587
588 self.nsm_states = {}
589 self.nsm_change_cb = None
590
591 async def _register_opaque_data(self, lsa_type, otype):
592 async with self.ready_lock:
593 cond = self.ready_cond[lsa_type].get(otype)
594 assert cond is None, "multiple registers for {} opaque-type {}".format(
595 lsa_typename(lsa_type), otype
596 )
597
598 logging.debug("register %s opaque-type %s", lsa_typename(lsa_type), otype)
599
600 mt = MSG_REGISTER_OPAQUETYPE
601 mp = struct.pack(msg_fmt[mt], lsa_type, otype)
602 await self.msg_send_raises(mt, mp)
603
604 async def _assure_opaque_ready(self, lsa_type, otype):
605 async with self.ready_lock:
606 if self.ready_cond[lsa_type].get(otype) is True:
607 return
608
609 await self._register_opaque_data(lsa_type, otype)
610 await self.wait_opaque_ready(lsa_type, otype)
611
612 async def _handle_msg_loop(self):
613 try:
614 logging.debug("entering async msg handling loop")
615 while True:
616 mt, msg = await self.msg_read()
617 if mt in amsg_info:
618 await self.handle_async_msg(mt, msg)
619 else:
620 mts = api_msgname(mt)
621 logging.warning(
622 "ignoring unexpected msg: %s len: %s", mts, len(msg)
623 )
624 except EOFError:
625 logging.info("Got EOF from OSPF API server on async notify socket")
626 return 2
627
628 @staticmethod
629 def _opaque_args(lsa_type, otype, oid, mp):
630 lsid = (otype << 24) | oid
631 return 0, 0, lsa_type, lsid, 0, 0, 0, FMT_LSA_HEADER_SIZE + len(mp)
632
633 @staticmethod
634 def _make_opaque_lsa(lsa_type, otype, oid, mp):
635 # /* Make a new LSA from parameters */
636 lsa = struct.pack(
637 FMT_LSA_HEADER, *OspfOpaqueClient._opaque_args(lsa_type, otype, oid, mp)
638 )
639 lsa += mp
640 return lsa
641
642 async def _ready_msg(self, mt, msg, extra, lsa_type, otype, addr):
643 if lsa_type == LSA_TYPE_OPAQUE_LINK:
644 e = "ifaddr {}".format(ip(addr))
645 elif lsa_type == LSA_TYPE_OPAQUE_AREA:
646 e = "area {}".format(ip(addr))
647 else:
648 e = ""
649 logging.info(
650 "RECV: %s ready notify for %s opaque-type %s%s",
651 self,
652 lsa_typename(lsa_type),
653 otype,
654 e,
655 )
656
657 # Signal all waiting senders they can send now.
658 async with self.ready_lock:
659 cond = self.ready_cond[lsa_type].get(otype)
660 self.ready_cond[lsa_type][otype] = True
661
662 if cond is True:
663 logging.warning(
664 "RECV: dup ready received for %s opaque-type %s",
665 lsa_typename(lsa_type),
666 otype,
667 )
668 elif cond:
669 for evt in cond:
670 evt.set()
671
672 async def _if_msg(self, mt, msg, extra, *args):
673 if mt == MSG_NEW_IF:
674 ifaddr, aid = args
675 else:
676 assert mt == MSG_DEL_IF
677 ifaddr, aid = args[0], 0
678 logging.info(
679 "RECV: %s ifaddr %s areaid %s", api_msgname(mt), ip(ifaddr), ip(aid)
680 )
681
682 async def _if_change_msg(self, mt, msg, extra, ifaddr, aid, state):
683 ifaddr = ip(ifaddr)
684 aid = ip(aid)
685
686 logging.info(
687 "RECV: %s ifaddr %s areaid %s state %s",
688 api_msgname(mt),
689 ifaddr,
690 aid,
691 ism_name(state),
692 )
693
694 self.if_area[ifaddr] = aid
695 self.ism_states[ifaddr] = state
696
697 if self.ism_change_cb:
698 self.ism_change_cb(ifaddr, aid, state)
699
700 async def _nbr_change_msg(self, mt, msg, extra, ifaddr, nbraddr, router_id, state):
701 ifaddr = ip(ifaddr)
702 nbraddr = ip(nbraddr)
703 router_id = ip(router_id)
704
705 logging.info(
706 "RECV: %s ifaddr %s nbraddr %s router_id %s state %s",
707 api_msgname(mt),
708 ifaddr,
709 nbraddr,
710 router_id,
711 nsm_name(state),
712 )
713
714 if ifaddr not in self.nsm_states:
715 self.nsm_states[ifaddr] = {}
716 self.nsm_states[ifaddr][(nbraddr, router_id)] = state
717
718 if self.nsm_change_cb:
719 self.nsm_change_cb(ifaddr, nbraddr, router_id, state)
720
721 async def _lsa_change_msg(self, mt, msg, extra, ifaddr, aid, is_self, *ls_header):
722 (
723 lsa_age, # ls_age,
724 _, # ls_options,
725 lsa_type,
726 ls_id,
727 _, # ls_adv_router,
728 ls_seq,
729 _, # ls_cksum,
730 ls_len,
731 ) = ls_header
732
733 otype = (ls_id >> 24) & 0xFF
734
735 if mt == MSG_LSA_UPDATE_NOTIFY:
736 ts = "update"
737 else:
738 assert mt == MSG_LSA_DELETE_NOTIFY
739 ts = "delete"
740
741 logging.info(
742 "RECV: LSA %s msg for LSA %s in area %s seq 0x%x len %s age %s",
743 ts,
744 ip(ls_id),
745 ip(aid),
746 ls_seq,
747 ls_len,
748 lsa_age,
749 )
750 idx = (lsa_type, otype)
751
752 pre_lsa_size = msg_size[mt] - FMT_LSA_HEADER_SIZE
753 lsa = msg[pre_lsa_size:]
754
755 if idx in self.opaque_change_cb:
756 self.opaque_change_cb[idx](mt, ifaddr, aid, ls_header, extra, lsa)
757
758 if self.lsa_change_cb:
759 self.lsa_change_cb(mt, ifaddr, aid, ls_header, extra, lsa)
760
761 async def _reachable_msg(self, mt, msg, extra, nadd, nremove):
762 router_ids = struct.unpack(">{}I".format(nadd + nremove), extra)
763 router_ids = [ip(x) for x in router_ids]
764 logging.info(
765 "RECV: %s added %s removed %s",
766 api_msgname(mt),
767 router_ids[:nadd],
768 router_ids[nadd:],
769 )
770 self.reachable_routers |= set(router_ids[:nadd])
771 self.reachable_routers -= set(router_ids[nadd:])
772 logging.info("RECV: %s new set %s", api_msgname(mt), self.reachable_routers)
773
774 if self.reachable_change_cb:
775 logging.info("RECV: %s calling callback", api_msgname(mt))
776 await self.reachable_change_cb(router_ids[:nadd], router_ids[nadd:])
777
778 async def add_opaque_data(self, addr, lsa_type, otype, oid, data):
779 """Add an instance of opaque data.
780
781 Add an instance of opaque data. This call will register for the given
782 LSA and opaque type if not already done.
783
784 Args:
785 addr: depends on lsa_type, LINK => ifaddr, AREA => area ID, AS => ignored
786 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
787 otype: (octet) opaque type
788 oid: (3 octets) ID of this opaque data
789 data: the opaque data
790 Raises:
791 See `msg_send_raises`
792 """
793
794 if lsa_type == LSA_TYPE_OPAQUE_LINK:
795 ifaddr, aid = int(addr), 0
796 elif lsa_type == LSA_TYPE_OPAQUE_AREA:
797 ifaddr, aid = 0, int(addr)
798 else:
799 assert lsa_type == LSA_TYPE_OPAQUE_AS
800 ifaddr, aid = 0, 0
801
802 mt = MSG_ORIGINATE_REQUEST
803 msg = struct.pack(
804 msg_fmt[mt],
805 ifaddr,
806 aid,
807 *OspfOpaqueClient._opaque_args(lsa_type, otype, oid, data),
808 )
809 msg += data
810 await self._assure_opaque_ready(lsa_type, otype)
811 await self.msg_send_raises(mt, msg)
812
813 async def delete_opaque_data(self, addr, lsa_type, otype, oid):
814 """Delete an instance of opaque data.
815
816 Delete an instance of opaque data. This call will register for the given
817 LSA and opaque type if not already done.
818
819 Args:
820 addr: depends on lsa_type, LINK => ifaddr, AREA => area ID, AS => ignored
821 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
822 otype: (octet) opaque type. Note: the type will be registered if the user
823 has not explicity done that yet with `register_opaque_data`.
824 oid: (3 octets) ID of this opaque data
825 Raises:
826 See `msg_send_raises`
827 """
828 if (lsa_type, otype) in self.opaque_change_cb:
829 del self.opaque_change_cb[(lsa_type, otype)]
830
831 mt = MSG_DELETE_REQUEST
832 await self._assure_opaque_ready(lsa_type, otype)
833 mp = struct.pack(msg_fmt[mt], int(addr), lsa_type, otype, oid)
834 await self.msg_send_raises(mt, mp)
835
836 async def register_opaque_data(self, lsa_type, otype, callback=None):
837 """Register intent to advertise opaque data.
838
839 The application should wait for the async notificaiton that the server is
840 ready to advertise the given opaque data type. The API currently only allows
841 a single "owner" of each unique (lsa_type,otype). To wait call `wait_opaque_ready`
842
843 Args:
844 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
845 otype: (octet) opaque type. Note: the type will be registered if the user
846 has not explicity done that yet with `register_opaque_data`.
847 callback: if given, callback will be called when changes are received for
848 LSA of the given (lsa_type, otype). The callbacks signature is:
849
850 `callback(msg_type, ifaddr, area_id, lsa_header, data, lsa)`
851
852 Args:
853 msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY
854 ifaddr: integer identifying an interface (by IP address)
855 area_id: integer identifying an area
856 lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH")
857 data: the opaque data that follows the LSA header
858 lsa: the octets of the full lsa
859 Raises:
860 See `msg_send_raises`
861 """
862 if callback:
863 self.opaque_change_cb[(lsa_type, otype)] = callback
864 elif (lsa_type, otype) in self.opaque_change_cb:
865 logging.warning(
866 "OSPFCLIENT: register: removing callback for %s opaque-type %s",
867 lsa_typename(lsa_type),
868 otype,
869 )
870 del self.opaque_change_cb[(lsa_type, otype)]
871
872 await self._register_opaque_data(lsa_type, otype)
873
874 async def wait_opaque_ready(self, lsa_type, otype):
875 async with self.ready_lock:
876 cond = self.ready_cond[lsa_type].get(otype)
877 if cond is True:
878 return
879
880 logging.debug(
881 "waiting for ready %s opaque-type %s", lsa_typename(lsa_type), otype
882 )
883
884 if not cond:
885 cond = self.ready_cond[lsa_type][otype] = []
886
887 evt = Event()
888 cond.append(evt)
889
890 await evt.wait()
891 logging.debug("READY for %s opaque-type %s", lsa_typename(lsa_type), otype)
892
893 async def register_opaque_data_wait(self, lsa_type, otype, callback=None):
894 """Register intent to advertise opaque data and wait for ready.
895
896 The API currently only allows a single "owner" of each unique (lsa_type,otype).
897
898 Args:
899 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
900 otype: (octet) opaque type. Note: the type will be registered if the user
901 has not explicity done that yet with `register_opaque_data`.
902 callback: if given, callback will be called when changes are received for
903 LSA of the given (lsa_type, otype). The callbacks signature is:
904
905 `callback(msg_type, ifaddr, area_id, lsa_header, data, lsa)`
906
907 Args:
908 msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY
909 ifaddr: integer identifying an interface (by IP address)
910 area_id: integer identifying an area
911 lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH")
912 data: the opaque data that follows the LSA header
913 lsa: the octets of the full lsa
914 Raises:
915
916 See `msg_send_raises`
917 """
918 if callback:
919 self.opaque_change_cb[(lsa_type, otype)] = callback
920 elif (lsa_type, otype) in self.opaque_change_cb:
921 logging.warning(
922 "OSPFCLIENT: register: removing callback for %s opaque-type %s",
923 lsa_typename(lsa_type),
924 otype,
925 )
926 del self.opaque_change_cb[(lsa_type, otype)]
927
928 return await self._assure_opaque_ready(lsa_type, otype)
929
930 async def unregister_opaque_data(self, lsa_type, otype):
931 """Unregister intent to advertise opaque data.
932
933 This will also cause the server to flush/delete all opaque data of
934 the given (lsa_type,otype).
935
936 Args:
937 lsa_type: LSA_TYPE_OPAQUE_{LINK,AREA,AS}
938 otype: (octet) opaque type. Note: the type will be registered if the user
939 has not explicity done that yet with `register_opaque_data`.
940 Raises:
941 See `msg_send_raises`
942 """
943
944 if (lsa_type, otype) in self.opaque_change_cb:
945 del self.opaque_change_cb[(lsa_type, otype)]
946
947 mt = MSG_UNREGISTER_OPAQUETYPE
948 mp = struct.pack(msg_fmt[mt], lsa_type, otype)
949 await self.msg_send_raises(mt, mp)
950
951 async def monitor_lsa(self, callback=None):
952 """Monitor changes to LSAs.
953
954 Args:
955 callback: if given, callback will be called when changes are received for
956 any LSA. The callback signature is:
957
958 `callback(msg_type, ifaddr, area_id, lsa_header, extra, lsa)`
959
960 Args:
961 msg_type: MSG_LSA_UPDATE_NOTIFY or MSG_LSA_DELETE_NOTIFY
962 ifaddr: integer identifying an interface (by IP address)
963 area_id: integer identifying an area
964 lsa_header: the LSA header as an unpacked tuple (fmt: ">HBBIILHH")
965 extra: the octets that follow the LSA header
966 lsa: the octets of the full lsa
967 """
968 self.lsa_change_cb = callback
969 await self.req_lsdb_sync()
970
971 async def monitor_reachable(self, callback=None):
972 """Monitor the set of reachable routers.
973
974 The property `reachable_routers` contains the set() of reachable router IDs
975 as integers. This set is updated prior to calling the `callback`
976
977 Args:
978 callback: callback will be called when the set of reachable
979 routers changes. The callback signature is:
980
981 `callback(added, removed)`
982
983 Args:
984 added: list of integer router IDs being added
985 removed: list of integer router IDs being removed
986 """
987 self.reachable_change_cb = callback
988 await self.req_reachable_routers()
989
990 async def monitor_ism(self, callback=None):
991 """Monitor the state of OSPF enabled interfaces.
992
993 Args:
994 callback: callback will be called when an interface changes state.
995 The callback signature is:
996
997 `callback(ifaddr, area_id, state)`
998
999 Args:
1000 ifaddr: integer identifying an interface (by IP address)
1001 area_id: integer identifying an area
1002 state: ISM_*
1003 """
1004 self.ism_change_cb = callback
1005 await self.req_ism_states()
1006
1007 async def monitor_nsm(self, callback=None):
1008 """Monitor the state of OSPF neighbors.
1009
1010 Args:
1011 callback: callback will be called when a neighbor changes state.
1012 The callback signature is:
1013
1014 `callback(ifaddr, nbr_addr, router_id, state)`
1015
1016 Args:
1017 ifaddr: integer identifying an interface (by IP address)
1018 nbr_addr: integer identifying neighbor by IP address
1019 router_id: integer identifying neighbor router ID
1020 state: NSM_*
1021 """
1022 self.nsm_change_cb = callback
1023 await self.req_nsm_states()
1024
1025
1026 # ================
1027 # CLI/Script Usage
1028 # ================
1029
1030
1031 async def async_main(args):
1032 c = OspfOpaqueClient(args.server)
1033 await c.connect()
1034
1035 try:
1036 # Start handling async messages from server.
1037 if sys.version_info[1] > 6:
1038 asyncio.create_task(c._handle_msg_loop())
1039 else:
1040 asyncio.get_event_loop().create_task(c._handle_msg_loop())
1041
1042 await c.req_lsdb_sync()
1043 await c.req_reachable_routers()
1044 await c.req_ism_states()
1045 await c.req_nsm_states()
1046
1047 if args.actions:
1048 for action in args.actions:
1049 _s = action.split(",")
1050 what = _s.pop(False)
1051 ltype = int(_s.pop(False))
1052 if ltype == 11:
1053 addr = ip(0)
1054 else:
1055 aval = _s.pop(False)
1056 try:
1057 addr = ip(int(aval))
1058 except ValueError:
1059 addr = ip(aval)
1060 oargs = [addr, ltype, int(_s.pop(False)), int(_s.pop(False))]
1061 assert len(_s) <= 1, "Bad format for action argument"
1062 try:
1063 b = bytes.fromhex(_s.pop(False))
1064 except IndexError:
1065 b = b""
1066 logging.info("opaque data is %s octets", len(b))
1067 # Needs to be multiple of 4 in length
1068 mod = len(b) % 4
1069 if mod:
1070 b += b"\x00" * (4 - mod)
1071 logging.info("opaque padding to %s octets", len(b))
1072
1073 if what.casefold() == "add":
1074 await c.add_opaque_data(*oargs, b)
1075 else:
1076 assert what.casefold().startswith("del")
1077 await c.delete_opaque_data(*oargs)
1078 if args.exit:
1079 return 0
1080 except Exception as error:
1081 logging.error("async_main: unexpected error: %s", error, exc_info=True)
1082 return 2
1083
1084 try:
1085 logging.info("Sleeping forever")
1086 while True:
1087 await asyncio.sleep(120)
1088 except EOFError:
1089 logging.info("Got EOF from OSPF API server on async notify socket")
1090 return 2
1091
1092
1093 def main(*args):
1094 ap = argparse.ArgumentParser(args)
1095 ap.add_argument("--exit", action="store_true", help="Exit after commands")
1096 ap.add_argument("--server", default="localhost", help="OSPF API server")
1097 ap.add_argument("-v", "--verbose", action="store_true", help="be verbose")
1098 ap.add_argument(
1099 "actions", nargs="*", help="(ADD|DEL),LSATYPE,[ADDR,],OTYPE,OID,[HEXDATA]"
1100 )
1101 args = ap.parse_args()
1102
1103 level = logging.DEBUG if args.verbose else logging.INFO
1104 logging.basicConfig(
1105 level=level, format="%(asctime)s %(levelname)s: CLIENT: %(name)s %(message)s"
1106 )
1107
1108 logging.info("ospfclient: starting")
1109
1110 status = 3
1111 try:
1112 if sys.version_info[1] > 6:
1113 # python >= 3.7
1114 status = asyncio.run(async_main(args))
1115 else:
1116 loop = asyncio.get_event_loop()
1117 try:
1118 status = loop.run_until_complete(async_main(args))
1119 finally:
1120 loop.close()
1121 except KeyboardInterrupt:
1122 logging.info("Exiting, received KeyboardInterrupt in main")
1123 except Exception as error:
1124 logging.info("Exiting, unexpected exception %s", error, exc_info=True)
1125 else:
1126 logging.info("ospfclient: clean exit")
1127
1128 return status
1129
1130
1131 if __name__ == "__main__":
1132 exit_status = main()
1133 sys.exit(exit_status)