]> git.proxmox.com Git - ovs.git/blob - lib/sflow_receiver.c
sFlow: add Sun Industry Standards Source License 1.1 as licensing option
[ovs.git] / lib / sflow_receiver.c
1 /* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of either the
2 * Sun Industry Standards Source License 1.1, that is available at:
3 * http://host-sflow.sourceforge.net/sissl.html
4 * or the InMon sFlow License, that is available at:
5 * http://www.inmon.com/technology/sflowlicense.txt
6 */
7
8 #ifndef __CHECKER__ /* Don't run sparse on anything in this file. */
9
10 #include <assert.h>
11 #include "sflow_api.h"
12
13 static void resetSampleCollector(SFLReceiver *receiver);
14 static void sendSample(SFLReceiver *receiver);
15 static void sflError(SFLReceiver *receiver, char *errm);
16 inline static void putNet32(SFLReceiver *receiver, u_int32_t val);
17 inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr);
18 #ifdef SFLOW_DO_SOCKET
19 static void initSocket(SFLReceiver *receiver);
20 #endif
21
22 /*_________________--------------------------__________________
23 _________________ sfl_receiver_init __________________
24 -----------------__________________________------------------
25 */
26
27 void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent)
28 {
29 /* first clear everything */
30 memset(receiver, 0, sizeof(*receiver));
31
32 /* now copy in the parameters */
33 receiver->agent = agent;
34
35 /* set defaults */
36 receiver->sFlowRcvrMaximumDatagramSize = SFL_DEFAULT_DATAGRAM_SIZE;
37 receiver->sFlowRcvrPort = SFL_DEFAULT_COLLECTOR_PORT;
38
39 #ifdef SFLOW_DO_SOCKET
40 /* initialize the socket address */
41 initSocket(receiver);
42 #endif
43
44 /* preset some of the header fields */
45 receiver->sampleCollector.datap = receiver->sampleCollector.data;
46 putNet32(receiver, SFLDATAGRAM_VERSION5);
47 putAddress(receiver, &agent->myIP);
48 putNet32(receiver, agent->subId);
49
50 /* prepare to receive the first sample */
51 resetSampleCollector(receiver);
52 }
53
54 /*_________________---------------------------__________________
55 _________________ reset __________________
56 -----------------___________________________------------------
57
58 called on timeout, or when owner string is cleared
59 */
60
61 static void reset(SFLReceiver *receiver) {
62 // ask agent to tell samplers and pollers to stop sending samples
63 sfl_agent_resetReceiver(receiver->agent, receiver);
64 // reinitialize
65 sfl_receiver_init(receiver, receiver->agent);
66 }
67
68 #ifdef SFLOW_DO_SOCKET
69 /*_________________---------------------------__________________
70 _________________ initSocket __________________
71 -----------------___________________________------------------
72 */
73
74 static void initSocket(SFLReceiver *receiver) {
75 if(receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
76 struct sockaddr_in6 *sa6 = &receiver->receiver6;
77 sa6->sin6_port = htons((u_int16_t)receiver->sFlowRcvrPort);
78 sa6->sin6_family = AF_INET6;
79 sa6->sin6_addr = receiver->sFlowRcvrAddress.address.ip_v6;
80 }
81 else {
82 struct sockaddr_in *sa4 = &receiver->receiver4;
83 sa4->sin_port = htons((u_int16_t)receiver->sFlowRcvrPort);
84 sa4->sin_family = AF_INET;
85 sa4->sin_addr = receiver->sFlowRcvrAddress.address.ip_v4;
86 }
87 }
88 #endif
89
90 /*_________________----------------------------------------_____________
91 _________________ MIB Vars _____________
92 -----------------________________________________________-------------
93 */
94
95 char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver) {
96 return receiver->sFlowRcvrOwner;
97 }
98 void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner) {
99 receiver->sFlowRcvrOwner = sFlowRcvrOwner;
100 if(sFlowRcvrOwner == NULL || sFlowRcvrOwner[0] == '\0') {
101 // reset condition! owner string was cleared
102 reset(receiver);
103 }
104 }
105 time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver) {
106 return receiver->sFlowRcvrTimeout;
107 }
108 void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout) {
109 receiver->sFlowRcvrTimeout =sFlowRcvrTimeout;
110 }
111 u_int32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver) {
112 return receiver->sFlowRcvrMaximumDatagramSize;
113 }
114 void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, u_int32_t sFlowRcvrMaximumDatagramSize) {
115 u_int32_t mdz = sFlowRcvrMaximumDatagramSize;
116 if(mdz < SFL_MIN_DATAGRAM_SIZE) mdz = SFL_MIN_DATAGRAM_SIZE;
117 receiver->sFlowRcvrMaximumDatagramSize = mdz;
118 }
119 SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver) {
120 return &receiver->sFlowRcvrAddress;
121 }
122 void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress) {
123 if(sFlowRcvrAddress) receiver->sFlowRcvrAddress = *sFlowRcvrAddress; // structure copy
124 #ifdef SFLOW_DO_SOCKET
125 initSocket(receiver);
126 #endif
127 }
128 u_int32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver) {
129 return receiver->sFlowRcvrPort;
130 }
131 void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, u_int32_t sFlowRcvrPort) {
132 receiver->sFlowRcvrPort = sFlowRcvrPort;
133 // update the socket structure
134 #ifdef SFLOW_DO_SOCKET
135 initSocket(receiver);
136 #endif
137 }
138
139 /*_________________---------------------------__________________
140 _________________ sfl_receiver_tick __________________
141 -----------------___________________________------------------
142 */
143
144 void sfl_receiver_tick(SFLReceiver *receiver, time_t now)
145 {
146 // if there are any samples to send, flush them now
147 if(receiver->sampleCollector.numSamples > 0) sendSample(receiver);
148 // check the timeout
149 if(receiver->sFlowRcvrTimeout && (u_int32_t)receiver->sFlowRcvrTimeout != 0xFFFFFFFF) {
150 // count down one tick and reset if we reach 0
151 if(--receiver->sFlowRcvrTimeout == 0) reset(receiver);
152 }
153 }
154
155 /*_________________-----------------------------__________________
156 _________________ receiver write utilities __________________
157 -----------------_____________________________------------------
158 */
159
160 inline static void put32(SFLReceiver *receiver, u_int32_t val)
161 {
162 *receiver->sampleCollector.datap++ = val;
163 }
164
165 inline static void putNet32(SFLReceiver *receiver, u_int32_t val)
166 {
167 *receiver->sampleCollector.datap++ = htonl(val);
168 }
169
170 inline static void putNet32_run(SFLReceiver *receiver, void *obj, size_t quads)
171 {
172 u_int32_t *from = (u_int32_t *)obj;
173 while(quads--) putNet32(receiver, *from++);
174 }
175
176 inline static void putNet64(SFLReceiver *receiver, u_int64_t val64)
177 {
178 u_int32_t *firstQuadPtr = receiver->sampleCollector.datap;
179 // first copy the bytes in
180 memcpy((u_char *)firstQuadPtr, &val64, 8);
181 if(htonl(1) != 1) {
182 // swap the bytes, and reverse the quads too
183 u_int32_t tmp = *receiver->sampleCollector.datap++;
184 *firstQuadPtr = htonl(*receiver->sampleCollector.datap);
185 *receiver->sampleCollector.datap++ = htonl(tmp);
186 }
187 else receiver->sampleCollector.datap += 2;
188 }
189
190 inline static void put128(SFLReceiver *receiver, u_char *val)
191 {
192 memcpy(receiver->sampleCollector.datap, val, 16);
193 receiver->sampleCollector.datap += 4;
194 }
195
196 inline static void putString(SFLReceiver *receiver, SFLString *s)
197 {
198 putNet32(receiver, s->len);
199 memcpy(receiver->sampleCollector.datap, s->str, s->len);
200 receiver->sampleCollector.datap += (s->len + 3) / 4; /* pad to 4-byte boundary */
201 }
202
203 inline static u_int32_t stringEncodingLength(SFLString *s) {
204 // answer in bytes, so remember to mulitply by 4 after rounding up to nearest 4-byte boundary
205 return 4 + (((s->len + 3) / 4) * 4);
206 }
207
208 inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr)
209 {
210 // encode unspecified addresses as IPV4:0.0.0.0 - or should we flag this as an error?
211 if(addr->type == 0) {
212 putNet32(receiver, SFLADDRESSTYPE_IP_V4);
213 put32(receiver, 0);
214 }
215 else {
216 putNet32(receiver, addr->type);
217 if(addr->type == SFLADDRESSTYPE_IP_V4) put32(receiver, addr->address.ip_v4.addr);
218 else put128(receiver, addr->address.ip_v6.addr);
219 }
220 }
221
222 inline static u_int32_t addressEncodingLength(SFLAddress *addr) {
223 return (addr->type == SFLADDRESSTYPE_IP_V6) ? 20 : 8; // type + address (unspecified == IPV4)
224 }
225
226 inline static void putMACAddress(SFLReceiver *receiver, u_int8_t *mac)
227 {
228 memcpy(receiver->sampleCollector.datap, mac, 6);
229 receiver->sampleCollector.datap += 2;
230 }
231
232 inline static void putSwitch(SFLReceiver *receiver, SFLExtended_switch *sw)
233 {
234 putNet32(receiver, sw->src_vlan);
235 putNet32(receiver, sw->src_priority);
236 putNet32(receiver, sw->dst_vlan);
237 putNet32(receiver, sw->dst_priority);
238 }
239
240 inline static void putRouter(SFLReceiver *receiver, SFLExtended_router *router)
241 {
242 putAddress(receiver, &router->nexthop);
243 putNet32(receiver, router->src_mask);
244 putNet32(receiver, router->dst_mask);
245 }
246
247 inline static u_int32_t routerEncodingLength(SFLExtended_router *router) {
248 return addressEncodingLength(&router->nexthop) + 8;
249 }
250
251 inline static void putGateway(SFLReceiver *receiver, SFLExtended_gateway *gw)
252 {
253 putAddress(receiver, &gw->nexthop);
254 putNet32(receiver, gw->as);
255 putNet32(receiver, gw->src_as);
256 putNet32(receiver, gw->src_peer_as);
257 putNet32(receiver, gw->dst_as_path_segments);
258 {
259 u_int32_t seg = 0;
260 for(; seg < gw->dst_as_path_segments; seg++) {
261 putNet32(receiver, gw->dst_as_path[seg].type);
262 putNet32(receiver, gw->dst_as_path[seg].length);
263 putNet32_run(receiver, gw->dst_as_path[seg].as.seq, gw->dst_as_path[seg].length);
264 }
265 }
266 putNet32(receiver, gw->communities_length);
267 putNet32_run(receiver, gw->communities, gw->communities_length);
268 putNet32(receiver, gw->localpref);
269 }
270
271 inline static u_int32_t gatewayEncodingLength(SFLExtended_gateway *gw) {
272 u_int32_t elemSiz = addressEncodingLength(&gw->nexthop);
273 u_int32_t seg = 0;
274 elemSiz += 16; // as, src_as, src_peer_as, dst_as_path_segments
275 for(; seg < gw->dst_as_path_segments; seg++) {
276 elemSiz += 8; // type, length
277 elemSiz += 4 * gw->dst_as_path[seg].length; // set/seq bytes
278 }
279 elemSiz += 4; // communities_length
280 elemSiz += 4 * gw->communities_length; // communities
281 elemSiz += 4; // localpref
282 return elemSiz;
283 }
284
285 inline static void putUser(SFLReceiver *receiver, SFLExtended_user *user)
286 {
287 putNet32(receiver, user->src_charset);
288 putString(receiver, &user->src_user);
289 putNet32(receiver, user->dst_charset);
290 putString(receiver, &user->dst_user);
291 }
292
293 inline static u_int32_t userEncodingLength(SFLExtended_user *user) {
294 return 4
295 + stringEncodingLength(&user->src_user)
296 + 4
297 + stringEncodingLength(&user->dst_user);
298 }
299
300 inline static void putUrl(SFLReceiver *receiver, SFLExtended_url *url)
301 {
302 putNet32(receiver, url->direction);
303 putString(receiver, &url->url);
304 putString(receiver, &url->host);
305 }
306
307 inline static u_int32_t urlEncodingLength(SFLExtended_url *url) {
308 return 4
309 + stringEncodingLength(&url->url)
310 + stringEncodingLength(&url->host);
311 }
312
313 inline static void putLabelStack(SFLReceiver *receiver, SFLLabelStack *labelStack)
314 {
315 putNet32(receiver, labelStack->depth);
316 putNet32_run(receiver, labelStack->stack, labelStack->depth);
317 }
318
319 inline static u_int32_t labelStackEncodingLength(SFLLabelStack *labelStack) {
320 return 4 + (4 * labelStack->depth);
321 }
322
323 inline static void putMpls(SFLReceiver *receiver, SFLExtended_mpls *mpls)
324 {
325 putAddress(receiver, &mpls->nextHop);
326 putLabelStack(receiver, &mpls->in_stack);
327 putLabelStack(receiver, &mpls->out_stack);
328 }
329
330 inline static u_int32_t mplsEncodingLength(SFLExtended_mpls *mpls) {
331 return addressEncodingLength(&mpls->nextHop)
332 + labelStackEncodingLength(&mpls->in_stack)
333 + labelStackEncodingLength(&mpls->out_stack);
334 }
335
336 inline static void putNat(SFLReceiver *receiver, SFLExtended_nat *nat)
337 {
338 putAddress(receiver, &nat->src);
339 putAddress(receiver, &nat->dst);
340 }
341
342 inline static u_int32_t natEncodingLength(SFLExtended_nat *nat) {
343 return addressEncodingLength(&nat->src)
344 + addressEncodingLength(&nat->dst);
345 }
346
347 inline static void putMplsTunnel(SFLReceiver *receiver, SFLExtended_mpls_tunnel *tunnel)
348 {
349 putString(receiver, &tunnel->tunnel_lsp_name);
350 putNet32(receiver, tunnel->tunnel_id);
351 putNet32(receiver, tunnel->tunnel_cos);
352 }
353
354 inline static u_int32_t mplsTunnelEncodingLength(SFLExtended_mpls_tunnel *tunnel) {
355 return stringEncodingLength(&tunnel->tunnel_lsp_name) + 8;
356 }
357
358 inline static void putMplsVc(SFLReceiver *receiver, SFLExtended_mpls_vc *vc)
359 {
360 putString(receiver, &vc->vc_instance_name);
361 putNet32(receiver, vc->vll_vc_id);
362 putNet32(receiver, vc->vc_label_cos);
363 }
364
365 inline static u_int32_t mplsVcEncodingLength(SFLExtended_mpls_vc *vc) {
366 return stringEncodingLength( &vc->vc_instance_name) + 8;
367 }
368
369 inline static void putMplsFtn(SFLReceiver *receiver, SFLExtended_mpls_FTN *ftn)
370 {
371 putString(receiver, &ftn->mplsFTNDescr);
372 putNet32(receiver, ftn->mplsFTNMask);
373 }
374
375 inline static u_int32_t mplsFtnEncodingLength(SFLExtended_mpls_FTN *ftn) {
376 return stringEncodingLength( &ftn->mplsFTNDescr) + 4;
377 }
378
379 inline static void putMplsLdpFec(SFLReceiver *receiver, SFLExtended_mpls_LDP_FEC *ldpfec)
380 {
381 putNet32(receiver, ldpfec->mplsFecAddrPrefixLength);
382 }
383
384 inline static u_int32_t mplsLdpFecEncodingLength(SFLExtended_mpls_LDP_FEC *ldpfec) {
385 return 4;
386 }
387
388 inline static void putVlanTunnel(SFLReceiver *receiver, SFLExtended_vlan_tunnel *vlanTunnel)
389 {
390 putLabelStack(receiver, &vlanTunnel->stack);
391 }
392
393 inline static u_int32_t vlanTunnelEncodingLength(SFLExtended_vlan_tunnel *vlanTunnel) {
394 return labelStackEncodingLength(&vlanTunnel->stack);
395 }
396
397
398 inline static void putGenericCounters(SFLReceiver *receiver, SFLIf_counters *counters)
399 {
400 putNet32(receiver, counters->ifIndex);
401 putNet32(receiver, counters->ifType);
402 putNet64(receiver, counters->ifSpeed);
403 putNet32(receiver, counters->ifDirection);
404 putNet32(receiver, counters->ifStatus);
405 putNet64(receiver, counters->ifInOctets);
406 putNet32(receiver, counters->ifInUcastPkts);
407 putNet32(receiver, counters->ifInMulticastPkts);
408 putNet32(receiver, counters->ifInBroadcastPkts);
409 putNet32(receiver, counters->ifInDiscards);
410 putNet32(receiver, counters->ifInErrors);
411 putNet32(receiver, counters->ifInUnknownProtos);
412 putNet64(receiver, counters->ifOutOctets);
413 putNet32(receiver, counters->ifOutUcastPkts);
414 putNet32(receiver, counters->ifOutMulticastPkts);
415 putNet32(receiver, counters->ifOutBroadcastPkts);
416 putNet32(receiver, counters->ifOutDiscards);
417 putNet32(receiver, counters->ifOutErrors);
418 putNet32(receiver, counters->ifPromiscuousMode);
419 }
420
421
422 /*_________________-----------------------------__________________
423 _________________ computeFlowSampleSize __________________
424 -----------------_____________________________------------------
425 */
426
427 static int computeFlowSampleSize(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
428 {
429 SFLFlow_sample_element *elem = fs->elements;
430 #ifdef SFL_USE_32BIT_INDEX
431 u_int siz = 52; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate,
432 sample_pool, drops, inputFormat, input, outputFormat, output, number of elements */
433 #else
434 u_int siz = 40; /* tag, length, sequence_number, source_id, sampling_rate,
435 sample_pool, drops, input, output, number of elements */
436 #endif
437
438 fs->num_elements = 0; /* we're going to count them again even if this was set by the client */
439 for(; elem != NULL; elem = elem->nxt) {
440 u_int elemSiz = 0;
441 fs->num_elements++;
442 siz += 8; /* tag, length */
443 switch(elem->tag) {
444 case SFLFLOW_HEADER:
445 elemSiz = 16; /* header_protocol, frame_length, stripped, header_length */
446 elemSiz += ((elem->flowType.header.header_length + 3) / 4) * 4; /* header, rounded up to nearest 4 bytes */
447 break;
448 case SFLFLOW_ETHERNET: elemSiz = sizeof(SFLSampled_ethernet); break;
449 case SFLFLOW_IPV4: elemSiz = sizeof(SFLSampled_ipv4); break;
450 case SFLFLOW_IPV6: elemSiz = sizeof(SFLSampled_ipv6); break;
451 case SFLFLOW_EX_SWITCH: elemSiz = sizeof(SFLExtended_switch); break;
452 case SFLFLOW_EX_ROUTER: elemSiz = routerEncodingLength(&elem->flowType.router); break;
453 case SFLFLOW_EX_GATEWAY: elemSiz = gatewayEncodingLength(&elem->flowType.gateway); break;
454 case SFLFLOW_EX_USER: elemSiz = userEncodingLength(&elem->flowType.user); break;
455 case SFLFLOW_EX_URL: elemSiz = urlEncodingLength(&elem->flowType.url); break;
456 case SFLFLOW_EX_MPLS: elemSiz = mplsEncodingLength(&elem->flowType.mpls); break;
457 case SFLFLOW_EX_NAT: elemSiz = natEncodingLength(&elem->flowType.nat); break;
458 case SFLFLOW_EX_MPLS_TUNNEL: elemSiz = mplsTunnelEncodingLength(&elem->flowType.mpls_tunnel); break;
459 case SFLFLOW_EX_MPLS_VC: elemSiz = mplsVcEncodingLength(&elem->flowType.mpls_vc); break;
460 case SFLFLOW_EX_MPLS_FTN: elemSiz = mplsFtnEncodingLength(&elem->flowType.mpls_ftn); break;
461 case SFLFLOW_EX_MPLS_LDP_FEC: elemSiz = mplsLdpFecEncodingLength(&elem->flowType.mpls_ldp_fec); break;
462 case SFLFLOW_EX_VLAN_TUNNEL: elemSiz = vlanTunnelEncodingLength(&elem->flowType.vlan_tunnel); break;
463 default:
464 sflError(receiver, "unexpected packet_data_tag");
465 return -1;
466 break;
467 }
468 // cache the element size, and accumulate it into the overall FlowSample size
469 elem->length = elemSiz;
470 siz += elemSiz;
471 }
472
473 return siz;
474 }
475
476 /*_________________-------------------------------__________________
477 _________________ sfl_receiver_writeFlowSample __________________
478 -----------------_______________________________------------------
479 */
480
481 int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
482 {
483 int packedSize;
484 if(fs == NULL) return -1;
485 if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1;
486
487 // check in case this one sample alone is too big for the datagram
488 // in fact - if it is even half as big then we should ditch it. Very
489 // important to avoid overruning the packet buffer.
490 if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
491 sflError(receiver, "flow sample too big for datagram");
492 return -1;
493 }
494
495 // if the sample pkt is full enough so that this sample might put
496 // it over the limit, then we should send it now before going on.
497 if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
498 sendSample(receiver);
499
500 receiver->sampleCollector.numSamples++;
501
502 #ifdef SFL_USE_32BIT_INDEX
503 putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
504 #else
505 putNet32(receiver, SFLFLOW_SAMPLE);
506 #endif
507
508 putNet32(receiver, packedSize - 8); // don't include tag and len
509 putNet32(receiver, fs->sequence_number);
510
511 #ifdef SFL_USE_32BIT_INDEX
512 putNet32(receiver, fs->ds_class);
513 putNet32(receiver, fs->ds_index);
514 #else
515 putNet32(receiver, fs->source_id);
516 #endif
517
518 putNet32(receiver, fs->sampling_rate);
519 putNet32(receiver, fs->sample_pool);
520 putNet32(receiver, fs->drops);
521
522 #ifdef SFL_USE_32BIT_INDEX
523 putNet32(receiver, fs->inputFormat);
524 putNet32(receiver, fs->input);
525 putNet32(receiver, fs->outputFormat);
526 putNet32(receiver, fs->output);
527 #else
528 putNet32(receiver, fs->input);
529 putNet32(receiver, fs->output);
530 #endif
531
532 putNet32(receiver, fs->num_elements);
533
534 {
535 SFLFlow_sample_element *elem = fs->elements;
536 for(; elem != NULL; elem = elem->nxt) {
537
538 putNet32(receiver, elem->tag);
539 putNet32(receiver, elem->length); // length cached in computeFlowSampleSize()
540
541 switch(elem->tag) {
542 case SFLFLOW_HEADER:
543 putNet32(receiver, elem->flowType.header.header_protocol);
544 putNet32(receiver, elem->flowType.header.frame_length);
545 putNet32(receiver, elem->flowType.header.stripped);
546 putNet32(receiver, elem->flowType.header.header_length);
547 /* the header */
548 memcpy(receiver->sampleCollector.datap, elem->flowType.header.header_bytes, elem->flowType.header.header_length);
549 /* round up to multiple of 4 to preserve alignment */
550 receiver->sampleCollector.datap += ((elem->flowType.header.header_length + 3) / 4);
551 break;
552 case SFLFLOW_ETHERNET:
553 putNet32(receiver, elem->flowType.ethernet.eth_len);
554 putMACAddress(receiver, elem->flowType.ethernet.src_mac);
555 putMACAddress(receiver, elem->flowType.ethernet.dst_mac);
556 putNet32(receiver, elem->flowType.ethernet.eth_type);
557 break;
558 case SFLFLOW_IPV4:
559 putNet32(receiver, elem->flowType.ipv4.length);
560 putNet32(receiver, elem->flowType.ipv4.protocol);
561 put32(receiver, elem->flowType.ipv4.src_ip.addr);
562 put32(receiver, elem->flowType.ipv4.dst_ip.addr);
563 putNet32(receiver, elem->flowType.ipv4.src_port);
564 putNet32(receiver, elem->flowType.ipv4.dst_port);
565 putNet32(receiver, elem->flowType.ipv4.tcp_flags);
566 putNet32(receiver, elem->flowType.ipv4.tos);
567 break;
568 case SFLFLOW_IPV6:
569 putNet32(receiver, elem->flowType.ipv6.length);
570 putNet32(receiver, elem->flowType.ipv6.protocol);
571 put128(receiver, elem->flowType.ipv6.src_ip.addr);
572 put128(receiver, elem->flowType.ipv6.dst_ip.addr);
573 putNet32(receiver, elem->flowType.ipv6.src_port);
574 putNet32(receiver, elem->flowType.ipv6.dst_port);
575 putNet32(receiver, elem->flowType.ipv6.tcp_flags);
576 putNet32(receiver, elem->flowType.ipv6.priority);
577 break;
578 case SFLFLOW_EX_SWITCH: putSwitch(receiver, &elem->flowType.sw); break;
579 case SFLFLOW_EX_ROUTER: putRouter(receiver, &elem->flowType.router); break;
580 case SFLFLOW_EX_GATEWAY: putGateway(receiver, &elem->flowType.gateway); break;
581 case SFLFLOW_EX_USER: putUser(receiver, &elem->flowType.user); break;
582 case SFLFLOW_EX_URL: putUrl(receiver, &elem->flowType.url); break;
583 case SFLFLOW_EX_MPLS: putMpls(receiver, &elem->flowType.mpls); break;
584 case SFLFLOW_EX_NAT: putNat(receiver, &elem->flowType.nat); break;
585 case SFLFLOW_EX_MPLS_TUNNEL: putMplsTunnel(receiver, &elem->flowType.mpls_tunnel); break;
586 case SFLFLOW_EX_MPLS_VC: putMplsVc(receiver, &elem->flowType.mpls_vc); break;
587 case SFLFLOW_EX_MPLS_FTN: putMplsFtn(receiver, &elem->flowType.mpls_ftn); break;
588 case SFLFLOW_EX_MPLS_LDP_FEC: putMplsLdpFec(receiver, &elem->flowType.mpls_ldp_fec); break;
589 case SFLFLOW_EX_VLAN_TUNNEL: putVlanTunnel(receiver, &elem->flowType.vlan_tunnel); break;
590 default:
591 sflError(receiver, "unexpected packet_data_tag");
592 return -1;
593 break;
594 }
595 }
596 }
597
598 // sanity check
599 assert(((u_char *)receiver->sampleCollector.datap
600 - (u_char *)receiver->sampleCollector.data
601 - receiver->sampleCollector.pktlen) == (u_int32_t)packedSize);
602
603 // update the pktlen
604 receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
605 return packedSize;
606 }
607
608 /*_________________-----------------------------__________________
609 _________________ computeCountersSampleSize __________________
610 -----------------_____________________________------------------
611 */
612
613 static int computeCountersSampleSize(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
614 {
615 SFLCounters_sample_element *elem = cs->elements;
616 #ifdef SFL_USE_32BIT_INDEX
617 u_int siz = 24; /* tag, length, sequence_number, ds_class, ds_index, number of elements */
618 #else
619 u_int siz = 20; /* tag, length, sequence_number, source_id, number of elements */
620 #endif
621
622 cs->num_elements = 0; /* we're going to count them again even if this was set by the client */
623 for(; elem != NULL; elem = elem->nxt) {
624 u_int elemSiz = 0;
625 cs->num_elements++;
626 siz += 8; /* tag, length */
627 switch(elem->tag) {
628 case SFLCOUNTERS_GENERIC: elemSiz = sizeof(elem->counterBlock.generic); break;
629 case SFLCOUNTERS_ETHERNET: elemSiz = sizeof(elem->counterBlock.ethernet); break;
630 case SFLCOUNTERS_TOKENRING: elemSiz = sizeof(elem->counterBlock.tokenring); break;
631 case SFLCOUNTERS_VG: elemSiz = sizeof(elem->counterBlock.vg); break;
632 case SFLCOUNTERS_VLAN: elemSiz = sizeof(elem->counterBlock.vlan); break;
633 default:
634 sflError(receiver, "unexpected counters_tag");
635 return -1;
636 break;
637 }
638 // cache the element size, and accumulate it into the overall FlowSample size
639 elem->length = elemSiz;
640 siz += elemSiz;
641 }
642 return siz;
643 }
644
645 /*_________________----------------------------------__________________
646 _________________ sfl_receiver_writeCountersSample __________________
647 -----------------__________________________________------------------
648 */
649
650 int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
651 {
652 int packedSize;
653 if(cs == NULL) return -1;
654 // if the sample pkt is full enough so that this sample might put
655 // it over the limit, then we should send it now.
656 if((packedSize = computeCountersSampleSize(receiver, cs)) == -1) return -1;
657
658 // check in case this one sample alone is too big for the datagram
659 // in fact - if it is even half as big then we should ditch it. Very
660 // important to avoid overruning the packet buffer.
661 if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
662 sflError(receiver, "counters sample too big for datagram");
663 return -1;
664 }
665
666 if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
667 sendSample(receiver);
668
669 receiver->sampleCollector.numSamples++;
670
671 #ifdef SFL_USE_32BIT_INDEX
672 putNet32(receiver, SFLCOUNTERS_SAMPLE_EXPANDED);
673 #else
674 putNet32(receiver, SFLCOUNTERS_SAMPLE);
675 #endif
676
677 putNet32(receiver, packedSize - 8); // tag and length not included
678 putNet32(receiver, cs->sequence_number);
679
680 #ifdef SFL_USE_32BIT_INDEX
681 putNet32(receiver, cs->ds_class);
682 putNet32(receiver, cs->ds_index);
683 #else
684 putNet32(receiver, cs->source_id);
685 #endif
686
687 putNet32(receiver, cs->num_elements);
688
689 {
690 SFLCounters_sample_element *elem = cs->elements;
691 for(; elem != NULL; elem = elem->nxt) {
692
693 putNet32(receiver, elem->tag);
694 putNet32(receiver, elem->length); // length cached in computeCountersSampleSize()
695
696 switch(elem->tag) {
697 case SFLCOUNTERS_GENERIC:
698 putGenericCounters(receiver, &(elem->counterBlock.generic));
699 break;
700 case SFLCOUNTERS_ETHERNET:
701 // all these counters are 32-bit
702 putNet32_run(receiver, &elem->counterBlock.ethernet, sizeof(elem->counterBlock.ethernet) / 4);
703 break;
704 case SFLCOUNTERS_TOKENRING:
705 // all these counters are 32-bit
706 putNet32_run(receiver, &elem->counterBlock.tokenring, sizeof(elem->counterBlock.tokenring) / 4);
707 break;
708 case SFLCOUNTERS_VG:
709 // mixed sizes
710 putNet32(receiver, elem->counterBlock.vg.dot12InHighPriorityFrames);
711 putNet64(receiver, elem->counterBlock.vg.dot12InHighPriorityOctets);
712 putNet32(receiver, elem->counterBlock.vg.dot12InNormPriorityFrames);
713 putNet64(receiver, elem->counterBlock.vg.dot12InNormPriorityOctets);
714 putNet32(receiver, elem->counterBlock.vg.dot12InIPMErrors);
715 putNet32(receiver, elem->counterBlock.vg.dot12InOversizeFrameErrors);
716 putNet32(receiver, elem->counterBlock.vg.dot12InDataErrors);
717 putNet32(receiver, elem->counterBlock.vg.dot12InNullAddressedFrames);
718 putNet32(receiver, elem->counterBlock.vg.dot12OutHighPriorityFrames);
719 putNet64(receiver, elem->counterBlock.vg.dot12OutHighPriorityOctets);
720 putNet32(receiver, elem->counterBlock.vg.dot12TransitionIntoTrainings);
721 putNet64(receiver, elem->counterBlock.vg.dot12HCInHighPriorityOctets);
722 putNet64(receiver, elem->counterBlock.vg.dot12HCInNormPriorityOctets);
723 putNet64(receiver, elem->counterBlock.vg.dot12HCOutHighPriorityOctets);
724 break;
725 case SFLCOUNTERS_VLAN:
726 // mixed sizes
727 putNet32(receiver, elem->counterBlock.vlan.vlan_id);
728 putNet64(receiver, elem->counterBlock.vlan.octets);
729 putNet32(receiver, elem->counterBlock.vlan.ucastPkts);
730 putNet32(receiver, elem->counterBlock.vlan.multicastPkts);
731 putNet32(receiver, elem->counterBlock.vlan.broadcastPkts);
732 putNet32(receiver, elem->counterBlock.vlan.discards);
733 break;
734 default:
735 sflError(receiver, "unexpected counters_tag");
736 return -1;
737 break;
738 }
739 }
740 }
741 // sanity check
742 assert(((u_char *)receiver->sampleCollector.datap
743 - (u_char *)receiver->sampleCollector.data
744 - receiver->sampleCollector.pktlen) == (u_int32_t)packedSize);
745
746 // update the pktlen
747 receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
748 return packedSize;
749 }
750
751 /*_________________---------------------------------__________________
752 _________________ sfl_receiver_samplePacketsSent __________________
753 -----------------_________________________________------------------
754 */
755
756 u_int32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver)
757 {
758 return receiver->sampleCollector.packetSeqNo;
759 }
760
761 /*_________________---------------------------__________________
762 _________________ sendSample __________________
763 -----------------___________________________------------------
764 */
765
766 static void sendSample(SFLReceiver *receiver)
767 {
768 /* construct and send out the sample, then reset for the next one... */
769 /* first fill in the header with the latest values */
770 /* version, agent_address and sub_agent_id were pre-set. */
771 u_int32_t hdrIdx = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ? 7 : 4;
772 receiver->sampleCollector.data[hdrIdx++] = htonl(++receiver->sampleCollector.packetSeqNo); /* seq no */
773 receiver->sampleCollector.data[hdrIdx++] = htonl((receiver->agent->now - receiver->agent->bootTime) * 1000); /* uptime */
774 receiver->sampleCollector.data[hdrIdx++] = htonl(receiver->sampleCollector.numSamples); /* num samples */
775 /* send */
776 if(receiver->agent->sendFn) (*receiver->agent->sendFn)(receiver->agent->magic,
777 receiver->agent,
778 receiver,
779 (u_char *)receiver->sampleCollector.data,
780 receiver->sampleCollector.pktlen);
781 else {
782 #ifdef SFLOW_DO_SOCKET
783 /* send it myself */
784 if (receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
785 u_int32_t soclen = sizeof(struct sockaddr_in6);
786 int result = sendto(receiver->agent->receiverSocket6,
787 receiver->sampleCollector.data,
788 receiver->sampleCollector.pktlen,
789 0,
790 (struct sockaddr *)&receiver->receiver6,
791 soclen);
792 if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "IPv6 socket sendto error");
793 if(result == 0) sfl_agent_error(receiver->agent, "receiver", "IPv6 socket sendto returned 0");
794 }
795 else {
796 u_int32_t soclen = sizeof(struct sockaddr_in);
797 int result = sendto(receiver->agent->receiverSocket4,
798 receiver->sampleCollector.data,
799 receiver->sampleCollector.pktlen,
800 0,
801 (struct sockaddr *)&receiver->receiver4,
802 soclen);
803 if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "socket sendto error");
804 if(result == 0) sfl_agent_error(receiver->agent, "receiver", "socket sendto returned 0");
805 }
806 #endif
807 }
808
809 /* reset for the next time */
810 resetSampleCollector(receiver);
811 }
812
813 /*_________________---------------------------__________________
814 _________________ resetSampleCollector __________________
815 -----------------___________________________------------------
816 */
817
818 static void resetSampleCollector(SFLReceiver *receiver)
819 {
820 receiver->sampleCollector.pktlen = 0;
821 receiver->sampleCollector.numSamples = 0;
822 /* point the datap to just after the header */
823 receiver->sampleCollector.datap = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ?
824 (receiver->sampleCollector.data + 10) : (receiver->sampleCollector.data + 7);
825
826 receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
827 }
828
829 /*_________________---------------------------__________________
830 _________________ sflError __________________
831 -----------------___________________________------------------
832 */
833
834 static void sflError(SFLReceiver *receiver, char *msg)
835 {
836 sfl_agent_error(receiver->agent, "receiver", msg);
837 resetSampleCollector(receiver);
838 }
839
840 #endif /* !__CHECKER__ */