Building a Logging Pipeline on AWS

Effective logging is a challenge. Effective logging with cloud deployments is more of a challenge, because you have to consider these additional factors:

Fortunately, centralized logging is a solved problem. However, there are many different solutions, usually involving agents that read local logfiles and push them to a central server. And “solved” that doesn't necessarily mean that you'll find an out-of-the-box solution for your deployment:

This article presents a “home grown” solution to those problems, based on AWS Elasticsearch and other Amazon managed services. Along with application logging, I also describe a pipeline that feeds Elastic Load Balancer logs into Elasticsearch for analysis.

Kibana

I'm going to start by looking at the very end of the pipeline, and work my way backwards from there.

Kibana is an “analytics and visualization platform” for an Elasticsearch cluster: it gives you a browser UI that allows you to search and filter logs, without writing explicit Elasticsearch queries. AWS Elasticsearch provides Kibana as a built-in feature, linked from the cluster's “Overview” tab.

I'm not going to give a complete description of Kibana here; the manual (linked above) does a much better job of that. But to give you a flavor of how it looks and works, here are two screenshots (click to open full-size in a new tab). The one on the left shows the interface immediately after opening the Kibana window: it's a list of the most recent log messages, with all available data. The screenshot on the right shows only error-level messages, and limits the displayed information to timestamp, logger name, and message.

Kibana default view Kibana filtered view

For me, a typical Kibana session starts by setting a time range, set with a link in the top right corner of the page. The default is messages from the last 15 minutes, which is great for general monitoring; usually, though, I'm investigating an issue from the past. Then I add a filter for the application, and change the output to show just timestamp, classname, and log message. Lastly, I either add a filter for log level (warn or error), or search for words that I think are likely to indicate the issue.

One of the nice features of Kibana is that the page URL includes all of the filters used by the search. So if you want another person to see the same thing that you do, simply send them the URL.

AWS Elasticsearch

Elasticsearch is a distributed search engine: it stores indexed data on multiple nodes, allowing parallel searches and redundant storage. Amazon Elasticsearch Service is a managed Elasticsearch cluster: you tell it how many nodes you want and how much disk space you want per node, and AWS does the rest. Of course, you give up some of the flexibility of a self-managed solution to do this, but for workloads like log management, it's more than sufficient.

OK, configuration does require a bit more thought than “number of nodes and how much disk,” but not much more.

Instance Types and Storage

The size of your cluster will depend on both the amount of log data that it ingests and the type of queries that you'll run against that data. Which means that you won't really know the best configuration until you've been running for a few weeks. Fortunately, you can easily scale your cluster once you have data in it, so you're not locked into the choices you make now.

If you're currently writing logfiles to disk you can estimate your storage requirements by multiplying the size of your daily logfile(s) by the number of days that you want to keep, add 20% for indexing overhead, and then double to account for replication. Then give yourself an additional 25% to handle situations like a rogue application writing a constant stream of error messages.

Or just pick a big number and scale down as needed. In my experience, 3 TB total storage was sufficient for a month of 30GB daily logs, with around 400GB reserved as a buffer. Be aware that your instance type will determine the maximum per-node storage space; see the docs for details.

As long as you have enough disk space, the number of nodes and their instance type largely depend on your query workload. Amazon recommends no fewer than three nodes, and starting with a larger instance type than you think you'll need. In my experience, six m4.large.elasticsearch instances were sufficient to handle infrequent queries that generally examined a day's worth of logging with filter expressions on the application name or classname.

Amazon also recommends using three dedicated master nodes for a production cluster. The master nodes are responsible for monitoring the overall health of a cluster, as well as coordinating index storage and replication. If you don't use dedicated master nodes, that functionality will run on the data nodes, where high utilitization may prevent the master node component from functioning correctly. That said, I didn't use dedicated master nodes and never saw an issue.

Lastly, you can enable zone awareness, which replicates data into a different availability zone. Enabling this feature will incur costs for inter-zone network traffic, although those should be small. I think that this option is more relevant for public-facing search clusters than one used for logging, but entire availability zones have gone down in the past; if you need to ensure that your cluster is always available, turn on this option.

Networking and Security

Since October 2017, you have a choice of running your Elasicsearch cluster inside the VPC or on the open Internet. However, as of this writing Kinesis Firehose can't write to an in-VPC cluster, so the choice is easy: ignore Amazon's recommendation and choose “Public Access” when asked for network configuration.

The reason that Amazon recommends running inside a VPC is that an Elasticsearch cluster is managed via HTTP calls — not just for queries, but also for management, including deleting indexes. So it's imperative to restrict access to your cluster. This starts with an access policy that's attached to the cluster itself. In addition to the cluster's access policy, you can also use IAM policies to grant access to the cluster.

I don't particularly like splitting the responsibilities between a cluster-attached policy and an IAM policy: this makes it more difficult to identify how a user or application received access (and the AWS limit of 10 policies per role also pushes you toward policies that affect multiple resources). Instead, I prefer a cluster policy that grants access to specific “functional” roles, which are then assigned to the applications that perform those functions.

Here is an example cluster policy that does just that, along with granting access to specific IP addresses (we'll use these roles later in this article).

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::123456789012:role/Elasticsearch_Cleanup_Role"
      },
      "Action": [
        "es:ESHttpDelete",
        "es:ESHttpGet"
      ],
      "Resource": "arn:aws:es:us-east-1:123456789012:domain/example/*"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": [
        "es:ESHttpGet",
        "es:ESHttpHead",
        "es:ESHttpPost"
      ],
      "Resource": "arn:aws:es:us-east-1:123456789012:domain/example/*",
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": [
            "123.45.67.123",
            "123.45.67.456"
          ]
        }
      }
    }
  ]
}

Application Logging: Kinesis Streams, Kinesis Firehose, AWS Elasticsearch

Now that you have an Elasticsearch cluster, let's start putting some data into it.

One of the key features of my logging pipeline is that it doesn't use an agent to parse logfiles. Instead, applications write JSON-formatted messages to a Kinesis stream. Those messages are then picked up by Kinesis Firehose, which aggregates them and writes to both the Elasticsearch cluster and an S3 bucket for archiving. application logging pipeline

Since you've already seen (and, I'll assume, configured) the Elasticsearch cluster, I'll work my through the rest of the pipeline in the order that you'll have to construct it. By the end you'll have a working pipeline (and if not, the last step is troubleshooting).

Kinesis Stream

A Kinesis Stream is a durable log of messages: multiple producers can add messages to the log, those messages remain in the log for a specific time duration (by default, one day), and during that time multiple consumers can read from the log. It occupies a space between a traditional message queue, where each message is consumed by a single consumer, and pub-sub, where consumers must be active at the time of publication in order to receive a message. In our pipeline there is only one consumer, the Firehose, but you could add additional endpoints, such as a Kinesis Analytics application to produce a real-time error dashboard.

One of the key features of a Kinesis stream is that it's scalable: you can use a small stream when you have only a few applications, then increase capacity as your number of applications increases. The unit of scaling is the shard: each shard can accept a total of 1,000 messages per second from any number of producers. That may seem like a large number, but a single production app-server instance could easily produce dozens or even hundreds of log messages per second.

If you don't know your message volume, then start with four shards, monitor, and resize as needed. You can monitor a stream from the AWS Console, and the metric that you should watch is “Write Throughput Exceeded”: while it's OK if you occasionally exceed the stream's throughput, you must give it a chance to catch up.

The other part of stream configuration is an IAM policy that allows applications to write to the stream. My preference is to include these permissions in a general logging policy, which also includes the permissions to write messages to CloudWatch Logs and CloudWatch Metrics. Note that my example policy differs in its resource specifications for the different services: the CloudWatch permissions grant access to any resource, while the Kinesis permissions are limited to only few operations and a single stream. This is because Kinesis streams may be used for multiple purposes in your organization, and you don't want a misconfigured client to write to the wrong stream.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:DescribeLogGroups",
                "logs:DescribeLogStreams",
                "logs:CreateLogGroup",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/example"
        }
    ]
}

Kinesis Firehose

Kinesis Firehose is a managed service that aggregates streaming data into batches and uploads those batches to other services (as of this writing, Elasticsearch, S3, and Amazon Redshift). You control the size of these batches by configuring Firehose with duration and size: it will accumulate messages for the specified amount of time, unless it hits the size limit first.

For this pipeline, the source of the Firehose will be a Kinesis stream, and the destination will be both the Elasticsearch cluster and an S3 bucket (to provide a permanent archive). While Firehose lets you invoke Lambda functions to transform the data during processing, there's no need to do that, at least for your initial pipeline (you may find it useful, later, to extract information from inside log messages, but that's out of the scope of this article).

You can only pick one destination for the stream (with the caveat that S3 may be a backup for another destination). When you choose Elasticsearch, you'll need to give the Firehose information about your cluster:

The most important piece of configuration is the buffer size and interval. For Elasticsearch, the overriding concern is latency: you want to see log messages shortly after they're emitted. For S3, however, you generally want to create larger files, so that long-term storage can use the lower-priced “Infrequent Access” storage type.

Unfortunately, the AWS Console only allows you to pick one set of values, which are applied to both destinations. If you create your firehose via CloudFormation, you can specify independent values. I recommend optimizing for Elasticsearch, choosing the minimum allowed interval of 60 seconds.

The last configuration item is an IAM role. Firehose needs permission to do its work, which includes:

When creating the Firehose via the console, you're given the option of creating this role at the same time; for your first pipeline I recommend doing just that. Note that this auto-created role includes permissions to update the Elasticsearch cluster, which goes against my preference for the cluster to grant its own permissions.

Log4J Appender

Now that you have your pipeline built, you need some way of writing JSON-formatted messages to the Kinesis stream. There are many ways that you could do this, but since I'm writing this article I'll point you toward my log4j-aws-appenders project.

From a systems-design perspective, I think that “pluggable” appenders are one of the nicest features of Log4J: by changing a configuration file you can change where your logs are written. Out of the box, Log4J supports a variety of destinations, including JMS message queues, JDBC databases, and the Unix syslog. There are third-party loggers for just about every destination your can imagine. Yet with all that, most people rely on ConsoleAppender and FileAppender (or, for the really fancy, RollingFileAppender).

To use my appenders, you must first include them as a dependency of your project. I'm going to assume that you're using Maven, so add the following to the dependencies section of your POM (but first check Maven Central for the latest version).

<dependency>
    <groupId>com.kdgregory.log4j</groupId>
    <artifactId>aws-appenders</artifactId>
    <version>1.2.2</version>
</dependency>

You'll also need to add a dependency for the Kinesis SDK. The AWS appenders library is designed to work with any version from the 1.11.x release sequence, and marks the dependency as “provided”: there are a lot of 1.11.x releases, some of them are not backwards compatible, and I didn't want to put anyone in dependency hell. If you're already using AWS then you probably have a preferred version; if not, here's the latest version as-of this writing:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-kinesis</artifactId>
    <version>1.11.379</version>
</dependency>

Next, you'll need to update your log4j.properties file to configure the appender. Let's assume that you're starting with a file like this, which writes all log messages to the console:

log4j.rootLogger=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%t] %-5p %c - %m%n

To log to Kinesis you need to add an appender definition, and reference that definition in the rootLogger configuration:

log4j.rootLogger=INFO, console, kinesis

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d [%t] %-5p %c - %m%n

log4j.appender.kinesis=com.kdgregory.log4j.aws.KinesisAppender
log4j.appender.kinesis.streamName=Example
log4j.appender.kinesis.layout=com.kdgregory.log4j.aws.JsonLayout
log4j.appender.kinesis.layout.tags=applicationName=Example,deployedTo={env:ENV},runDate={date}
log4j.appender.kinesis.layout.enableHostname=true

There are actually two parts to the Kinesis configuration: the first is the appender, and the second is the layout. Here we're configuring the appender to write to the Kinesis stream named “Example” and using default values for the rest of its configuration (you'll find the appender configuration documented here).

JsonLayout is responsible for converting the Log4J logging event into a JSON representation, potentially adding metadata (for documentation on the layout, look here). The result looks like this (actually, the result is a single line; this is what it looks like after being passed through a pretty-printer):

{
	"hostname": "peregrine",
	"level": "DEBUG",
	"logger": "com.kdgregory.log4j.aws.example.Main",
	"message": "value is 60",
	"processId": "4260",
	"tags": {
		"applicationName": "Example",
		"deployedTo": "test",
		"runDate": "20180605"
	},
	"thread": "example-0",
	"timestamp": "2018-06-05T11:11:05.936Z"
}

When loaded into Elasticsearch, you can search or filter on any of these values. The “tags” are particularly useful: they are intended to provide metadata about a particular application instance, allowing you to answer questions such as “does this error only happen in production?”

Troubleshooting

If you've been following along, setting up the components, you should now see your own log messages in Elasticsearch. If not, here are some steps to take to figure out why you aren't.

  1. Verify that Log4J is using the appender

    Log4J provides a debugging property, log4j.configDebug that will write internal logging messages to the console. Your first step should be to write a small program that sends log messages, and run it with this property enabled (there's just such a program in the “examples” directory of my appenders project).

    java -Dlog4j.configDebug=true -jar target/aws-appenders-example-1.2.2.jar

    When you run this, you should see messages like the following. If you made any mistakes in the configuration (such as setting the property sreamName rather than streamName), there will be a warning message here.

    log4j: Parsing appender named "kinesis".
    log4j: Parsing layout options for "kinesis".
    log4j: Setting property [enableHostname] to [true].
    log4j: Setting property [enableLocation] to [true].
    log4j: Setting property [tags] to [applicationName=Example,runDate={date}].
    log4j: End of parsing for "kinesis".
    log4j: Setting property [streamName] to [AppenderExample].
    log4j: Parsed "kinesis" options.

    Assuming that log4j.properties has a valid configuration, you should see the following message, which indicates that the appender is running. This message does not, however, indicate that the appender is able to write to the stream, since the actual writing takes place on a background thread.

    log4j: KinesisLogWriter: created client from factory: com.amazonaws.services.kinesis.AmazonKinesisClientBuilder.defaultClient
  2. Verify that your application has permissions to write to the stream

    If the appender isn't able to write to the stream — and this will almost always be due to an incorrectly-named stream or invalid credentials — you'll see a message like the following further down in the console output:

    log4j:ERROR initialization failed: unable to configure stream: AppenderExample
    com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain

    Once you know that the sample application works, make sure that your application uses the same debugging configuration and use the Log4J debug property to verify its operation.

  3. Verify that Firehose can read from Kinesis and write to the Elasticsearch cluster

    If you let the console create firehose_delivery_role then you should have all of the permissions that you need (unless, of course, you configured the Elasticsearch cluster to deny all access). If you manually create the service role, or change the permissions it grants, then the Firehose may not be able to process data.

    You can identify both of these problems from the “Amazon Elasticsearch Service Logs” tab when looking at the Firehose in the AWS console. If unable to read from Kinesis you'll see a message with the error code Kinesis.AccessDenied; if unable to write to Elasticsearch, the error code will be ES.AccessDenied.

What About Non-Java Solutions?

If you're using a JVM-based language, then my appenders library is a good solution for writing directly from the application to a Kinesis stream. But what if you're not using Java? Well, it turns out that I'm not the only person with this idea: Googling turns up libraries for Log4Net, Python, Go, and other languages. I haven't tried any of these, so am not including links.

You should do some due diligence when picking a logging library. It should use the bulk API so that you're not making a web-service call for each message, and retry messages when throttled. You also want the log-writer to run on a background thread so that the calls to Kinesis don't block your appplication. And finally, ensure that the writer has a mechanism to discard messages if unable to write them, so that you don't run out of memory.

As an alternative, you can write your logging messages to a file and let the Amazon Kinesis Agent send them on to Kinesis. This approach has two caveats: first, you have to install the agent on every deployment machine (which may mean running the installer script rather than just installing a package). And second, you will need to manage your logfiles (ie, with logrotate) so that you don't run out of disk space.

Regardless of whether you use an agent or a logging library that writes directly to Kinesis, the key is that your logging output should be JSON. That's what Elasticsearch wants, and starting from any other format means more work and chance for mistakes.

It's also important that your JSON is consistent between applications in order to perform cross-application searches: it should use the same field names, and give them similar values.

Cleaning Up Old Indexes

One of the problems with this pipeline (or any Elasticsearch-based solution) is that Elasticsearch does not automatically clean up old indexes. You can clean up manually — it's a simple HTTP DELETE, after all — but it's more in keeping with the idea of managed services if it happens automatically. My solution is an AWS Lambda function, invoked daily, which retrieves a list of the cluster's indexes and deletes all but a specified number of them.

I originally wrote this section as a blog post. That post tried to cover all possible configurations of Elasticsearch, running the Lambda both inside and outside of a VPC. This version focuses on the Elasticsearch cluster described above, with an outside-the-VPC Lambda.

The Lambda function

Diving right in, here's the Lambda function. It's Python 3, which I find to be the easiest language for Lambda functions.

# Copyright 2018 Keith D Gregory
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Contains example code from https://github.com/DavidMuller/aws-requests-auth

import json
import os
import requests

from aws_requests_auth.aws_auth import AWSRequestsAuth

def lambda_handler(event, context):
    es_host = os.environ['ELASTIC_SEARCH_HOSTNAME']
    num_indexes_to_keep = int(os.environ['NUM_INDEXES_TO_KEEP'])
    index_prefix = os.environ['INDEX_PREFIX']

    auth = AWSRequestsAuth(aws_access_key=os.environ['AWS_ACCESS_KEY_ID'],
                           aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
                           aws_token=os.environ['AWS_SESSION_TOKEN'],
                           aws_region=os.environ['AWS_REGION'],
                           aws_service='es',
                           aws_host=es_host)

    indexResponse = requests.get('https://' + es_host + '/*', auth=auth)
    if (indexResponse.status_code != 200):
        raise Exception('failed to retrieve indexes: ' + indexResponse.text)

    indexData = indexResponse.json()
    indexNames = sorted([x for x in indexData.keys() if x.startswith(index_prefix)])
    indexesToDelete = indexNames[0 : max(0, len(indexNames) - num_indexes_to_keep)]

    for idx in indexesToDelete:
        deleteResponse = requests.delete('https://' + es_host + '/' + idx, auth=auth)
        if deleteResponse.status_code == 200:
            print("deleted " + idx)
        else:
            raise Exception("failed to delete " + idx + ": " + deleteResponse.text)

It's fairly simple: the first get() finds all of the indexes in the cluster. These are then filtered based on the configured prefix, which is the index name that you specified when configuring the firehose. The filtered list is then sorted, relying on the firehose-appended timestamp to provide order. Next is to identify the indexes to be discarded, and lastly to iterate through them calling delete().

The tricky part is that it can't use Python's built-in http module, because the cluster requires requests to be signed. So instead I use the requests and aws-requests-auth modules.

Which in turn means that you can't simply paste the code into the AWS Console, you must must create an upload package that includes the necessary libraries. The following instructions are written for AWS Linux (and won't work on Ubuntu due to this bug); I recommend spinning up a new t2.micro instance to run them.

First, make sure that you have the development tools installed:

sudo yum install python3 python3-pip wget zip

Next, create a directory for the package, and save the Lambda function in it with the name lambda_function.py. Or, pull it from my Github account, as shown here.

mkdir es-cleanup
chdir es-cleanup
wget https://raw.githubusercontent.com/kdgregory/aws-misc/master/es-cleanup-signed/lambda_function.py

Then use pip to install the dependencies into that directory.

pip3 install -t `pwd` requests aws-requests-auth

Lastly, zip up the contents of the directory (from within the directory) so that you can upload it to Lambda.

zip -r /tmp/escleanup.zip .

If you've never worked with Lambda before, I recommend stopping here and working through the AWS tutorial to understand the various pages of the Lambda creation wizard. If you have created Lambda functions before, here are the highlights for this one:

Moving onto the second page of the wizard, you'll need to upload the ZIP that you created earlier, and set the following environment variables:

Scrolling down this page, you want to leave “Networking” as “No VPC” and update the execution timeout to 30 seconds from the default 3. There's no need to change the default memory allotment; this isn't a memory-hungry script.

Testing

Once you've created the Lambda, you need to verify that it actually works — and you probably don't want to wait until you're out of space to do so. Fortunately, Lambda has a “Test” button that will run the function, providing it a predefined payload. For this function the specific payload doesn't matter; just use the default.

What does matter is that you have indexes that follow the specified naming pattern. Rather that load actual logging data into the cluster, you can create the indexes with a simple curl call:

curl -XPUT https://search-example-5flxfrid9q5q17qqq5qqqqqpqm.us-east-1.es.amazonaws.com/logstash-2018-01-01

Replace the URL with your cluster's URL, and create as many indexes as you like following the naming pattern. Once you've created these indexes, press the “Test” button and verify that only the desired number remain.

If everything works you'll see a green box at the top of the Console, and the log message will show you the indexes that were deleted. If the test failed, you'll see a red box with an error message. The most likely error message is an authorization failure, which means that the cluster isn't configured to trust the Lambda function's role.

failed to retrieve indexes: {"Message":"User: arn:aws:sts::123456789012:assumed-role/Elasticsearch_Cleanup_Role/ElasticsearchCleanup is not authorized to perform: es:ESHttpGet"}

Triggered Invocation

For production use, you don't want to manually invoke the Lambda. Instead, use CloudWatch Events to trigger it on a regular schedule — for example, at midnight every day.

There are two things to remember about scheduled events: first, that times are specified in UTC. So if you want to run at midnight Eastern Daylight Time, which is 4 hours off UTC, you need to use the following expression:

0 4 * * ? *

The second thing to remember is that you can specify day-of-week or day-of-month but not both. As shown above, one of these fields has to be “?”, even if the other is “*”.

Regardless of whether you define the rule in the Lambda Console or the CloudWatch Events Console, AWS will take care of granting the correct permissions so that the rule can invoke the Lambda. Once you set it up you're good to go.

Examining archived logfiles with JQ

As I mentioned above, Firehose simply concatenates the input records together. This leaves you with files that contain a single, extremely long line of JSON records, with no separators between them. Fortunately, there are tools that can process files like that, and one is jq.

You should be able to install jq using your platform's package manager; if not, the link above has installation instructions. To run it, you'll need to download the relevant archive files from S3 (which means that you need to know the approximate date that you want to examine). I prefer to keep them GZIPed, and use zcat to uncompress them and further concatenate them before passing to jq.

A full description of jq is beyond the scope of this article (click on the link!). Here's a short example, which takes a set of compressed files, finds all errors and prints their timestamp, logger name, and message.

zcat *.gz | jq -c 'select(.level == "ERROR") | [ .timestamp, .logger, .message ]' | more

While you can build up arbitrarily complex jq expressions, my personal preference is to mimic a traditional text format, typically using the array constructor shown above. Once you have that you can then turn back to tools like grep to examine the results.

Load Balancer Logging: S3, Lambda, AWS Elasticsearch

I'm going to wrap up with a section on using Elasticsearch to analyze logs from an Elastic Load Balancer. This process uses a different pipeline: the load balancer writes logfiles to S3, which triggers a Lambda function that writes their contents to Elasticsearch. elb logging pipeline

While you can use a single Elasticsearch cluster to handle both application and ELB logging, my preference is to use separate clusters: it adds to your user's workload if they have to explicitly specify the type of data for each query.

The only decision that you'll have to make for this pipeline is how frequently the ELB should write its logs to S3. You have two choices: every five minutes or every hour. Which you pick depends on how quickly you want to see updates, versus how big the files will be. As with application log archives, large files are only important if you plan to use “infrequent access” storage on S3. And personally, I prefer the immediate feedback offered by a five minute interval.

Example ELB log line

Each line in the ELB logfile looks like this:

2018-04-30T12:40:34.740628Z example 173.49.123.456:58798 172.30.1.119:80 0.000037 0.001039 0.000046 200 200 0 43862 "GET http://example-1886477337.us-east-1.elb.amazonaws.com:80/index.php?page=scm.git HTTP/1.1" "curl/7.47.0" - -

Breaking it down, we have the following components:

2018-04-30T12:40:34.740628Z UTC timestamp when the request was received by the ELB, in ISO-8601 format.
example The name of the load balancer. While this seems redundant, since each ELB writes to its own logging directory, it's useful when preparing records for Elasticsearch.
173.49.123.456:58798 172.30.1.119:80 The source and destination addresses.

The source address is useful to discover clients that are making an unnatural level of requests: they may be probing the system for weaknesses.

The destination address is useful for determining whether your backend instances are receiving equal amounts of work: if one has significantly more requests than another it may indicate that those requests are erroring-out.

0.000037 0.001039 0.000046 Request processing time. This consists of three components:
  • The first is the time that the load balancer takes to pick an instance to handle the request
  • The second is the time from sending the request to the back-end instance to the receipt of the first response header (note: not the entire response).
  • The last number is the time taken by the load balancer to start sending the response to the client.
These times don't account for large body content or network delays, so are typically lower than the times recorded by a browser.
200 200 The first number is the HTTP status code returned from the ELB to the client, while the second is the status code returned from the instance to the ELB.

Most of the time, they'll be the same. If the instance is unresponsive, however, its status code will be a dash (“-”), and the ELB status code will indicate a gateway error (one of the 5xx codes).

0 43862 These two numbers are the size of the request and response bodies, in bytes.
"GET http://example-1886477337.us-east-1.elb.amazonaws.com:80/index.php?page=scm.git HTTP/1.1" This is the original request line, including HTTP verb, URL, and HTTP version. We'll break it into pieces before uploading to Elasticsearch.
"curl/7.47.0" This is the user agent.
- - These two fields are the HTTPS cipher and protocol for the request. When you see dashes, as here, it means that the request was using plain HTTP.

The Lambda function

This one's a little long, so I'm going to break it into pieces. Skipping over the headers and import statements (you can find the full script here), here's the start of the actual code:

es_hostname = os.environ["ELASTIC_SEARCH_HOSTNAME"]
batch_size = int(os.environ["BATCH_SIZE"])

These are the two environment variables that configure the Lambda. As with the cleanup Lambda, ELASTIC_SEARCH_HOSTNAME lets us construct URLs for the Elasticseach cluster. BATCH_SIZE will be described below.

baseRE = re.compile(
    (
    r'^(\d{4}-\d{2}-\d{2}T\d+:\d+:\d+\.\d+Z) '    # timestamp
    r'([^ ]+) '                                   # elb_name
    r'(\d+\.\d+\.\d+\.\d+):(\d+) '                # client_ip, client_port
    r'(\d+\.\d+\.\d+\.\d+):(\d+) '                # backend_ip, backend_port
    r'([0-9.-]+) '                                # request_processing_time
    r'([0-9.-]+) '                                # backend_processing_time
    r'([0-9.-]+) '                                # response_processing_time
    r'(\d{3}) '                                   # elb_status_code
    r'(\d{3}) '                                   # backend_status_code
    r'(\d+) '                                     # received_bytes
    r'(\d+) '                                     # sent_bytes
    r'"([A-Z]+) '                                 # http_method
    r'([^ ]+) '                                   # http_url
    r'([^ ]+)" '                                  # http_version
    r'"(.+)" '                                    # user_agent
    r'(.+) '                                      # ssl_cipher
    r'(.+)$'                                      # ssl_protocol
    ))

# this regex extracts the host portion of the url
hostRE = re.compile(r'[Hh][Tt][Tt][Pp][Ss]?://([^:/]+).*')

The heart of the Lambda is a massive regular expression to pull pieces out of the ELB log line. In addition to breaking apart the log line, I also use regexes to extract additional information, in this case the hostname from the request URL (which is useful if you exposing the same service under different hostnames).

s3 = boto3.resource("s3")
auth = AWSRequestsAuth(aws_access_key=os.environ["AWS_ACCESS_KEY_ID"],
                       aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
                       aws_token=os.environ["AWS_SESSION_TOKEN"],
                       aws_region=os.environ["AWS_REGION"],
                       aws_service="es",
                       aws_host=es_hostname)

Here I define the AWS resources used by the Lambda: a reference to the S3 service and a signer for Elasticsearch HTTP requests. The environment variables here are provided by the Lambda execution environment.

def lambda_handler(event, context):
    """The entry point for the Lambda function."""
    for rec in event["Records"]:
        s3_bucket = rec["s3"]["bucket"]["name"]
        s3_key = rec["s3"]["object"]["key"]
        process_file(s3_bucket, s3_key)

This is the Lambda entry point. The function is passed an S3-specific event structure that we have to pull apart to find the actual file(s) that were written. In practice, only one file gets written at a time, but we still have to process an array of records.

def process_file(s3_bucket, s3_key):
    """Handles a single uploaded file, transforming its contents and writing to Elasticsearch."""
    print("processing: s3://" + s3_bucket + "/" + s3_key)
    base_id = hashlib.sha1(s3_key.encode('utf-8')).hexdigest()
    with tempfile.TemporaryDirectory() as tmpdir:
        srcFile = os.path.join(tmpdir, "elb_log.txt")
        s3.Bucket(s3_bucket).download_file(s3_key, srcFile)
        recnum = 0;
        batch = []
        with open(srcFile, "r") as src:
            for s in src:
                recnum += 1
                batch += process_record(base_id, recnum, s)
                if recnum % batch_size == 0:
                    do_upload(batch, recnum)
                    batch = []
            do_upload(batch, recnum)

Each file gets processed independently. The first task is to download the file from S3 into the Lambda execution environment, so that we can open it from within Python. Since temporary space in the Lambda environment is limited, we store the file in a temporary directory that's deleted (along with its contents) as soon as the function is done (if you're storing hourly logfiles with an active server, you may not have enough space to download them — either switch to five-minute intervals or store compressed files and decompress them on the fly).

This function is where the batch size configuration is used: a Lambda function has a limited amount of memory, which may not be enough to hold all of the rows of a file (you can always pay for more memory, but it would be wasted for smaller files). There's also a limit on the size of an Elasticsearch network request, which varies by the size of the cluster's instances. My philosophy is that it's easier to set a limit when processing the file, and adjust that limit based on how much memory the Lambda consumes (a batch size of 5,000 entries is a good starting place).

def process_record(base_id, recnum, s):
    """Parses a single ELB log entry and creates an entry for the bulk upload."""
    data = parse(s)
    index_name = "elb-" + data["timestamp"][0:13].lower()
    record_id = base_id + "-" + str(recnum)
    return [
        json.dumps({ "index": { "_index": index_name, "_type": "elb_access_log", "_id": record_id }}),
        json.dumps(data)
        ]

Each record in the file is passed to this function, which parses the text and writes it as a pair of JSON strings that are combined to form the batch. The first of these strings gives Elasticsearch information about how to index the record; here I'm using hourly indexes based on the record's timestamp. The second is the actual data.

def do_upload(batch, recnum):
    """Combines a list of updates into a single Elasticsearch request."""
    print("uploading batch ending at record " + str(recnum))
    rsp = requests.post("https://" + es_hostname + "/_bulk",
                        headers={"Content-Type": "application/x-ndjson"},
                        auth=auth,
                        data="\n".join(batch) + "\n")
    if rsp.status_code != 200:
        raise BaseException("unable to upload: " + rsp.text)

The Lambda writes batches of records to the Elasticsearch cluster's bulk update API. This has to be a signed request, using the auth object to create appropriate headers for the requests module.

def parse(s):
    """Extracts fields from an ELB log entry and returns them in a map."""
    m = baseRE.search(s)
    result = {}
    result["timestamp"]                     = m.group(1)
    result["elb_name"]                      = m.group(2)
    result["client_ip"]                     = m.group(3)
    result["client_port"]                   = m.group(4)
    result["backend_ip"]                    = m.group(5)
    result["backend_port"]                  = m.group(6)
    try:
        result["request_processing_time"]   = float(m.group(7))
    except:
        pass
    try:
        result["backend_processing_time"]   = float(m.group(8))
    except:
        pass
    try:
        result["response_processing_time"]  = float(m.group(9))
    except:
        pass
    result["elb_status_code"]               = m.group(10)
    result["backend_status_code"]           = m.group(11)
    result["received_bytes"]                = int(m.group(12))
    result["sent_bytes"]                    = int(m.group(13))
    result["http_method"]                   = m.group(14)
    result["http_url"]                      = m.group(15)
    result["http_version "]                 = m.group(16)
    result["user_agent"]                    = m.group(17)
    result["ssl_cipher"]                    = m.group(18)
    result["ssl_protocol"]                  = m.group(19)

    result["host"] = hostRE.match(m.group(15)).group(1)

    return result

Finally, this is where a log entry gets parsed into fields, using the regex defined at the top of the file. The fields representing processing time get turned into numeric values, which will allow Elasticsearch to sort them in “natural” order; the exception handler is required because the log entry will have a non-parseable “-” if the back-end wasn't accepting requests.

One thing to note here is the separate regex that extracts the hostname from the URL. This particular field is only useful if you have traffic for multiple domains (“www.example.com” and “example.com” count), but it's a technique that's generally useful: pull out site-specific fields that might be used for analysis. My website, for example, uses “page” to identify the article; by extracting the associated value, I can do an easy breakdown of traffic by destination.

Deployment

As with the cleanup Lambda, you have to create a directory and build a ZIPfile from it.

mkdir elb-to-es
cd elb-to-es

wget https://raw.githubusercontent.com/kdgregory/aws-misc/master/elb-to-es/lambda_function.py

pip3 install -t `pwd` requests aws-requests-auth

zip -r /tmp/elb-to-es.zip .

Also like the previous Lambda, you'll need to create an execution role. For this one you need the predefined “AmazonS3ReadOnlyAccess” policy as well as “AWSLambdaBasicExecutionRole”. You also need to grant it “es:ESHttpPost”, either as a policy in the execution role or (as I prefer) in the cluster's policy.

For More Information

My appenders library for Log4J is here. Use that link for documentation, but grab the latest released version from Maven Central (you can find the Maven coordinates here).

If you're using Python, here's an example of formatting log output in JSON that's compatible with that produced by JSONLayout.

I also have a library of miscellaneous AWS-related scripts. You'll find both the index cleanup and the ELB log delivery Lambdas there.

I used a lot of Amazon services for this article. I linked to are their user guides when introducing the service, but here are all of the links in one place:

I only touched on the features of Kibana. For more information, see the Kibana User Guide.

In case you missed the link earlier, here's Amazon's documentation on how to size your Elasticsearch cluster.

Sizing Elasticsearch indexes is also something of a black art. You can read the Elastic docs, but ultimately your only control over index size is how frequently you rotate the indexes.

Copyright © Keith D Gregory, all rights reserved

This site does not intentionally use tracking cookies. Any cookies have been added by my hosting provider (InMotion Hosting), and I have no ability to remove them. I do, however, have access to the site's access logs, so could analyze traffic based on IP address.