Sending Nginx json logs using Vector in Clickhouse and Elasticsearch



Vector , designed to collect, transform and send data from logs, metrics and events. Its Github



Written in the Rust language, it features high performance and low memory consumption compared to its counterparts. In addition, much attention is paid to functions related to correctness, in particular, the ability to save unsent events to the buffer on disk and rotate files.



Architecturally, Vector is an event router that accepts messages from one or more sources , optionally applies transformations on these messages , and sends them to one or more sinks .



Vector is a replacement for filebeat and logstash, it can act in both roles (receive and send logs), more details on their website .



If in Logstash the chain is built as input β†’ filter β†’ output, then in Vector it is sources β†’ transforms β†’ sinks



Examples can be found in the documentation.



. geoip. geoip , vector .



Aug 05 06:25:31.889 DEBUG transform{name=nginx_parse_rename_fields type=rename_fields}: vector::transforms::rename_fields: Field did not exist field=Β«geoip.country_nameΒ» rate_limit_secs=30


- geoip, .



Nginx (Access logs) β†’ Vector (Client | Filebeat) β†’ Vector (Server | Logstash) β†’ Clickhouse Elasticsearch. 4 . 3 .





.



Selinux



sed -i 's/^SELINUX=.*/SELINUX=disabled/g' /etc/selinux/config
reboot


HTTP +



HTTP nodejs-stub-server Maxim Ignatenko



Nodejs-stub-server rpm. rpm. rpm Fedora Copr



antonpatsev/nodejs-stub-server



yum -y install yum-plugin-copr epel-release
yes | yum copr enable antonpatsev/nodejs-stub-server


nodejs-stub-server, Apache benchmark screen



yum -y install stub_http_server screen mc httpd-tools screen


/var/lib/stub_http_server/stub_http_server.js stub_http_server .



var max_sleep = 10;


stub_http_server.



systemctl start stub_http_server
systemctl enable stub_http_server


Clickhouse 3



ClickHouse SSE 4.2, , , , . , , SSE 4.2:



grep -q sse4_2 /proc/cpuinfo && echo "SSE 4.2 supported" || echo "SSE 4.2 not supported"


:



sudo yum install -y yum-utils
sudo rpm --import https://repo.clickhouse.tech/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.tech/rpm/stable/x86_64


:



sudo yum install -y clickhouse-server clickhouse-client


clickhouse-server /etc/clickhouse-server/config.xml



<listen_host>0.0.0.0</listen_host>


c trace debug



<level>debug</level>


:



min_compress_block_size  65536
max_compress_block_size  1048576


Zstd , DDL.





zstd DDL . .



, zstd Clickhouse β€” , , .



, :



service clickhouse-server start


Clickhouse



Clickhouse



clickhouse-client -h 172.26.10.109 -m


172.26.10.109 β€” IP Clickhouse.



vector



CREATE DATABASE vector;


.



show databases;


vector.logs.



/*        */

CREATE TABLE vector.logs
(
    `node_name` String,
    `timestamp` DateTime,
    `server_name` String,
    `user_id` String,
    `request_full` String,
    `request_user_agent` String,
    `request_http_host` String,
    `request_uri` String,
    `request_scheme` String,
    `request_method` String,
    `request_length` UInt64,
    `request_time` Float32,
    `request_referrer` String,
    `response_status` UInt16,
    `response_body_bytes_sent` UInt64,
    `response_content_type` String,
    `remote_addr` IPv4,
    `remote_port` UInt32,
    `remote_user` String,
    `upstream_addr` IPv4,
    `upstream_port` UInt32,
    `upstream_bytes_received` UInt64,
    `upstream_bytes_sent` UInt64,
    `upstream_cache_status` String,
    `upstream_connect_time` Float32,
    `upstream_header_time` Float32,
    `upstream_response_length` UInt64,
    `upstream_response_time` Float32,
    `upstream_status` UInt16,
    `upstream_content_type` String,
    INDEX idx_http_host request_http_host TYPE set(0) GRANULARITY 1
)
ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY timestamp
TTL timestamp + toIntervalMonth(1)
SETTINGS index_granularity = 8192;


. clickhouse-client .



vector.



use vector;

Ok.

0 rows in set. Elapsed: 0.001 sec.


.



show tables;

β”Œβ”€name────────────────┐
β”‚ logs                β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜


elasticsearch 4- Elasticsearch Clickhouse



rpm



rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch


2 :



/etc/yum.repos.d/elasticsearch.repo



[elasticsearch]
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=0
autorefresh=1
type=rpm-md


/etc/yum.repos.d/kibana.repo



[kibana-7.x]
name=Kibana repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md


elasticsearch kibana



yum install -y kibana elasticsearch


1 , /etc/elasticsearch/elasticsearch.yml :



discovery.type: single-node


vector elasticsearch network.host.



network.host: 0.0.0.0


kibana server.host /etc/kibana/kibana.yml



server.host: "0.0.0.0"


elasticsearch



systemctl enable elasticsearch
systemctl start elasticsearch


kibana



systemctl enable kibana
systemctl start kibana


Elasticsearch single-node 1 shard, 0 replica. .



:



curl -X PUT http://localhost:9200/_template/default -H 'Content-Type: application/json' -d '{"index_patterns": ["*"],"order": -1,"settings": {"number_of_shards": "1","number_of_replicas": "0"}}' 


Vector Logstash 2



yum install -y https://packages.timber.io/vector/0.9.X/vector-x86_64.rpm mc httpd-tools screen


Vector Logstash. /etc/vector/vector.toml



# /etc/vector/vector.toml

data_dir = "/var/lib/vector"

[sources.nginx_input_vector]
  # General
  type                          = "vector"
  address                       = "0.0.0.0:9876"
  shutdown_timeout_secs         = 30

[transforms.nginx_parse_json]
  inputs                        = [ "nginx_input_vector" ]
  type                          = "json_parser"

[transforms.nginx_parse_add_defaults]
  inputs                        = [ "nginx_parse_json" ]
  type                          = "lua"
  version                       = "2"

  hooks.process = """
  function (event, emit)

    function split_first(s, delimiter)
      result = {};
      for match in (s..delimiter):gmatch("(.-)"..delimiter) do
          table.insert(result, match);
      end
      return result[1];
    end

    function split_last(s, delimiter)
      result = {};
      for match in (s..delimiter):gmatch("(.-)"..delimiter) do
          table.insert(result, match);
      end
      return result[#result];
    end

    event.log.upstream_addr             = split_first(split_last(event.log.upstream_addr, ', '), ':')
    event.log.upstream_bytes_received   = split_last(event.log.upstream_bytes_received, ', ')
    event.log.upstream_bytes_sent       = split_last(event.log.upstream_bytes_sent, ', ')
    event.log.upstream_connect_time     = split_last(event.log.upstream_connect_time, ', ')
    event.log.upstream_header_time      = split_last(event.log.upstream_header_time, ', ')
    event.log.upstream_response_length  = split_last(event.log.upstream_response_length, ', ')
    event.log.upstream_response_time    = split_last(event.log.upstream_response_time, ', ')
    event.log.upstream_status           = split_last(event.log.upstream_status, ', ')

    if event.log.upstream_addr == "" then
        event.log.upstream_addr = "127.0.0.1"
    end

    if (event.log.upstream_bytes_received == "-" or event.log.upstream_bytes_received == "") then
        event.log.upstream_bytes_received = "0"
    end

    if (event.log.upstream_bytes_sent == "-" or event.log.upstream_bytes_sent == "") then
        event.log.upstream_bytes_sent = "0"
    end

    if event.log.upstream_cache_status == "" then
        event.log.upstream_cache_status = "DISABLED"
    end

    if (event.log.upstream_connect_time == "-" or event.log.upstream_connect_time == "") then
        event.log.upstream_connect_time = "0"
    end

    if (event.log.upstream_header_time == "-" or event.log.upstream_header_time == "") then
        event.log.upstream_header_time = "0"
    end

    if (event.log.upstream_response_length == "-" or event.log.upstream_response_length == "") then
        event.log.upstream_response_length = "0"
    end

    if (event.log.upstream_response_time == "-" or event.log.upstream_response_time == "") then
        event.log.upstream_response_time = "0"
    end

    if (event.log.upstream_status == "-" or event.log.upstream_status == "") then
        event.log.upstream_status = "0"
    end

    emit(event)

  end
  """

[transforms.nginx_parse_remove_fields]
    inputs                              = [ "nginx_parse_add_defaults" ]
    type                                = "remove_fields"
    fields                              = ["data", "file", "host", "source_type"]

[transforms.nginx_parse_coercer]

    type                                = "coercer"
    inputs                              = ["nginx_parse_remove_fields"]

    types.request_length = "int"
    types.request_time = "float"

    types.response_status = "int"
    types.response_body_bytes_sent = "int"

    types.remote_port = "int"

    types.upstream_bytes_received = "int"
    types.upstream_bytes_send = "int"
    types.upstream_connect_time = "float"
    types.upstream_header_time = "float"
    types.upstream_response_length = "int"
    types.upstream_response_time = "float"
    types.upstream_status = "int"

    types.timestamp = "timestamp"

[sinks.nginx_output_clickhouse]
    inputs   = ["nginx_parse_coercer"]
    type     = "clickhouse"

    database = "vector"
    healthcheck = true
    host = "http://172.26.10.109:8123" #   Clickhouse
    table = "logs"

    encoding.timestamp_format = "unix"

    buffer.type = "disk"
    buffer.max_size = 104900000
    buffer.when_full = "block"

    request.in_flight_limit = 20

[sinks.elasticsearch]
    type = "elasticsearch"
    inputs   = ["nginx_parse_coercer"]
    compression = "none"
    healthcheck = true
    # 172.26.10.116 -    elasticsearch
    host = "http://172.26.10.116:9200" 
    index = "vector-%Y-%m-%d"


transforms.nginx_parse_add_defaults.



CDN upstream_*



:



"upstream_addr": "128.66.0.10:443, 128.66.0.11:443, 128.66.0.12:443"
"upstream_bytes_received": "-, -, 123"
"upstream_status": "502, 502, 200"




service systemd /etc/systemd/system/vector.service



# /etc/systemd/system/vector.service

[Unit]
Description=Vector
After=network-online.target
Requires=network-online.target

[Service]
User=vector
Group=vector
ExecStart=/usr/bin/vector
ExecReload=/bin/kill -HUP $MAINPID
Restart=no
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=vector

[Install]
WantedBy=multi-user.target


Vector



systemctl enable vector
systemctl start vector


vector



journalctl -f -u vector




INFO vector::topology::builder: Healthcheck: Passed.
INFO vector::topology::builder: Healthcheck: Passed.


(Web server) β€” 1-



c nginx ipv6, logs clickhouse upstream_addr IPv4, ipv6 . ipv6 , :



DB::Exception: Invalid IPv4 value.: (while read the value of key upstream_addr)


, ipv6.



/etc/sysctl.d/98-disable-ipv6.conf



net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1




sysctl --system


nginx.



nginx /etc/yum.repos.d/nginx.repo



[nginx-stable]
name=nginx stable repo
baseurl=http://nginx.org/packages/centos/$releasever/$basearch/
gpgcheck=1
enabled=1
gpgkey=https://nginx.org/keys/nginx_signing.key
module_hotfixes=true


nginx



yum install -y nginx


Nginx /etc/nginx/nginx.conf



user  nginx;
# you must set worker processes based on your CPU cores, nginx does not benefit from setting more than that
worker_processes auto; #some last versions calculate it automatically

# number of file descriptors used for nginx
# the limit for the maximum FDs on the server is usually set by the OS.
# if you don't set FD's then OS settings will be used which is by default 2000
worker_rlimit_nofile 100000;

error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;

# provides the configuration file context in which the directives that affect connection processing are specified.
events {
    # determines how much clients will be served per worker
    # max clients = worker_connections * worker_processes
    # max clients is also limited by the number of socket connections available on the system (~64k)
    worker_connections 4000;

    # optimized to serve many clients with each thread, essential for linux -- for testing environment
    use epoll;

    # accept as many connections as possible, may flood worker connections if set too low -- for testing environment
    multi_accept on;
}

http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

log_format vector escape=json
    '{'
        '"node_name":"nginx-vector",'
        '"timestamp":"$time_iso8601",'
        '"server_name":"$server_name",'
        '"request_full": "$request",'
        '"request_user_agent":"$http_user_agent",'
        '"request_http_host":"$http_host",'
        '"request_uri":"$request_uri",'
        '"request_scheme": "$scheme",'
        '"request_method":"$request_method",'
        '"request_length":"$request_length",'
        '"request_time": "$request_time",'
        '"request_referrer":"$http_referer",'
        '"response_status": "$status",'
        '"response_body_bytes_sent":"$body_bytes_sent",'
        '"response_content_type":"$sent_http_content_type",'
        '"remote_addr": "$remote_addr",'
        '"remote_port": "$remote_port",'
        '"remote_user": "$remote_user",'
        '"upstream_addr": "$upstream_addr",'
        '"upstream_bytes_received": "$upstream_bytes_received",'
        '"upstream_bytes_sent": "$upstream_bytes_sent",'
        '"upstream_cache_status":"$upstream_cache_status",'
        '"upstream_connect_time":"$upstream_connect_time",'
        '"upstream_header_time":"$upstream_header_time",'
        '"upstream_response_length":"$upstream_response_length",'
        '"upstream_response_time":"$upstream_response_time",'
        '"upstream_status": "$upstream_status",'
        '"upstream_content_type":"$upstream_http_content_type"'
    '}';

    access_log  /var/log/nginx/access.log  main;
    access_log  /var/log/nginx/access.json.log vector;      #     json

    sendfile        on;
    #tcp_nopush     on;

    keepalive_timeout  65;

    #gzip  on;

    include /etc/nginx/conf.d/*.conf;
}


, Nginx access_log



access_log  /var/log/nginx/access.log  main;            #  
access_log  /var/log/nginx/access.json.log vector;      #     json


logrotate ( log .log)



default.conf /etc/nginx/conf.d/



rm -f /etc/nginx/conf.d/default.conf


/etc/nginx/conf.d/vhost1.conf



server {
    listen 80;
    server_name vhost1;
    location / {
        proxy_pass http://172.26.10.106:8080;
    }
}


/etc/nginx/conf.d/vhost2.conf



server {
    listen 80;
    server_name vhost2;
    location / {
        proxy_pass http://172.26.10.108:8080;
    }
}


/etc/nginx/conf.d/vhost3.conf



server {
    listen 80;
    server_name vhost3;
    location / {
        proxy_pass http://172.26.10.109:8080;
    }
}


/etc/nginx/conf.d/vhost4.conf



server {
    listen 80;
    server_name vhost4;
    location / {
        proxy_pass http://172.26.10.116:8080;
    }
}


/etc/hosts (172.26.10.106 ip nginx) :



172.26.10.106 vhost1
172.26.10.106 vhost2
172.26.10.106 vhost3
172.26.10.106 vhost4




nginx -t 
systemctl restart nginx


Vector



yum install -y https://packages.timber.io/vector/0.9.X/vector-x86_64.rpm


systemd /etc/systemd/system/vector.service



[Unit]
Description=Vector
After=network-online.target
Requires=network-online.target

[Service]
User=vector
Group=vector
ExecStart=/usr/bin/vector
ExecReload=/bin/kill -HUP $MAINPID
Restart=no
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=vector

[Install]
WantedBy=multi-user.target


Filebeat /etc/vector/vector.toml. IP 172.26.10.108 β€” IP log (Vector-Server)



data_dir = "/var/lib/vector"

[sources.nginx_file]
  type                          = "file"
  include                       = [ "/var/log/nginx/access.json.log" ]
  start_at_beginning            = false
  fingerprinting.strategy       = "device_and_inode"

[sinks.nginx_output_vector]
  type                          = "vector"
  inputs                        = [ "nginx_file" ]

  address                       = "172.26.10.108:9876"


vector log . , nginx centos adm.



usermod -a -G adm vector


vector



systemctl enable vector
systemctl start vector


vector



journalctl -f -u vector




INFO vector::topology::builder: Healthcheck: Passed.




Apache benchmark.



httpd-tools



Apache benchmark c 4 screen. screen, Apache benchmark. screen .



C 1-



while true; do ab -H "User-Agent: 1server" -c 100 -n 10 -t 10 http://vhost1/; sleep 1; done


C 2-



while true; do ab -H "User-Agent: 2server" -c 100 -n 10 -t 10 http://vhost2/; sleep 1; done


C 3-



while true; do ab -H "User-Agent: 3server" -c 100 -n 10 -t 10 http://vhost3/; sleep 1; done


C 4-



while true; do ab -H "User-Agent: 4server" -c 100 -n 10 -t 10 http://vhost4/; sleep 1; done


Clickhouse



Clickhouse



clickhouse-client -h 172.26.10.109 -m


SQL



SELECT * FROM vector.logs;

β”Œβ”€node_name────┬───────────timestamp─┬─server_name─┬─user_id─┬─request_full───┬─request_user_agent─┬─request_http_host─┬─request_uri─┬─request_scheme─┬─request_method─┬─request_length─┬─request_time─┬─request_referrer─┬─response_status─┬─response_body_bytes_sent─┬─response_content_type─┬───remote_addr─┬─remote_port─┬─remote_user─┬─upstream_addr─┬─upstream_port─┬─upstream_bytes_received─┬─upstream_bytes_sent─┬─upstream_cache_status─┬─upstream_connect_time─┬─upstream_header_time─┬─upstream_response_length─┬─upstream_response_time─┬─upstream_status─┬─upstream_content_type─┐
β”‚ nginx-vector β”‚ 2020-08-07 04:32:42 β”‚ vhost1      β”‚         β”‚ GET / HTTP/1.0 β”‚ 1server            β”‚ vhost1            β”‚ /           β”‚ http           β”‚ GET            β”‚             66 β”‚        0.028 β”‚                  β”‚             404 β”‚                       27 β”‚                       β”‚ 172.26.10.106 β”‚       45886 β”‚             β”‚ 172.26.10.106 β”‚             0 β”‚                     109 β”‚                  97 β”‚ DISABLED              β”‚                     0 β”‚                0.025 β”‚                       27 β”‚                  0.029 β”‚             404 β”‚                       β”‚
└──────────────┴─────────────────────┴─────────────┴─────────┴────────────────┴────────────────────┴───────────────────┴─────────────┴────────────────┴────────────────┴────────────────┴──────────────┴──────────────────┴─────────────────┴──────────────────────────┴───────────────────────┴───────────────┴─────────────┴─────────────┴───────────────┴───────────────┴─────────────────────────┴─────────────────────┴───────────────────────┴───────────────────────┴──────────────────────┴──────────────────────────┴────────────────────────┴─────────────────┴───────────────────────


Clickhouse



select concat(database, '.', table)                         as table,
       formatReadableSize(sum(bytes))                       as size,
       sum(rows)                                            as rows,
       max(modification_time)                               as latest_modification,
       sum(bytes)                                           as bytes_size,
       any(engine)                                          as engine,
       formatReadableSize(sum(primary_key_bytes_in_memory)) as primary_keys_size
from system.parts
where active
group by database, table
order by bytes_size desc;


Clickhouse .





logs 857.19 .





Elasticsearch 4,5.



vector Clickhouse 4500/857.19 = 5.24 Elasticsearch.



vector compression .



Telegram chat on Clickhouse

Telegram chat on Elasticsearch

Telegram chat on " Collection and analytics of system messages "




All Articles