Streaming Search and Aggregate Results

Object Streaming

The API offers new endpoints to receive data as a stream of objects. These new API endpoints are supported by new methods in the API Wrapper library. The goal of this new API offering is to receive objects as they are available, rather than as a part of single API call that returns all matching objects in an JSON array.

A typical use case for this API is ETL workloads. Consider the following simple Console Application

dotnet new console
dotnet add package Feenics.Keep.WebApi.Wrapper.Standard --version 1.9.206 --source https://nuget-secure.feenics.com/nuget
using System;
using System.Threading.Tasks;
using Feenics.Keep.WebApi.Model;
using Feenics.Keep.WebApi.Wrapper;

namespace StreamSample
{
    class Program
    {
        static async Task Main(string[] args)
        {
            try
            {
                var client = new Client("https://keepapi.feenicshosting.com");
                var (loggedIn, failedLoginError, failedLoginMessage) = await client.LoginAsync("yourinstance", "username", "password");
                if (!loggedIn)
                {
                    Console.Error.WriteLine($"Failed to login. {failedLoginMessage} ({failedLoginError})");
                    Environment.Exit(1);
                }
                var currentInstance = await client.GetCurrentInstanceAsync();

                using var searchStream = await SearchStream.Create(client, currentInstance, "{'_t':'Person','Surname':'Shillington'}");
                while (await searchStream.ReadAsync())
                {
                    if (searchStream.BaseInfo is PersonInfo person)
                    {
                        Console.WriteLine($"{person.GivenName} {person.Surname}");
                    }
                    else
                    {
                        Console.WriteLine($"Found {searchStream.BaseInfo.CommonName} which is a {searchStream.BaseInfo.GetType().Name} and not a {nameof(PersonInfo)}");
                    }
                }
            }
            catch (FailedOutcomeException e)
            {
                Console.Error.WriteLine($"unexpected error from the API. Status: {e.HttpStatus}, {e.ResponseString}");
                Environment.Exit(2);
            }
            catch (Exception e)
            {
                Console.Error.WriteLine($"unexpected error {e.Message}");
                Environment.Exit(3);
            }

        }
    }
}

Remember that the Client class is largely stateless, meaning the only information that is persisted from one call to the next is the authentication token received from a call to LoginAsync.

Rather than compromise this repository design pattern, a new Stateful class is used to maintain the network connection and provide a forward only cursor style API.

The SearchStream is created by the static Create method, which takes an instance of a Wrapper Client and additional search parameters identical to the SearchAsync method.

using var searchStream = await SearchStream.Create(client, currentInstance, "{'_t':'Person','Surname':'Shillington'}");

Note the SearchStream implements IDisposable and so the using declaration is applied, which will call the Dispose method when the searchStream variable falls out of scope.

At least one call the ReadAsync must be performed before accessing the BaseInfo property.

while (await searchStream.ReadAsync())

The call to ReadAsync() will set the BaseInfo property to specific instance that was returned. Therefore it is possible to cast the BaseInfo type to the specific type expected.

if (searchStream.BaseInfo is PersonInfo person)

Extension Method

To read all the objects into an List, the extension method ToListAsync() which extends Task<SearchStream>

var searchResults = await SearchStream.Create(client, currentInstance, "{'_t':'Person','Surname':'Shillington'}").ToListAsync();

In this case the SearchStream is disposed after the call to ToListAsync(). This convenience method is provided to each the transition from using SearchAsync which returns a List<BaseInfo> object, and is not recommend in the long term, because it negates the real benefit of the streaming api, which is to facilitate processing one object at a time, thereby keeping memory pressure low.

Additional Streaming Types

In addition to the SearchStream type, the EventStream and AggregateStream types are provided to search over the Events collection and to receive Aggregation query results. Note in the case of the AggregateStream the instance property is called Document, which is of type BsonDocument.

var currentInstance = await client.GetCurrentInstanceAsync();

var operations = new[]{
    new BsonDocument {{ "$match",new BsonDocument {{"_t","Person"}}}},
    new BsonDocument {{ "$project",new BsonDocument { { "GivenName", 1}, {"Surname", 1 } } }}

};
var projectTemplate = new { _id = ObjectId.Empty, GivenName = "", Surname = "" };
var aggStream = await AggregateStream.Create(client, currentInstance, "KeepObjects", operations);

while (await aggStream.ReadAsync())
{
    var projectedPerson = aggStream.Document.To(projectTemplate);
    Console.WriteLine($"found {projectedPerson.GivenName} {projectedPerson.Surname} ({projectedPerson._id.ToString()}");
}

Note the use of a convenient extension method To which takes either a <T> type declaration or a nominal type “example” when using anonymous types.

static class BsonExtensions
{
    public static T To<T>(this BsonDocument source)
    {
        var result = BsonSerializer.Deserialize<T>(source);
        return result;
    }
    public static T To<T>(this BsonDocument source, T nominalType)
    {
        return source.To<T>();
    }
}