Skip to content

Instantly share code, notes, and snippets.

@devops-school
Last active January 11, 2026 18:04
Show Gist options
  • Select an option

  • Save devops-school/2bf356f3a05de7ae291189eeab8ebbfe to your computer and use it in GitHub Desktop.

Select an option

Save devops-school/2bf356f3a05de7ae291189eeab8ebbfe to your computer and use it in GitHub Desktop.
Logstash Pipeline Config file Example

The Grok plugin is one of the more cooler plugins. It enables you to parse unstructured log data into something structured and queryable. Grok is looking for patterns in the data it’s receiving, so we have to configure it to identify the patterns that interest us. Grok comes with some built in patterns. The pattern we are using in this case is %{COMBINEDAPACHELOG}which can be used when Logstash is receiving log data from Apache HTTP.

input {
    beats {
        port => "5044"
    }
}
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}
output {
    elasticsearch {
        hosts => ["http://elasticsearch:9200"]
        index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    }
    stdout { 
        codec => rubydebug 
      }
}
Deals with syslog line input and listens to port 5044.
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{SYSLOGLINE}"}
}
}
output {
stdout { codec => rubydebug }
}
Short Example of Logstash Multiple Pipelines
http://shinaisan.github.io/2018/08/25/short-example-of-logstash-multiple-pipelines.html
https://gist.github.com/shinaisan/78f3a3ad1ab50cab1d3ff32983454987
input {
file {
path => "/var/log/apache2/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
}
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
Example of Elastic Logstash pipeline input, filter and output
==============================================
Example 1: File → Logstash → Elasticsearch
input {
file {
path => "/var/log/apache2/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
}
==============================================
Example 2: Filebeat → Logstash → Kafka
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
geoip {
source => "clientip"
}
}
output {
kafka {
bootstrap_servers => "localhost"
codec => plain {
format => "%{message}"
}
topic_id => "apache"
}
}
==============================================
Example 3: Beats → Logstash → Logz.io (TCP)
input {
beats {
port => "5044"
}
type => apache_access
}
filter {
add_field => { "token" => "aaWTINmMspBUetRoGUrxEApzQkkoMWMn" }
}
tcp {
host => "listener.logz.io"
port => 5050
codec => json_lines
}
==============================================
Example 4: Beats → Logstash → Logz.io (SSL)
input {
beats {
port => "5044"
}
type => apache_access
}
filter {
add_field => { "token" => "aaWTINmMspBUetRoGUrxEApzQkkoMWMn" }
}
output {
lumberjack {
host => "listener.logz.io"
port => 5006
ssl_certificate => "/usr/share/logstash/keys/TrustExternalCARoot.crt"
codec => json_lines
}
==============================================
==============================================
input {
file {
path => ["/home/logstash/testdata.log"]
sincedb_path => "/dev/null"
start_position => "beginning"
}
}
filter {
}
output {
stdout {
codec => rubydebug
}
}
==============================================
#output
file {
codec => line { format => "%{field1},%{field2}"}
path => "/path/to/data_export.csv"
}
==============================================
filebeat.inputs:
- type: log
paths:
- /path/to/file/logstash-tutorial.log
output.logstash:
hosts: ["localhost:5044"]
==============================================
input {
beats {
port => "5044"
}
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
stdout { codec => rubydebug }
}
==============================================
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
==============================================
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
output {
stdout { codec => rubydebug }
}
==============================================
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
stdout { codec => rubydebug }
}
==============================================
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
==============================================
input {
file {
path => ["/home/logstash/testdata.log"]
sincedb_path => "/dev/null"
start_position => "beginning"
}
}
filter {
}
output {
stdout {
codec => rubydebug
}
}
==============================================
filter {
dissect {
mapping => {
"message" => "%{timestamp->} %{duration} %{client_address} %{cache_result}/%{status_code} %{bytes} %{request_method} %{url} %{user} %{hierarchy_code}/%{server} %{content_type}"
}
remove_field => ["message"]
}
}
==============================================
filter {
grok {
match => {
"message" => "%{NUMBER:timestamp}%{SPACE}%{GREEDYDATA:rest}"
}
}
}
==============================================
filter {
grok {
match => {
"message" => "%{NUMBER:timestamp}%{SPACE}%{NUMBER:duration}\s%{IP:client_address}\s%{WORD:cache_result}/%{POSINT:status_code}\s%{NUMBER:bytes}\s%{WORD:request_method}\s%{NOTSPACE:url}\s%{NOTSPACE:user}\s%{WORD:hierarchy_code}/%{NOTSPACE:server}\s%{NOTSPACE:content_type}"
}
remove_field => ["message"]
}
}
==============================================
mutate {
convert => {
"bytes" => "integer"
"duration" => "integer"
"status_code" => "integer"
"timestamp" => "float"
}
}
==============================================
filter {
dissect {
mapping => {
"message" => "%{timestamp->} %{duration} %{client_address} %{cache_result}/%{status_code} %{bytes} %{request_method} %{url} %{user} %{hierarchy_code}/%{server} %{content_type}"
}
remove_field => ["message"]
convert_datatype => {
"bytes" => "int"
"duration" => "int"
"status_code" => "int"
"timestamp" => "float"
}
}
}
==============================================
filter {
grok {
match => {
"message" => "%{NUMBER:timestamp:float}%{SPACE}%{NUMBER:duration:int}\s%{IP:client_address}\s%{WORD:cache_result}/%{POSINT:status_code:int}\s%{NUMBER:bytes:int}\s%{WORD:request_method}\s%{NOTSPACE:url}\s%{NOTSPACE:user}\s%{WORD:hierarchy_code}/%{NOTSPACE:server}\s%{NOTSPACE:content_type}"
}
remove_field => ["message"]
}
}
==============================================
# config/pipelines.yml
- pipeline.id: beats-server
config.string: |
input { beats { port => 5044 } }
output {
if [type] == "apache" {
pipeline { send_to => weblogs }
} else if [type] == "system" {
pipeline { send_to => syslog }
} else {
pipeline { send_to => fallback }
}
}
- pipeline.id: weblog-processing
config.string: |
input { pipeline { address => weblogs } }
filter {
# Weblog filter statements here...
}
output {
elasticsearch { hosts => [es_cluster_a_host] }
}
- pipeline.id: syslog-processing
config.string: |
input { pipeline { address => syslog } }
filter {
# Syslog filter statements here...
}
output {
elasticsearch { hosts => [es_cluster_b_host] }
}
- pipeline.id: fallback-processing
config.string: |
input { pipeline { address => fallback } }
output { elasticsearch { hosts => [es_cluster_b_host] } }
==============================================
# config/pipelines.yml
- pipeline.id: intake
queue.type: persisted
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => [es, http] } }
- pipeline.id: buffered-es
queue.type: persisted
config.string: |
input { pipeline { address => es } }
output { elasticsearch { } }
- pipeline.id: buffered-http
queue.type: persisted
config.string: |
input { pipeline { address => http } }
output { http { } }
==============================================
# config/pipelines.yml
- pipeline.id: intake
queue.type: persisted
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => ["internal-es", "partner-s3"] } }
- pipeline.id: buffered-es
queue.type: persisted
config.string: |
input { pipeline { address => "internal-es" } }
# Index the full event
output { elasticsearch { } }
- pipeline.id: partner
queue.type: persisted
config.string: |
input { pipeline { address => "partner-s3" } }
filter {
# Remove the sensitive data
mutate { remove_field => 'sensitive-data' }
}
output { s3 { } } # Output to partner's bucket
==============================================
# config/pipelines.yml
- pipeline.id: beats
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => [commonOut] } }
- pipeline.id: kafka
config.string: |
input { kafka { ... } }
output { pipeline { send_to => [commonOut] } }
- pipeline.id: partner
# This common pipeline enforces the same logic whether data comes from Kafka or Beats
config.string: |
input { pipeline { address => commonOut } }
filter {
# Always remove sensitive data from all input sources
mutate { remove_field => 'sensitive-data' }
}
output { elasticsearch { } }
==============================================
Reference
https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html
https://www.elastic.co/blog/a-practical-introduction-to-logstash
https://www.elastic.co/guide/en/logstash/current/pipeline-to-pipeline.html
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment