Home > Storage > ObjectScale and ECS > Product Documentation > Deploying the Elastic Stack with Searchable Snapshots and Frozen Tier > Create a Logstash pipeline
The data being ingested into Logstash from the two ECS VDCs is being tagged with a VDC indicator, depending on from which port (5005 or 5006) the syslog data was received. This allows an additional identifier filter to be put on incoming data streams so that customers can decide whether to perform searches across all VDC data, or data from a specific VDC.
The second portion of the Logstash pipeline is the GROK filter which accurately parses the access log data and assigns the data to the correct key / value pairs.
Certain fields in the access logs are either redundant or not needed and are subsequently removed in the mutate section.
The last portion of the pipeline shows how the output of the received data is sent to the appropriate VDC Index.
input {
tcp {
port => 5005
type => syslog
tags => ["VDC1"]
}
tcp {
port => 5006
type => syslog
tags => ["VDC2"]
}
}
filter {
if "VDC1" in [tags] {
grok {
match => { "message" => ["%{NOTSPACE:field1}%{MONTH:monthh}%{SPACE}+%{MONTHDAY:dayy} %{TIME:timee}%{SPACE}%{IPORHOST:hostt}%{SPACE}%{WORD:wordd}%{SPACE}%{TIMESTAMP_ISO8601:ecstimestamp}%{SPACE}%{NOTSPACE:request_id}%{SPACE}%{IPORHOST:ecs_ip}:%{NUMBER:ecs_port}%{SPACE}%{IPORHOST:client_ip}:%{NUMBER:client_port}%{SPACE}((?<user>[^-]{1}[^\s]*)|-)%{SPACE}((?<agent>[^-]{1}[^\s]*)|-)%{SPACE}%{WORD:method}%{SPACE}((?<namespace>[^-]{1}[^\s]*)|-)%{SPACE}((?<bucket>[^-]{1}[^\s]*)|-)%{SPACE}((?<key>[^-]{1}[^\s]*)|-)%{SPACE}((?<query>[^-]{1}[^\s]*)|-)%{SPACE}HTTP/%{NUMBER:http_version}%{SPACE}%{NUMBER:response_code}%{SPACE}%{NUMBER:duration}%{SPACE}%{NOTSPACE:upload_bytes}%{SPACE}%{NOTSPACE:download_bytes}%{SPACE}%{NUMBER:ecs_latency}%{SPACE}((?<range>[^-]{1}[^\s]*)|-)%{SPACE}((?<copy>[^-]{1}[^\s]*)|-)%{SPACE}%{NOTSPACE:deep_copy_size}%{SPACE}('X-Forwarded-For: (%{IPORHOST:xforwardedfor}|-')| )"]
}
}
date {
match => [ "ecstimestamp", "YYYY-MM-dd HH:mm:ss,SSS" ]
timezone => "UTC"
target => "@timestamp"
}
if [upload_bytes] == "-" {
mutate {
replace => { "upload_bytes" => "0" }
}
}
if [download_bytes] == "-" {
mutate {
replace => { "download_bytes" => "0" }
}
}
if [deep_copy_size] == "-" {
mutate {
replace => { "deep_copy_size" => "0" }
}
}
mutate {
remove_field => ["field1", "monthh", "dayy", "timee", "hostt", "wordd", "message", "type", "host", "port", "@version"]
add_field => {"site" => "VDC1"}
}
}
if "VDC2" in [tags] {
grok {
match => { "message" => ["%{NOTSPACE:field1}%{MONTH:monthh}%{SPACE}+%{MONTHDAY:dayy} %{TIME:timee}%{SPACE}%{IPORHOST:hostt}%{SPACE}%{WORD:wordd}%{SPACE}%{TIMESTAMP_ISO8601:ecstimestamp}%{SPACE}%{NOTSPACE:request_id}%{SPACE}%{IPORHOST:ecs_ip}:%{NUMBER:ecs_port}%{SPACE}%{IPORHOST:client_ip}:%{NUMBER:client_port}%{SPACE}((?<user>[^-]{1}[^\s]*)|-)%{SPACE}((?<agent>[^-]{1}[^\s]*)|-)%{SPACE}%{WORD:method}%{SPACE}((?<namespace>[^-]{1}[^\s]*)|-)%{SPACE}((?<bucket>[^-]{1}[^\s]*)|-)%{SPACE}((?<key>[^-]{1}[^\s]*)|-)%{SPACE}((?<query>[^-]{1}[^\s]*)|-)%{SPACE}HTTP/%{NUMBER:http_version}%{SPACE}%{NUMBER:response_code}%{SPACE}%{NUMBER:duration}%{SPACE}%{NOTSPACE:upload_bytes}%{SPACE}%{NOTSPACE:download_bytes}%{SPACE}%{NUMBER:ecs_latency}%{SPACE}((?<range>[^-]{1}[^\s]*)|-)%{SPACE}((?<copy>[^-]{1}[^\s]*)|-)%{SPACE}%{NOTSPACE:deep_copy_size}%{SPACE}('X-Forwarded-For: (%{IPORHOST:xforwardedfor}|-')| )"]
}
}
date {
match => [ "ecstimestamp", "YYYY-MM-dd HH:mm:ss,SSS" ]
timezone => "UTC"
target => "@timestamp"
}
if [upload_bytes] == "-" {
mutate {
replace => { "upload_bytes" => "0" }
}
}
if [download_bytes] == "-" {
mutate {
replace => { "download_bytes" => "0" }
}
}
if [deep_copy_size] == "-" {
mutate {
replace => { "deep_copy_size" => "0" }
}
}
mutate {
remove_field => ["field1", "monthh", "dayy", "timee", "hostt", "wordd", "message", "type", "host", "port", "@version"]
add_field => {"site" => "VDC2"}
}
}
}
output {
if "VDC1" in [tags] {
elasticsearch {
index => "ecs-vdc1-index"
hosts => ["http://10.246.145.180:9200","http://10.246.156.181:9200","http://10.246.156.182:9200"]
}
}
if "VDC2" in [tags] {
elasticsearch {
index => "ecs-vdc2-index"
hosts => ["http://10.246.156.180:9200","http://10.246.156.181:9200","http://10.246.156.182:9200"]
}
}
}
Our ECS VDC pipeline is now created at /etc/logstash/conf.d/ecs-vdc-pipeline.conf on the Logstash server.
We can see in the Logstash logs that our pipeline has been picked up as a source.
2022-05-19T19:27:11,107][INFO ][logstash.javapipeline][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["/etc/logstash/conf.d/ecs-vdc-pipeline.conf"], :thread=>"#<Thread:0x6dcdcde1 run>"}