How to create a MongoDB pipeline for .Net applications

Pipeline is a very versatile and powerful mechanism in MongoDB to execute multiple actions on data in a collection. It's called a pipeline because the output of one stage is passed as the input to the next stage, much like how water flows through a pipeline.

MongoDB's aggregation framework enables you to perform complex data processing and manipulation operations on your data directly within the database itself. It's an incredibly powerful feature that allows you to execute various commands filtering, transforming, sorting, grouping, and calculating aggregated values on a set of data. You control the execution of commands through a sequence of stages.

Over a series of posts, I will demonstrate how MongoDB pipelines can be developed for .Net applications. In this post, I will start with a simple pipeline that perform following commands as part of the aggregation pipeline.

  • match
  • project

I will use a table named TransactionActivity that has following document structure.

    {
        "_id":"",
        "Actor":"",
        "ActivityLocation":"",
        "ActivityDevice":""
        "ActivityAtUtc":{datetime},
        "ActivityEntries": [
            {
                "Quantity":"",
                "Amount":""
            }
        ]
    }

This document is representative of activity monitoring of an eCommerce application. The monitoring application perform real time analysis to detect any anomalies.

I will build a simple pipeline that matches the activity records for a time range. After matching the records, the pipeline will project one field "ActivityAtUtc".

    public async Task GetTransactionActivityTrendsAsync()
    {
        var dataStoreProvider = new DataStoreProvider();
        var dataStore = dataStoreProvider.GetDatabase("SecurityMonitor");
        var collection = dataStore.GetCollection("TransactionActivity");

        var startTime = new DateTime(2023, 7, 4, 12, 0, 0, DateTimeKind.Utc);
        var endTime = startTime.AddHours(3);
        // Create a pipeline to process data in stages.
        // Pipeline is an array of commands that will be executed in sequence.
        var pipeline = new BsonDocument[]
        {
            // stage1: match the documents with the time range
            new ("$match", new BsonDocument
            {
                {"$and", new BsonArray
                    {
                        new BsonDocument("ActivityAtUtc", new BsonDocument("$gte", startTime)),
                        new BsonDocument("ActivityAtUtc", new BsonDocument("$lt", endTime))
                    }}
            }),
            // stage2: project the fields for output documents
            new ("$project", new BsonDocument
            {
                {
                    "ActivityAtUtc",1
                }
            })
        };

        var results = (await collection.AggregateAsync(pipeline)).ToList();
        foreach (var result in results)
        {
            System.Console.WriteLine(result.ToJson());
        }
    }

Here is some explanation of the code. I have prepared a range of time for which I want to analyze the activity. In my example, I have hard wired this range. In real application, you will be passing the time range as run time parameter to some function.

Next step is construction of MongoDB pipeline. By definition a pipeline is an array of stages that are executed in sequence. The variable pipeline is defined as an array of BsonDocument. In this array, I defined the first stage as "$match". This is the stage where the records will be filtered to match the time range. Performing "$match" in the early stage allows to take advantage of MongoDB indexing and reduce the data set to the documents that you need for aggregation.

Now that we have matched the documents, the next stage is "$project". This stage will take the output from $match stage and create an output that will project only one field "ActivityAtUtc" field. The final output will also contain "_id" field. The "_id" field is automatically included. If you want to exclude "_id" field, you will need to explicitly exclude in the projection.

This an example a very simple MongoDB pipeline. In the subsequent posts, I will build on this example to perform some complex analysis and aggregation.

Search

Social

Weather

0.1 °C / 32.1 °F

weather conditions Clouds

Monthly Posts

Blog Tags