Skip to content

Commit

Permalink
Merge pull request #244 from sematext/sc-8931-k8s-containerd-hotfix
Browse files Browse the repository at this point in the history
  • Loading branch information
Adnan Rahić authored Jul 23, 2020
2 parents 832da5c + 2c79a38 commit f2d9921
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 21 deletions.
6 changes: 6 additions & 0 deletions config/examples/kubernetes-containerd-log-routing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@ inputFilter:
# include: !!js/regexp /failed|error|exception/i # include errors
# exclude: !!js/regexp /super noisy error messages/i # exclude noise

outputFilter:
kubernetesEnrichment:
module: kuberntes-enrichment

output:
# stdout: ldjson
elasticsearch:
module: elasticsearch
url: https://logsene-receiver.sematext.com
# index: de1135be-xxxx-xxxx-xxxx-365c63d5aff2
indices:
c332463a-xxxx-xxxx-xxxx-535d18521418:
- app.*\.log
Expand Down
45 changes: 24 additions & 21 deletions lib/plugins/input-filter/kubernetesContainerd.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const containerdSplitRegexp = /^(.+[stdout|stderr] [F|P]) /
// const containerdSplitRegexp = /^(.+[stdout|stderr] [F|P]) / // old
const containerdSplitRegexp = /^(.+)\s(stdout|stderr)\s(F|P)\s(.*)/ // new

// Dictionary to store sources and log lines
/**
Expand Down Expand Up @@ -64,22 +65,26 @@ function parseK8sFileName (sourceName) {
module.exports = function (context, config, data, callback) {
try {
const sections = data.split(containerdSplitRegexp)
if (sections && sections.length === 3) {

if (sections && sections.length === 6) {
const k8sInfo = parseK8sFileName(context.sourceName)
const meta = sections[1].split(' ')
const logLine = sections[2]
const timestamp = sections[1]
const streamName = sections[2]
const streamFlag = sections[3]
const logLine = sections[4]

if (meta.length === 3 && meta[0]) {
k8sInfo['@timestamp'] = new Date(meta[0])
k8sInfo.streamName = meta[1]
k8sInfo.streamFlag = meta[2]
if (timestamp && streamName && streamFlag) {
k8sInfo['@timestamp'] = new Date(timestamp)
k8sInfo.streamName = streamName
k8sInfo.streamFlag = streamFlag

const sourceName = context.sourceName
if (sources[sourceName] === undefined) {
sources[sourceName] = {}
}
sources[sourceName].streamFlag = k8sInfo.streamFlag

// if it is a partial return and wait for the next chunk
if (sources[sourceName].streamFlag === 'P') {
if (sources[sourceName].logLines === undefined) {
sources[sourceName].logLines = []
Expand All @@ -91,6 +96,7 @@ module.exports = function (context, config, data, callback) {
return callback(null, null)
}

// if it is the final chunk of the partial log join it and ship it
if (
sources[sourceName].streamFlag === 'F' &&
sources[sourceName].previousStreamFlag === 'P'
Expand All @@ -104,6 +110,16 @@ module.exports = function (context, config, data, callback) {
context.enrichEvent = k8sInfo
return callback(null, joinedLogLine)
}

// if it is a full log line, enrich it, and send as is
if (
sources[sourceName].streamFlag === 'F'
) {
// a special property in context object to propagate fields to
// the parsed object after parsing -> all logs will be enriched k8s metadata
context.enrichEvent = k8sInfo
return callback(null, logLine)
}
}

return callback(null, logLine)
Expand All @@ -114,16 +130,3 @@ module.exports = function (context, config, data, callback) {
return callback(null, data)
}
}

// test function
// if (require.main === module) {
// module.exports(
// {
// sourceName:
// '/var/log/containers/busybox2_default_busybox-5f03725b871fe3f2cbfdde7864100a12aed2708d759bea14f8d41656accba8f6.log'
// },
// {},
// '2019-03-28T23:13:41.945317977Z stdout F Mar 28 23:13:41 kube-mil01-pa1934121294964badacb5ddd3753d504c-w1 local1.notice haproxy[8]: Proxy masteretcdfrontend started',
// console.log
// )
// }

0 comments on commit f2d9921

Please sign in to comment.