diff --git a/src/netlink/cnetlink.cpp b/src/netlink/cnetlink.cpp index fe852c916..c590b183b 100644 --- a/src/netlink/cnetlink.cpp +++ b/src/netlink/cnetlink.cpp @@ -33,9 +33,8 @@ namespace basebox { cnetlink::cnetlink() : swi(nullptr), thread(1), caches(NL_MAX_CACHE, nullptr), nl_proc_max(10), - running(false), rfd_scheduled(false), bridge(nullptr), - bond(new nl_bond(this)), vlan(new nl_vlan(this)), - l3(new nl_l3(vlan, this)), config_lo(false) { + state(NL_STATE_STOPPED), bridge(nullptr), bond(new nl_bond(this)), + vlan(new nl_vlan(this)), l3(new nl_l3(vlan, this)) { sock_tx = nl_socket_alloc(); if (sock_tx == nullptr) { @@ -157,7 +156,16 @@ void cnetlink::init_caches() { LOG(FATAL) << __FUNCTION__ << ": add route/neigh to cache mngr"; } - thread.wakeup(this); + try { + thread.add_read_fd(this, nl_cache_mngr_get_fd(mngr), true, false); + } catch (std::exception &e) { + LOG(FATAL) << "caught " << e.what(); + } +} + +void cnetlink::init_subsystems() noexcept { + assert(swi); + l3->init(); } int cnetlink::set_nl_socket_buffer_sizes(nl_sock *sk) { @@ -350,17 +358,24 @@ int cnetlink::get_port_id(int ifindex) const { void cnetlink::handle_wakeup(rofl::cthread &thread) { bool do_wakeup = false; - if (not rfd_scheduled) { - try { - thread.add_read_fd(this, nl_cache_mngr_get_fd(mngr), true, false); - rfd_scheduled = true; - } catch (std::exception &e) { - LOG(FATAL) << "caught " << e.what(); - } + if (!swi) + return; + + switch (state) { + case NL_STATE_INIT: + init_subsystems(); + state = NL_STATE_RUNNING; + break; + case NL_STATE_RUNNING: + break; + case NL_STATE_STOPPED: + return; } // loop through nl_objs - for (int cnt = 0; cnt < nl_proc_max && nl_objs.size() && running; cnt++) { + for (int cnt = 0; + cnt < nl_proc_max && nl_objs.size() && state == NL_STATE_RUNNING; + cnt++) { auto obj = nl_objs.front(); nl_objs.pop_front(); @@ -400,16 +415,9 @@ void cnetlink::handle_wakeup(rofl::cthread &thread) { do_wakeup = true; } - if (swi && swi->is_connected()) { - if (!config_lo) { - config_lo_addr(); - config_lo = true; - } - } else if (swi && !swi->is_connected()) { - config_lo = false; - } - if (do_wakeup || nl_objs.size()) { + VLOG(3) << __FUNCTION__ + << ": calling wakeup nl_objs.size()=" << nl_objs.size(); this->thread.wakeup(this); } } @@ -421,7 +429,7 @@ void cnetlink::handle_read_event(rofl::cthread &thread, int fd) { int rv = nl_cache_mngr_data_ready(mngr); VLOG(1) << __FUNCTION__ << ": #processed=" << rv; // notify update - if (running) { + if (state != NL_STATE_STOPPED) { this->thread.wakeup(this); } } @@ -468,7 +476,11 @@ void cnetlink::nl_cb_v2(struct nl_cache *cache, struct nl_object *old_obj, << " new_obj=" << static_cast(new_obj); assert(data); - static_cast(data)->nl_objs.emplace_back(action, old_obj, new_obj); + auto nl = static_cast(data); + + // only enqueue nl msgs if not in stopped state + if (nl->state != NL_STATE_STOPPED) + nl->nl_objs.emplace_back(action, old_obj, new_obj); } void cnetlink::set_tapmanager(std::shared_ptr tm) { @@ -497,7 +509,9 @@ int cnetlink::handle_source_mac_learn() { _packet_in.swap(packet_in); } - for (int cnt = 0; cnt < nl_proc_max && _packet_in.size() && running; cnt++) { + for (int cnt = 0; + cnt < nl_proc_max && _packet_in.size() && state == NL_STATE_RUNNING; + cnt++) { auto p = _packet_in.front(); int ifindex = tap_man->get_ifindex(p.port_id); @@ -521,6 +535,7 @@ int cnetlink::handle_source_mac_learn() { int size = _packet_in.size(); if (size) { + VLOG(3) << __FUNCTION__ << ": " << size << " packets not processed"; std::lock_guard scoped_lock(pi_mutex); std::copy(make_move_iterator(_packet_in.rbegin()), make_move_iterator(_packet_in.rend()), @@ -551,7 +566,8 @@ int cnetlink::handle_fdb_timeout() { _fdb_evts.swap(fdb_evts); } - for (int cnt = 0; cnt < nl_proc_max && _fdb_evts.size() && bridge && running; + for (int cnt = 0; cnt < nl_proc_max && _fdb_evts.size() && bridge && + state == NL_STATE_RUNNING; cnt++) { auto fdbev = _fdb_evts.front(); @@ -567,6 +583,7 @@ int cnetlink::handle_fdb_timeout() { int size = _fdb_evts.size(); if (size) { + VLOG(3) << __FUNCTION__ << ": " << size << " events not processed"; std::lock_guard scoped_lock(fdb_ev_mutex); std::copy(make_move_iterator(_fdb_evts.rbegin()), make_move_iterator(_fdb_evts.rend()), @@ -1075,6 +1092,20 @@ void cnetlink::unregister_switch(__attribute__((unused)) stop(); } +void cnetlink::start() noexcept { + if (state == NL_STATE_RUNNING) + return; + VLOG(1) << __FUNCTION__ << ": started netlink processing"; + state = NL_STATE_INIT; + thread.wakeup(this); +} + +void cnetlink::stop() noexcept { + VLOG(1) << __FUNCTION__ << ": stopped netlink processing"; + state = NL_STATE_STOPPED; + thread.wakeup(this); +} + void cnetlink::port_status_changed(uint32_t port_no, enum nbi::port_status ps) noexcept { try { @@ -1093,8 +1124,6 @@ int cnetlink::handle_port_status_events() { std::deque> _pc_changes; std::deque> _pc_retry; - VLOG(2) << __FUNCTION__; - { std::lock_guard scoped_lock(pc_mutex); _pc_changes.swap(port_status_changes); @@ -1169,6 +1198,7 @@ int cnetlink::handle_port_status_events() { int size = _pc_retry.size(); if (size) { + VLOG(3) << __FUNCTION__ << ": " << size << " changes not processed"; std::lock_guard scoped_lock(pc_mutex); std::copy(make_move_iterator(_pc_retry.begin()), make_move_iterator(_pc_retry.end()), @@ -1178,34 +1208,6 @@ int cnetlink::handle_port_status_events() { return size; } -int cnetlink::config_lo_addr() noexcept { - std::list lo_addr; - std::unique_ptr addr_filter( - rtnl_addr_alloc(), &rtnl_addr_put); - - rtnl_addr_set_ifindex(addr_filter.get(), 1); - rtnl_addr_set_family(addr_filter.get(), AF_INET); - - nl_cache_foreach_filter( - caches[NL_ADDR_CACHE], OBJ_CAST(addr_filter.get()), - [](struct nl_object *obj, void *arg) { - VLOG(3) << __FUNCTION__ << " : found configured loopback " << obj; - - std::list *add_list = - static_cast *>(arg); - - add_list->emplace_back(ADDR_CAST(obj)); - }, - &lo_addr); - - for (auto addr : lo_addr) { - if (l3->add_l3_addr(addr) < 0) - return -EINVAL; - } - - return 0; -} - bool cnetlink::is_bridge_configured(rtnl_link *l) { assert(l); diff --git a/src/netlink/cnetlink.hpp b/src/netlink/cnetlink.hpp index f4064377a..eba849d47 100644 --- a/src/netlink/cnetlink.hpp +++ b/src/netlink/cnetlink.hpp @@ -25,7 +25,7 @@ class nl_vlan; class tap_manager; class cnetlink final : public rofl::cthread_env { - +public: enum nl_cache_t { NL_ADDR_CACHE, NL_LINK_CACHE, @@ -34,7 +34,6 @@ class cnetlink final : public rofl::cthread_env { NL_MAX_CACHE, }; -public: cnetlink(); ~cnetlink() override; @@ -52,10 +51,14 @@ class cnetlink final : public rofl::cthread_env { int get_port_id(rtnl_link *l) const; int get_port_id(int ifindex) const; + nl_cache *get_cache(enum nl_cache_t id) { return caches[id]; } + void resend_state() noexcept; void register_switch(switch_interface *) noexcept; void unregister_switch(switch_interface *) noexcept; + void start() noexcept; + void stop() noexcept; void port_status_changed(uint32_t, enum nbi::port_status) noexcept; @@ -71,18 +74,6 @@ class cnetlink final : public rofl::cthread_env { void fdb_timeout(uint32_t port_id, uint16_t vid, const rofl::caddress_ll &mac); - void start() { - if (running) - return; - running = true; - thread.wakeup(this); - } - - void stop() { - running = false; - thread.wakeup(this); - } - private: // non copyable cnetlink(const cnetlink &other) = delete; @@ -93,6 +84,12 @@ class cnetlink final : public rofl::cthread_env { NL_TIMER_RESYNC, }; + enum nl_state { + NL_STATE_RUNNING, + NL_STATE_INIT, + NL_STATE_STOPPED, + }; + switch_interface *swi; rofl::cthread thread; @@ -105,8 +102,7 @@ class cnetlink final : public rofl::cthread_env { std::mutex pc_mutex; int nl_proc_max; - bool running; - bool rfd_scheduled; + enum nl_state state; std::deque nl_objs; std::shared_ptr tap_man; @@ -136,7 +132,6 @@ class cnetlink final : public rofl::cthread_env { std::mutex fdb_ev_mutex; std::deque fdb_evts; - bool config_lo; int handle_port_status_events(); int handle_source_mac_learn(); @@ -155,6 +150,7 @@ class cnetlink final : public rofl::cthread_env { int load_from_file(const std::string &path); void init_caches(); + void init_subsystems() noexcept; int set_nl_socket_buffer_sizes(nl_sock *sk); @@ -175,8 +171,6 @@ class cnetlink final : public rofl::cthread_env { void neigh_ll_created(rtnl_neigh *neigh) noexcept; void neigh_ll_updated(rtnl_neigh *old_neigh, rtnl_neigh *new_neigh) noexcept; void neigh_ll_deleted(rtnl_neigh *neigh) noexcept; - - int config_lo_addr() noexcept; }; } // end of namespace basebox diff --git a/src/netlink/nbi_impl.cpp b/src/netlink/nbi_impl.cpp index 90a7e2c04..06dd83e22 100644 --- a/src/netlink/nbi_impl.cpp +++ b/src/netlink/nbi_impl.cpp @@ -17,7 +17,6 @@ nbi_impl::nbi_impl(std::shared_ptr nl, std::shared_ptr tap_man) : nl(nl), tap_man(tap_man) { nl->set_tapmanager(tap_man); - nl->start(); } nbi_impl::~nbi_impl() { nl->stop(); } @@ -29,6 +28,22 @@ void nbi_impl::register_switch(switch_interface *swi) noexcept { nl->register_switch(swi); } +void nbi_impl::switch_state_notification(enum switch_state state) noexcept { + switch (state) { + case SWITCH_STATE_UP: + nl->start(); + break; + case SWITCH_STATE_DOWN: + case SWITCH_STATE_FAILED: + case SWITCH_STATE_UNKNOWN: + nl->stop(); + break; + default: + LOG(FATAL) << __FUNCTION__ << ": invalid state"; + break; + } +} + void nbi_impl::port_notification( std::deque ¬ifications) noexcept { diff --git a/src/netlink/nbi_impl.hpp b/src/netlink/nbi_impl.hpp index 6970f5954..f99c9911f 100644 --- a/src/netlink/nbi_impl.hpp +++ b/src/netlink/nbi_impl.hpp @@ -25,6 +25,7 @@ class nbi_impl : public nbi, public switch_callback { // nbi void register_switch(switch_interface *) noexcept override; + void switch_state_notification(enum switch_state) noexcept override; void resend_state() noexcept override; void port_notification(std::deque &) noexcept override; diff --git a/src/netlink/nl_l3.cpp b/src/netlink/nl_l3.cpp index 5ccbb15a2..a74d85ad3 100644 --- a/src/netlink/nl_l3.cpp +++ b/src/netlink/nl_l3.cpp @@ -79,6 +79,34 @@ rofl::caddress_in6 libnl_in6addr_2_rofl(struct nl_addr *addr, int *rv) { return rofl::caddress_in6(&sin, salen); } +int nl_l3::init() noexcept { + std::list lo_addr; + std::unique_ptr addr_filter( + rtnl_addr_alloc(), &rtnl_addr_put); + + rtnl_addr_set_ifindex(addr_filter.get(), 1); + rtnl_addr_set_family(addr_filter.get(), AF_INET); + + nl_cache_foreach_filter( + nl->get_cache(cnetlink::NL_ADDR_CACHE), OBJ_CAST(addr_filter.get()), + [](struct nl_object *obj, void *arg) { + VLOG(3) << __FUNCTION__ << " : found configured loopback " << obj; + + std::list *add_list = + static_cast *>(arg); + + add_list->emplace_back(ADDR_CAST(obj)); + }, + &lo_addr); + + for (auto addr : lo_addr) { + if (add_l3_addr(addr) < 0) + return -EINVAL; + } + + return 0; +} + // XXX separate function to make it possible to add lo addresses more directly int nl_l3::add_l3_addr(struct rtnl_addr *a) { assert(sw); diff --git a/src/netlink/nl_l3.hpp b/src/netlink/nl_l3.hpp index b3984195c..176b50394 100644 --- a/src/netlink/nl_l3.hpp +++ b/src/netlink/nl_l3.hpp @@ -33,6 +33,8 @@ class nl_l3 { nl_l3(std::shared_ptr vlan, cnetlink *nl); ~nl_l3() {} + int init() noexcept; + int add_l3_addr(struct rtnl_addr *a); int add_l3_addr_v6(struct rtnl_addr *a); int del_l3_addr(struct rtnl_addr *a); diff --git a/src/of-dpa/controller.cpp b/src/of-dpa/controller.cpp index 7df0a6a4a..5937554fe 100644 --- a/src/of-dpa/controller.cpp +++ b/src/of-dpa/controller.cpp @@ -39,9 +39,8 @@ void controller::handle_dpt_open(rofl::crofdpt &dpt) { // set max queue size in rofl dpt.set_conn(rofl::cauxid(0)).set_txqueue_max_size(128 * 1024); - dpt.send_features_request(rofl::cauxid(0)); - dpt.send_desc_stats_request(rofl::cauxid(0), 0); - dpt.send_port_desc_stats_request(rofl::cauxid(0), 0); + dpt.send_features_request(rofl::cauxid(0), 1); + dpt.send_desc_stats_request(rofl::cauxid(0), 0, 1); if (flags) subscribe_to(flags); @@ -55,6 +54,7 @@ void controller::handle_dpt_close(const rofl::cdptid &dptid) { LOG(INFO) << __FUNCTION__ << ": closing connection to dptid=" << std::showbase << std::hex << dptid; + connected = false; std::deque ntfys; try { { @@ -72,7 +72,6 @@ void controller::handle_dpt_close(const rofl::cdptid &dptid) { nbi::PORT_EVENT_DEL, port.get_port_no(), port.get_name()}); } - connected = false; nb->port_notification(ntfys); } catch (rofl::eRofDptNotFound &e) { @@ -85,6 +84,8 @@ void controller::handle_dpt_close(const rofl::cdptid &dptid) { } this->dptid = rofl::cdptid(0); + + nb->switch_state_notification(nbi::SWITCH_STATE_DOWN); } void controller::handle_conn_terminated(rofl::crofdpt &dpt, @@ -126,15 +127,20 @@ void controller::handle_conn_congestion_solved(rofl::crofdpt &dpt, void controller::handle_features_reply( rofl::crofdpt &dpt, const rofl::cauxid &auxid, rofl::openflow::cofmsg_features_reply &msg) { - VLOG(1) << __FUNCTION__ << ": dpt=" << dpt << " on auxid=" << auxid; - VLOG(3) << __FUNCTION__ << ": dpid=" << dpt.get_dpid() << std::endl << msg; + VLOG(1) << __FUNCTION__ << ": dpt=" << dpt << " on auxid=" << auxid + << ", msg: " << msg; } void controller::handle_desc_stats_reply( rofl::crofdpt &dpt, const rofl::cauxid &auxid, rofl::openflow::cofmsg_desc_stats_reply &msg) { - VLOG(1) << __FUNCTION__ << ": dpt=" << dpt << " on auxid=" << auxid; - VLOG(3) << __FUNCTION__ << ": dpt=" << std::endl << dpt << std::endl << msg; + VLOG(1) << __FUNCTION__ << ": dpt=" << dpt << " on auxid=" << auxid + << " msg: " << msg; + + // TODO evaluate switch here? + + nb->switch_state_notification(nbi::SWITCH_STATE_UP); + dpt.send_port_desc_stats_request(rofl::cauxid(0), 0, 2); } void controller::handle_packet_in(rofl::crofdpt &dpt, const rofl::cauxid &auxid, @@ -332,7 +338,8 @@ void controller::handle_timeout(rofl::cthread &thread, uint32_t timer_id) { thread.add_timer( this, TIMER_port_stats_request, rofl::ctimespec().expire_in(port_stats_request_interval)); - request_port_stats(); + if (connected) + request_port_stats(); break; default: rofl::crofbase::handle_timeout(thread, timer_id); diff --git a/src/sai.hpp b/src/sai.hpp index c873200e0..0ff4b3825 100644 --- a/src/sai.hpp +++ b/src/sai.hpp @@ -143,6 +143,13 @@ class nbi { port_type_lag = 2, }; + enum switch_state { + SWITCH_STATE_UNKNOWN, + SWITCH_STATE_UP, + SWITCH_STATE_DOWN, + SWITCH_STATE_FAILED, + }; + static enum port_type get_port_type(uint32_t port_id) { return static_cast(port_id >> 16); } @@ -152,6 +159,7 @@ class nbi { } virtual void register_switch(switch_interface *) noexcept = 0; + virtual void switch_state_notification(enum switch_state) noexcept = 0; virtual void resend_state() noexcept = 0; virtual void port_notification(std::deque &) noexcept = 0;