Monitor Microsoft Exchange Server mailflow using ELK

It’s been a while, but today I thought it was time to finish my ELK input for monitoring Microsoft Exchange Server.

Exchange Server logs the mailflow to logfiles in \Program Files\Microsoft\Exchange Server\V14\TransportRoles\Logs\MessageTracking by default. At least in the 2010 version I am using, I was told in Exchange Server 2013 you have to turn it on manually. Also the fields in the log files may differ between versions, so once again, the setup I post here is for Exchange Server 2010. Feel free to adjust the filters to any other version. More about Message Tracking can be found here.

To send these files to ELK, I used Filebeat. I used a standard installation, and this is what my filebeat.yml looks like:

filebeat:
  prospectors:
    -
      paths:
       - E:\Program Files\Microsoft\Exchange Server\V14\TransportRoles\Logs\MessageTracking\MSGTRK2*.LOG
      input_type: log
      document_type: ex_msg_trk

output:
  logstash:
    hosts: ["elkserver:5044"]

Leave the Filebeat service stopped until we configured Logstash to accept the logs.

It was a bit of a puzzle, trial and error (many thanks to Andrew Haynes for providing the final piece of the puzzle for the grok pattern), but this is what my input finally looks like:

input {
  beats {
    port => 5044
    type => "ex_msg_trk"
  }
}

filter {
 if [type] == "ex_msg_trk" {

  grok {
    match => { "message" => "(%{TIMESTAMP_ISO8601:date-time})?,(%{IPORHOST:client-ip})?,(%{IPORHOST:client-hostname})?,(%{IPORHOST:server-ip})?,(%{IPORHOST:server-hostname})?,(%{GREEDYDATA:source-context})?,(%{GREEDYDATA:connector-id})?,(%{WORD:source})?,(%{WORD:event-id})?,(%{NUMBER:internal-message-id})?,(%{GREEDYDATA:message-id})?,(%{GREEDYDATA:recipient-address})?,(%{GREEDYDATA:recipient-status})?,(%{NUMBER:total-bytes})?,(%{NUMBER:recipient-count})?,(%{GREEDYDATA:related-recipient-address})?,(%{GREEDYDATA:reference})?,(%{GREEDYDATA:message-subject})?,(%{GREEDYDATA:sender-address})?,(%{GREEDYDATA:return-path})?,(%{GREEDYDATA:message-info})?,(%{WORD:directionality})?,(%{GREEDYDATA:tenant-id})?,(%{IPORHOST:original-client-ip})?,(%{IPORHOST:original-server-ip})?,(%{GREEDYDATA:custom-data})?" }

    }

    mutate {
        convert => [ "total-bytes", "integer" ]
        convert => [ "recipient-count", "integer" ]
        split => ["recipient-address", ";"]
        split => [ "source-context", ";" ]
        split => [ "custom_data", ";" ]
        }

 }
}

output {
 if [type] == "ex_msg_trk" {
  elasticsearch {
     hosts => localhost
     index => "logstash_exch-%{+YYYY.MM.dd}"
     }
   }
 }

It contains a lot of “if” clauses, but that is because I found out that if you have multiple config files for multiple inputs, Logstash processes them as being one big file. So every filter and every output that is destined for a particular type of data, you need to explicitly define that.

Well, after inserting this input into logstash, restart Logstash and start the Filebeat service on the Exchange server. Your message tracking logs should be coming in now!

Edit: after using this input, I noticed it ain’t processing all records the right way. Because of that, I had to add the following code to the grok filter:

if "_grokparsefailure" in [tags] {
  drop { }
}

Meanwhile I’ll try to find out what causes this. It seems it has something to do with the way the traffic is flowing. All incoming messages are processed correctly, the outgoing ain’t…

Thanks Andrew for fixing this!

I’ll provide some dashboards soon.

 

3 thoughts on “Monitor Microsoft Exchange Server mailflow using ELK”

  1. “Meanwhile I’ll try to find out what causes this. It seems it has something to do with the way the traffic is flowing. All incoming messages are processed correctly, the outgoing ain’t…”

    Had the same problem. It has to do with the non event-id=Receive entries containing multiple null fields.
    If you change the grok match to the below it will catch everything. Putting ( and )? around the matches makes them optional, otherwise the first , will make the Grok match try to match against an “,”

    match => { “message” => “(%{TIMESTAMP_ISO8601:date-time})?,(%{IPORHOST:client-ip})?,(%{IPORHOST:client-hostname})?,(%{IPORHOST:server-ip})?,(%{IPORHOST:server-hostname})?,(%{GREEDYDATA:source-context})?,(%{GREEDYDATA:connector-id})?,(%{WORD:source})?,(%{WORD:event-id})?,(%{NUMBER:internal-message-id})?,(%{GREEDYDATA:message-id})?,(%{GREEDYDATA:recipient-address})?,(%{GREEDYDATA:recipient-status})?,(%{NUMBER:total-bytes})?,(%{NUMBER:recipient-count})?,(%{GREEDYDATA:related-recipient-address})?,(%{GREEDYDATA:reference})?,(%{GREEDYDATA:message-subject})?,(%{GREEDYDATA:sender-address})?,(%{GREEDYDATA:return-path})?,(%{GREEDYDATA:message-info})?,(%{WORD:directionality})?,(%{GREEDYDATA:tenant-id})?,(%{IPORHOST:original-client-ip})?,(%{IPORHOST:original-server-ip})?,(%{GREEDYDATA:custom-data})?”

    1. Hi,

      Thanks for your reply! I didn’t spent much time anymore on figuring out why this didn’t work. I had in mind it had someting to do with not all fields having values all the time, but I didn’t do any research on how to make the fields all optional. Many thanks to you, I just tried this config out, and it works smoothly! 🙂 Keep in mind a } is missing at the end of your pattern, but who cares 😉

      I will update the article soon!

  2. Hi Rene, are you available for an informal chat about ingesting Exchange Server email logs? We might need some light consulting help.

Leave a Reply

Your email address will not be published. Required fields are marked *