diff --git a/examples/ip_monitor.rs b/examples/ip_monitor.rs index 6b02753..133017d 100644 --- a/examples/ip_monitor.rs +++ b/examples/ip_monitor.rs @@ -3,6 +3,7 @@ use futures::stream::StreamExt; use netlink_packet_route::constants::*; +use netlink_proto::packet::NetlinkEvent; use rtnetlink::{ new_connection, sys::{AsyncSocket, SocketAddr}, @@ -26,8 +27,8 @@ async fn main() -> Result<(), String> { // handle - `Handle` to the `Connection`. Used to send/recv netlink // messages. // - // messages - A channel receiver. - let (mut conn, mut _handle, mut messages) = + // events - A channel receiver. + let (mut conn, mut _handle, mut events) = new_connection().map_err(|e| format!("{}", e))?; // These flags specify what kinds of broadcast messages we want to listen @@ -63,10 +64,17 @@ async fn main() -> Result<(), String> { // Create message to enable }); - // Start receiving events through `messages` channel. - while let Some((message, _)) = messages.next().await { - let payload = message.payload; - println!("{:?}", payload); + // Start receiving events through `events` channel. + while let Some(event) = events.next().await { + match event { + NetlinkEvent::Message((message, _)) => { + let payload = message.payload; + println!("Route change message - {:?}", payload); + } + NetlinkEvent::Overrun => { + println!("Netlink socket overrun. Some messages were lost"); + } + } } Ok(()) } diff --git a/examples/listen.rs b/examples/listen.rs index befb1f2..61b1af0 100644 --- a/examples/listen.rs +++ b/examples/listen.rs @@ -5,6 +5,7 @@ use futures::stream::StreamExt; +use netlink_proto::packet::NetlinkEvent; use rtnetlink::{ constants::{RTMGRP_IPV4_ROUTE, RTMGRP_IPV6_ROUTE}, new_connection, @@ -14,7 +15,7 @@ use rtnetlink::{ #[tokio::main] async fn main() -> Result<(), String> { // Open the netlink socket - let (mut connection, _, mut messages) = + let (mut connection, _, mut events) = new_connection().map_err(|e| format!("{}", e))?; // These flags specify what kinds of broadcast messages we want to listen @@ -32,9 +33,16 @@ async fn main() -> Result<(), String> { .expect("failed to bind"); tokio::spawn(connection); - while let Some((message, _)) = messages.next().await { - let payload = message.payload; - println!("Route change message - {:?}", payload); + while let Some(event) = events.next().await { + match event { + NetlinkEvent::Message((message, _)) => { + let payload = message.payload; + println!("Route change message - {:?}", payload); + } + NetlinkEvent::Overrun => { + println!("Netlink socket overrun. Some messages were lost"); + } + } } Ok(()) } diff --git a/src/connection.rs b/src/connection.rs index 134115a..3f74d32 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -5,7 +5,7 @@ use std::io; use futures::channel::mpsc::UnboundedReceiver; use crate::{ - packet::{NetlinkMessage, RtnlMessage}, + packet::{NetlinkEvent, NetlinkMessage, RtnlMessage}, proto::Connection, sys::{protocols::NETLINK_ROUTE, AsyncSocket, SocketAddr}, Handle, @@ -16,7 +16,7 @@ use crate::{ pub fn new_connection() -> io::Result<( Connection, Handle, - UnboundedReceiver<(NetlinkMessage, SocketAddr)>, + UnboundedReceiver, SocketAddr)>>, )> { new_connection_with_socket() } @@ -25,7 +25,7 @@ pub fn new_connection() -> io::Result<( pub fn new_connection_with_socket() -> io::Result<( Connection, Handle, - UnboundedReceiver<(NetlinkMessage, SocketAddr)>, + UnboundedReceiver, SocketAddr)>>, )> where S: AsyncSocket,