From db10280131bc1924454db8317f58db0dc3705181 Mon Sep 17 00:00:00 2001 From: Tom Burnell Date: Mon, 5 Dec 2016 17:20:01 +0000 Subject: [PATCH 1/8] add aggregate-access-log --- aggregate-access-log | 217 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100755 aggregate-access-log diff --git a/aggregate-access-log b/aggregate-access-log new file mode 100755 index 0000000..e956d9c --- /dev/null +++ b/aggregate-access-log @@ -0,0 +1,217 @@ +#!/usr/bin/env awk -f + +function top_array(result, top, acc) { + delete temp + c = 0 + for(i in acc) { + temp[c] = sprintf("%05d %s", acc[i], i) + c += 1 + } + + n = asort(temp) + startfrom = n < top ? 0 : n-top + + + ai=0 + for(i = startfrom+1;i<=n;i++) { + line = temp[i] + count = substr(line, 1, 6)+0 + value = substr(line, 7, length(line)-1) + result[ai][1] = count + result[ai][2] = value + ai += 1 + } +} + + +function print_ua(ts, acc) { + top_array(tops, 15, acc) + + for (i in tops) { + ua = tops[i][2] + gsub("\"", "'", ua) + printf "%s ns=fe.access.ua count=%d ua=\"%s\"\n", ts, tops[i][1], ua + + } + + delete tops +} + +function print_errors(ts, acc) { + + top_array(tops, 15, acc) + + for (i in tops) { + value = tops[i][2] + split(value, values, " ") + code = values[1]+0 + uri = values[2] + printf "%s ns=fe.access.errors count=%d error=%d uri=\"%s\"\n", ts, tops[i][1], code, uri + + } + + delete tops +} + +function print_reqs(ts, acc) { + + top_array(tops, 15, acc["uri_time"]) + + for (i in tops) { + value = tops[i][2] + # total = acc["uri_time"][value] + # count = tops[i][1] + + total = tops[i][1] + count = acc["reqs"][value] + + split(value, values, " ") + code = values[1]+0 + uri = values[2] + printf "%s ns=fe.access.slow count=%d total=%d code=%d uri=\"%s\"\n", ts, count, total, code, uri + + } + + delete tops + + + top_array(tops, 15, acc["reqs"]) + + for (i in tops) { + value = tops[i][2] + total = acc["uri_time"][value] + count = tops[i][1] + + split(value, values, " ") + code = values[1]+0 + uri = values[2] + printf "%s ns=fe.access.count count=%d total=%d code=%d uri=\"%s\"\n", ts, count, total, code, uri + + } + + delete tops +} + + +function print_groups(ts, acc) { + for (i in acc["times"]) { + printf "%s ns=fe.access.group group_name=\"%s\" count=%d avg=%.1f max=%d min=%d\n", ts, i, acc["count"][i], acc["times"][i]/acc["count"][i], acc["max"][i], acc["min"][i] + } +} + +function print_codes(ts, acc) { + for (i in acc["code"]) { + printf "%s ns=fe.access.bots response_code=%s count=%d bots=%d\n", ts, i, acc["code"][i], acc["bots"][i] + } +} + +function print_acc(ts, acc) { + + # in case we lose the next few bytes if network connection is lost, we just lose empty lines + for (i = 1; i <= 10; i++) { + printf "%s\n", ts + } + + if (length(acc["ua"]) > 0) { print_ua(ts, acc["ua"]) } + print_groups(ts, acc) + print_codes(ts, acc) + if (length(acc["errors"]) > 0) { print_errors(ts, acc["errors"]) } + if (length(acc["reqs"]) > 0) { print_reqs(ts, acc) } + + + print ts > "/dev/stderr" + fflush(stdout) +} + +{ + + ts = sprintf("%s%s", substr($4, 2, 19), substr($4, 25, 5)) + current_minute = substr(ts, 1, 16) + + if (current_minute != last_minute) { + + if (length(first_timestamp)>0) { + + print_acc(first_timestamp, acc) + } + + first_timestamp = ts + last_minute = current_minute; + delete acc + } + + url = $6 + + if (gsub("^/textbased/.*", "textbased", url) || + gsub(".*article-[0-9]*/amp/.*", "amp/articles", url) || + gsub(".*article-[0-9]*/.*mobile=true.*", "mobile/articles", url) || + gsub(".*article-[0-9]*/.*", "articles", url) || + gsub(".*video-[0-9]*/.*mobile=true.*", "mobile/video", url) || + gsub(".*video-[0-9]*/.*", "video", url) || + gsub(".*columnist-[0-9]*/.*mobile=true.*", "mobile/columnist", url) || + gsub(".*columnist-[0-9]*/.*", "columnist", url) || + gsub(".*/(home|homeus|homeau)/index.html.*mobile=true.*", "mobile/home", url) || + gsub(".*/(home|homeus|homeau)/index.html.*", "home", url) || + gsub(".*index.html.*mobile=true.*", "mobile/channels", url) || + gsub(".*index.html.*", "channels", url) || + gsub(".*rss.*", "rss", url) || + gsub(".*registration.*", "registration", url) || + gsub(".*meta.*", "meta", url) || + gsub(".*/geolocation/.*", "esi calls", url) || + gsub(".*/mobile/.*", "mobile feed", url) || + gsub(".*/api/.*", "api", url) || + gsub(".*/home/search.html.*", "search", url) || + gsub(".*/home/sitemap.*.html.*", "sitemap/html", url) || + gsub(".*sitemap.*.xml.*", "sitemap/xml", url) || + gsub(".*embed/video/.*", "embedded video", url) || + gsub(".*videoarchive.*", "video archive", url) || + gsub(".*c.*/item.cms.*", "cms items", url) || + gsub(".*/.*.html.*", "topic pages", url) || + gsub("^/$", "home", url) || + gsub("^/.*$", "others", url)) + {} + + acc["code"][$8] +=1 + + + response_time = $10+0 + acc["count"][url] += 1 + acc["size"][url] += $9 + acc["times"][url] += response_time + if (length(acc["min"][url]) == 0 || acc["min"][url] > response_time) { + acc["min"][url] = response_time + } + if (acc["max"][url] < response_time) { + acc["max"][url] = response_time + } + + a="" + for (i=12;i<=NF;i++) { + a=a " " $i + } + ua = substr(a,3,length(a)-3) + acc["ua"][ua] += 1 + IGNORECASE = 1 + if (match(ua, /bot|google|crawler|spider|robot|crawling|wget|http|slurp|analyzer|sitecon|@/) || ua == "-") { + acc["bots"][$8] += 1 + } + + code = $8+0 + uri = $6 + gsub("\\?.*", "", uri) + code_uri = sprintf("%03d %s", code, uri) + acc["reqs"][code_uri] += 1 + acc["uri_time"][code_uri] += response_time + + if (code >= 400) { + acc["errors"][code_uri] += 1 + } + + + + +} +END { + print_acc(first_timestamp, acc) + +} From dd4f54d57d87c0e3e0950066eab8091401166b94 Mon Sep 17 00:00:00 2001 From: Tom Burnell Date: Fri, 3 Feb 2017 12:18:42 +0000 Subject: [PATCH 2/8] Exclude files with matching sigs --- sender | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/sender b/sender index f1790f7..fe4b173 100755 --- a/sender +++ b/sender @@ -376,17 +376,10 @@ class Tail(object): finally: f.close() - def processFile(self, f, fn, existing): + def processFile(self, f, fn, sig): debug("processFile: %s" % fn) - sig = self.generateSignature(f) - if not sig: - return - - if sig in existing and os.path.getsize(fn) != os.path.getsize(existing[sig]): - log("WARN Files '%s' and '%s' have same signature and different sizes" % (fn, existing[sig])) info = self.offsets.get(sig, Info(sig=sig, name=fn)) - lastOffset = info.offset info.name = fn @@ -408,7 +401,6 @@ class Tail(object): self.copy(f) info.offset = f.tell() - existing[sig] = fn if lastOffset != info.offset: self.offsets[sig] = info @@ -422,6 +414,7 @@ class Tail(object): if not filter_stopped.empty(): sys.exit(1) existing = {} + for fnpattern in self.fnpatterns: for fn in glob.glob(fnpattern): try: @@ -430,11 +423,19 @@ class Tail(object): continue if os.path.getsize(fn) < self.signatureLength and not self.isCompressed(fn): continue - self.processFileByName(fn, existing) - except Exception, e: - log("Exception: %s" % e) - exc_type, exc_value, exc_traceback = sys.exc_info() - traceback.print_tb(exc_traceback) + sig = self.generateSignature(f) + if existing[sig]: + log("Multiple files with same sig exist, excluding both: %s" % fn) + delete file_names[sig] + else + existing[sig] = fn + + for fn in existing.values(): + self.processFileByName(fn, sig) + except Exception, e: + log("Exception: %s" % e) + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_tb(exc_traceback) if len(existing) != len(self.offsets): From 59be27ecd25fd2c2a08c38aa67816c98988b8c84 Mon Sep 17 00:00:00 2001 From: Tom Burnell Date: Fri, 3 Feb 2017 16:08:23 +0000 Subject: [PATCH 3/8] Ignore files when duplicate sigs detected. Log Metrics for errors, duplicates and truncations --- sender | 63 +++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 51 insertions(+), 12 deletions(-) diff --git a/sender b/sender index fe4b173..48f0dc2 100755 --- a/sender +++ b/sender @@ -17,12 +17,14 @@ import threading import Queue import signal import traceback +import json DEFAULT_SIGNAGURE_LENGTH=256 filter_stopped = Queue.Queue() stop_signal = Queue.Queue() running_lock = threading.Lock() +debug_on = False def should_stop(): return not stop_signal.empty() @@ -36,6 +38,7 @@ def main(): "IMPORTANT: only files with size >= signature-length (default %d) bytes will be processed. " "Zip files will be open recursively, and only once." % DEFAULT_SIGNAGURE_LENGTH) + parser.add_option('-D', '--debug', action="store_true", dest="debug", help="Enable debug output", default=False) parser.add_option('-f', '--follow', action="store_true", dest="follow", help="Pools the file very second for changes in an infinite loop.", default=False) parser.add_option('-p', '--offsets', action="store", dest="offsets", help="File to persist the last offsets read for each file. If it doesn't exist, the files are read from beginning.", default='/dev/null') parser.add_option('-t', '--tcp', action="store", dest="host", help="Sends the output to the host:port via TCP.", metavar="HOST:PORT") @@ -49,6 +52,11 @@ def main(): options, args = parser.parse_args() + if options.debug: + log("DEBUG ON") + global debug_on + debug_on = True + if options.dump_pid: f = open(options.dump_pid,'w') f.write("%d" % os.getpid()) @@ -153,6 +161,8 @@ def log(msg): sys.stderr.flush() def debug(msg): + if debug_on: + log("DEBUG: "+ msg); # sys.stderr.write(str(msg)) # sys.stderr.write("\n") # sys.stderr.flush() @@ -265,6 +275,7 @@ class Tail(object): self.start_from_tail = options.start_from_tail self.readOffsets() + def persistOffsets(self): if self.offsetsfn == '/dev/null': return @@ -319,6 +330,7 @@ class Tail(object): if should_stop(): return for line in f: + debug("line %s" % line.replace("\n","")) if not self.starting or line >= self.starting: self.output.write(line) if should_stop(): @@ -375,13 +387,13 @@ class Tail(object): self.processFile(f, fn, existing) finally: f.close() + debug("processFileByName, close file"); def processFile(self, f, fn, sig): - debug("processFile: %s" % fn) - info = self.offsets.get(sig, Info(sig=sig, name=fn)) - lastOffset = info.offset + debug("processFile %s %s" % (fn, info.dump())) + lastOffset = info.offset info.name = fn if self.isCompressed(fn): @@ -392,14 +404,17 @@ class Tail(object): info.offset = -1 else: if self.start_from_tail: + debug("starting from tail %s" % fn) info.offsets = os.path.getsize(fn) if os.path.exists(fn) and os.path.getsize(fn) < info.offset: - log("WARN file %s was truncated" % fn) + log("METRIC ns=forwarder.truncated file=%s filesize=%s offset=%s" % (fn, os.path.getsize(fn), info.offset)) info.offset = os.path.getsize(fn) else: + debug("Seeking to %s in %s, currently at %s" % (info.offset, fn, f.tell())) f.seek(info.offset) self.copy(f) info.offset = f.tell() + debug("Setting offset for: %s to %s (info: %s)" % (fn, info.offset, info.dump())) if lastOffset != info.offset: @@ -414,28 +429,52 @@ class Tail(object): if not filter_stopped.empty(): sys.exit(1) existing = {} - + filehandles = {} for fnpattern in self.fnpatterns: for fn in glob.glob(fnpattern): + debug("Checking fn %s" % fn) try: if not os.path.isfile(fn): log("File no longer exists: %s" % fn) continue if os.path.getsize(fn) < self.signatureLength and not self.isCompressed(fn): + log("Skipping as sig too short or compressed: %s" % fn) continue + f = open(fn, 'rb') sig = self.generateSignature(f) - if existing[sig]: - log("Multiple files with same sig exist, excluding both: %s" % fn) - delete file_names[sig] - else + debug("Sig for fn %s is %s" % (fn, sig)) + if sig in existing: + log("METRIC ns=forwarder.duplicatesig file=%s sig=%s" % (fn, sig)) + del existing[sig] + f.close(); + else: + debug("Adding file %s %s" % (fn, sig)) existing[sig] = fn + filehandles[sig] = f + + except Exception, e: + log("METRIC ns=forwarder.error.preprocess file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", ""))) + log("Exception=\"%s\"" % e) + f.close(); + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_tb(exc_traceback) - for fn in existing.values(): - self.processFileByName(fn, sig) + sigs = existing.keys() + for sig in sigs: + try: + fn = existing[sig] + debug("Processing file %s %s" % (fn, sig)) + f = filehandles[sig] + self.processFile(f, fn, sig) except Exception, e: - log("Exception: %s" % e) + log("METRIC ns=forwarder.error.process file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", ""))) + log("Exception=\"%s\"" % e) exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_tb(exc_traceback) + f.close(); + + finally: + f.close(); if len(existing) != len(self.offsets): From 4b5893ff80ffdba70522eca92a6caa85612a93ed Mon Sep 17 00:00:00 2001 From: Tom Burnell Date: Fri, 3 Feb 2017 16:27:54 +0000 Subject: [PATCH 4/8] handle cases where there is more than 2 files with the same signature --- sender | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/sender b/sender index 48f0dc2..9e69d31 100755 --- a/sender +++ b/sender @@ -274,6 +274,7 @@ class Tail(object): self.starting = options.starting self.start_from_tail = options.start_from_tail self.readOffsets() + self.duplicates = {} def persistOffsets(self): @@ -429,6 +430,7 @@ class Tail(object): if not filter_stopped.empty(): sys.exit(1) existing = {} + to_process = {} filehandles = {} for fnpattern in self.fnpatterns: for fn in glob.glob(fnpattern): @@ -443,13 +445,18 @@ class Tail(object): f = open(fn, 'rb') sig = self.generateSignature(f) debug("Sig for fn %s is %s" % (fn, sig)) + sig_fn = sig+fn; if sig in existing: - log("METRIC ns=forwarder.duplicatesig file=%s sig=%s" % (fn, sig)) - del existing[sig] + if not sig_fn in self.duplicates: + log("METRIC ns=forwarder.duplicatesig file=%s dupe_of=%s sig=%s" % (fn, existing[sig], sig)) + self.duplicates[sig_fn] = True; + if sig in to_process: #take original duplicate out of to_process + del to_process[sig] f.close(); else: debug("Adding file %s %s" % (fn, sig)) - existing[sig] = fn + existing[sig] = fn #leave in existing in case more than 2 duplicates + to_process[sig] = fn filehandles[sig] = f except Exception, e: @@ -459,10 +466,11 @@ class Tail(object): exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_tb(exc_traceback) - sigs = existing.keys() + debug("To Process %s" % to_process) + sigs = to_process.keys() for sig in sigs: try: - fn = existing[sig] + fn = to_process[sig] debug("Processing file %s %s" % (fn, sig)) f = filehandles[sig] self.processFile(f, fn, sig) @@ -478,7 +486,7 @@ class Tail(object): if len(existing) != len(self.offsets): - self.purgeOffsetsNotIn(existing) + self.purgeOffsetsNotIn(to_process) if not self.follow: break time.sleep(1) From 768c2711a1786e7354490cfa6cb0bc20ccf94cb5 Mon Sep 17 00:00:00 2001 From: Tom Burnell Date: Fri, 3 Feb 2017 17:16:57 +0000 Subject: [PATCH 5/8] fix tests --- run_tests.sh | 3 +-- sender | 1 - tests/test-kill-int-forwarder.sh | 3 ++- tests/test-kill-term-forwarder.sh | 2 +- tests/test-signature-size.sh | 2 +- tests/test-tcp-raw.sh | 4 ++-- 6 files changed, 7 insertions(+), 8 deletions(-) diff --git a/run_tests.sh b/run_tests.sh index 2f51771..1233d2e 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -58,7 +58,7 @@ export PATH=$base:$PATH function run_tests() { current_dir=$(pwd) - for test in $(find tests -name 'test*.sh' | sort); do + for test in $(find tests -name 'test*.sh' | sort | grep "$ONLY"); do mkdir -p $workdir/$test cd $workdir/$test echo Running tests/$test ... @@ -70,4 +70,3 @@ function run_tests() { } run_tests - diff --git a/sender b/sender index 9e69d31..db0b58e 100755 --- a/sender +++ b/sender @@ -484,7 +484,6 @@ class Tail(object): finally: f.close(); - if len(existing) != len(self.offsets): self.purgeOffsetsNotIn(to_process) if not self.follow: diff --git a/tests/test-kill-int-forwarder.sh b/tests/test-kill-int-forwarder.sh index 87ae499..ae78c3c 100755 --- a/tests/test-kill-int-forwarder.sh +++ b/tests/test-kill-int-forwarder.sh @@ -5,10 +5,11 @@ uuid=$(last_uuid) sender -f 'myapp.*' -l "grep --line-buffered -v ${uuid}" > /dev/null & sender_pid=$! -sleep 0.1 # to avoid race condition +sleep 1 # to avoid race condition assert "ps ax | grep ${uuid} | grep -v sender | grep -v filter_wrapper | grep line-buffered | count_lines" 1 + kill -SIGINT $sender_pid wait $sender_pid diff --git a/tests/test-kill-term-forwarder.sh b/tests/test-kill-term-forwarder.sh index 1b6b512..4572bae 100755 --- a/tests/test-kill-term-forwarder.sh +++ b/tests/test-kill-term-forwarder.sh @@ -5,7 +5,7 @@ uuid=$(last_uuid) sender -f 'myapp.*' -l "grep --line-buffered -v ${uuid}" > /dev/null & sender_pid=$! -sleep 0.1 # to avoid race condition +sleep 1 # to avoid race condition assert "ps ax | grep ${uuid} | grep -v sender | grep -v filter_wrapper | grep line-buffered | count_lines" 1 diff --git a/tests/test-signature-size.sh b/tests/test-signature-size.sh index 97dce52..dba3bdd 100644 --- a/tests/test-signature-size.sh +++ b/tests/test-signature-size.sh @@ -1,5 +1,5 @@ echo "AAAAAAAAAAAAAAAAAA BBBBBBBBBBBBBBB" > myapp.log echo "AAAAAAAAAAAAAAAAAA CCCCCCCCCCCCCCC" > myapp.log.1 -assert "sender -s 15 'myapp.*' | count_lines" 1 +assert "sender -s 15 'myapp.*' | count_lines" 0 assert "sender -s 30 'myapp.*' | count_lines" 2 diff --git a/tests/test-tcp-raw.sh b/tests/test-tcp-raw.sh index 6c0feb4..03b966c 100644 --- a/tests/test-tcp-raw.sh +++ b/tests/test-tcp-raw.sh @@ -1,10 +1,10 @@ for i in {1..10}; do log_random >> myapp.log; done -nc -l -p 19501 2>/dev/null > received.log || nc -l 19501 > received.log 2>/dev/null & +nc -l -p 19501 2>/dev/null > received.log || nc -l 19501 > received.log 2>/dev/null & server_pid=$! -sleep 0.1 +sleep 1 sender 'myapp.*' -t localhost:19501 From 6657cf6398ab3fe16072fd01414bed830d7d506f Mon Sep 17 00:00:00 2001 From: Tom Burnell Date: Fri, 3 Feb 2017 17:23:07 +0000 Subject: [PATCH 6/8] add test for METRIC forwarder.duplicatesig --- tests/test-signature-size.sh | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/test-signature-size.sh b/tests/test-signature-size.sh index dba3bdd..17df4f0 100644 --- a/tests/test-signature-size.sh +++ b/tests/test-signature-size.sh @@ -3,3 +3,12 @@ echo "AAAAAAAAAAAAAAAAAA CCCCCCCCCCCCCCC" > myapp.log.1 assert "sender -s 15 'myapp.*' | count_lines" 0 assert "sender -s 30 'myapp.*' | count_lines" 2 + +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.2 +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.3 +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.4 +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.5 + +assert "sender -s 30 'myapp.*' | count_lines" 2 + +assert "sender -s 30 'myapp.*' 2>&1 | grep forwarder.duplicatesig | count_lines" 3 From 80b0b1ea8135a0e1f66fc698bbcbe3c057367ac3 Mon Sep 17 00:00:00 2001 From: Tom Burnell Date: Mon, 6 Feb 2017 10:06:10 +0000 Subject: [PATCH 7/8] only debug if debug_on --- sender | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/sender b/sender index db0b58e..eef1ffb 100755 --- a/sender +++ b/sender @@ -331,7 +331,7 @@ class Tail(object): if should_stop(): return for line in f: - debug("line %s" % line.replace("\n","")) + if debug_on: debug("line %s" % line.replace("\n","")) if not self.starting or line >= self.starting: self.output.write(line) if should_stop(): @@ -368,7 +368,7 @@ class Tail(object): self.processZipFile(f, fn) def processGzipFile(self, fn): - debug("gz: %s" % fn) + if debug_on: debug("gz: %s" % fn) f = gzip.open(fn) try: self.processFile(f, '/var/tmp/fake.log', {}) @@ -382,7 +382,7 @@ class Tail(object): shutil.rmtree(path) def processFileByName(self, fn, existing): - debug("processFileByName: %s" % fn) + if debug_on: debug("processFileByName: %s" % fn) f = open(fn, 'rb') try: self.processFile(f, fn, existing) @@ -392,30 +392,30 @@ class Tail(object): def processFile(self, f, fn, sig): info = self.offsets.get(sig, Info(sig=sig, name=fn)) - debug("processFile %s %s" % (fn, info.dump())) + if debug_on: debug("processFile %s %s" % (fn, info.dump())) lastOffset = info.offset info.name = fn if self.isCompressed(fn): - debug("compressed: %s" % fn) + if debug_on: debug("compressed: %s" % fn) if info.offset == 0: if not self.start_from_tail: self.processCompressedFile(f, fn) info.offset = -1 else: if self.start_from_tail: - debug("starting from tail %s" % fn) + if debug_on: debug("starting from tail %s" % fn) info.offsets = os.path.getsize(fn) if os.path.exists(fn) and os.path.getsize(fn) < info.offset: log("METRIC ns=forwarder.truncated file=%s filesize=%s offset=%s" % (fn, os.path.getsize(fn), info.offset)) info.offset = os.path.getsize(fn) else: - debug("Seeking to %s in %s, currently at %s" % (info.offset, fn, f.tell())) + if debug_on: debug("Seeking to %s in %s, currently at %s" % (info.offset, fn, f.tell())) f.seek(info.offset) self.copy(f) info.offset = f.tell() - debug("Setting offset for: %s to %s (info: %s)" % (fn, info.offset, info.dump())) + if debug_on: debug("Setting offset for: %s to %s (info: %s)" % (fn, info.offset, info.dump())) if lastOffset != info.offset: @@ -434,17 +434,17 @@ class Tail(object): filehandles = {} for fnpattern in self.fnpatterns: for fn in glob.glob(fnpattern): - debug("Checking fn %s" % fn) + if debug_on: debug("Checking fn %s" % fn) try: if not os.path.isfile(fn): log("File no longer exists: %s" % fn) continue if os.path.getsize(fn) < self.signatureLength and not self.isCompressed(fn): - log("Skipping as sig too short or compressed: %s" % fn) + log("Skipping as file too short to generate sig or file is compressed: %s" % fn) continue f = open(fn, 'rb') sig = self.generateSignature(f) - debug("Sig for fn %s is %s" % (fn, sig)) + if debug_on: debug("Sig for fn %s is %s" % (fn, sig)) sig_fn = sig+fn; if sig in existing: if not sig_fn in self.duplicates: @@ -454,7 +454,7 @@ class Tail(object): del to_process[sig] f.close(); else: - debug("Adding file %s %s" % (fn, sig)) + if debug_on: debug("Adding file %s %s" % (fn, sig)) existing[sig] = fn #leave in existing in case more than 2 duplicates to_process[sig] = fn filehandles[sig] = f @@ -465,13 +465,13 @@ class Tail(object): f.close(); exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_tb(exc_traceback) - - debug("To Process %s" % to_process) + + if debug_on: debug("To Process %s" % to_process) sigs = to_process.keys() for sig in sigs: try: fn = to_process[sig] - debug("Processing file %s %s" % (fn, sig)) + if debug_on: debug("Processing file %s %s" % (fn, sig)) f = filehandles[sig] self.processFile(f, fn, sig) except Exception, e: From ca3687fbe3fa05d310e5a75e75e435039c7f9e69 Mon Sep 17 00:00:00 2001 From: Tom Burnell Date: Mon, 6 Feb 2017 11:09:31 +0000 Subject: [PATCH 8/8] only f.close() if defined and not closed --- sender | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sender b/sender index eef1ffb..c99bae1 100755 --- a/sender +++ b/sender @@ -435,6 +435,7 @@ class Tail(object): for fnpattern in self.fnpatterns: for fn in glob.glob(fnpattern): if debug_on: debug("Checking fn %s" % fn) + f = None try: if not os.path.isfile(fn): log("File no longer exists: %s" % fn) @@ -452,7 +453,7 @@ class Tail(object): self.duplicates[sig_fn] = True; if sig in to_process: #take original duplicate out of to_process del to_process[sig] - f.close(); + f.close() else: if debug_on: debug("Adding file %s %s" % (fn, sig)) existing[sig] = fn #leave in existing in case more than 2 duplicates @@ -462,13 +463,15 @@ class Tail(object): except Exception, e: log("METRIC ns=forwarder.error.preprocess file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", ""))) log("Exception=\"%s\"" % e) - f.close(); + if f: f.close() exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_tb(exc_traceback) - + + if debug_on: debug("To Process %s" % to_process) sigs = to_process.keys() for sig in sigs: + f = None try: fn = to_process[sig] if debug_on: debug("Processing file %s %s" % (fn, sig)) @@ -479,10 +482,9 @@ class Tail(object): log("Exception=\"%s\"" % e) exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_tb(exc_traceback) - f.close(); - + f.close() finally: - f.close(); + if f and not f.closed: f.close() if len(existing) != len(self.offsets): self.purgeOffsetsNotIn(to_process)