Skip to content

Commit

Permalink
Merge PR #1487 (Release 2022-10: Alpina) into master
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneia committed Oct 12, 2022
2 parents 11154da + 68c338e commit 94e4f19
Show file tree
Hide file tree
Showing 77 changed files with 805,661 additions and 1,745 deletions.
2 changes: 1 addition & 1 deletion src/apps/basic/basic_apps.lua
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ end
function Truncate:push ()
for _ = 1, link.nreadable(self.input.input) do
local p = receive(self.input.input)
ffi.fill(p.data, math.min(0, self.n - p.length))
ffi.fill(p.data, math.max(0, self.n - p.length))
p.length = self.n
transmit(self.output.output,p)
end
Expand Down
2 changes: 1 addition & 1 deletion src/apps/intel_mp/loadgen.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ local register = require("lib.hardware.register")
local receive, empty = link.receive, link.empty

local can_transmit, transmit
local num_descriptors = 1024
local num_descriptors = 4096

LoadGen = {}

Expand Down
4 changes: 2 additions & 2 deletions src/apps/interlink/receiver.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ end
-- process termination.
function Receiver.shutdown (pid)
for _, queue in ipairs(shm.children("/"..pid.."/interlink/receiver")) do
local backlink = "/"..pid.."/interlink/receiver/"..queue..".interlink"
local shm_name = "/"..pid.."/group/interlink/"..queue..".interlink"
local backlink = "/"..pid.."/interlink/receiver/"..queue
local shm_name = "/"..pid.."/group/interlink/"..queue
-- Call protected in case /<pid>/group is already unlinked.
local ok, r = pcall(interlink.open, shm_name)
if ok then interlink.detach_receiver(r, shm_name) end
Expand Down
4 changes: 2 additions & 2 deletions src/apps/interlink/transmitter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ end
-- process termination.
function Transmitter.shutdown (pid)
for _, queue in ipairs(shm.children("/"..pid.."/interlink/transmitter")) do
local backlink = "/"..pid.."/interlink/transmitter/"..queue..".interlink"
local shm_name = "/"..pid.."/group/interlink/"..queue..".interlink"
local backlink = "/"..pid.."/interlink/transmitter/"..queue
local shm_name = "/"..pid.."/group/interlink/"..queue
-- Call protected in case /<pid>/group is already unlinked.
local ok, r = pcall(interlink.open, shm_name)
if ok then interlink.detach_transmitter(r, shm_name) end
Expand Down
4 changes: 2 additions & 2 deletions src/apps/ipfix/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ packets.

— Key **templates**

*Optional*. The templates for flows being collected. See the source
code for more information.
*Optional*. The templates for flows being collected.
See `apps/ipfix/README.templates.md` for more information.

### To-do list

Expand Down
20 changes: 20 additions & 0 deletions src/apps/ipfix/README.templates.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<!--- DO NOT EDIT, this file is generated via
sudo ./snabb snsh -e 'require("apps.ipfix.template").templatesMarkdown()' > apps/ipfix/README.templates.md
--->

## IPFIX templates (apps.ipfix.template)

- `value=n` means the value stores up to *n* bytes.
- `tcpControlBitsReduced` are the `tcpControlBits` defined in [RFC 5102](https://datatracker.ietf.org/doc/html/rfc5102#section-5.8.7)
- The `dns*` keys are private enterprise extensions courtesy by SWITCH

| Name | Id | Type | Filter | Keys | Values | Required Maps
| --- | --- | --- | --- | --- | --- | ---
| v4 | 256 | v4 | ip | `sourceIPv4Address`, `destinationIPv4Address`, `protocolIdentifier`, `sourceTransportPort`, `destinationTransportPort` | `flowStartMilliseconds`, `flowEndMilliseconds`, `packetDeltaCount`, `octetDeltaCount`, `tcpControlBitsReduced` |
| v4_DNS | 258 | v4 | ip and udp port 53 | `sourceIPv4Address`, `destinationIPv4Address`, `protocolIdentifier`, `sourceTransportPort`, `destinationTransportPort`, `dnsFlagsCodes`, `dnsQuestionCount`, `dnsAnswerCount`, `dnsQuestionName=64`, `dnsQuestionType`, `dnsQuestionClass`, `dnsAnswerName=64`, `dnsAnswerType`, `dnsAnswerClass`, `dnsAnswerTtl`, `dnsAnswerRdata=64`, `dnsAnswerRdataLen` | `flowStartMilliseconds`, `flowEndMilliseconds`, `packetDeltaCount`, `octetDeltaCount` |
| v4_HTTP | 257 | v4 | ip and tcp dst port 80 | `sourceIPv4Address`, `destinationIPv4Address`, `protocolIdentifier`, `sourceTransportPort`, `destinationTransportPort` | `flowStartMilliseconds`, `flowEndMilliseconds`, `packetDeltaCount`, `octetDeltaCount`, `tcpControlBitsReduced`, `httpRequestMethod=8`, `httpRequestHost=32`, `httpRequestTarget=64` |
| v4_extended | 1256 | v4 | ip | `sourceIPv4Address`, `destinationIPv4Address`, `protocolIdentifier`, `sourceTransportPort`, `destinationTransportPort` | `flowStartMilliseconds`, `flowEndMilliseconds`, `packetDeltaCount`, `octetDeltaCount`, `sourceMacAddress`, `postDestinationMacAddress`, `vlanId`, `ipClassOfService`, `bgpSourceAsNumber`, `bgpDestinationAsNumber`, `bgpPrevAdjacentAsNumber`, `bgpNextAdjacentAsNumber`, `tcpControlBitsReduced`, `icmpTypeCodeIPv4`, `ingressInterface`, `egressInterface` | `mac_to_as`, `vlan_to_ifindex`, `pfx4_to_as`
| v6 | 512 | v6 | ip6 | `sourceIPv6Address`, `destinationIPv6Address`, `protocolIdentifier`, `sourceTransportPort`, `destinationTransportPort` | `flowStartMilliseconds`, `flowEndMilliseconds`, `packetDeltaCount`, `octetDeltaCount`, `tcpControlBitsReduced` |
| v6_DNS | 514 | v6 | ip6 and udp port 53 | `sourceIPv6Address`, `destinationIPv6Address`, `protocolIdentifier`, `sourceTransportPort`, `destinationTransportPort`, `dnsFlagsCodes`, `dnsQuestionCount`, `dnsAnswerCount`, `dnsQuestionName=64`, `dnsQuestionType`, `dnsQuestionClass`, `dnsAnswerName=64`, `dnsAnswerType`, `dnsAnswerClass`, `dnsAnswerTtl`, `dnsAnswerRdata=64`, `dnsAnswerRdataLen` | `flowStartMilliseconds`, `flowEndMilliseconds`, `packetDeltaCount`, `octetDeltaCount` |
| v6_HTTP | 513 | v6 | ip6 and tcp dst port 80 | `sourceIPv6Address`, `destinationIPv6Address`, `protocolIdentifier`, `sourceTransportPort`, `destinationTransportPort` | `flowStartMilliseconds`, `flowEndMilliseconds`, `packetDeltaCount`, `octetDeltaCount`, `tcpControlBitsReduced`, `httpRequestMethod=8`, `httpRequestHost=32`, `httpRequestTarget=64` |
| v6_extended | 1512 | v6 | ip6 | `sourceIPv6Address`, `destinationIPv6Address`, `protocolIdentifier`, `sourceTransportPort`, `destinationTransportPort` | `flowStartMilliseconds`, `flowEndMilliseconds`, `packetDeltaCount`, `octetDeltaCount`, `sourceMacAddress`, `postDestinationMacAddress`, `vlanId`, `ipClassOfService`, `bgpSourceAsNumber`, `bgpDestinationAsNumber`, `bgpNextAdjacentAsNumber`, `bgpPrevAdjacentAsNumber`, `tcpControlBitsReduced`, `icmpTypeCodeIPv6`, `ingressInterface`, `egressInterface` | `mac_to_as`, `vlan_to_ifindex`, `pfx6_to_as`
210 changes: 119 additions & 91 deletions src/apps/ipfix/ipfix.lua
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ function FlowSet:new (spec, args)
end
add_table_counters('table', o.table)
add_table_counters('rate_table', o.sp.table)
assert(not shm.exists(shm_name.."/packets_in"))
o.shm = shm.create_frame(shm_name, frame_init)

-- Template-specific counters
Expand Down Expand Up @@ -563,38 +564,40 @@ function FlowSet:sync_stats()
end
end

IPFIX = {}
local ipfix_config_params = {
idle_timeout = { default = 300 },
active_timeout = { default = 120 },
flush_timeout = { default = 10 },
cache_size = { default = 20000 },
max_load_factor = { default = 0.4 },
scan_protection = { default = {} },
scan_time = { default = 10 },
-- RFC 5153 §6.2 recommends a 10-minute template refresh
-- configurable from 1 minute to 1 day.
template_refresh_interval = { default = 600 },
-- Valid values: 9 or 10.
ipfix_version = { default = 10 },
-- RFC 7011 §10.3.3 specifies that if the PMTU is unknown, a
-- maximum of 512 octets should be used for UDP transmission.
mtu = { default = 512 },
observation_domain = { default = 256 },
exporter_ip = { required = true },
exporter_eth_src = { default = '00:00:00:00:00:00' },
exporter_eth_dst = { default = '00:00:00:00:00:00' },
collector_ip = { required = true },
collector_port = { required = true },
templates = { default = { "v4", "v6" } },
maps = { default = {} },
maps_log_fh = { default = nil },
-- Used to distinguish instances of the app running in the same
-- process
instance = { default = 1 },
add_packet_metadata = { default = true },
log_date = { default = true }
IPFIX = {
config = {
idle_timeout = { default = 300 },
active_timeout = { default = 120 },
flush_timeout = { default = 10 },
cache_size = { default = 20000 },
max_load_factor = { default = 0.4 },
scan_protection = { default = {} },
scan_time = { default = 10 },
-- RFC 5153 §6.2 recommends a 10-minute template refresh
-- configurable from 1 minute to 1 day.
template_refresh_interval = { default = 600 },
-- Valid values: 9 or 10.
ipfix_version = { default = 10 },
-- RFC 7011 §10.3.3 specifies that if the PMTU is unknown, a
-- maximum of 512 octets should be used for UDP transmission.
mtu = { default = 512 },
observation_domain = { default = 256 },
exporter_ip = { required = true },
exporter_eth_src = { default = '00:00:00:00:00:00' },
exporter_eth_dst = { default = '00:00:00:00:00:00' },
collector_ip = { required = true },
collector_port = { required = true },
templates = { default = { "v4", "v6" } },
maps = { default = {} },
maps_logfile = { default = nil },
-- Used to distinguish instances of the app running in the same
-- process
instance = { default = 1 },
add_packet_metadata = { default = true },
log_date = { default = true }
}
}
local ipfix_config_params = IPFIX.config

local scan_protection_params = {
enable = { default = false },
Expand Down Expand Up @@ -641,78 +644,100 @@ local function setup_transport_header(self, config)
end

function IPFIX:new(config)
config = lib.parse(config, ipfix_config_params)
local o = { boot_time = engine.now(),
template_refresh_interval = config.template_refresh_interval,
next_template_refresh = -1,
version = config.ipfix_version,
observation_domain = config.observation_domain,
instance = config.instance,
add_packet_metadata = config.add_packet_metadata,
logger = logger.new({ date = config.log_date,
module = ("[%5d]"):format(S.getpid())
.." IPFIX exporter"} ) }
o.shm = {
-- Total number of packets received
received_packets = { counter },
-- Packets not matched by any flow set
ignored_packets = { counter },
-- Number of template packets sent
template_packets = { counter },
-- Non-wrapping sequence number (see add_ipfix_header() for a
-- brief description of the semantics for IPFIX and Netflowv9)
sequence_number = { counter, 1 },
version = { counter, o.version },
observation_domain = { counter, o.observation_domain },
stats_timer = lib.throttle(5),
templates = {},
flow_sets = {},
shm = {
-- Total number of packets received
received_packets = { counter },
-- Packets not matched by any flow set
ignored_packets = { counter },
-- Number of template packets sent
template_packets = { counter },
-- Non-wrapping sequence number (see add_ipfix_header() for a
-- brief description of the semantics for IPFIX and Netflowv9)
sequence_number = { counter, 1 },
version = { counter, config.ipfix_version },
observation_domain = { counter, config.observation_domain },
}
}
o = setmetatable(o, { __index = IPFIX })
o:reconfig(config)
return o
end

function IPFIX:reconfig(config)
self.template_refresh_interval = config.template_refresh_interval
self.version = config.ipfix_version
self.observation_domain = config.observation_domain
self.instance = config.instance
self.add_packet_metadata = config.add_packet_metadata
self.logger = logger.new({ date = config.log_date,
module = ("[%5d]"):format(S.getpid())
.." IPFIX exporter"})

if o.version == 9 then
o.header_t = netflow_v9_packet_header_t
elseif o.version == 10 then
o.header_t = ipfix_packet_header_t
if self.shm.path then -- shm frame initialized?
counter.set(self.shm.version, self.version)
counter.set(self.shm.observation_domain, self.observation_domain)
end

if self.version == 9 then
self.header_t = netflow_v9_packet_header_t
elseif self.version == 10 then
self.header_t = ipfix_packet_header_t
else
error('unsupported ipfix version: '..o.version)
error('unsupported ipfix version: '..self.version)
end
o.header_ptr_t = ptr_to(o.header_t)
o.header_size = ffi.sizeof(o.header_t)
self.header_ptr_t = ptr_to(self.header_t)
self.header_size = ffi.sizeof(self.header_t)

setup_transport_header(o, config)
setup_transport_header(self, config)

-- FIXME: Assuming we export to IPv4 address.
local l3_header_len = 20
local l4_header_len = 8
local ipfix_header_len = o.header_size
local ipfix_header_len = self.header_size
local total_header_len = l4_header_len + l3_header_len + ipfix_header_len
local flow_set_args = { mtu = config.mtu - total_header_len,
version = config.ipfix_version,
cache_size = config.cache_size,
max_load_factor = config.max_load_factor,
max_load_factor = config.max_load_factor,
scan_protection = lib.parse(config.scan_protection,
scan_protection_params),
idle_timeout = config.idle_timeout,
active_timeout = config.active_timeout,
scan_time = config.scan_time,
flush_timeout = config.flush_timeout,
parent = o,
parent = self,
maps = config.maps,
maps_log_fh = config.maps_log_fh,
maps_log_fh = config.maps_logfile and
assert(io.open(config.maps_logfile, "a")) or nil,
instance = config.instance,
log_date = config.log_date }

o.flow_sets = {}
for _, template in ipairs(config.templates) do
table.insert(o.flow_sets, FlowSet:new(template, flow_set_args))
o.logger:log("Added template "..o.flow_sets[#o.flow_sets]:id())
end

o.stats_timer = lib.throttle(5)
return setmetatable(o, { __index = self })
end
local flow_set_args_changed = not lib.equal(self.flow_set_args, flow_set_args)
self.flow_set_args = flow_set_args

function IPFIX:reconfig(config)
-- Only support reconfiguration of the transport header for now
config = lib.parse(config, ipfix_config_params)
setup_transport_header(self, config)
for i, template in ipairs(self.templates) do
if template ~= config.templates[i] or flow_set_args_changed then
self.flow_sets[i] = nil
end
end
for i, template in ipairs(config.templates) do
if not self.flow_sets[i] then
self.flow_sets[i] = FlowSet:new(template, flow_set_args)
if self.templates[i] then
self.logger:log("Updated template "..self.flow_sets[i]:id())
else
self.logger:log("Added template "..self.flow_sets[i]:id())
end
else
self.logger:log("Kept template "..self.flow_sets[i]:id())
end
end
self.templates = config.templates
end

function IPFIX:send_template_records(out)
Expand Down Expand Up @@ -840,20 +865,23 @@ function selftest()
local consts = require("apps.lwaftr.constants")
local ethertype_ipv4 = consts.ethertype_ipv4
local ethertype_ipv6 = consts.ethertype_ipv6
local ipfix = IPFIX:new({ exporter_ip = "192.168.1.2",
collector_ip = "192.168.1.1",
collector_port = 4739,
flush_timeout = 0,
scan_time = 1,
templates = {
'v4_extended', 'v6_extended'
},
maps = {
mac_to_as = "apps/ipfix/test/mac_to_as",
vlan_to_ifindex = "apps/ipfix/test/vlan_to_ifindex",
pfx4_to_as = "apps/ipfix/test/pfx4_to_as.csv",
pfx6_to_as = "apps/ipfix/test/pfx6_to_as.csv"
}})
local conf = {
exporter_ip = "192.168.1.2",
collector_ip = "192.168.1.1",
collector_port = 4739,
flush_timeout = 0,
scan_time = 1,
templates = {
'v4_extended', 'v6_extended'
},
maps = {
mac_to_as = "apps/ipfix/test/mac_to_as",
vlan_to_ifindex = "apps/ipfix/test/vlan_to_ifindex",
pfx4_to_as = "apps/ipfix/test/pfx4_to_as.csv",
pfx6_to_as = "apps/ipfix/test/pfx6_to_as.csv"
}
}
local ipfix = IPFIX:new(lib.parse(conf, IPFIX.config))
ipfix.shm = shm.create_frame("apps/ipfix", ipfix.shm)

-- Mock input and output.
Expand Down
45 changes: 45 additions & 0 deletions src/apps/ipfix/template.lua
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,51 @@ templates = {
},
}

local templates_legend = [[
- `value=n` means the value stores up to *n* bytes.
- `tcpControlBitsReduced` are the `tcpControlBits` defined in [RFC 5102](https://datatracker.ietf.org/doc/html/rfc5102#section-5.8.7)
- The `dns*` keys are private enterprise extensions courtesy by SWITCH
]]

-- sudo ./snabb snsh -e 'require("apps.ipfix.template").templatesMarkdown()' > apps/ipfix/README.templates.md
function templatesMarkdown (out)
out = out or io.stdout
local names = {}
for name in pairs(templates) do
table.insert(names, name)
end
table.sort(names)
out:write([[<!--- DO NOT EDIT, this file is generated via
sudo ./snabb snsh -e 'require("apps.ipfix.template").templatesMarkdown()' > apps/ipfix/README.templates.md
--->
]])
out:write("## IPFIX templates (apps.ipfix.template)\n\n")
out:write(templates_legend.."\n")
out:write("| Name | Id | Type | Filter | Keys | Values | Required Maps\n")
out:write("| --- | --- | --- | --- | --- | --- | ---\n")
local function listCell (xs)
out:write("| ")
if not xs then return end
for i, x in ipairs(xs) do
if i < #xs then
out:write(("`%s`, "):format(x))
else
out:write(("`%s` "):format(x))
end
end
end
for _, name in ipairs(names) do
local t = templates[name]
out:write(("| %s | %d | %s | %s "):format(
name, t.id, t.aggregation_type, t.filter))
listCell(t.keys)
listCell(t.values)
listCell(t.require_maps)
out:write("\n")
end
end

function selftest()
print('selftest: apps.ipfix.template')
local datagram = require("lib.protocol.datagram")
Expand Down
Loading

0 comments on commit 94e4f19

Please sign in to comment.