Skip to content

Latest commit

 

History

History
235 lines (195 loc) · 9.04 KB

kafka-logger.md

File metadata and controls

235 lines (195 loc) · 9.04 KB
title
kafka-logger

Summary

Name

kafka-logger is a plugin which works as a Kafka client driver for the ngx_lua nginx module.

This will provide the ability to send Log data requests as JSON objects to external Kafka clusters.

This plugin provides the ability to push Log data as a batch to you're external Kafka topics. In case if you did not receive the log data don't worry give it some time it will automatically send the logs after the timer function expires in our Batch Processor.

For more info on Batch-Processor in Apache APISIX please refer. Batch-Processor

Attributes

Name Type Requirement Default Valid Description
broker_list object required An array of Kafka brokers.
kafka_topic string required Target topic to push data.
producer_type string optional async ["async", "sync"] Producer's mode of sending messages.
key string optional Used for partition allocation of messages.
timeout integer optional 3 [1,...] Timeout for the upstream to send data.
name string optional "kafka logger" A unique identifier to identity the batch processor.
meta_format enum optional "default" ["default","origin"] default: collect the request information with default JSON way. origin: collect the request information with original HTTP request. example
batch_max_size integer optional 1000 [1,...] Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the Kafka service.
inactive_timeout integer optional 5 [1,...] The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the Kafka service regardless of whether the number of logs in the buffer reaches the set maximum number.
buffer_duration integer optional 60 [1,...] Maximum age in seconds of the oldest entry in a batch before the batch must be processed.
max_retry_count integer optional 0 [0,...] Maximum number of retries before removing from the processing pipe line.
retry_delay integer optional 1 [0,...] Number of seconds the process execution should be delayed if the execution fails.
include_req_body boolean optional false [false, true] Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included.

examples of meta_format

  • default:

    {
     "upstream": "127.0.0.1:1980",
     "start_time": 1619414294760,
     "client_ip": "127.0.0.1",
     "service_id": "",
     "route_id": "1",
     "request": {
       "querystring": {
         "ab": "cd"
       },
       "size": 90,
       "uri": "/hello?ab=cd",
       "url": "http://localhost:1984/hello?ab=cd",
       "headers": {
         "host": "localhost",
         "content-length": "6",
         "connection": "close"
       },
       "body": "abcdef",
       "method": "GET"
     },
     "response": {
       "headers": {
         "connection": "close",
         "content-type": "text/plain; charset=utf-8",
         "date": "Mon, 26 Apr 2021 05:18:14 GMT",
         "server": "APISIX/2.5",
         "transfer-encoding": "chunked"
       },
       "size": 190,
       "status": 200
     },
     "server": {
       "hostname": "localhost",
       "version": "2.5"
     },
     "latency": 0
    }
  • origin:

    GET /hello?ab=cd HTTP/1.1
    host: localhost
    content-length: 6
    connection: close
    
    abcdef

Info

The message will write to the buffer first. It will send to the kafka server when the buffer exceed the batch_max_size, or every buffer_duration flush the buffer.

In case of success, returns true. In case of errors, returns nil with a string describing the error (buffer overflow).

Sample broker list

This plugin supports to push in to more than one broker at a time. Specify the brokers of the external kafka servers as below sample to take effect of this functionality.

{
    "127.0.0.1":9092,
    "127.0.0.1":9093
}

How To Enable

The following is an example on how to enable the kafka-logger for a specific route.

curl http://127.0.0.1:9080/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
    "plugins": {
       "kafka-logger": {
           "broker_list" :
             {
               "127.0.0.1":9092
             },
           "kafka_topic" : "test2",
           "key" : "key1",
           "batch_max_size": 1,
           "name": "kafka logger"
       }
    },
    "upstream": {
       "nodes": {
           "127.0.0.1:1980": 1
       },
       "type": "roundrobin"
    },
    "uri": "/hello"
}'

Test Plugin

*success:

$ curl -i http://127.0.0.1:9080/hello
HTTP/1.1 200 OK
...
hello, world

Metadata

Name Type Requirement Default Valid Description
log_format object optional {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} Log format declared as key value pair in JSON format. Only string is supported in the value part. If the value starts with $, it means to get APISIX variables or Nginx variable.

Note that the metadata configuration is applied in global scope, which means it will take effect on all Route or Service which use kafka-logger plugin.

APISIX Variables

Variable Name Description Usage Example
route_id id of route $route_id
route_name name of route $route_name
service_id id of service $service_id
service_name name of service $service_name
consumer_name username of consumer $consumer_name

Example

curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/kafka-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
    "log_format": {
        "host": "$host",
        "@timestamp": "$time_iso8601",
        "client_ip": "$remote_addr"
    }
}'

It is expected to see some logs like that:

{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}

Disable Plugin

Remove the corresponding json configuration in the plugin configuration to disable the kafka-logger. APISIX plugins are hot-reloaded, therefore no need to restart APISIX.

$ curl http://127.0.0.1:2379/apisix/admin/routes/1  -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d value='
{
    "methods": ["GET"],
    "uri": "/hello",
    "plugins": {},
    "upstream": {
        "type": "roundrobin",
        "nodes": {
            "127.0.0.1:1980": 1
        }
    }
}'