Skip to content

Commit

Permalink
fix bugs if logs has massive kinds of clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
ethe committed Oct 5, 2023
1 parent 22f6e0f commit 29963b4
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 141 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Log2Array
A command-line tool that automates the detection and extraction of log templates, leveraging regular expressions generated by GPT-4. Say goodbye to the stress of manually writing regex.
A command-line tool that automates the detection and extraction of log templates, leveraging regular expressions generated by GPT-4. Escape from the stress of manually writing regex.

## What it does?
If you have several kinds of logs are mixed together:
Expand Down
102 changes: 54 additions & 48 deletions log2array/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,62 +12,68 @@
from .util import Log, Memory


def sink(path: str, max_size=512) -> Generator[str | None, List[str] | None, None]:
files = glob.glob(path)
logging.info(f"detect log files: {files}")
buffer = []

for file in files:
file = open(path, "r")
with file as f:
for line in f:
if buffer is not None:
while len(buffer) != 0:
l = buffer.pop()
yield l

b = yield line[:-1][:max_size]
if b is not None:
buffer += b

b = yield None
if b is not None:
buffer += b
while len(buffer) != 0:
l = buffer.pop()
yield l

class Sink:
def __init__(self, path: str, max_size: int = 512):
self.path = path
self.max_size = max_size
self.buffer = []

def __iter__(self):
files = glob.glob(self.path)
logging.info(f"detect log files: {files}")

def f():
for file in files:
with open(file, "r") as f:
for line in f:
yield line[:-1][: self.max_size]

def b():
while len(self.buffer) != 0:
line = self.buffer.pop()
yield line

file = f()

while True:
buf = b()
yield from buf
yield from file
yield None
if len(self.buffer) == 0:
break

Memory().serialize(re.Pattern, lambda p: p.pattern)
def send(self, lines):
self.buffer += lines


@Memory.remember
def match(
memory: Memory,
log_stream: Generator[str | None, List[str] | None, None],
) -> Generator[Log | str | None, re.Pattern | None, None]:
patterns = memory.load("patterns", [])
for offset in range(0, len(patterns)):
patterns[offset] = re.compile(patterns[offset])
Memory().serialize(re.Pattern, lambda p: p.pattern)

for line in log_stream:
if line is None:
yield None
continue

pattern = None
class Match:
def __init__(self, memory: Memory, sink: Sink):
self.sink = sink
self.patterns = memory.load("patterns", [])
for offset in range(0, len(self.patterns)):
self.patterns[offset] = re.compile(self.patterns[offset])

for regex in patterns:
match = regex.match(line)
if match is None:
def __iter__(self):
for line in self.sink:
if line is None:
yield None
continue
pattern = yield Log(regex.pattern, line, match.groups())
break
else:
pattern = yield line

if pattern is not None and pattern not in patterns:
patterns.append(pattern)
for regex in self.patterns:
match = regex.match(line)
if match is None:
continue
yield Log(regex.pattern, line, match.groups())
break
else:
yield line

def send(self, pattern: re.Pattern):
self.patterns.append(pattern)


def collect(
Expand Down
24 changes: 16 additions & 8 deletions log2array/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from IPython import embed
from rich.logging import RichHandler

from log2array import match, sink, collect
from log2array.cluster import cluster
from log2array import Match, Sink, collect
from log2array.cluster import Cluster
from log2array.extract import extract
from log2array.util import Memory, parse_size

Expand All @@ -18,7 +18,9 @@ def main():
pass


@main.command()
@main.command(
help="log2array cache all extracted patterns to each files as default, clean the cache as needed."
)
def clean():
files = glob(f"{Memory.PATH}/*")
for f in files:
Expand All @@ -28,7 +30,10 @@ def clean():
@main.command()
@click.argument("file")
@click.option(
"--gpt-base", default=openai.api_base, help="OpenAI API base.", show_default=True
"--gpt-base",
default=openai.api_base,
help="OpenAI API base.",
show_default=True,
)
@click.option(
"--max-lines",
Expand Down Expand Up @@ -65,14 +70,17 @@ def run(file, gpt_base, max_lines, buf_size, max_len, threshold):
return

logging.basicConfig(
level="INFO", format="%(message)s", datefmt="[%X]", handlers=[RichHandler()]
level="INFO",
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler()],
)
buf_size = parse_size(buf_size)

with Memory().current(file):
f = sink(file, max_size=max_len)
m = match(f)
c = cluster(f, m, buf_size=buf_size, threshold=threshold)
f = Sink(file, max_size=max_len)
m = Match(Memory(), f)
c = Cluster(f, m, buf_size=buf_size, threshold=threshold)
e = extract(
c,
m,
Expand Down
147 changes: 84 additions & 63 deletions log2array/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import numpy
import torch

from . import Log
from . import Log, Match, Sink


@contextlib.contextmanager
Expand All @@ -20,7 +20,7 @@ def pool(model):
model.stop_multi_process_pool(pool)


class Cluster:
class Community:
texts: List[str]
embeddings: List[torch.Tensor]

Expand Down Expand Up @@ -73,70 +73,91 @@ def sample(community: torch.Tensor) -> List[int]:
return [min[0], sort[0][0], min1[0]]


def cluster(
log_stream: Generator[str | None, List[str] | None, None],
regex_match: Generator[Log | str | None, re.Pattern | None, None],
model="./all-MiniLM-L6-v2",
buf_size=8 * 1024 * 1024,
threshold=0.7,
min_community_size=3,
) -> Generator[Log | List[str], None, None]:
import torch
from sentence_transformers import SentenceTransformer as Embedder

from .util import community_detection

model = Embedder(model)
# sample_clusters: List[Cluster] = []
buffer: List[str] = []
size = 0

with pool(model) as p:
for line in regex_match:
if line is None:
send, buffer = buffer, []
log_stream.send(send)
continue

if isinstance(line, Log):
yield line
continue

buffer.append(line)
size += len(line)
if size < buf_size:
continue

logging.info(
f"embedding {format(size / 1024, '.2f')}KB / {len(buffer)} logs, it might take a while."
)
embeddings = torch.from_numpy(model.encode_multi_process(buffer, p))
class Cluster:
def __init__(
self,
sink: Sink,
match: Match,
model="./all-MiniLM-L6-v2",
buf_size=8 * 1024 * 1024,
threshold=0.7,
min_community_size=3,
):
from sentence_transformers import SentenceTransformer as Embedder

self.buf_size = buf_size
self.threshold = threshold
self.min_community_size = min_community_size
self.sink = sink
self.match = match
self.model = Embedder(model)
self.buffer = []
self.size = 0

logging.info("analyze log communities, it might take a while.")
clusters = community_detection(
embeddings,
min_community_size=min_community_size,
threshold=threshold,
)
logging.info(f"get {len(clusters)} log communities.")
for cluster in clusters:
vecs = [embeddings[i] for i in cluster]
ids = sample(torch.from_numpy(numpy.array(vecs)))
s0, s1, s2 = (
cluster[ids[0]],
cluster[ids[1]],
cluster[ids[2]],
def __iter__(self):
import torch
from .util import community_detection

with pool(self.model) as p:
for line in self.match:
if isinstance(line, Log):
yield line
continue

if line is not None:
self.buffer.append(line)
self.size += len(line)

if self.size < self.buf_size and line is not None:
continue

if self.size == 0:
continue

logging.info(
f"embedding {format(self.size / 1024, '.2f')}KB / {len(self.buffer)} logs, it might take a while."
)
embeddings = torch.from_numpy(
self.model.encode_multi_process(self.buffer, p)
)

samples = Cluster.from_list(
[buffer[s0], buffer[s1], buffer[s2]],
[embeddings[s0], embeddings[s1], embeddings[s2]],
logging.info("analyze log communities, it might take a while.")
clusters = community_detection(
embeddings,
min_community_size=self.min_community_size,
threshold=self.threshold,
)
# sample_clusters.append(samples)
logging.info(f"get {len(clusters)} log communities.")

if len(clusters) > 0:
yield from self._sample(clusters, embeddings)
else:
logging.warning(
f"no cluster is detected, maybe you should decrease the threshold."
)

self._recycle()

def _sample(self, clusters, embeddings):
for cluster, _ in zip(clusters, range(0, 3)):
vecs = [embeddings[i] for i in cluster]
ids = sample(torch.from_numpy(numpy.array(vecs)))
s0, s1, s2 = (
cluster[ids[0]],
cluster[ids[1]],
cluster[ids[2]],
)

samples = Community.from_list(
[self.buffer[s0], self.buffer[s1], self.buffer[s2]],
[embeddings[s0], embeddings[s1], embeddings[s2]],
)
# sample_clusters.append(samples)

logging.info(f"yield samples {samples.texts}")
yield samples.texts
logging.info(f"yield samples {samples.texts}")
yield samples.texts

send, buffer = buffer, []
size = 0
log_stream.send(send)
def _recycle(self):
send, self.buffer = self.buffer, []
self.size = 0
self.sink.send(send)
Loading

0 comments on commit 29963b4

Please sign in to comment.