Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: multi source port #497

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions firewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,21 @@ func (fp FirewallPacket) MarshalJSON() ([]byte, error) {
})
}

const (
prime1 uint16 = 173
prime2 uint16 = 3313
prime3 uint16 = 5387
)

func (fp FirewallPacket) HashPorts(numBuckets int) int {
if numBuckets <= 1 {
return 0
}

// super lame but fast mixing
return int((fp.LocalPort^prime1)+(fp.RemotePort^prime2)+prime3) % numBuckets
}

// NewFirewall creates a new Firewall object. A TimerWheel is created for you from the provided timeouts.
func NewFirewall(l *logrus.Logger, tcpTimeout, UDPTimeout, defaultTimeout time.Duration, c *cert.NebulaCertificate) *Firewall {
//TODO: error on 0 duration
Expand Down
2 changes: 2 additions & 0 deletions header.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ var typeMap = map[NebulaMessageType]string{
const (
testRequest NebulaMessageSubType = 0
testReply NebulaMessageSubType = 1

nonCanonicalSource NebulaMessageSubType = 1
)

var eHeaderTooShort = errors.New("header is too short")
Expand Down
15 changes: 10 additions & 5 deletions inside.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *FirewallPacket,

dropReason := f.firewall.Drop(packet, *fwPacket, false, hostinfo, f.caPool, localCache)
if dropReason == nil {
f.sendNoMetrics(message, 0, ci, hostinfo, hostinfo.remote, packet, nb, out, q)
s := fwPacket.HashPorts(f.sendPorts)
var subType NebulaMessageSubType
if s != 0 {
subType = nonCanonicalSource
}
f.sendNoMetrics(message, subType, ci, hostinfo, hostinfo.remote, packet, nb, out, q, s)

} else if f.l.Level >= logrus.DebugLevel {
hostinfo.logger(f.l).
Expand Down Expand Up @@ -145,7 +150,7 @@ func (f *Interface) sendMessageNow(t NebulaMessageType, st NebulaMessageSubType,
return
}

f.sendNoMetrics(message, st, hostInfo.ConnectionState, hostInfo, hostInfo.remote, p, nb, out, 0)
f.sendNoMetrics(message, st, hostInfo.ConnectionState, hostInfo, hostInfo.remote, p, nb, out, 0, 0)
}

// SendMessageToVpnIp handles real ip:port lookup and sends to the current best known address for vpnIp
Expand Down Expand Up @@ -181,10 +186,10 @@ func (f *Interface) sendMessageToVpnIp(t NebulaMessageType, st NebulaMessageSubT

func (f *Interface) send(t NebulaMessageType, st NebulaMessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote *udpAddr, p, nb, out []byte) {
f.messageMetrics.Tx(t, st, 1)
f.sendNoMetrics(t, st, ci, hostinfo, remote, p, nb, out, 0)
f.sendNoMetrics(t, st, ci, hostinfo, remote, p, nb, out, 0, 0)
}

func (f *Interface) sendNoMetrics(t NebulaMessageType, st NebulaMessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote *udpAddr, p, nb, out []byte, q int) {
func (f *Interface) sendNoMetrics(t NebulaMessageType, st NebulaMessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote *udpAddr, p, nb, out []byte, q, s int) {
if ci.eKey == nil {
//TODO: log warning
return
Expand Down Expand Up @@ -222,7 +227,7 @@ func (f *Interface) sendNoMetrics(t NebulaMessageType, st NebulaMessageSubType,
return
}

err = f.writers[q].WriteTo(out, remote)
err = f.writers[q][s].WriteTo(out, remote)
if err != nil {
hostinfo.logger(f.l).WithError(err).
WithField("udpAddr", remote).Error("Failed to write outgoing packet")
Expand Down
17 changes: 11 additions & 6 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ type Interface struct {

conntrackCacheTimeout time.Duration

writers []*udpConn
// writers is a 2 level table, indexed first by the routine and then by the sending port
writers [][]*udpConn
sendPorts int

readers []io.ReadWriteCloser

metricHandshakes metrics.Histogram
Expand Down Expand Up @@ -115,7 +118,7 @@ func NewInterface(c *InterfaceConfig) (*Interface, error) {
udpBatchSize: c.UDPBatchSize,
routines: c.routines,
version: c.version,
writers: make([]*udpConn, c.routines),
writers: make([][]*udpConn, c.routines),
readers: make([]io.ReadWriteCloser, c.routines),
caPool: c.caPool,
myVpnIp: ip2int(c.certState.certificate.Details.Ips[0].IP),
Expand Down Expand Up @@ -189,7 +192,7 @@ func (f *Interface) listenOut(i int) {
var li *udpConn
// TODO clean this up with a coherent interface for each outside connection
if i > 0 {
li = f.writers[i]
li = f.writers[i][0]
} else {
li = f.outside
}
Expand Down Expand Up @@ -222,8 +225,10 @@ func (f *Interface) RegisterConfigChangeCallbacks(c *Config) {
c.RegisterReloadCallback(f.reloadCA)
c.RegisterReloadCallback(f.reloadCertKey)
c.RegisterReloadCallback(f.reloadFirewall)
for _, udpConn := range f.writers {
c.RegisterReloadCallback(udpConn.reloadConfig)
for _, udpConns := range f.writers {
for _, udpConn := range udpConns {
c.RegisterReloadCallback(udpConn.reloadConfig)
}
}
}

Expand Down Expand Up @@ -302,7 +307,7 @@ func (f *Interface) reloadFirewall(c *Config) {
func (f *Interface) emitStats(i time.Duration) {
ticker := time.NewTicker(i)

udpStats := NewUDPStatsEmitter(f.writers)
udpStats := NewUDPStatsEmitter(f.writers[0])

for range ticker.C {
f.firewall.EmitStats()
Expand Down
27 changes: 25 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func Main(config *Config, configTest bool, buildVersion string, logger *logrus.L
}
}

// set up our UDP listener
// set up our UDP listeners
udpConns := make([]*udpConn, routines)
port := config.GetInt("listen.port", 0)

Expand All @@ -183,6 +183,29 @@ func Main(config *Config, configTest bool, buildVersion string, logger *logrus.L
}
}

// set up conns used for sending, including optionally binding to extra sender ports
// the listener socket is always the 0th element used to send.
sendPorts := config.GetInt("send.ports", 1)
sendConns := make([][]*udpConn, routines)
for i := 0; i < routines; i++ {
sendConns[i] = make([]*udpConn, sendPorts)
sendConns[i][0] = udpConns[i]
}

if sendPorts > 1 {
for i := 0; i < routines; i++ {
for offset := 1; offset < sendPorts; offset++ {
udpConn, err := NewListener(l, config.GetString("listen.host", "0.0.0.0"), port+offset, routines > 1)
if err != nil {
return nil, NewContextualError("Failed to open udp listener", m{"queue": i}, err)
}
udpConn.reloadConfig(config)

sendConns[i][offset] = udpConn
}
}
}

// Set up my internal host map
var preferredRanges []*net.IPNet
rawPreferredRanges := config.GetStringSlice("preferred_ranges", []string{})
Expand Down Expand Up @@ -394,7 +417,7 @@ func Main(config *Config, configTest bool, buildVersion string, logger *logrus.L

// TODO: Better way to attach these, probably want a new interface in InterfaceConfig
// I don't want to make this initial commit too far-reaching though
ifce.writers = udpConns
ifce.writers = sendConns

ifce.RegisterConfigChangeCallbacks(config)

Expand Down
4 changes: 3 additions & 1 deletion outside.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte,
return
}

f.handleHostRoaming(hostinfo, addr)
if !(header.Type == message && header.Subtype == nonCanonicalSource) {
f.handleHostRoaming(hostinfo, addr)
}

f.connectionManager.In(hostinfo.hostId)
}
Expand Down