Stream Logs to OpenSearch via Kinesis

Infrastructure & DevOps Modernization

Data streaming eliminates the need to write custom applications for transferring data. Caylent’s Kennery Serain provides a reference architecture and code examples to showcase how to ingest data on OpenSearch using Kinesis Data Streams in near real-time.

When you hear phrases like data stream or streaming data do you find yourself wondering what they actually mean? In this blog post, we’ll explore these fancy terms and see that they describe simple but powerful processes. For starters, a stream is nothing more than data or a data chunk and streaming is the act of moving the data from one place to another.

Let’s imagine the following very common scenario: you have an old application and you want to modernize it. One of the first requirements is to start redesigning the database, since this old database has duplicate data, poorly maintained indexes, and the tables are not normalized following the Boyce-Codd Normal Form. But there is another important requirement: no data should be lost! The data from the old database needs to be transformed and transferred to the redesigned tables in the new database. Did you know that you don’t need to do this sync manually? There are many tools capable of doing this data transfer. For the given example, you could leverage Amazon Apache Kafka or the AWS Database Migration Service (DMS). All of these types of tools, behind the scenes, are using streams for transferring data from a source to a target. One of my favorite things about data streaming is that you can have ongoing replication from the source to the target.

This feature is very useful during the migration in case you keep receiving new data on your old database during the transition time and need to keep your new database in sync. The ongoing replication will guarantee the replication from every new line written in your old database to the new one, and they don’t even need to have the same database vendor. For example, you can continuously stream data from a Postgres Database to a SQL Server database - and without writing a line of code. That’s incredible! Let’s go deeper into the underlying technology and architecture to see how it all works.

Data Streaming vs Message Queueing

There are different ways to configure asynchronous communication among microservices when using Event Driven Architectures (EDA). A well-known approach is to use message queueing, but this is very different from event data streaming, despite the fact that both are publisher/subscriber systems.

Using message queues to communicate among microservices is often called the Notification Pattern. For example, imagine that we have two microservices: Order and Stock. Every time an order is created the Order microservice dispatches an OrderCreatedEvent message notifying the subscribed microservices, in the case of the Stock microservice, that the order was created. The Stock microservice uses this event to invoke a command that decrements the purchased quantity from the product stock, and also records the OrderId in case a rollback is necessary. You may have noticed that this Event-Driven Design using message queuing is not using a large payload. The OrderCreatedEvent message could have only three fields in the message body: OrderId, ProductId, and ProductQuantity. This example helps demonstrate why the default message size for an Amazon SQS queue is so tiny at 256 Kilobytes. In the said example, we’re not transferring the data itself, we’re simply transferring references to the data. Small payloads are perfect for message queueing. On the other hand, we have data streaming for the opposite purpose and we can use it for transferring large amounts of data. Another difference between data streaming and message queuing is that data streaming is often associated with near real-time data transfer while message queueing is often associated with asynchronous (batch) processing.

Extending the above example, imagine that you need to send the last three years of Orders data to a data warehouse. How would you transfer a 5GB payload? Using data streaming! With data streaming you can decompose a large dataset into smaller chunks and continuously transfer them to a target.

Message queueing and data streaming, both utilize the concepts of topics and the publisher/subscriber pattern but they have very different use cases. Let’s go deeper into an event data streaming example.

Kinesis to OpenSearch: Reference Architecture

Amazon Kinesis is the official streaming data service platform from AWS and Amazon OpenSearch is an analytics engine that is a fork of the popular observability platform ElasticSearch and Kibana. OpenSearch is both fully open source while also being offered by AWS as a managed service. Using OpenSearch you can aggregate logs to collect metrics and create custom dashboards for monitoring and debugging your applications. It’s out of this blog’s scope, but it’s nice to remember that OpenSearch also has a tracing feature, as AWS X-Ray does, which allows the user to get HTTP request metrics from the front-end to the database.

The following architecture design will show how to continually stream application logs to OpenSearch using Kinesis Data Streams and Amazon Kinesis Delivery Streams (Firehose). We’ll publish application logs on Kinesis Data Streams, Kinesis Firehose will then pull the data from the Data Stream and push it to OpenSearch:

All the needed infrastructure will be provisioned using AWS CloudFormation as the Infrastructure as Code (IaC) tool. To implement the above diagram, we’ll follow these steps:

  1. Create the data stream where the producer (instance) will publish the data (application logs).
  2. Create and configure OpenSearch where all the logs will be stored and also visualized using Kibana.
  3. Create the delivery stream that will be the bridge between the data source and target.

Creating a Kinesis Data Stream

Let’s start with the first step: data stream creation. Below is an example showing how to create a Kinesis Data Stream using IaC with CloudFormation. There are two ways of provisioning a Kinesis Data Stream: ON_DEMAND and PROVISIONED. For small and medium applications we should leverage the ON_DEMAND stream mode, which avoids the complexity of calculating capacity (Kinesis Shards). 

“A shard is a uniquely identified sequence of data records in a stream. A stream is composed of one or more shards, each of which provides a fixed unit of capacity. Each shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second and up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys).” - Kinesis Components Documentation.

Provisioning and Configuring OpenSearch

Going to the next step, we need to provision and configure OpenSearch. Since we’re using OpenSearch Serverless, we don’t need to provision a cluster and it will only be necessary to create an OpenSearch collection. A collection is a logical grouping of one or more indexes where the application logs will be aggregated. Once the collection is created an endpoint will be available to access and manage the streamed data through a UI (with Kibana). 

After creating the OpenSearch collection we need to create three required policies: Data Access, Network, and Encryption Policies. 

Data Access Policy: This policy specifies the resources (users and roles) that can have access to the OpenSearch collection, indexes and dashboard.

Looking at the Principal block in the above CloudFormation a bit closer, we’re specifying the user, ${openSearchUser}, can have access to the dashboard, to all indexes, and to the OpenSearch collection we’re creating, ${openSearchCollectionName}. Adding the role, ${DeliveryRole.Arn}, as a Principal means that we’re allowing that DeliveryRole role, which is assumed by the Kinesis Firehose service to publish data to the OpenSearch collection.

Network Policy: This policy specifies whether OpenSearch resources, the collection, and dashboard are accessible from the public internet or only from the internal AWS network via  Amazon VPC endpoints. We’re explicitly allowing public access with the AllowFromPublic flag set to true.

Encryption Policy: Encryption at rest is required for serverless collections, and you have two options. The first option is to specify an encryption key from AWS Key Management Service (KMS) or if you don’t want to create or use a specific encryption key, you can just set the AWSOwnedKey to true and AWS will manage the encryption key for you.

Having the OpenSearch collection and its required policies set, the next step is to configure the Kinesis Delivery Stream (Firehose). In the CloudFormation script below, we’re specifying a Kinesis Data Stream as the source for our data under KinesisStreamSourceConfiguration, and the OpenSearch collection as the destination using CollectionEndpoint

When configuring the Kinesis Delivery Stream, it’s important to enable Amazon CloudWatch logs for the data ingestion on the OpenSearch collection and also for the Amazon S3 backup errors, otherwise we won’t be able to debug the stream pipeline.

Since we’re configuring an OpenSearch collection as the destination for the delivery stream, it’s necessary to specify an index where all the streamed data will be aggregated; that’s the IndexName field below. The specified index will be created automatically so it isn't necessary to create it using the UI. 

As specified on the diagram, the Firehose service will be polling data from the data stream and pushing it to the OpenSearch collection. We need to configure the periodicity for the data polling interval, which is in the BufferingHints configuration below. It’s defined in seconds and there is a tradeoff: using a lower number for your polling interval can decrease latency and improve real-time processing, but it is also more expensive so be sure to find the right balance for your needs.

The last step is to handle service permissions using roles, policies and permissions. Since Kinesis Delivery Stream (Firehose) is polling data from Kinesis Data Stream, populating an OpenSearch collection, publishing logs on CloudWatch, and saving backup logs on S3, all of the required permissions need to be explicitly declared. My advice is don’t worry about memorizing any permissions names or settings as all of them are well described in the AWS documentation. As a trick to help you assign the correct permissions to your policy, you can create the resource manually using the console and copy the automatically generated permissions to your CloudFormation script.

Once you have the CloudFormation script ready for deployment, you can run it like this from the command line:

NOTE: When you are managing AWS IAM resources through CloudFormation remember that it's necessary to specify the capability value using CAPABILITY_IAM - just like Linux capabilities.

After running the above command, all your resources should be created and you’ll have something like this:

Created Resources:

  • Kinesis Data Stream
  • Kinesis Delivery Stream
  • OpenSearch Collection
  • OpenSearch Data Access, Network, and Encryption Policies
  • S3 bucket for backup logs errors
  • Kinesis Delivery Stream Role and Policy

You can find the complete CloudFormation script here.

Publishing data from Kinesis

Now that the streaming infrastructure is ready, we have what we need to  start publishing data on the Kinesis Data Streams. There are many ways to publish our application logs on the Kinesis Data Streams. We could write an application or an AWS Lambda function  to do it using the Kinesis Software Development Kit (SDK), or we could create a producer using the Amazon Kinesis Producer Library (KPL), but for demonstration purposes we’re going to use the good ol’ AWS Command Line Interface (CLI). This is the command structure:

Below, is the command with all of the parameters set, and by putting it inside a do-while loop we will simulate data being streamed continuously:

Running the above command should generate something like this:

Note: Once you run the above command, use CTRL + C, for breaking the loop and releasing the terminal. 

With that being done, the Kinesis Delivery Stream (Firehose) is now receiving data from the Kinesis Data Stream every 60 seconds (BufferingHints) and sending it to the OpenSearch collection. Next, to see the data on the UI, we can grab the Kibana URL from the collection resource. 

To sign-in, you need to use the access key and secret key from the specified user on the OpenSearch Data Access Policy.

Before having access to the streamed data, you’ll see that Kibana suggests creating an index-pattern. An index-pattern is a regex (regular expression) to aggregate data from multiple indexes. As we have only one, we can put the index name previously specified on the CloudFormation - application-logs-index.

Because we created the index pattern under the Discover menu, we are already able to see the application logs that we streamed from bash. As you can see below, there are more than one thousand log entries.

Now, you are only limited by your imagination! For example, an Observability Engineer could extract many metrics from a data warehouse like that. For demonstration purposes, let’s plot that data on a line graph.

Yes, the magic is happening! We are receiving streamed data and creating visualizations using OpenSearch. The next steps are up to you!

Once you are done, and to avoid any billing surprises, be sure to delete the entire stack by running:

Conclusion

We could have used Kinesis for streaming to different targets as approached here, but OpenSearch it’s a powerful tool that deserves highlight since it can be used for more than just application logs; You can also leverage OpenSearch for your SIEM (Security Information and Event Management) stack - For gathering security insights and metrics from your application on AWS. We hope to share more OpenSearch use cases in the future. 

Understanding how to set up and implement data streaming can open up many new ideas and opportunities. What you learned here about streaming application logs to OpenSearch Serverless is just one example on how to create near real-time integrations without writing a line of code. Data Streaming implementations, gives you the chance to add value to the business faster, since you don’t need to unnecessarily spend time on building custom technical processes and reinventing the wheel. 

Next Steps

If you’re ready to simplify how you ingest, search, visualize, and analyze your data, Caylent's experts can help you leverage Amazon OpenSearch. Get in touch with our team to discuss how we can help you achieve your goals.


Accelerate your GenAI initiatives

Leveraging our accelerators and technical experience

Browse GenAI Offerings
Infrastructure & DevOps Modernization
Kenerry Serain

Kenerry Serain

Kenerry is a Cloud and Software Architect that is passionate about Cloud, Linux, Containers and Programming. Graduated in Computer Science, he is always looking to understand things from a fundamental level. His mission is to deliver sophisticated technical solutions without compromising quality. He loves to contribute to the community through open-source projects, articles, lectures and to guide Caylent's customers through challenging problems.

View Kenerry's articles

Related Blog Posts

Effective AWS Mocking with Moto

Learn how to effectively mock AWS services for better testing and development with Moto to make code testing more effective & efficient.

Infrastructure & DevOps Modernization

Automated Testing with Jest on AWS

Learn how to automate testing and safeguard your JavaScript apps using Jest with AWS CodeBuild and CodePipeline.

Cloud Native App Dev
Infrastructure & DevOps Modernization
Cloud Technology

CI/CD with Bitbucket and Jira Workflows: How to

Master Jira workflows for efficient software development lifecycle (SDLC) management and leverage it in combination with Bitbucket pipelines to optimize your CI/CD process.

Infrastructure & DevOps Modernization
Cloud Technology