Develop a MongoDB pipeline to transform data into time buckets

In the previous post How to create a MongoDB pipeline, I discussed a very simple MongoDB pipeline that executed two basic stages, $match and $project. The provided us the basic knowledge of MongoDB pipeline works.

In this post, I am going to expand on the previous simple example and build a more realistic MongoDB pipeline that I use to identify anomalies in transaction activity patterns. For example, during certain period of the day I expect the activity to follow a pattern of extreme activity for few minutes and then few minutes of inactivity. If I observe some variation in this pattern, it triggers an alert in the mind of the person who is observing the behavior.

Let me first state the problem that I will try to solve by constructing MongoDB pipeline for data analysis. I want to review transaction activity in bucket of 15 seconds over a specified time range. This statement translates to "filter/match the transaction data between specified date range, group the data in buckets of 15 seconds, aggregate the number of transaction activities in 15s buckets and report the count of the activities in the buckets".

It is important to write down your problem statement and the output you are expecting. It helps in formulating MongoDB pipeline.

Following code snippet is the solution to the requirements I stated above.

public async Task GetTransactionActivityTrendsAsync()
{
    var dataStoreProvider = new DataStoreProvider();
    var dataStore = dataStoreProvider.GetDatabase("SecurityMonitor");
    var collection = dataStore.GetCollection<BsonDocument>("TransactionActivity");

    var startTime = new DateTime(2023, 7, 4, 12, 0, 0, DateTimeKind.Utc);
    var endTime = startTime.AddHours(3);
    // First create complete list of 15s boundaries. These boundaries will
    // be required to create all the buckets including cases when there are no entries
    // for certain time intervals.
    var totalHours = (endTime - startTime).TotalHours;
    var startOfBoundaries =
        Math.Floor(
            (startTime - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds / (1000 * 15));
    var allBoundaries = new BsonArray();
    var totalBoundaries = (totalHours * 60 * 60) / 15;
    for (var i = 0; i < totalBoundaries; i++)
    {
        allBoundaries.Add(startOfBoundaries + i);
    }
    
    // Create a pipeline to process data in stages.
    // Pipeline is an array of commands that will be executed in sequence.
    var pipeline = new BsonDocument[]
    {
        // stage1: match the documents with the time range
        new ("$match", new BsonDocument
        {
            {"$and", new BsonArray
                {
                    new BsonDocument("ActivityAtUtc", new BsonDocument("$gte", startTime)),
                    new BsonDocument("ActivityAtUtc", new BsonDocument("$lt", endTime))
                }}
        }),
        // stage2: project the fields for output documents
        new ("$project", new BsonDocument
            {
                {
                    "ActivityAt",
                        // floor the 15s boundary value
                        new BsonDocument("$floor",
                            // convert unix timestamp to 15s boundary
                            new BsonDocument("$divide",
                                new BsonArray{
                                    // subtract date value from unix base date
                                    new BsonDocument("$subtract",
                                        new BsonArray
                                        {
                                            "$ActivityAtUtc",
                                            new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)
                                        }),
                                        1000*15
                                    }
                                )
                            )
                }
            }
        ),
        // stage 3: Create buckets of entries against the time boundaries
        new ("$bucket", new BsonDocument
        {
            {"groupBy", "$ActivityAt"},
            {"boundaries",allBoundaries},
            {"default","Other"},
            { "output", new BsonDocument
                {
                    { "Count", new BsonDocument("$sum", 1) }
                }
            }
        }),
        // stage 4: project the output to use friendly name for time bucket
        // value rather than _id
        new ("$project", new BsonDocument
        {
            {"_id",0},
            {"TimeBucket", "$_id"},
            {"Count",1}
        }),
        // stage 5: sort the records by time bucket
        new ("$sort", new BsonDocument("TimeBucket",1))
    };
    
    var results = (await collection.AggregateAsync<BsonDocument>(pipeline)).ToList();

    // $bucket stage does not include any entry for the intervals where
    // there is not activity. This step will need to be handled
    // by iterating over all the boundaries and inserting empty values.
    var finalResult = new List<BsonDocument>();
    for (var i = 0; i < totalBoundaries; i++)
    {
        var timeBucket = startOfBoundaries + i;
        var doc = results.FirstOrDefault(d => d["TimeBucket"] == timeBucket);
        if (doc != null)
        {
            finalResult.Add(doc);
        }
        else
        {
            finalResult.Add(new BsonDocument
            {
                { "TimeBucket", timeBucket },
                { "Count", 0 }
            });
        }
    }

    foreach (var result in finalResult)
    {
        System.Console.WriteLine(result.ToJson());
    }
}

Let's look at what is going on in the code above. I have used 5 MongoDB stages in the pipeline.

  • $match: This is first step in processing the data. In this stage the records are matched with the specified range. Unless you have any special circumstances, I will strongly recommend matching the records as early as possible in the pipeline. You can take advantage of MongoDB indexing at this stage.

    $project:In this stage, we will take the matched records and create output documents that has fields that we will require for further processing. One important step in this $project stage is that I have converted the activity time to number of seconds since January 1970 which is also known as Unix Timestamp. There is one more step performed in this conversion is that I have converted the timestamp to a 15s boundary by dividing the number of seconds value by (1000*15). The subtraction of date fields gives us number of milliseconds. The value is divided by 1000 to convert to number of seconds. The division by 15 will normalize the value to 15s boundary. In simple terms, all the values that are in that 15s bucket will have the same normalized timestamp value.

    $bucket: In this stage, the projected records will be placed in the appropriate 15s buckets. These 15s buckets are specified in boundaries field. At the beginning of the function, i computed all the 15s buckets between start and end date of the specified range. This stage has also computed the number of entries in each 15s buckets and placed in the value in Count field of the output.

    $project: This is second time when I am projecting the output from previous stage. I really did not have to do for my case, but I included it here for demonstration. In this example, I have decided to drop "_id" field from the output and places the value from "_id" field into new field named TimeBucket

    $sort: This is the last stage in the pipeline which is used to sort the results.

There is one last step that has been performed after the pipeline has completed. It has to do with the filling in the data for empty buckets. The $bucket stage does not fill the data for the boundaries for which data is not available. Let me explain this with following example.

Let's say that you specify boundaries as [0,10,20,30,40,50] for a data set. The data set contains values for boundaries 0-10, 20-30 and 40-50. The data set has no information about boundaries 10-20 and 30-40. $bucket is not going to include any information for the boundaries 10-20 and 30-40. In certain situations this may work. For use cases where I want to know if there was no activity in certain time buckets, this $bucket operation will not be sufficient.

To address this gap, the last step in the process is to iterate over all the possible boundaries. For the boundaries that has no data, I have filled 0 as Count.

Now I have the complete information about all the time boundaries between the specified time range. I can use this to perform all types of the data analysis. I have included a sample time series display of this data.

Search

Social

Weather

-1.8 °C / 28.8 °F

weather conditions Clouds

Monthly Posts

Blog Tags