Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exclude files with matching sigs #6

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
217 changes: 217 additions & 0 deletions aggregate-access-log
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
#!/usr/bin/env awk -f
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this file part of this PR?


function top_array(result, top, acc) {
delete temp
c = 0
for(i in acc) {
temp[c] = sprintf("%05d %s", acc[i], i)
c += 1
}

n = asort(temp)
startfrom = n < top ? 0 : n-top


ai=0
for(i = startfrom+1;i<=n;i++) {
line = temp[i]
count = substr(line, 1, 6)+0
value = substr(line, 7, length(line)-1)
result[ai][1] = count
result[ai][2] = value
ai += 1
}
}


function print_ua(ts, acc) {
top_array(tops, 15, acc)

for (i in tops) {
ua = tops[i][2]
gsub("\"", "'", ua)
printf "%s ns=fe.access.ua count=%d ua=\"%s\"\n", ts, tops[i][1], ua

}

delete tops
}

function print_errors(ts, acc) {

top_array(tops, 15, acc)

for (i in tops) {
value = tops[i][2]
split(value, values, " ")
code = values[1]+0
uri = values[2]
printf "%s ns=fe.access.errors count=%d error=%d uri=\"%s\"\n", ts, tops[i][1], code, uri

}

delete tops
}

function print_reqs(ts, acc) {

top_array(tops, 15, acc["uri_time"])

for (i in tops) {
value = tops[i][2]
# total = acc["uri_time"][value]
# count = tops[i][1]

total = tops[i][1]
count = acc["reqs"][value]

split(value, values, " ")
code = values[1]+0
uri = values[2]
printf "%s ns=fe.access.slow count=%d total=%d code=%d uri=\"%s\"\n", ts, count, total, code, uri

}

delete tops


top_array(tops, 15, acc["reqs"])

for (i in tops) {
value = tops[i][2]
total = acc["uri_time"][value]
count = tops[i][1]

split(value, values, " ")
code = values[1]+0
uri = values[2]
printf "%s ns=fe.access.count count=%d total=%d code=%d uri=\"%s\"\n", ts, count, total, code, uri

}

delete tops
}


function print_groups(ts, acc) {
for (i in acc["times"]) {
printf "%s ns=fe.access.group group_name=\"%s\" count=%d avg=%.1f max=%d min=%d\n", ts, i, acc["count"][i], acc["times"][i]/acc["count"][i], acc["max"][i], acc["min"][i]
}
}

function print_codes(ts, acc) {
for (i in acc["code"]) {
printf "%s ns=fe.access.bots response_code=%s count=%d bots=%d\n", ts, i, acc["code"][i], acc["bots"][i]
}
}

function print_acc(ts, acc) {

# in case we lose the next few bytes if network connection is lost, we just lose empty lines
for (i = 1; i <= 10; i++) {
printf "%s\n", ts
}

if (length(acc["ua"]) > 0) { print_ua(ts, acc["ua"]) }
print_groups(ts, acc)
print_codes(ts, acc)
if (length(acc["errors"]) > 0) { print_errors(ts, acc["errors"]) }
if (length(acc["reqs"]) > 0) { print_reqs(ts, acc) }


print ts > "/dev/stderr"
fflush(stdout)
}

{

ts = sprintf("%s%s", substr($4, 2, 19), substr($4, 25, 5))
current_minute = substr(ts, 1, 16)

if (current_minute != last_minute) {

if (length(first_timestamp)>0) {

print_acc(first_timestamp, acc)
}

first_timestamp = ts
last_minute = current_minute;
delete acc
}

url = $6

if (gsub("^/textbased/.*", "textbased", url) ||
gsub(".*article-[0-9]*/amp/.*", "amp/articles", url) ||
gsub(".*article-[0-9]*/.*mobile=true.*", "mobile/articles", url) ||
gsub(".*article-[0-9]*/.*", "articles", url) ||
gsub(".*video-[0-9]*/.*mobile=true.*", "mobile/video", url) ||
gsub(".*video-[0-9]*/.*", "video", url) ||
gsub(".*columnist-[0-9]*/.*mobile=true.*", "mobile/columnist", url) ||
gsub(".*columnist-[0-9]*/.*", "columnist", url) ||
gsub(".*/(home|homeus|homeau)/index.html.*mobile=true.*", "mobile/home", url) ||
gsub(".*/(home|homeus|homeau)/index.html.*", "home", url) ||
gsub(".*index.html.*mobile=true.*", "mobile/channels", url) ||
gsub(".*index.html.*", "channels", url) ||
gsub(".*rss.*", "rss", url) ||
gsub(".*registration.*", "registration", url) ||
gsub(".*meta.*", "meta", url) ||
gsub(".*/geolocation/.*", "esi calls", url) ||
gsub(".*/mobile/.*", "mobile feed", url) ||
gsub(".*/api/.*", "api", url) ||
gsub(".*/home/search.html.*", "search", url) ||
gsub(".*/home/sitemap.*.html.*", "sitemap/html", url) ||
gsub(".*sitemap.*.xml.*", "sitemap/xml", url) ||
gsub(".*embed/video/.*", "embedded video", url) ||
gsub(".*videoarchive.*", "video archive", url) ||
gsub(".*c.*/item.cms.*", "cms items", url) ||
gsub(".*/.*.html.*", "topic pages", url) ||
gsub("^/$", "home", url) ||
gsub("^/.*$", "others", url))
{}

acc["code"][$8] +=1


response_time = $10+0
acc["count"][url] += 1
acc["size"][url] += $9
acc["times"][url] += response_time
if (length(acc["min"][url]) == 0 || acc["min"][url] > response_time) {
acc["min"][url] = response_time
}
if (acc["max"][url] < response_time) {
acc["max"][url] = response_time
}

a=""
for (i=12;i<=NF;i++) {
a=a " " $i
}
ua = substr(a,3,length(a)-3)
acc["ua"][ua] += 1
IGNORECASE = 1
if (match(ua, /bot|google|crawler|spider|robot|crawling|wget|http|slurp|analyzer|sitecon|@/) || ua == "-") {
acc["bots"][$8] += 1
}

code = $8+0
uri = $6
gsub("\\?.*", "", uri)
code_uri = sprintf("%03d %s", code, uri)
acc["reqs"][code_uri] += 1
acc["uri_time"][code_uri] += response_time

if (code >= 400) {
acc["errors"][code_uri] += 1
}




}
END {
print_acc(first_timestamp, acc)

}
78 changes: 63 additions & 15 deletions sender
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import threading
import Queue
import signal
import traceback
import json

DEFAULT_SIGNAGURE_LENGTH=256

filter_stopped = Queue.Queue()
stop_signal = Queue.Queue()
running_lock = threading.Lock()
debug_on = False

def should_stop():
return not stop_signal.empty()
Expand All @@ -36,6 +38,7 @@ def main():
"IMPORTANT: only files with size >= signature-length (default %d) bytes will be processed. "
"Zip files will be open recursively, and only once." % DEFAULT_SIGNAGURE_LENGTH)

parser.add_option('-D', '--debug', action="store_true", dest="debug", help="Enable debug output", default=False)
parser.add_option('-f', '--follow', action="store_true", dest="follow", help="Pools the file very second for changes in an infinite loop.", default=False)
parser.add_option('-p', '--offsets', action="store", dest="offsets", help="File to persist the last offsets read for each file. If it doesn't exist, the files are read from beginning.", default='/dev/null')
parser.add_option('-t', '--tcp', action="store", dest="host", help="Sends the output to the host:port via TCP.", metavar="HOST:PORT")
Expand All @@ -49,6 +52,11 @@ def main():

options, args = parser.parse_args()

if options.debug:
log("DEBUG ON")
global debug_on
debug_on = True

if options.dump_pid:
f = open(options.dump_pid,'w')
f.write("%d" % os.getpid())
Expand Down Expand Up @@ -153,6 +161,8 @@ def log(msg):
sys.stderr.flush()

def debug(msg):
if debug_on:
log("DEBUG: "+ msg);
# sys.stderr.write(str(msg))
# sys.stderr.write("\n")
# sys.stderr.flush()
Expand Down Expand Up @@ -264,6 +274,8 @@ class Tail(object):
self.starting = options.starting
self.start_from_tail = options.start_from_tail
self.readOffsets()
self.duplicates = {}


def persistOffsets(self):
if self.offsetsfn == '/dev/null':
Expand Down Expand Up @@ -319,6 +331,7 @@ class Tail(object):
if should_stop():
return
for line in f:
debug("line %s" % line.replace("\n",""))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calls like this in production will first resolve the string template, only to find out the debug is OFF... so the ideal, for performance in production, is to:
if debug_on: debug(...)

if not self.starting or line >= self.starting:
self.output.write(line)
if should_stop():
Expand Down Expand Up @@ -375,20 +388,13 @@ class Tail(object):
self.processFile(f, fn, existing)
finally:
f.close()
debug("processFileByName, close file");

def processFile(self, f, fn, existing):
debug("processFile: %s" % fn)
sig = self.generateSignature(f)
if not sig:
return

if sig in existing and os.path.getsize(fn) != os.path.getsize(existing[sig]):
log("WARN Files '%s' and '%s' have same signature and different sizes" % (fn, existing[sig]))

def processFile(self, f, fn, sig):
info = self.offsets.get(sig, Info(sig=sig, name=fn))
debug("processFile %s %s" % (fn, info.dump()))

lastOffset = info.offset

info.name = fn

if self.isCompressed(fn):
Expand All @@ -399,16 +405,18 @@ class Tail(object):
info.offset = -1
else:
if self.start_from_tail:
debug("starting from tail %s" % fn)
info.offsets = os.path.getsize(fn)
if os.path.exists(fn) and os.path.getsize(fn) < info.offset:
log("WARN file %s was truncated" % fn)
log("METRIC ns=forwarder.truncated file=%s filesize=%s offset=%s" % (fn, os.path.getsize(fn), info.offset))
info.offset = os.path.getsize(fn)
else:
debug("Seeking to %s in %s, currently at %s" % (info.offset, fn, f.tell()))
f.seek(info.offset)
self.copy(f)
info.offset = f.tell()
debug("Setting offset for: %s to %s (info: %s)" % (fn, info.offset, info.dump()))

existing[sig] = fn

if lastOffset != info.offset:
self.offsets[sig] = info
Expand All @@ -422,23 +430,63 @@ class Tail(object):
if not filter_stopped.empty():
sys.exit(1)
existing = {}
to_process = {}
filehandles = {}
for fnpattern in self.fnpatterns:
for fn in glob.glob(fnpattern):
debug("Checking fn %s" % fn)
try:
if not os.path.isfile(fn):
log("File no longer exists: %s" % fn)
continue
if os.path.getsize(fn) < self.signatureLength and not self.isCompressed(fn):
log("Skipping as sig too short or compressed: %s" % fn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the message here 'skipping as file to short and not compressed'?

continue
self.processFileByName(fn, existing)
f = open(fn, 'rb')
sig = self.generateSignature(f)
debug("Sig for fn %s is %s" % (fn, sig))
sig_fn = sig+fn;
if sig in existing:
if not sig_fn in self.duplicates:
log("METRIC ns=forwarder.duplicatesig file=%s dupe_of=%s sig=%s" % (fn, existing[sig], sig))
self.duplicates[sig_fn] = True;
if sig in to_process: #take original duplicate out of to_process
del to_process[sig]
f.close();
else:
debug("Adding file %s %s" % (fn, sig))
existing[sig] = fn #leave in existing in case more than 2 duplicates
to_process[sig] = fn
filehandles[sig] = f

except Exception, e:
log("Exception: %s" % e)
log("METRIC ns=forwarder.error.preprocess file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", "")))
log("Exception=\"%s\"" % e)
f.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file might not be open at this point. to guarantee it, put the f = open before try:

exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback)

debug("To Process %s" % to_process)
sigs = to_process.keys()
for sig in sigs:
try:
fn = to_process[sig]
debug("Processing file %s %s" % (fn, sig))
f = filehandles[sig]
self.processFile(f, fn, sig)
except Exception, e:
log("METRIC ns=forwarder.error.process file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", "")))
log("Exception=\"%s\"" % e)
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback)
f.close();

finally:
f.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file might be closed twice.. possibly resulting in an exception



if len(existing) != len(self.offsets):
self.purgeOffsetsNotIn(existing)
self.purgeOffsetsNotIn(to_process)
if not self.follow:
break
time.sleep(1)
Expand Down