Skip to content

Commit

Permalink
Merge pull request bisdn#169 from toanju/init-rework
Browse files Browse the repository at this point in the history
Init rework
  • Loading branch information
DFritzsche authored Jan 4, 2019
2 parents 045595b + 244ef4e commit 57e7e3a
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 84 deletions.
112 changes: 57 additions & 55 deletions src/netlink/cnetlink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ namespace basebox {

cnetlink::cnetlink()
: swi(nullptr), thread(1), caches(NL_MAX_CACHE, nullptr), nl_proc_max(10),
running(false), rfd_scheduled(false), bridge(nullptr),
bond(new nl_bond(this)), vlan(new nl_vlan(this)),
l3(new nl_l3(vlan, this)), config_lo(false) {
state(NL_STATE_STOPPED), bridge(nullptr), bond(new nl_bond(this)),
vlan(new nl_vlan(this)), l3(new nl_l3(vlan, this)) {

sock_tx = nl_socket_alloc();
if (sock_tx == nullptr) {
Expand Down Expand Up @@ -157,7 +156,16 @@ void cnetlink::init_caches() {
LOG(FATAL) << __FUNCTION__ << ": add route/neigh to cache mngr";
}

thread.wakeup(this);
try {
thread.add_read_fd(this, nl_cache_mngr_get_fd(mngr), true, false);
} catch (std::exception &e) {
LOG(FATAL) << "caught " << e.what();
}
}

void cnetlink::init_subsystems() noexcept {
assert(swi);
l3->init();
}

int cnetlink::set_nl_socket_buffer_sizes(nl_sock *sk) {
Expand Down Expand Up @@ -350,17 +358,24 @@ int cnetlink::get_port_id(int ifindex) const {
void cnetlink::handle_wakeup(rofl::cthread &thread) {
bool do_wakeup = false;

if (not rfd_scheduled) {
try {
thread.add_read_fd(this, nl_cache_mngr_get_fd(mngr), true, false);
rfd_scheduled = true;
} catch (std::exception &e) {
LOG(FATAL) << "caught " << e.what();
}
if (!swi)
return;

switch (state) {
case NL_STATE_INIT:
init_subsystems();
state = NL_STATE_RUNNING;
break;
case NL_STATE_RUNNING:
break;
case NL_STATE_STOPPED:
return;
}

// loop through nl_objs
for (int cnt = 0; cnt < nl_proc_max && nl_objs.size() && running; cnt++) {
for (int cnt = 0;
cnt < nl_proc_max && nl_objs.size() && state == NL_STATE_RUNNING;
cnt++) {
auto obj = nl_objs.front();
nl_objs.pop_front();

Expand Down Expand Up @@ -400,16 +415,9 @@ void cnetlink::handle_wakeup(rofl::cthread &thread) {
do_wakeup = true;
}

if (swi && swi->is_connected()) {
if (!config_lo) {
config_lo_addr();
config_lo = true;
}
} else if (swi && !swi->is_connected()) {
config_lo = false;
}

if (do_wakeup || nl_objs.size()) {
VLOG(3) << __FUNCTION__
<< ": calling wakeup nl_objs.size()=" << nl_objs.size();
this->thread.wakeup(this);
}
}
Expand All @@ -421,7 +429,7 @@ void cnetlink::handle_read_event(rofl::cthread &thread, int fd) {
int rv = nl_cache_mngr_data_ready(mngr);
VLOG(1) << __FUNCTION__ << ": #processed=" << rv;
// notify update
if (running) {
if (state != NL_STATE_STOPPED) {
this->thread.wakeup(this);
}
}
Expand Down Expand Up @@ -468,7 +476,11 @@ void cnetlink::nl_cb_v2(struct nl_cache *cache, struct nl_object *old_obj,
<< " new_obj=" << static_cast<void *>(new_obj);

assert(data);
static_cast<cnetlink *>(data)->nl_objs.emplace_back(action, old_obj, new_obj);
auto nl = static_cast<cnetlink *>(data);

// only enqueue nl msgs if not in stopped state
if (nl->state != NL_STATE_STOPPED)
nl->nl_objs.emplace_back(action, old_obj, new_obj);
}

void cnetlink::set_tapmanager(std::shared_ptr<tap_manager> tm) {
Expand Down Expand Up @@ -497,7 +509,9 @@ int cnetlink::handle_source_mac_learn() {
_packet_in.swap(packet_in);
}

for (int cnt = 0; cnt < nl_proc_max && _packet_in.size() && running; cnt++) {
for (int cnt = 0;
cnt < nl_proc_max && _packet_in.size() && state == NL_STATE_RUNNING;
cnt++) {
auto p = _packet_in.front();
int ifindex = tap_man->get_ifindex(p.port_id);

Expand All @@ -521,6 +535,7 @@ int cnetlink::handle_source_mac_learn() {

int size = _packet_in.size();
if (size) {
VLOG(3) << __FUNCTION__ << ": " << size << " packets not processed";
std::lock_guard<std::mutex> scoped_lock(pi_mutex);
std::copy(make_move_iterator(_packet_in.rbegin()),
make_move_iterator(_packet_in.rend()),
Expand Down Expand Up @@ -551,7 +566,8 @@ int cnetlink::handle_fdb_timeout() {
_fdb_evts.swap(fdb_evts);
}

for (int cnt = 0; cnt < nl_proc_max && _fdb_evts.size() && bridge && running;
for (int cnt = 0; cnt < nl_proc_max && _fdb_evts.size() && bridge &&
state == NL_STATE_RUNNING;
cnt++) {

auto fdbev = _fdb_evts.front();
Expand All @@ -567,6 +583,7 @@ int cnetlink::handle_fdb_timeout() {

int size = _fdb_evts.size();
if (size) {
VLOG(3) << __FUNCTION__ << ": " << size << " events not processed";
std::lock_guard<std::mutex> scoped_lock(fdb_ev_mutex);
std::copy(make_move_iterator(_fdb_evts.rbegin()),
make_move_iterator(_fdb_evts.rend()),
Expand Down Expand Up @@ -1075,6 +1092,20 @@ void cnetlink::unregister_switch(__attribute__((unused))
stop();
}

void cnetlink::start() noexcept {
if (state == NL_STATE_RUNNING)
return;
VLOG(1) << __FUNCTION__ << ": started netlink processing";
state = NL_STATE_INIT;
thread.wakeup(this);
}

void cnetlink::stop() noexcept {
VLOG(1) << __FUNCTION__ << ": stopped netlink processing";
state = NL_STATE_STOPPED;
thread.wakeup(this);
}

void cnetlink::port_status_changed(uint32_t port_no,
enum nbi::port_status ps) noexcept {
try {
Expand All @@ -1093,8 +1124,6 @@ int cnetlink::handle_port_status_events() {
std::deque<std::tuple<uint32_t, enum nbi::port_status, int>> _pc_changes;
std::deque<std::tuple<uint32_t, enum nbi::port_status, int>> _pc_retry;

VLOG(2) << __FUNCTION__;

{
std::lock_guard<std::mutex> scoped_lock(pc_mutex);
_pc_changes.swap(port_status_changes);
Expand Down Expand Up @@ -1169,6 +1198,7 @@ int cnetlink::handle_port_status_events() {

int size = _pc_retry.size();
if (size) {
VLOG(3) << __FUNCTION__ << ": " << size << " changes not processed";
std::lock_guard<std::mutex> scoped_lock(pc_mutex);
std::copy(make_move_iterator(_pc_retry.begin()),
make_move_iterator(_pc_retry.end()),
Expand All @@ -1178,34 +1208,6 @@ int cnetlink::handle_port_status_events() {
return size;
}

int cnetlink::config_lo_addr() noexcept {
std::list<struct rtnl_addr *> lo_addr;
std::unique_ptr<struct rtnl_addr, void (*)(rtnl_addr *)> addr_filter(
rtnl_addr_alloc(), &rtnl_addr_put);

rtnl_addr_set_ifindex(addr_filter.get(), 1);
rtnl_addr_set_family(addr_filter.get(), AF_INET);

nl_cache_foreach_filter(
caches[NL_ADDR_CACHE], OBJ_CAST(addr_filter.get()),
[](struct nl_object *obj, void *arg) {
VLOG(3) << __FUNCTION__ << " : found configured loopback " << obj;

std::list<struct rtnl_addr *> *add_list =
static_cast<std::list<struct rtnl_addr *> *>(arg);

add_list->emplace_back(ADDR_CAST(obj));
},
&lo_addr);

for (auto addr : lo_addr) {
if (l3->add_l3_addr(addr) < 0)
return -EINVAL;
}

return 0;
}

bool cnetlink::is_bridge_configured(rtnl_link *l) {
assert(l);

Expand Down
32 changes: 13 additions & 19 deletions src/netlink/cnetlink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class nl_vlan;
class tap_manager;

class cnetlink final : public rofl::cthread_env {

public:
enum nl_cache_t {
NL_ADDR_CACHE,
NL_LINK_CACHE,
Expand All @@ -34,7 +34,6 @@ class cnetlink final : public rofl::cthread_env {
NL_MAX_CACHE,
};

public:
cnetlink();
~cnetlink() override;

Expand All @@ -52,10 +51,14 @@ class cnetlink final : public rofl::cthread_env {
int get_port_id(rtnl_link *l) const;
int get_port_id(int ifindex) const;

nl_cache *get_cache(enum nl_cache_t id) { return caches[id]; }

void resend_state() noexcept;

void register_switch(switch_interface *) noexcept;
void unregister_switch(switch_interface *) noexcept;
void start() noexcept;
void stop() noexcept;

void port_status_changed(uint32_t, enum nbi::port_status) noexcept;

Expand All @@ -71,18 +74,6 @@ class cnetlink final : public rofl::cthread_env {
void fdb_timeout(uint32_t port_id, uint16_t vid,
const rofl::caddress_ll &mac);

void start() {
if (running)
return;
running = true;
thread.wakeup(this);
}

void stop() {
running = false;
thread.wakeup(this);
}

private:
// non copyable
cnetlink(const cnetlink &other) = delete;
Expand All @@ -93,6 +84,12 @@ class cnetlink final : public rofl::cthread_env {
NL_TIMER_RESYNC,
};

enum nl_state {
NL_STATE_RUNNING,
NL_STATE_INIT,
NL_STATE_STOPPED,
};

switch_interface *swi;

rofl::cthread thread;
Expand All @@ -105,8 +102,7 @@ class cnetlink final : public rofl::cthread_env {
std::mutex pc_mutex;

int nl_proc_max;
bool running;
bool rfd_scheduled;
enum nl_state state;
std::deque<nl_obj> nl_objs;

std::shared_ptr<tap_manager> tap_man;
Expand Down Expand Up @@ -136,7 +132,6 @@ class cnetlink final : public rofl::cthread_env {

std::mutex fdb_ev_mutex;
std::deque<fdb_ev> fdb_evts;
bool config_lo;

int handle_port_status_events();
int handle_source_mac_learn();
Expand All @@ -155,6 +150,7 @@ class cnetlink final : public rofl::cthread_env {
int load_from_file(const std::string &path);

void init_caches();
void init_subsystems() noexcept;

int set_nl_socket_buffer_sizes(nl_sock *sk);

Expand All @@ -175,8 +171,6 @@ class cnetlink final : public rofl::cthread_env {
void neigh_ll_created(rtnl_neigh *neigh) noexcept;
void neigh_ll_updated(rtnl_neigh *old_neigh, rtnl_neigh *new_neigh) noexcept;
void neigh_ll_deleted(rtnl_neigh *neigh) noexcept;

int config_lo_addr() noexcept;
};

} // end of namespace basebox
17 changes: 16 additions & 1 deletion src/netlink/nbi_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ nbi_impl::nbi_impl(std::shared_ptr<cnetlink> nl,
std::shared_ptr<tap_manager> tap_man)
: nl(nl), tap_man(tap_man) {
nl->set_tapmanager(tap_man);
nl->start();
}

nbi_impl::~nbi_impl() { nl->stop(); }
Expand All @@ -29,6 +28,22 @@ void nbi_impl::register_switch(switch_interface *swi) noexcept {
nl->register_switch(swi);
}

void nbi_impl::switch_state_notification(enum switch_state state) noexcept {
switch (state) {
case SWITCH_STATE_UP:
nl->start();
break;
case SWITCH_STATE_DOWN:
case SWITCH_STATE_FAILED:
case SWITCH_STATE_UNKNOWN:
nl->stop();
break;
default:
LOG(FATAL) << __FUNCTION__ << ": invalid state";
break;
}
}

void nbi_impl::port_notification(
std::deque<port_notification_data> &notifications) noexcept {

Expand Down
1 change: 1 addition & 0 deletions src/netlink/nbi_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class nbi_impl : public nbi, public switch_callback {

// nbi
void register_switch(switch_interface *) noexcept override;
void switch_state_notification(enum switch_state) noexcept override;
void resend_state() noexcept override;
void
port_notification(std::deque<port_notification_data> &) noexcept override;
Expand Down
28 changes: 28 additions & 0 deletions src/netlink/nl_l3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,34 @@ rofl::caddress_in6 libnl_in6addr_2_rofl(struct nl_addr *addr, int *rv) {
return rofl::caddress_in6(&sin, salen);
}

int nl_l3::init() noexcept {
std::list<struct rtnl_addr *> lo_addr;
std::unique_ptr<struct rtnl_addr, void (*)(rtnl_addr *)> addr_filter(
rtnl_addr_alloc(), &rtnl_addr_put);

rtnl_addr_set_ifindex(addr_filter.get(), 1);
rtnl_addr_set_family(addr_filter.get(), AF_INET);

nl_cache_foreach_filter(
nl->get_cache(cnetlink::NL_ADDR_CACHE), OBJ_CAST(addr_filter.get()),
[](struct nl_object *obj, void *arg) {
VLOG(3) << __FUNCTION__ << " : found configured loopback " << obj;

std::list<struct rtnl_addr *> *add_list =
static_cast<std::list<struct rtnl_addr *> *>(arg);

add_list->emplace_back(ADDR_CAST(obj));
},
&lo_addr);

for (auto addr : lo_addr) {
if (add_l3_addr(addr) < 0)
return -EINVAL;
}

return 0;
}

// XXX separate function to make it possible to add lo addresses more directly
int nl_l3::add_l3_addr(struct rtnl_addr *a) {
assert(sw);
Expand Down
2 changes: 2 additions & 0 deletions src/netlink/nl_l3.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class nl_l3 {
nl_l3(std::shared_ptr<nl_vlan> vlan, cnetlink *nl);
~nl_l3() {}

int init() noexcept;

int add_l3_addr(struct rtnl_addr *a);
int add_l3_addr_v6(struct rtnl_addr *a);
int del_l3_addr(struct rtnl_addr *a);
Expand Down
Loading

0 comments on commit 57e7e3a

Please sign in to comment.