Data streams enable efficient retrieval of large datasets by delivering objects one at a time as they become available, rather than loading everything into memory at once. This is essential for ETL workloads, bulk exports, and memory-constrained environments.
When to Use Streaming:
- Retrieving more than 1,000 objects
- Processing data in memory-constrained environments
- Building ETL pipelines or data exports
- Real-time processing where you can handle objects as they arrive
dotnet new console
dotnet add package Feenics.Keep.WebApi.Wrapper.Standard --source https://nuget-secure.feenics.com/nuget
using Feenics.Keep.WebApi.Model;
using Feenics.Keep.WebApi.Wrapper;
var client = new Client("https://api.us.acresecurity.cloud", userAgent: "myAppName/myAppVersion.0.0");
var (loggedIn, error, message) = await client.LoginAsync("myinstance", "username", "password");
if (!loggedIn)
{
Console.Error.WriteLine($"Login failed: {message}");
return;
}
var instance = await client.GetCurrentInstanceAsync();
// Stream people matching the query
using var stream = await SearchStream.Create(client, instance, "{'_t':'Person'}");
while (await stream.ReadAsync())
{
if (stream.BaseInfo is PersonInfo person)
{
Console.WriteLine($"Processing: {person.GivenName} {person.Surname}");
// Process one person at a time - memory efficient!
}
}
acre Access Control provides three streaming types:
| Stream Type | Collection | Use Case |
|---|---|---|
SearchStream |
KeepObjects | Search for any object type (Person, Controller, etc.) |
EventStream |
Events | Query historical events |
AggregateStream |
KeepObjects or Events | Execute custom aggregation pipelines |
Search for objects with streaming results:
// Find all people with surname "Smith"
using var stream = await SearchStream.Create(
client,
instance,
"{'_t':'Person','Surname':'Smith'}"
);
var count = 0;
while (await stream.ReadAsync())
{
if (stream.BaseInfo is PersonInfo person)
{
Console.WriteLine($"{++count}. {person.CommonName}");
}
}
Console.WriteLine($"Total: {count} people");
using var stream = await SearchStream.Create(client, instance, "{'_t':'MercuryController'}");
while (await stream.ReadAsync())
{
// Handle different controller types
switch (stream.BaseInfo)
{
case Lp1502Info lp1502:
Console.WriteLine($"LP1502: {lp1502.CommonName} - {lp1502.MacAddress}");
break;
case Ep1501Info ep1501:
Console.WriteLine($"EP1501: {ep1501.CommonName} - {ep1501.MacAddress}");
break;
default:
Console.WriteLine($"Other: {stream.BaseInfo.CommonName}");
break;
}
}
// Search across entire instance hierarchy
using var stream = await SearchStream.Create(
client,
instance,
"{'_t':'Person'}",
includeChildInstances: true // Include all child instances
);
while (await stream.ReadAsync())
{
var person = stream.BaseInfo as PersonInfo;
Console.WriteLine($"{person.CommonName} in instance {person.InFolderKey}");
}
Stream historical events from the Events collection:
// Get all access granted events from the last 24 hours
var yesterday = DateTime.UtcNow.AddDays(-1).ToString("o");
var query = $"{{'EventTypeId':'{'$oid': '61cb71500000000000000000'} },'OccurredOn.Date':{{'$gte':ISODate('{yesterday}')}}}}";
using var stream = await EventStream.Create(client, instance, query);
while (await stream.ReadAsync())
{
var evt = stream.EventMessage;
Console.WriteLine($"{evt.OccurredOn}: {evt.MessageLong}");
}
// Access denied events for a specific reader
var readerKey = "5bcde6dba3ac160048509333";
var query = $@"{{
'EventTypeId':{'$oid': '61cb71500000000000000000'} },
'ObjectLinks.LinkedObjectId':ObjectId('{readerKey}'),
'ObjectLinks.Relation':'Reader'
}}";
using var stream = await EventStream.Create(client, instance, query);
while (await stream.ReadAsync())
{
Console.WriteLine($"Denied: {stream.EventMessage.MessageLong}");
}
Execute aggregation pipelines and stream results:
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
var instance = await client.GetCurrentInstanceAsync();
// Count people by their first letter of surname
var pipeline = new[]
{
new BsonDocument("$match", new BsonDocument("_t", "Person")),
new BsonDocument("$project", new BsonDocument
{
{ "GivenName", 1 },
{ "Surname", 1 }
})
};
using var stream = await AggregateStream.Create(client, instance, "KeepObjects", pipeline);
while (await stream.ReadAsync())
{
// Access raw BsonDocument
var doc = stream.Document;
Console.WriteLine($"Found: {doc["GivenName"]} {doc["Surname"]}");
}
For cleaner code, deserialize to anonymous types:
var pipeline = new[]
{
new BsonDocument("$match", new BsonDocument("_t", "Person")),
new BsonDocument("$group", new BsonDocument
{
{ "_id", new BsonDocument("$substr", new BsonArray { "$Surname", 0, 1 }) },
{ "count", new BsonDocument("$sum", 1) }
}),
new BsonDocument("$sort", new BsonDocument("count", -1))
};
// Define expected shape
var template = new { _id = "", count = 0 };
using var stream = await AggregateStream.Create(client, instance, "KeepObjects", pipeline);
while (await stream.ReadAsync())
{
var result = stream.Document.To(template);
Console.WriteLine($"Surnames starting with '{result._id}': {result.count}");
}
Add this extension method for convenient deserialization:
public static class BsonExtensions
{
public static T To<T>(this BsonDocument source)
{
return BsonSerializer.Deserialize<T>(source);
}
public static T To<T>(this BsonDocument source, T nominalType)
{
return source.To<T>();
}
}
For convenience when you need all results in memory:
// Get all results as a list (one line)
var allPeople = await SearchStream.Create(
client,
instance,
"{'_t':'Person'}"
).ToListAsync();
Console.WriteLine($"Retrieved {allPeople.Count} people");
⚠️ Warning:
ToListAsync()loads all results into memory, negating the memory benefits of streaming. Only use for small result sets.
For more control:
var people = new List<PersonInfo>();
using var stream = await SearchStream.Create(client, instance, "{'_t':'Person'}");
while (await stream.ReadAsync())
{
if (stream.BaseInfo is PersonInfo person)
{
people.Add(person);
// Optional: Limit results
if (people.Count >= 10000)
{
Console.WriteLine("Reached limit, stopping...");
break;
}
}
}
// Use 'using' statement for automatic disposal
using var stream = await SearchStream.Create(client, instance, query);
while (await stream.ReadAsync())
{
// Process...
}
// Stream automatically disposed here
// Good: Process and discard each object
using var stream = await SearchStream.Create(client, instance, "{'_t':'Person'}");
var count = 0;
while (await stream.ReadAsync())
{
await ProcessPersonAsync(stream.BaseInfo as PersonInfo);
count++;
if (count % 1000 == 0)
Console.WriteLine($"Processed {count} records...");
}
try
{
using var stream = await SearchStream.Create(client, instance, query);
while (await stream.ReadAsync())
{
try
{
await ProcessAsync(stream.BaseInfo);
}
catch (Exception ex)
{
// Log but continue processing
Console.Error.WriteLine($"Error processing {stream.BaseInfo.Key}: {ex.Message}");
}
}
}
catch (FailedOutcomeException e)
{
Console.Error.WriteLine($"API Error: {e.HttpStatus} - {e.ResponseString}");
}
// Bad: Accumulates all results in memory
var allResults = new List<BaseInfo>();
using var stream = await SearchStream.Create(client, instance, "{'_t':'Person'}");
while (await stream.ReadAsync())
{
allResults.Add(stream.BaseInfo); // Memory grows unbounded!
}
using var stream = await SearchStream.Create(client, instance, query);
// Bad: Accessing before ReadAsync
var person = stream.BaseInfo; // Will be null or throw!
// Good: Always call ReadAsync first
while (await stream.ReadAsync())
{
var person = stream.BaseInfo; // Now safe to access
}
using System.Text;
using Feenics.Keep.WebApi.Model;
using Feenics.Keep.WebApi.Wrapper;
async Task ExportPeopleToCsv(Client client, InstanceInfo instance, string outputPath)
{
using var writer = new StreamWriter(outputPath, false, Encoding.UTF8);
await writer.WriteLineAsync("Key,GivenName,Surname,Email,CardCount");
using var stream = await SearchStream.Create(client, instance, "{'_t':'Person'}");
var count = 0;
while (await stream.ReadAsync())
{
if (stream.BaseInfo is PersonInfo person)
{
var email = person.Addresses?
.OfType<EmailAddressInfo>()
.FirstOrDefault()?.MailTo ?? "";
var cardCount = person.CardAssignments?.Count ?? 0;
await writer.WriteLineAsync(
$"{person.Key},{person.GivenName},{person.Surname},{email},{cardCount}"
);
if (++count % 1000 == 0)
Console.WriteLine($"Exported {count} people...");
}
}
Console.WriteLine($"Exported {count} people to {outputPath}");
}
async Task GenerateEventReport(Client client, InstanceInfo instance, DateTime startDate)
{
var stats = new Dictionary<string, int>();
var isoDate = startDate.ToString("o");
var query = $"{{'OccurredOn':{{'$gte':ISODate('{isoDate}')}}}}";
using var stream = await EventStream.Create(client, instance, query);
while (await stream.ReadAsync())
{
var eventType = stream.EventMessage.EventTypeKey ?? "Unknown";
stats[eventType] = stats.GetValueOrDefault(eventType, 0) + 1;
}
Console.WriteLine($"Event Statistics since {startDate:d}:");
Console.WriteLine(new string('-', 40));
foreach (var (type, count) in stats.OrderByDescending(x => x.Value))
{
Console.WriteLine($"{type,-25} {count,10:N0}");
}
}
async Task GenerateHardwareInventory(Client client, InstanceInfo instance)
{
var pipeline = new[]
{
new BsonDocument("$match", new BsonDocument
{
{ "_t", new BsonDocument("$in", new BsonArray
{ "MercuryController", "MercuryReader", "MercuryDoor" })
}
}),
new BsonDocument("$group", new BsonDocument
{
{ "_id", "$_t" },
{ "count", new BsonDocument("$sum", 1) },
{ "onlineCount", new BsonDocument("$sum",
new BsonDocument("$cond", new BsonArray { "$IsOnline", 1, 0 }))
}
}),
new BsonDocument("$sort", new BsonDocument("count", -1))
};
var template = new { _id = "", count = 0, onlineCount = 0 };
Console.WriteLine("Hardware Inventory:");
Console.WriteLine(new string('-', 50));
Console.WriteLine($"{"Type",-20} {"Total",-10} {"Online",-10} {"Offline",-10}");
Console.WriteLine(new string('-', 50));
using var stream = await AggregateStream.Create(client, instance, "KeepObjects", pipeline);
while (await stream.ReadAsync())
{
var result = stream.Document.To(template);
var offline = result.count - result.onlineCount;
Console.WriteLine($"{result._id,-20} {result.count,-10} {result.onlineCount,-10} {offline,-10}");
}
}
| Aspect | Standard API (SearchAsync) | Streaming (SearchStream) |
|---|---|---|
| Memory Usage | High (all results in memory) | Low (one object at a time) |
| Time to First Result | Slow (wait for all) | Fast (immediate) |
| Pagination | Manual (page by page) | Automatic (handled internally) |
| Best For | Small result sets (<1,000) | Large result sets (>1,000) |
| Error Recovery | Restart from beginning | Restart from beginning |
| Processing Model | Batch | Incremental |