|
Example of Elastic Logstash pipeline input, filter and output |
|
|
|
============================================== |
|
Example 1: File → Logstash → Elasticsearch |
|
input { |
|
file { |
|
path => "/var/log/apache2/access.log" |
|
start_position => "beginning" |
|
sincedb_path => "/dev/null" |
|
} |
|
} |
|
filter { |
|
grok { |
|
match => { "message" => "%{COMBINEDAPACHELOG}" } |
|
} |
|
date { |
|
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] |
|
} |
|
geoip { |
|
source => "clientip" |
|
} |
|
} |
|
output { |
|
elasticsearch { |
|
hosts => ["localhost:9200"] |
|
} |
|
} |
|
============================================== |
|
Example 2: Filebeat → Logstash → Kafka |
|
input { |
|
beats { |
|
port => "5044" |
|
} |
|
} |
|
filter { |
|
grok { |
|
match => { "message" => "%{COMBINEDAPACHELOG}" } |
|
} |
|
date { |
|
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] |
|
} |
|
geoip { |
|
source => "clientip" |
|
} |
|
} |
|
output { |
|
kafka { |
|
bootstrap_servers => "localhost" |
|
codec => plain { |
|
format => "%{message}" |
|
} |
|
topic_id => "apache" |
|
} |
|
} |
|
============================================== |
|
Example 3: Beats → Logstash → Logz.io (TCP) |
|
input { |
|
beats { |
|
port => "5044" |
|
} |
|
type => apache_access |
|
} |
|
filter { |
|
add_field => { "token" => "aaWTINmMspBUetRoGUrxEApzQkkoMWMn" } |
|
} |
|
tcp { |
|
host => "listener.logz.io" |
|
port => 5050 |
|
codec => json_lines |
|
} |
|
============================================== |
|
Example 4: Beats → Logstash → Logz.io (SSL) |
|
input { |
|
beats { |
|
port => "5044" |
|
} |
|
type => apache_access |
|
} |
|
filter { |
|
add_field => { "token" => "aaWTINmMspBUetRoGUrxEApzQkkoMWMn" } |
|
} |
|
output { |
|
lumberjack { |
|
host => "listener.logz.io" |
|
port => 5006 |
|
ssl_certificate => "/usr/share/logstash/keys/TrustExternalCARoot.crt" |
|
codec => json_lines |
|
} |
|
============================================== |
|
============================================== |
|
input { |
|
file { |
|
path => ["/home/logstash/testdata.log"] |
|
sincedb_path => "/dev/null" |
|
start_position => "beginning" |
|
} |
|
} |
|
filter { |
|
} |
|
output { |
|
stdout { |
|
codec => rubydebug |
|
} |
|
} |
|
|
|
============================================== |
|
#output |
|
file { |
|
codec => line { format => "%{field1},%{field2}"} |
|
path => "/path/to/data_export.csv" |
|
} |
|
|
|
============================================== |
|
filebeat.inputs: |
|
- type: log |
|
paths: |
|
- /path/to/file/logstash-tutorial.log |
|
|
|
output.logstash: |
|
hosts: ["localhost:5044"] |
|
|
|
============================================== |
|
input { |
|
beats { |
|
port => "5044" |
|
} |
|
} |
|
# The filter part of this file is commented out to indicate that it is |
|
# optional. |
|
# filter { |
|
# |
|
# } |
|
output { |
|
stdout { codec => rubydebug } |
|
} |
|
============================================== |
|
filter { |
|
grok { |
|
match => { "message" => "%{COMBINEDAPACHELOG}"} |
|
} |
|
} |
|
============================================== |
|
input { |
|
beats { |
|
port => "5044" |
|
} |
|
} |
|
filter { |
|
grok { |
|
match => { "message" => "%{COMBINEDAPACHELOG}"} |
|
} |
|
} |
|
output { |
|
stdout { codec => rubydebug } |
|
} |
|
============================================== |
|
|
|
input { |
|
beats { |
|
port => "5044" |
|
} |
|
} |
|
filter { |
|
grok { |
|
match => { "message" => "%{COMBINEDAPACHELOG}"} |
|
} |
|
geoip { |
|
source => "clientip" |
|
} |
|
} |
|
output { |
|
stdout { codec => rubydebug } |
|
} |
|
|
|
============================================== |
|
|
|
input { |
|
beats { |
|
port => "5044" |
|
} |
|
} |
|
filter { |
|
grok { |
|
match => { "message" => "%{COMBINEDAPACHELOG}"} |
|
} |
|
geoip { |
|
source => "clientip" |
|
} |
|
} |
|
output { |
|
elasticsearch { |
|
hosts => [ "localhost:9200" ] |
|
} |
|
} |
|
|
|
============================================== |
|
input { |
|
file { |
|
path => ["/home/logstash/testdata.log"] |
|
sincedb_path => "/dev/null" |
|
start_position => "beginning" |
|
} |
|
} |
|
filter { |
|
} |
|
output { |
|
stdout { |
|
codec => rubydebug |
|
} |
|
} |
|
============================================== |
|
filter { |
|
dissect { |
|
mapping => { |
|
"message" => "%{timestamp->} %{duration} %{client_address} %{cache_result}/%{status_code} %{bytes} %{request_method} %{url} %{user} %{hierarchy_code}/%{server} %{content_type}" |
|
} |
|
remove_field => ["message"] |
|
} |
|
} |
|
============================================== |
|
filter { |
|
grok { |
|
match => { |
|
"message" => "%{NUMBER:timestamp}%{SPACE}%{GREEDYDATA:rest}" |
|
} |
|
} |
|
} |
|
============================================== |
|
filter { |
|
grok { |
|
match => { |
|
"message" => "%{NUMBER:timestamp}%{SPACE}%{NUMBER:duration}\s%{IP:client_address}\s%{WORD:cache_result}/%{POSINT:status_code}\s%{NUMBER:bytes}\s%{WORD:request_method}\s%{NOTSPACE:url}\s%{NOTSPACE:user}\s%{WORD:hierarchy_code}/%{NOTSPACE:server}\s%{NOTSPACE:content_type}" |
|
} |
|
remove_field => ["message"] |
|
} |
|
} |
|
============================================== |
|
mutate { |
|
convert => { |
|
"bytes" => "integer" |
|
"duration" => "integer" |
|
"status_code" => "integer" |
|
"timestamp" => "float" |
|
} |
|
} |
|
============================================== |
|
filter { |
|
dissect { |
|
mapping => { |
|
"message" => "%{timestamp->} %{duration} %{client_address} %{cache_result}/%{status_code} %{bytes} %{request_method} %{url} %{user} %{hierarchy_code}/%{server} %{content_type}" |
|
} |
|
remove_field => ["message"] |
|
convert_datatype => { |
|
"bytes" => "int" |
|
"duration" => "int" |
|
"status_code" => "int" |
|
"timestamp" => "float" |
|
} |
|
} |
|
} |
|
============================================== |
|
filter { |
|
grok { |
|
match => { |
|
"message" => "%{NUMBER:timestamp:float}%{SPACE}%{NUMBER:duration:int}\s%{IP:client_address}\s%{WORD:cache_result}/%{POSINT:status_code:int}\s%{NUMBER:bytes:int}\s%{WORD:request_method}\s%{NOTSPACE:url}\s%{NOTSPACE:user}\s%{WORD:hierarchy_code}/%{NOTSPACE:server}\s%{NOTSPACE:content_type}" |
|
} |
|
remove_field => ["message"] |
|
} |
|
} |
|
============================================== |
|
# config/pipelines.yml |
|
- pipeline.id: beats-server |
|
config.string: | |
|
input { beats { port => 5044 } } |
|
output { |
|
if [type] == "apache" { |
|
pipeline { send_to => weblogs } |
|
} else if [type] == "system" { |
|
pipeline { send_to => syslog } |
|
} else { |
|
pipeline { send_to => fallback } |
|
} |
|
} |
|
- pipeline.id: weblog-processing |
|
config.string: | |
|
input { pipeline { address => weblogs } } |
|
filter { |
|
# Weblog filter statements here... |
|
} |
|
output { |
|
elasticsearch { hosts => [es_cluster_a_host] } |
|
} |
|
- pipeline.id: syslog-processing |
|
config.string: | |
|
input { pipeline { address => syslog } } |
|
filter { |
|
# Syslog filter statements here... |
|
} |
|
output { |
|
elasticsearch { hosts => [es_cluster_b_host] } |
|
} |
|
- pipeline.id: fallback-processing |
|
config.string: | |
|
input { pipeline { address => fallback } } |
|
output { elasticsearch { hosts => [es_cluster_b_host] } } |
|
============================================== |
|
# config/pipelines.yml |
|
- pipeline.id: intake |
|
queue.type: persisted |
|
config.string: | |
|
input { beats { port => 5044 } } |
|
output { pipeline { send_to => [es, http] } } |
|
- pipeline.id: buffered-es |
|
queue.type: persisted |
|
config.string: | |
|
input { pipeline { address => es } } |
|
output { elasticsearch { } } |
|
- pipeline.id: buffered-http |
|
queue.type: persisted |
|
config.string: | |
|
input { pipeline { address => http } } |
|
output { http { } } |
|
============================================== |
|
# config/pipelines.yml |
|
- pipeline.id: intake |
|
queue.type: persisted |
|
config.string: | |
|
input { beats { port => 5044 } } |
|
output { pipeline { send_to => ["internal-es", "partner-s3"] } } |
|
- pipeline.id: buffered-es |
|
queue.type: persisted |
|
config.string: | |
|
input { pipeline { address => "internal-es" } } |
|
# Index the full event |
|
output { elasticsearch { } } |
|
- pipeline.id: partner |
|
queue.type: persisted |
|
config.string: | |
|
input { pipeline { address => "partner-s3" } } |
|
filter { |
|
# Remove the sensitive data |
|
mutate { remove_field => 'sensitive-data' } |
|
} |
|
output { s3 { } } # Output to partner's bucket |
|
============================================== |
|
# config/pipelines.yml |
|
- pipeline.id: beats |
|
config.string: | |
|
input { beats { port => 5044 } } |
|
output { pipeline { send_to => [commonOut] } } |
|
- pipeline.id: kafka |
|
config.string: | |
|
input { kafka { ... } } |
|
output { pipeline { send_to => [commonOut] } } |
|
- pipeline.id: partner |
|
# This common pipeline enforces the same logic whether data comes from Kafka or Beats |
|
config.string: | |
|
input { pipeline { address => commonOut } } |
|
filter { |
|
# Always remove sensitive data from all input sources |
|
mutate { remove_field => 'sensitive-data' } |
|
} |
|
output { elasticsearch { } } |
|
============================================== |
|
Reference |
|
https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html |
|
https://www.elastic.co/blog/a-practical-introduction-to-logstash |
|
https://www.elastic.co/guide/en/logstash/current/pipeline-to-pipeline.html |