Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test new configs logstash #77

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
58c2772
added custom_size_based_buffer class
monishkadas-ms Sep 12, 2024
cbb7444
added custom_size_based_buffer class
monishkadas-ms Sep 12, 2024
5b2af15
added custom_size_based_buffer class
monishkadas-ms Sep 12, 2024
806e57f
added custom_size_based_buffer class
monishkadas-ms Sep 12, 2024
117a2b4
Merge branch 'master' of https://github.com/Azure/logstash-output-kus…
MonishkaDas Sep 23, 2024
0110ae6
Added warning for deprecated path config var
MonishkaDas Sep 24, 2024
f334a4b
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
28fc7a3
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
79b8c3f
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
47a3db2
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
4b66790
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
3b50bdb
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
521c21f
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
531046c
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
c813a5b
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
ab1f66f
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
9994da5
Updated kusto_spec.rb and ingestor_spec.rb
MonishkaDas Sep 25, 2024
649bb0d
Updated max_size config
MonishkaDas Sep 27, 2024
ed85986
Updated max_size config
MonishkaDas Sep 27, 2024
43f9bea
Added tests in kusto_spec.rb
MonishkaDas Oct 2, 2024
4a85b94
Added tests in kusto_spec.rb
MonishkaDas Oct 2, 2024
9c84304
Updated custom_size_based_buffer.rb
MonishkaDas Oct 15, 2024
fce671f
Updated custom_size_based_buffer.rb
MonishkaDas Oct 15, 2024
379da3b
Updated custom_size_based_buffer.rb
MonishkaDas Oct 16, 2024
b7aba1a
Adds temp file buffer used during network downtime
MonishkaDas Oct 17, 2024
ca981ba
Updated custom_size_based_buffer.rb
MonishkaDas Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 39 additions & 33 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ apply plugin: 'maven-publish'
// The gemspec contains the gem metadata to build and package the gem. The gradle build serves as a mechanism of getting these "vendor" files required for the gem.
// The alternative is to use ruby-maven gem to package, but this runs into classpath conflicts/issues with the logstash plugin.
group "org.logstash.outputs"
version Files.readAllLines(Paths.get("version")).first()

def versionFile = Paths.get("version")
if (Files.exists(versionFile)) {
version = Files.readAllLines(versionFile).first()
} else {
version = "2.0.7"
}

repositories {
mavenCentral()
Expand All @@ -23,16 +29,16 @@ repositories {
// update dependencies to bom azure-sdk-bom/1.2.24

dependencies {
implementation 'com.microsoft.azure.kusto:kusto-data:5.1.1'
implementation 'com.microsoft.azure.kusto:kusto-ingest:5.1.1'
implementation 'com.azure:azure-core-http-netty:1.15.0'
implementation 'com.azure:azure-core:1.49.0'
implementation 'com.azure:azure-data-tables:12.4.1'
implementation 'com.azure:azure-identity:1.12.1'
implementation 'com.microsoft.azure.kusto:kusto-data:5.2.0'
implementation 'com.microsoft.azure.kusto:kusto-ingest:5.2.0'
implementation 'com.azure:azure-core-http-netty:1.15.1'
implementation 'com.azure:azure-core:1.49.1'
implementation 'com.azure:azure-data-tables:12.4.2'
implementation 'com.azure:azure-identity:1.13.0'
implementation 'com.azure:azure-json:1.1.0'
implementation 'com.azure:azure-storage-blob:12.26.0'
implementation 'com.azure:azure-storage-common:12.25.0'
implementation 'com.azure:azure-storage-queue:12.21.0'
implementation 'com.azure:azure-storage-blob:12.26.1'
implementation 'com.azure:azure-storage-common:12.25.1'
implementation 'com.azure:azure-storage-queue:12.21.1'
implementation 'com.azure:azure-xml:1.0.0'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.16.0'
implementation 'com.fasterxml.jackson.core:jackson-core:2.16.0'
Expand All @@ -50,30 +56,30 @@ dependencies {
implementation 'commons-logging:commons-logging:1.3.1'
implementation 'io.github.resilience4j:resilience4j-core:1.7.1'
implementation 'io.github.resilience4j:resilience4j-retry:1.7.1'
implementation 'io.netty:netty-buffer:4.1.108.Final'
implementation 'io.netty:netty-codec-dns:4.1.108.Final'
implementation 'io.netty:netty-codec-http2:4.1.108.Final'
implementation 'io.netty:netty-codec-http:4.1.108.Final'
implementation 'io.netty:netty-codec-socks:4.1.108.Final'
implementation 'io.netty:netty-codec:4.1.108.Final'
implementation 'io.netty:netty-common:4.1.108.Final'
implementation 'io.netty:netty-handler-proxy:4.1.108.Final'
implementation 'io.netty:netty-handler:4.1.108.Final'
implementation 'io.netty:netty-resolver-dns-classes-macos:4.1.108.Final'
implementation 'io.netty:netty-resolver-dns-native-macos:4.1.108.Final:osx-x86_64'
implementation 'io.netty:netty-resolver-dns:4.1.108.Final'
implementation 'io.netty:netty-resolver:4.1.108.Final'
implementation 'io.netty:netty-buffer:4.1.110.Final'
implementation 'io.netty:netty-codec-dns:4.1.110.Final'
implementation 'io.netty:netty-codec-http2:4.1.110.Final'
implementation 'io.netty:netty-codec-http:4.1.110.Final'
implementation 'io.netty:netty-codec-socks:4.1.110.Final'
implementation 'io.netty:netty-codec:4.1.110.Final'
implementation 'io.netty:netty-common:4.1.110.Final'
implementation 'io.netty:netty-handler-proxy:4.1.110.Final'
implementation 'io.netty:netty-handler:4.1.110.Final'
implementation 'io.netty:netty-resolver-dns-classes-macos:4.1.110.Final'
implementation 'io.netty:netty-resolver-dns-native-macos:4.1.110.Final:osx-x86_64'
implementation 'io.netty:netty-resolver-dns:4.1.110.Final'
implementation 'io.netty:netty-resolver:4.1.110.Final'
implementation 'io.netty:netty-tcnative-boringssl-static:2.0.65.Final'
implementation 'io.netty:netty-tcnative-classes:2.0.65.Final'
implementation 'io.netty:netty-transport-classes-epoll:4.1.108.Final'
implementation 'io.netty:netty-transport-classes-kqueue:4.1.108.Final'
implementation 'io.netty:netty-transport-native-epoll:4.1.108.Final:linux-x86_64'
implementation 'io.netty:netty-transport-native-kqueue:4.1.108.Final:osx-x86_64'
implementation 'io.netty:netty-transport-native-unix-common:4.1.108.Final'
implementation 'io.netty:netty-transport:4.1.108.Final'
implementation 'io.projectreactor.netty:reactor-netty-core:1.0.43'
implementation 'io.projectreactor.netty:reactor-netty-http:1.0.43'
implementation 'io.projectreactor:reactor-core:3.4.36'
implementation 'io.netty:netty-transport-classes-epoll:4.1.110.Final'
implementation 'io.netty:netty-transport-classes-kqueue:4.1.110.Final'
implementation 'io.netty:netty-transport-native-epoll:4.1.110.Final:linux-x86_64'
implementation 'io.netty:netty-transport-native-kqueue:4.1.110.Final:osx-x86_64'
implementation 'io.netty:netty-transport-native-unix-common:4.1.110.Final'
implementation 'io.netty:netty-transport:4.1.110.Final'
implementation 'io.projectreactor.netty:reactor-netty-core:1.0.45'
implementation 'io.projectreactor.netty:reactor-netty-http:1.0.45'
implementation 'io.projectreactor:reactor-core:3.4.38'
implementation 'io.vavr:vavr:0.10.4'
implementation 'io.vavr:vavr-match:0.10.4'
implementation 'net.java.dev.jna:jna-platform:5.13.0'
Expand Down Expand Up @@ -116,7 +122,7 @@ task vendor {
String vendorPathPrefix = "vendor/jar-dependencies"
configurations.runtimeClasspath.allDependencies.each { dep ->
println("Copying ${dep.group}:${dep.name}:${dep.version}")
File f = configurations.runtimeClasspath.filter { it.absolutePath.contains("${dep.group}/${dep.name}/${dep.version}") }.singleFile
File f = configurations.runtimeClasspath.filter { it.absolutePath.contains("${dep.group}${File.separator}${dep.name}${File.separator}${dep.version}") }.singleFile
String groupPath = dep.group.replaceAll('\\.', '/')
File newJarFile = file("${vendorPathPrefix}/${groupPath}/${dep.name}/${dep.version}/${dep.name}-${dep.version}.jar")
newJarFile.mkdirs()
Expand Down
31 changes: 27 additions & 4 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

require 'logstash/outputs/kusto/ingestor'
require 'logstash/outputs/kusto/interval'
require "logstash/outputs/kusto/custom_size_based_buffer"

##
# This plugin sends messages to Azure Kusto in batches.
Expand Down Expand Up @@ -95,7 +96,6 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
# Mapping name - deprecated, use json_mapping
config :mapping, validate: :string, deprecated: true


# Determines if local files used for temporary storage will be deleted
# after upload is successful
config :delete_temp_files, validate: :boolean, default: true
Expand All @@ -119,10 +119,19 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
# Check Proxy URL can be over http or https. Dowe need it this way or ignore this & remove this
config :proxy_protocol, validate: :string, required: false , default: 'http'

# Maximum size (number of events) of the buffer before it gets flushed, defaults to 1000
config :max_size, validate: :number, default: 1000

# Maximum interval (in seconds) before the buffer gets flushed, defaults to 60
config :max_interval, validate: :number, default: 60

default :codec, 'json_lines'

def register
require 'fileutils' # For mkdir_p
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it all add up ? It is just a buffer that is added.

Currently there is events written to file and then these files are flushed. With the new approach these files should go away

Right now the code changes are

a) Buffer that is taken from LA plugin
b) We write the events to the buffer and flush. How is this message encoded ?
c) can we remove the file part ? or the file buffers based on size on file ?
d) Where are the Tests ?
e) Time based flushes ?

@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
flush(events)
end

@files = {}
@io_mutex = Mutex.new
Expand Down Expand Up @@ -198,9 +207,17 @@ def root_directory

public
def multi_receive_encoded(events_and_encoded)
encoded_by_path = Hash.new { |h, k| h[k] = [] }

events_and_encoded.each do |event, encoded|
@buffer << { event: event, encoded: encoded }
end
end

def flush(events)
encoded_by_path = Hash.new { |h, k| h[k] = [] }

events.each do |event_data|
event = event_data[:event]
encoded = event_data[:encoded]
file_output_path = event_path(event)
encoded_by_path[file_output_path] << encoded
end
Expand All @@ -217,6 +234,12 @@ def multi_receive_encoded(events_and_encoded)
end
end

public
def shutdown
@buffer.shutdown
@ingestor.stop unless @ingestor.nil?
end

def close
@flusher.stop unless @flusher.nil?
@cleaner.stop unless @cleaner.nil?
Expand All @@ -235,7 +258,7 @@ def close
end
end

@ingestor.stop unless @ingestor.nil?
shutdown
end

private
Expand Down
74 changes: 74 additions & 0 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
module LogStash
module Outputs
class CustomSizeBasedBuffer
def initialize(max_size, max_interval, &flush_callback)
@max_size = max_size
@max_interval = max_interval
@flush_callback = flush_callback
@buffer = []
@mutex = Mutex.new
@last_flush_time = Time.now
@shutdown = false
@flusher_condition = ConditionVariable.new

start_flusher_thread
end

def <<(event)
@mutex.synchronize do
@buffer << event
flush if @buffer.size >= @max_size
end
end

def shutdown
@mutex.synchronize do
@shutdown = true
@flusher_condition.signal # Wake up the flusher thread
end
@flusher_thread.join
flush # Ensure final flush after shutdown
end

private

def start_flusher_thread
@flusher_thread = Thread.new do
loop do
@mutex.synchronize do
break if @shutdown
if Time.now - @last_flush_time >= @max_interval
flush
end
@flusher_condition.wait(@mutex, @max_interval) # Wait for either the interval or shutdown signal
end
end
end
end


def flush_if_needed
@mutex.synchronize do
if Time.now - @last_flush_time >= @max_interval
flush
end
end
end

def flush
return if @buffer.empty?

begin
@flush_callback.call(@buffer)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, during gracefully shutdown of the buffer, how do we ensure all events are flushed before the application exits? may be add a graceful shutdown

rescue => e
# Log the error and continue,
puts "Error during flush: #{e.message}"
puts e.backtrace.join("\n")
ensure
@buffer.clear
@last_flush_time = Time.now
end
end
end
end
end
Loading