Data Streams

Data streams enable efficient retrieval of large datasets by delivering objects one at a time as they become available, rather than loading everything into memory at once. This is essential for ETL workloads, bulk exports, and memory-constrained environments.

When to Use Streaming:

  • Retrieving more than 1,000 objects
  • Processing data in memory-constrained environments
  • Building ETL pipelines or data exports
  • Real-time processing where you can handle objects as they arrive

Quick Start

Installation

dotnet new console
dotnet add package Feenics.Keep.WebApi.Wrapper.Standard --source https://nuget-secure.feenics.com/nuget

Basic Search Stream

using Feenics.Keep.WebApi.Model;
using Feenics.Keep.WebApi.Wrapper;

var client = new Client("https://api.us.acresecurity.cloud", userAgent: "myAppName/myAppVersion.0.0");
var (loggedIn, error, message) = await client.LoginAsync("myinstance", "username", "password");

if (!loggedIn)
{
    Console.Error.WriteLine($"Login failed: {message}");
    return;
}

var instance = await client.GetCurrentInstanceAsync();

// Stream people matching the query
using var stream = await SearchStream.Create(client, instance, "{'_t':'Person'}");
while (await stream.ReadAsync())
{
    if (stream.BaseInfo is PersonInfo person)
    {
        Console.WriteLine($"Processing: {person.GivenName} {person.Surname}");
        // Process one person at a time - memory efficient!
    }
}

Stream Types

acre Access Control provides three streaming types:

Stream Type Collection Use Case
SearchStream KeepObjects Search for any object type (Person, Controller, etc.)
EventStream Events Query historical events
AggregateStream KeepObjects or Events Execute custom aggregation pipelines

SearchStream

Search for objects with streaming results:

Basic Usage

// Find all people with surname "Smith"
using var stream = await SearchStream.Create(
    client, 
    instance, 
    "{'_t':'Person','Surname':'Smith'}"
);

var count = 0;
while (await stream.ReadAsync())
{
    if (stream.BaseInfo is PersonInfo person)
    {
        Console.WriteLine($"{++count}. {person.CommonName}");
    }
}
Console.WriteLine($"Total: {count} people");

Type-Safe Processing

using var stream = await SearchStream.Create(client, instance, "{'_t':'MercuryController'}");

while (await stream.ReadAsync())
{
    // Handle different controller types
    switch (stream.BaseInfo)
    {
        case Lp1502Info lp1502:
            Console.WriteLine($"LP1502: {lp1502.CommonName} - {lp1502.MacAddress}");
            break;
        case Ep1501Info ep1501:
            Console.WriteLine($"EP1501: {ep1501.CommonName} - {ep1501.MacAddress}");
            break;
        default:
            Console.WriteLine($"Other: {stream.BaseInfo.CommonName}");
            break;
    }
}

Including Child Instances

// Search across entire instance hierarchy
using var stream = await SearchStream.Create(
    client, 
    instance, 
    "{'_t':'Person'}",
    includeChildInstances: true  // Include all child instances
);

while (await stream.ReadAsync())
{
    var person = stream.BaseInfo as PersonInfo;
    Console.WriteLine($"{person.CommonName} in instance {person.InFolderKey}");
}

EventStream

Stream historical events from the Events collection:

// Get all access granted events from the last 24 hours
var yesterday = DateTime.UtcNow.AddDays(-1).ToString("o");
var query = $"{{'EventTypeId':'{'$oid': '61cb71500000000000000000'} },'OccurredOn.Date':{{'$gte':ISODate('{yesterday}')}}}}";

using var stream = await EventStream.Create(client, instance, query);

while (await stream.ReadAsync())
{
    var evt = stream.EventMessage;
    Console.WriteLine($"{evt.OccurredOn}: {evt.MessageLong}");
}

Event Filtering Examples

// Access denied events for a specific reader
var readerKey = "5bcde6dba3ac160048509333";
var query = $@"{{
    'EventTypeId':{'$oid': '61cb71500000000000000000'} },
    'ObjectLinks.LinkedObjectId':ObjectId('{readerKey}'),
    'ObjectLinks.Relation':'Reader'
}}";

using var stream = await EventStream.Create(client, instance, query);
while (await stream.ReadAsync())
{
    Console.WriteLine($"Denied: {stream.EventMessage.MessageLong}");
}

AggregateStream

Execute aggregation pipelines and stream results:

using MongoDB.Bson;
using MongoDB.Bson.Serialization;

var instance = await client.GetCurrentInstanceAsync();

// Count people by their first letter of surname
var pipeline = new[]
{
    new BsonDocument("$match", new BsonDocument("_t", "Person")),
    new BsonDocument("$project", new BsonDocument
    {
        { "GivenName", 1 },
        { "Surname", 1 }
    })
};

using var stream = await AggregateStream.Create(client, instance, "KeepObjects", pipeline);

while (await stream.ReadAsync())
{
    // Access raw BsonDocument
    var doc = stream.Document;
    Console.WriteLine($"Found: {doc["GivenName"]} {doc["Surname"]}");
}

Using Anonymous Types

For cleaner code, deserialize to anonymous types:

var pipeline = new[]
{
    new BsonDocument("$match", new BsonDocument("_t", "Person")),
    new BsonDocument("$group", new BsonDocument
    {
        { "_id", new BsonDocument("$substr", new BsonArray { "$Surname", 0, 1 }) },
        { "count", new BsonDocument("$sum", 1) }
    }),
    new BsonDocument("$sort", new BsonDocument("count", -1))
};

// Define expected shape
var template = new { _id = "", count = 0 };

using var stream = await AggregateStream.Create(client, instance, "KeepObjects", pipeline);

while (await stream.ReadAsync())
{
    var result = stream.Document.To(template);
    Console.WriteLine($"Surnames starting with '{result._id}': {result.count}");
}

BSON Extension Helper

Add this extension method for convenient deserialization:

public static class BsonExtensions
{
    public static T To<T>(this BsonDocument source)
    {
        return BsonSerializer.Deserialize<T>(source);
    }
    
    public static T To<T>(this BsonDocument source, T nominalType)
    {
        return source.To<T>();
    }
}

Converting to Lists

ToListAsync Extension

For convenience when you need all results in memory:

// Get all results as a list (one line)
var allPeople = await SearchStream.Create(
    client, 
    instance, 
    "{'_t':'Person'}"
).ToListAsync();

Console.WriteLine($"Retrieved {allPeople.Count} people");

⚠️ Warning: ToListAsync() loads all results into memory, negating the memory benefits of streaming. Only use for small result sets.

Manual List Building

For more control:

var people = new List<PersonInfo>();

using var stream = await SearchStream.Create(client, instance, "{'_t':'Person'}");
while (await stream.ReadAsync())
{
    if (stream.BaseInfo is PersonInfo person)
    {
        people.Add(person);
        
        // Optional: Limit results
        if (people.Count >= 10000)
        {
            Console.WriteLine("Reached limit, stopping...");
            break;
        }
    }
}

Best Practices

✅ DO: Dispose Streams Properly

// Use 'using' statement for automatic disposal
using var stream = await SearchStream.Create(client, instance, query);
while (await stream.ReadAsync())
{
    // Process...
}
// Stream automatically disposed here

✅ DO: Process Objects Incrementally

// Good: Process and discard each object
using var stream = await SearchStream.Create(client, instance, "{'_t':'Person'}");
var count = 0;
while (await stream.ReadAsync())
{
    await ProcessPersonAsync(stream.BaseInfo as PersonInfo);
    count++;
    
    if (count % 1000 == 0)
        Console.WriteLine($"Processed {count} records...");
}

✅ DO: Handle Errors Gracefully

try
{
    using var stream = await SearchStream.Create(client, instance, query);
    while (await stream.ReadAsync())
    {
        try
        {
            await ProcessAsync(stream.BaseInfo);
        }
        catch (Exception ex)
        {
            // Log but continue processing
            Console.Error.WriteLine($"Error processing {stream.BaseInfo.Key}: {ex.Message}");
        }
    }
}
catch (FailedOutcomeException e)
{
    Console.Error.WriteLine($"API Error: {e.HttpStatus} - {e.ResponseString}");
}

❌ DON’T: Store All Results Unless Necessary

// Bad: Accumulates all results in memory
var allResults = new List<BaseInfo>();
using var stream = await SearchStream.Create(client, instance, "{'_t':'Person'}");
while (await stream.ReadAsync())
{
    allResults.Add(stream.BaseInfo);  // Memory grows unbounded!
}

❌ DON’T: Call ReadAsync Before Accessing BaseInfo

using var stream = await SearchStream.Create(client, instance, query);

// Bad: Accessing before ReadAsync
var person = stream.BaseInfo;  // Will be null or throw!

// Good: Always call ReadAsync first
while (await stream.ReadAsync())
{
    var person = stream.BaseInfo;  // Now safe to access
}

Complete Examples

Example 1: Export People to CSV

using System.Text;
using Feenics.Keep.WebApi.Model;
using Feenics.Keep.WebApi.Wrapper;

async Task ExportPeopleToCsv(Client client, InstanceInfo instance, string outputPath)
{
    using var writer = new StreamWriter(outputPath, false, Encoding.UTF8);
    await writer.WriteLineAsync("Key,GivenName,Surname,Email,CardCount");
    
    using var stream = await SearchStream.Create(client, instance, "{'_t':'Person'}");
    var count = 0;
    
    while (await stream.ReadAsync())
    {
        if (stream.BaseInfo is PersonInfo person)
        {
            var email = person.Addresses?
                .OfType<EmailAddressInfo>()
                .FirstOrDefault()?.MailTo ?? "";
            
            var cardCount = person.CardAssignments?.Count ?? 0;
            
            await writer.WriteLineAsync(
                $"{person.Key},{person.GivenName},{person.Surname},{email},{cardCount}"
            );
            
            if (++count % 1000 == 0)
                Console.WriteLine($"Exported {count} people...");
        }
    }
    
    Console.WriteLine($"Exported {count} people to {outputPath}");
}

Example 2: Event Statistics Report

async Task GenerateEventReport(Client client, InstanceInfo instance, DateTime startDate)
{
    var stats = new Dictionary<string, int>();
    var isoDate = startDate.ToString("o");
    var query = $"{{'OccurredOn':{{'$gte':ISODate('{isoDate}')}}}}";
    
    using var stream = await EventStream.Create(client, instance, query);
    
    while (await stream.ReadAsync())
    {
        var eventType = stream.EventMessage.EventTypeKey ?? "Unknown";
        stats[eventType] = stats.GetValueOrDefault(eventType, 0) + 1;
    }
    
    Console.WriteLine($"Event Statistics since {startDate:d}:");
    Console.WriteLine(new string('-', 40));
    
    foreach (var (type, count) in stats.OrderByDescending(x => x.Value))
    {
        Console.WriteLine($"{type,-25} {count,10:N0}");
    }
}

Example 3: Hardware Inventory with Aggregation

async Task GenerateHardwareInventory(Client client, InstanceInfo instance)
{
    var pipeline = new[]
    {
        new BsonDocument("$match", new BsonDocument
        {
            { "_t", new BsonDocument("$in", new BsonArray 
                { "MercuryController", "MercuryReader", "MercuryDoor" }) 
            }
        }),
        new BsonDocument("$group", new BsonDocument
        {
            { "_id", "$_t" },
            { "count", new BsonDocument("$sum", 1) },
            { "onlineCount", new BsonDocument("$sum", 
                new BsonDocument("$cond", new BsonArray { "$IsOnline", 1, 0 })) 
            }
        }),
        new BsonDocument("$sort", new BsonDocument("count", -1))
    };
    
    var template = new { _id = "", count = 0, onlineCount = 0 };
    
    Console.WriteLine("Hardware Inventory:");
    Console.WriteLine(new string('-', 50));
    Console.WriteLine($"{"Type",-20} {"Total",-10} {"Online",-10} {"Offline",-10}");
    Console.WriteLine(new string('-', 50));
    
    using var stream = await AggregateStream.Create(client, instance, "KeepObjects", pipeline);
    
    while (await stream.ReadAsync())
    {
        var result = stream.Document.To(template);
        var offline = result.count - result.onlineCount;
        Console.WriteLine($"{result._id,-20} {result.count,-10} {result.onlineCount,-10} {offline,-10}");
    }
}

Comparison: Stream vs Standard API

Aspect Standard API (SearchAsync) Streaming (SearchStream)
Memory Usage High (all results in memory) Low (one object at a time)
Time to First Result Slow (wait for all) Fast (immediate)
Pagination Manual (page by page) Automatic (handled internally)
Best For Small result sets (<1,000) Large result sets (>1,000)
Error Recovery Restart from beginning Restart from beginning
Processing Model Batch Incremental