diff --git a/README.md b/README.md index 8f04476..1aecc89 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Log2Array -A command-line tool that automates the detection and extraction of log templates, leveraging regular expressions generated by GPT-4. Say goodbye to the stress of manually writing regex. +A command-line tool that automates the detection and extraction of log templates, leveraging regular expressions generated by GPT-4. Escape from the stress of manually writing regex. ## What it does? If you have several kinds of logs are mixed together: diff --git a/log2array/__init__.py b/log2array/__init__.py index a76b932..986f7ad 100644 --- a/log2array/__init__.py +++ b/log2array/__init__.py @@ -12,62 +12,68 @@ from .util import Log, Memory -def sink(path: str, max_size=512) -> Generator[str | None, List[str] | None, None]: - files = glob.glob(path) - logging.info(f"detect log files: {files}") - buffer = [] - - for file in files: - file = open(path, "r") - with file as f: - for line in f: - if buffer is not None: - while len(buffer) != 0: - l = buffer.pop() - yield l - - b = yield line[:-1][:max_size] - if b is not None: - buffer += b - - b = yield None - if b is not None: - buffer += b - while len(buffer) != 0: - l = buffer.pop() - yield l - +class Sink: + def __init__(self, path: str, max_size: int = 512): + self.path = path + self.max_size = max_size + self.buffer = [] + + def __iter__(self): + files = glob.glob(self.path) + logging.info(f"detect log files: {files}") + + def f(): + for file in files: + with open(file, "r") as f: + for line in f: + yield line[:-1][: self.max_size] + + def b(): + while len(self.buffer) != 0: + line = self.buffer.pop() + yield line + + file = f() + + while True: + buf = b() + yield from buf + yield from file + yield None + if len(self.buffer) == 0: + break -Memory().serialize(re.Pattern, lambda p: p.pattern) + def send(self, lines): + self.buffer += lines -@Memory.remember -def match( - memory: Memory, - log_stream: Generator[str | None, List[str] | None, None], -) -> Generator[Log | str | None, re.Pattern | None, None]: - patterns = memory.load("patterns", []) - for offset in range(0, len(patterns)): - patterns[offset] = re.compile(patterns[offset]) +Memory().serialize(re.Pattern, lambda p: p.pattern) - for line in log_stream: - if line is None: - yield None - continue - pattern = None +class Match: + def __init__(self, memory: Memory, sink: Sink): + self.sink = sink + self.patterns = memory.load("patterns", []) + for offset in range(0, len(self.patterns)): + self.patterns[offset] = re.compile(self.patterns[offset]) - for regex in patterns: - match = regex.match(line) - if match is None: + def __iter__(self): + for line in self.sink: + if line is None: + yield None continue - pattern = yield Log(regex.pattern, line, match.groups()) - break - else: - pattern = yield line - if pattern is not None and pattern not in patterns: - patterns.append(pattern) + for regex in self.patterns: + match = regex.match(line) + if match is None: + continue + yield Log(regex.pattern, line, match.groups()) + break + else: + yield line + + def send(self, pattern: re.Pattern): + self.patterns.append(pattern) def collect( diff --git a/log2array/__main__.py b/log2array/__main__.py index ac28d25..7bfee5b 100644 --- a/log2array/__main__.py +++ b/log2array/__main__.py @@ -7,8 +7,8 @@ from IPython import embed from rich.logging import RichHandler -from log2array import match, sink, collect -from log2array.cluster import cluster +from log2array import Match, Sink, collect +from log2array.cluster import Cluster from log2array.extract import extract from log2array.util import Memory, parse_size @@ -18,7 +18,9 @@ def main(): pass -@main.command() +@main.command( + help="log2array cache all extracted patterns to each files as default, clean the cache as needed." +) def clean(): files = glob(f"{Memory.PATH}/*") for f in files: @@ -28,7 +30,10 @@ def clean(): @main.command() @click.argument("file") @click.option( - "--gpt-base", default=openai.api_base, help="OpenAI API base.", show_default=True + "--gpt-base", + default=openai.api_base, + help="OpenAI API base.", + show_default=True, ) @click.option( "--max-lines", @@ -65,14 +70,17 @@ def run(file, gpt_base, max_lines, buf_size, max_len, threshold): return logging.basicConfig( - level="INFO", format="%(message)s", datefmt="[%X]", handlers=[RichHandler()] + level="INFO", + format="%(message)s", + datefmt="[%X]", + handlers=[RichHandler()], ) buf_size = parse_size(buf_size) with Memory().current(file): - f = sink(file, max_size=max_len) - m = match(f) - c = cluster(f, m, buf_size=buf_size, threshold=threshold) + f = Sink(file, max_size=max_len) + m = Match(Memory(), f) + c = Cluster(f, m, buf_size=buf_size, threshold=threshold) e = extract( c, m, diff --git a/log2array/cluster.py b/log2array/cluster.py index 0358ca8..b0a5be4 100644 --- a/log2array/cluster.py +++ b/log2array/cluster.py @@ -8,7 +8,7 @@ import numpy import torch -from . import Log +from . import Log, Match, Sink @contextlib.contextmanager @@ -20,7 +20,7 @@ def pool(model): model.stop_multi_process_pool(pool) -class Cluster: +class Community: texts: List[str] embeddings: List[torch.Tensor] @@ -73,70 +73,91 @@ def sample(community: torch.Tensor) -> List[int]: return [min[0], sort[0][0], min1[0]] -def cluster( - log_stream: Generator[str | None, List[str] | None, None], - regex_match: Generator[Log | str | None, re.Pattern | None, None], - model="./all-MiniLM-L6-v2", - buf_size=8 * 1024 * 1024, - threshold=0.7, - min_community_size=3, -) -> Generator[Log | List[str], None, None]: - import torch - from sentence_transformers import SentenceTransformer as Embedder - - from .util import community_detection - - model = Embedder(model) - # sample_clusters: List[Cluster] = [] - buffer: List[str] = [] - size = 0 - - with pool(model) as p: - for line in regex_match: - if line is None: - send, buffer = buffer, [] - log_stream.send(send) - continue - - if isinstance(line, Log): - yield line - continue - - buffer.append(line) - size += len(line) - if size < buf_size: - continue - - logging.info( - f"embedding {format(size / 1024, '.2f')}KB / {len(buffer)} logs, it might take a while." - ) - embeddings = torch.from_numpy(model.encode_multi_process(buffer, p)) +class Cluster: + def __init__( + self, + sink: Sink, + match: Match, + model="./all-MiniLM-L6-v2", + buf_size=8 * 1024 * 1024, + threshold=0.7, + min_community_size=3, + ): + from sentence_transformers import SentenceTransformer as Embedder + + self.buf_size = buf_size + self.threshold = threshold + self.min_community_size = min_community_size + self.sink = sink + self.match = match + self.model = Embedder(model) + self.buffer = [] + self.size = 0 - logging.info("analyze log communities, it might take a while.") - clusters = community_detection( - embeddings, - min_community_size=min_community_size, - threshold=threshold, - ) - logging.info(f"get {len(clusters)} log communities.") - for cluster in clusters: - vecs = [embeddings[i] for i in cluster] - ids = sample(torch.from_numpy(numpy.array(vecs))) - s0, s1, s2 = ( - cluster[ids[0]], - cluster[ids[1]], - cluster[ids[2]], + def __iter__(self): + import torch + from .util import community_detection + + with pool(self.model) as p: + for line in self.match: + if isinstance(line, Log): + yield line + continue + + if line is not None: + self.buffer.append(line) + self.size += len(line) + + if self.size < self.buf_size and line is not None: + continue + + if self.size == 0: + continue + + logging.info( + f"embedding {format(self.size / 1024, '.2f')}KB / {len(self.buffer)} logs, it might take a while." + ) + embeddings = torch.from_numpy( + self.model.encode_multi_process(self.buffer, p) ) - samples = Cluster.from_list( - [buffer[s0], buffer[s1], buffer[s2]], - [embeddings[s0], embeddings[s1], embeddings[s2]], + logging.info("analyze log communities, it might take a while.") + clusters = community_detection( + embeddings, + min_community_size=self.min_community_size, + threshold=self.threshold, ) - # sample_clusters.append(samples) + logging.info(f"get {len(clusters)} log communities.") + + if len(clusters) > 0: + yield from self._sample(clusters, embeddings) + else: + logging.warning( + f"no cluster is detected, maybe you should decrease the threshold." + ) + + self._recycle() + + def _sample(self, clusters, embeddings): + for cluster, _ in zip(clusters, range(0, 3)): + vecs = [embeddings[i] for i in cluster] + ids = sample(torch.from_numpy(numpy.array(vecs))) + s0, s1, s2 = ( + cluster[ids[0]], + cluster[ids[1]], + cluster[ids[2]], + ) + + samples = Community.from_list( + [self.buffer[s0], self.buffer[s1], self.buffer[s2]], + [embeddings[s0], embeddings[s1], embeddings[s2]], + ) + # sample_clusters.append(samples) - logging.info(f"yield samples {samples.texts}") - yield samples.texts + logging.info(f"yield samples {samples.texts}") + yield samples.texts - send, buffer = buffer, [] - size = 0 - log_stream.send(send) + def _recycle(self): + send, self.buffer = self.buffer, [] + self.size = 0 + self.sink.send(send) diff --git a/log2array/extract.py b/log2array/extract.py index a0f4845..834e754 100644 --- a/log2array/extract.py +++ b/log2array/extract.py @@ -7,12 +7,14 @@ import openai -from log2array.util import Log +from . import Match +from .cluster import Cluster +from .util import Log def extract( - log_cluster: Generator[Log | List[str], None, None], - regex_match: Generator[Log | str | None, re.Pattern | None, None], + cluster: Cluster, + match: Match, api_base: str = openai.api_base, model: str = "gpt-4", top_p: float = 0.1, @@ -50,8 +52,9 @@ def get_messages(samples): { "role": "system", "content": "You are a senior regular expression developer." - "Create a regular expression which could match, group and extract the log template of several logs below." - "Take care of character escaping.", + "Create a regular expression which could match, group and extract the log template of several logs below, each line is a single log." + "Each group prefer use `(.*?)`." + "pattern should start with `^` and end with `$`", }, { "role": "user", @@ -59,7 +62,7 @@ def get_messages(samples): }, ] - for message in log_cluster: + for message in cluster: if isinstance(message, Log): yield message continue @@ -80,10 +83,12 @@ def get_messages(samples): logging.info( f"extractd regex: {pattern.pattern}", ) - regex_match.send(pattern) + match.send(pattern) continue - except Exception: - logging.warning(f"faild to compile regex: {completion}, guessing...") + except Exception as e: + logging.warning( + f"faild to compile regex: {completion}, error: {e}, guessing..." + ) try: pattern = re.compile( @@ -92,6 +97,6 @@ def get_messages(samples): logging.info( f"guessing success: {pattern.pattern}", ) - regex_match.send(pattern) - except Exception: - logging.error("failed to compile regex: {}".format(completion)) + match.send(pattern) + except Exception as e: + logging.error(f"failed to compile regex: {completion}, error: {e}") diff --git a/log2array/util.py b/log2array/util.py index 870891a..e1a3cde 100644 --- a/log2array/util.py +++ b/log2array/util.py @@ -70,15 +70,6 @@ def current(self, file: str): f.truncate() f.write(json.dumps(all, cls=Serializer)) - @classmethod - def remember(cls, func): - memory = cls() - - def inner(*args, **kwargs): - return func(memory, *args, **kwargs) - - return inner - @classmethod def serialize(cls, type, serializer): Serializer().register(type, serializer) @@ -133,6 +124,8 @@ def community_detection( top_val_large, top_idx_large = cos_scores[i].topk( k=sort_max_size, largest=True ) + if min(2 * sort_max_size, len(embeddings)) <= sort_max_size: + break for idx, val in zip(top_idx_large.tolist(), top_val_large): if val < threshold: