Development Update: The Flow of a Data Stream

So yesterday I showed how I want to see data requests from the end-to-end perspective, using “attributes” to decorate a class’ fields with the information needed to send a request. I ended with how this works out at the point of usage. My test UI now contains something like the following:

private void TestGetSimState()
    var mgr = RequestManager.Instance;
    log.Info("The simulator is {0}.",
                .AsBoolean() ? "Running" : "Stopped");

    AircraftData data = mgr.RequestData<AircraftData>()
    log.Info("Currently selected aircraft is '{0}'.", data.Title);

    log.Info("Starting stream");
    mgr.RequestData<AircraftData>(ObjectDataPeriod.PerSecond, onlyWhenChanged: true)
       .Subscribe((AircraftData data) => log.Info("[stream] Currently selected aircraft is '{0}'.", data.Title));

The first request is one for the SystemState, which tells us if the user is in control of the aircraft or in a UI dialog. The second request uses the annotated AircraftData class to request the currently selected aircraft, but the third is new. The third asks for the aircraft information every second, but only if it changed. This results in a stream, and we subscribe a lambda to it to tell us every time the user changes the aircraft, and what the new choice is.

So how does this work?

public IMessageStream<T> RequestData<T>(ObjectDataPeriod period, bool onlyWhenChanged = false)
    where T : class
    Type t = typeof(T);
    log.Debug("RequestData<{0}>()", t.FullName);

    ObjectDefinition def = GetObjectDefinition(t);
    MessageStream<ObjectData> data = RequestManager.Instance.RequestObjectData<ObjectData>(def, period, onlyWhenChanged: onlyWhenChanged);

    MessageStream<T> result = new(1);
    data.Subscribe((ObjectData data) => {
        log.Trace("RequestData<{0}> callback called", t.FullName);
        catch (Exception e)
    return result;

The RequestData method, after generating a debug log message, uses GetObjectDefinition(t) to check if we have a DefineId for this class. If not, it scans its fields and properties for our attribute and builds an ObjectDefinition object for it. Then I tell the ObjectDefinition to register itself with SimConnect, so we can start sending requests. Now we send a request for this block and get a stream as result, but this stream contains the SimConnect SIMCONNECT_RECV_SIMOBJECT_DATA message contents, which are useless in a C# context, so I subscribe a new stream on this one, that uses the ObjectDefinition object to translate the buffer into actual C# objects. This new stream is what is returned to the caller.

This approach works well, and make great usage of the reactive design pattern. Note that the second stream is created with a buffer size of 1, so if a second object arrives before the first is consumed, we have a problem. For aircraft data that is not very likely however. Todo items here are making sure that larger buffer sizes work correctly, and that I can create an IEnumerator conforming object for this stream, so you can do foreach loops or even LINQ-style stuff. IAsyncEnumerator may also be a nice one. For high frequency data we may want to prevent new objects for each message, so do an object that is updated rather than created fresh.

Next Step

I think the best next step is to fill out the message parsing and types. It now is able to do INT32, INT64, FLOAT32, FLOAT64, and fixed size strings, so I’ll start with the rest. Also I want the tagged message format, because that is better able to deal parsing data blocks when the simulator didn’t understand all the data items we asked for.

See ya!

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s