diff --git a/run_tests.sh b/run_tests.sh index 2f51771..1233d2e 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -58,7 +58,7 @@ export PATH=$base:$PATH function run_tests() { current_dir=$(pwd) - for test in $(find tests -name 'test*.sh' | sort); do + for test in $(find tests -name 'test*.sh' | sort | grep "$ONLY"); do mkdir -p $workdir/$test cd $workdir/$test echo Running tests/$test ... @@ -70,4 +70,3 @@ function run_tests() { } run_tests - diff --git a/sender b/sender index f1790f7..c99bae1 100755 --- a/sender +++ b/sender @@ -17,12 +17,14 @@ import threading import Queue import signal import traceback +import json DEFAULT_SIGNAGURE_LENGTH=256 filter_stopped = Queue.Queue() stop_signal = Queue.Queue() running_lock = threading.Lock() +debug_on = False def should_stop(): return not stop_signal.empty() @@ -36,6 +38,7 @@ def main(): "IMPORTANT: only files with size >= signature-length (default %d) bytes will be processed. " "Zip files will be open recursively, and only once." % DEFAULT_SIGNAGURE_LENGTH) + parser.add_option('-D', '--debug', action="store_true", dest="debug", help="Enable debug output", default=False) parser.add_option('-f', '--follow', action="store_true", dest="follow", help="Pools the file very second for changes in an infinite loop.", default=False) parser.add_option('-p', '--offsets', action="store", dest="offsets", help="File to persist the last offsets read for each file. If it doesn't exist, the files are read from beginning.", default='/dev/null') parser.add_option('-t', '--tcp', action="store", dest="host", help="Sends the output to the host:port via TCP.", metavar="HOST:PORT") @@ -49,6 +52,11 @@ def main(): options, args = parser.parse_args() + if options.debug: + log("DEBUG ON") + global debug_on + debug_on = True + if options.dump_pid: f = open(options.dump_pid,'w') f.write("%d" % os.getpid()) @@ -153,6 +161,8 @@ def log(msg): sys.stderr.flush() def debug(msg): + if debug_on: + log("DEBUG: "+ msg); # sys.stderr.write(str(msg)) # sys.stderr.write("\n") # sys.stderr.flush() @@ -264,6 +274,8 @@ class Tail(object): self.starting = options.starting self.start_from_tail = options.start_from_tail self.readOffsets() + self.duplicates = {} + def persistOffsets(self): if self.offsetsfn == '/dev/null': @@ -319,6 +331,7 @@ class Tail(object): if should_stop(): return for line in f: + if debug_on: debug("line %s" % line.replace("\n","")) if not self.starting or line >= self.starting: self.output.write(line) if should_stop(): @@ -355,7 +368,7 @@ class Tail(object): self.processZipFile(f, fn) def processGzipFile(self, fn): - debug("gz: %s" % fn) + if debug_on: debug("gz: %s" % fn) f = gzip.open(fn) try: self.processFile(f, '/var/tmp/fake.log', {}) @@ -369,46 +382,41 @@ class Tail(object): shutil.rmtree(path) def processFileByName(self, fn, existing): - debug("processFileByName: %s" % fn) + if debug_on: debug("processFileByName: %s" % fn) f = open(fn, 'rb') try: self.processFile(f, fn, existing) finally: f.close() + debug("processFileByName, close file"); - def processFile(self, f, fn, existing): - debug("processFile: %s" % fn) - sig = self.generateSignature(f) - if not sig: - return - - if sig in existing and os.path.getsize(fn) != os.path.getsize(existing[sig]): - log("WARN Files '%s' and '%s' have same signature and different sizes" % (fn, existing[sig])) - + def processFile(self, f, fn, sig): info = self.offsets.get(sig, Info(sig=sig, name=fn)) + if debug_on: debug("processFile %s %s" % (fn, info.dump())) lastOffset = info.offset - info.name = fn if self.isCompressed(fn): - debug("compressed: %s" % fn) + if debug_on: debug("compressed: %s" % fn) if info.offset == 0: if not self.start_from_tail: self.processCompressedFile(f, fn) info.offset = -1 else: if self.start_from_tail: + if debug_on: debug("starting from tail %s" % fn) info.offsets = os.path.getsize(fn) if os.path.exists(fn) and os.path.getsize(fn) < info.offset: - log("WARN file %s was truncated" % fn) + log("METRIC ns=forwarder.truncated file=%s filesize=%s offset=%s" % (fn, os.path.getsize(fn), info.offset)) info.offset = os.path.getsize(fn) else: + if debug_on: debug("Seeking to %s in %s, currently at %s" % (info.offset, fn, f.tell())) f.seek(info.offset) self.copy(f) info.offset = f.tell() + if debug_on: debug("Setting offset for: %s to %s (info: %s)" % (fn, info.offset, info.dump())) - existing[sig] = fn if lastOffset != info.offset: self.offsets[sig] = info @@ -422,23 +430,64 @@ class Tail(object): if not filter_stopped.empty(): sys.exit(1) existing = {} + to_process = {} + filehandles = {} for fnpattern in self.fnpatterns: for fn in glob.glob(fnpattern): + if debug_on: debug("Checking fn %s" % fn) + f = None try: if not os.path.isfile(fn): log("File no longer exists: %s" % fn) continue if os.path.getsize(fn) < self.signatureLength and not self.isCompressed(fn): + log("Skipping as file too short to generate sig or file is compressed: %s" % fn) continue - self.processFileByName(fn, existing) + f = open(fn, 'rb') + sig = self.generateSignature(f) + if debug_on: debug("Sig for fn %s is %s" % (fn, sig)) + sig_fn = sig+fn; + if sig in existing: + if not sig_fn in self.duplicates: + log("METRIC ns=forwarder.duplicatesig file=%s dupe_of=%s sig=%s" % (fn, existing[sig], sig)) + self.duplicates[sig_fn] = True; + if sig in to_process: #take original duplicate out of to_process + del to_process[sig] + f.close() + else: + if debug_on: debug("Adding file %s %s" % (fn, sig)) + existing[sig] = fn #leave in existing in case more than 2 duplicates + to_process[sig] = fn + filehandles[sig] = f + except Exception, e: - log("Exception: %s" % e) + log("METRIC ns=forwarder.error.preprocess file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", ""))) + log("Exception=\"%s\"" % e) + if f: f.close() exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_tb(exc_traceback) + if debug_on: debug("To Process %s" % to_process) + sigs = to_process.keys() + for sig in sigs: + f = None + try: + fn = to_process[sig] + if debug_on: debug("Processing file %s %s" % (fn, sig)) + f = filehandles[sig] + self.processFile(f, fn, sig) + except Exception, e: + log("METRIC ns=forwarder.error.process file=\"%s\" exception=\"%s\"" % (fn, str(e).replace("\n", ""))) + log("Exception=\"%s\"" % e) + exc_type, exc_value, exc_traceback = sys.exc_info() + traceback.print_tb(exc_traceback) + f.close() + finally: + if f and not f.closed: f.close() + if len(existing) != len(self.offsets): - self.purgeOffsetsNotIn(existing) + self.purgeOffsetsNotIn(to_process) if not self.follow: break time.sleep(1) diff --git a/tests/test-kill-int-forwarder.sh b/tests/test-kill-int-forwarder.sh index 87ae499..ae78c3c 100755 --- a/tests/test-kill-int-forwarder.sh +++ b/tests/test-kill-int-forwarder.sh @@ -5,10 +5,11 @@ uuid=$(last_uuid) sender -f 'myapp.*' -l "grep --line-buffered -v ${uuid}" > /dev/null & sender_pid=$! -sleep 0.1 # to avoid race condition +sleep 1 # to avoid race condition assert "ps ax | grep ${uuid} | grep -v sender | grep -v filter_wrapper | grep line-buffered | count_lines" 1 + kill -SIGINT $sender_pid wait $sender_pid diff --git a/tests/test-kill-term-forwarder.sh b/tests/test-kill-term-forwarder.sh index 1b6b512..4572bae 100755 --- a/tests/test-kill-term-forwarder.sh +++ b/tests/test-kill-term-forwarder.sh @@ -5,7 +5,7 @@ uuid=$(last_uuid) sender -f 'myapp.*' -l "grep --line-buffered -v ${uuid}" > /dev/null & sender_pid=$! -sleep 0.1 # to avoid race condition +sleep 1 # to avoid race condition assert "ps ax | grep ${uuid} | grep -v sender | grep -v filter_wrapper | grep line-buffered | count_lines" 1 diff --git a/tests/test-signature-size.sh b/tests/test-signature-size.sh index 97dce52..17df4f0 100644 --- a/tests/test-signature-size.sh +++ b/tests/test-signature-size.sh @@ -1,5 +1,14 @@ echo "AAAAAAAAAAAAAAAAAA BBBBBBBBBBBBBBB" > myapp.log echo "AAAAAAAAAAAAAAAAAA CCCCCCCCCCCCCCC" > myapp.log.1 -assert "sender -s 15 'myapp.*' | count_lines" 1 +assert "sender -s 15 'myapp.*' | count_lines" 0 assert "sender -s 30 'myapp.*' | count_lines" 2 + +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.2 +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.3 +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.4 +echo "AAAAAAAAAAAAAAAAAA DDDDDDDDDDDDDDD" > myapp.log.5 + +assert "sender -s 30 'myapp.*' | count_lines" 2 + +assert "sender -s 30 'myapp.*' 2>&1 | grep forwarder.duplicatesig | count_lines" 3 diff --git a/tests/test-tcp-raw.sh b/tests/test-tcp-raw.sh index 6c0feb4..03b966c 100644 --- a/tests/test-tcp-raw.sh +++ b/tests/test-tcp-raw.sh @@ -1,10 +1,10 @@ for i in {1..10}; do log_random >> myapp.log; done -nc -l -p 19501 2>/dev/null > received.log || nc -l 19501 > received.log 2>/dev/null & +nc -l -p 19501 2>/dev/null > received.log || nc -l 19501 > received.log 2>/dev/null & server_pid=$! -sleep 0.1 +sleep 1 sender 'myapp.*' -t localhost:19501