🚿 Streaming data with aschannel

Overview

aschannel is a streaming endpoint which allows clients to retrieve the results of each submitted task – in real-time – as they are generated. Streaming APIs differ from regular APIs in that a connection is held open indefinitely and messages are sent to the client immediately on becoming available.

Clients subscribe to one or more accounts using asmaster, or request data directly via asapi, and results will be streamed to the client in real-time once available.

Important

When using the aschannel it is critical to be listening permanently to the stream. Clients who are not listening permanently may miss their task results, or critical messages from the system. If messages on the stream are not heard, there is no way to regenerate them: the tasks must be run again.

Connecting to the channel

A connection to aschannel can be made using the following command:

$ curl \
    -X GET \
    -H "Authorization: Token <TOKEN>" \
    -d "stream=<STREAM>" \
    https://aschannel.reincubate.com/stream/

Where <TOKEN> is the client’s token, and <STREAM> the client’s stream ID. This ID is provided by the asapi endpoint when a task is submitted, and it is a constant value tied to a client’s token.

Note

asapi is responsible for dynamically allocating endpoint URLs: it indicates the <STREAM> under the JSON key stream. This is provided purely for validation: the client need already be subscribed to the channel at the point tasks are submitted to asapi, in order to not risk missing the results.

In the example above, the host https://aschannel.reincubate.com is assumed. This may not be so in all cases: see service, action and endpoint discovery.

Troubleshooting HTTP response codes

When connecting to aschannel an HTTP response code will be sent in the response header. The table below describes their meanings.

Status Text Description
200 OK Everything is fine.
400 Bad Request The request is malformed.
401 Unauthorized Client token is invalid.
503 Service Unavailable aschannel is currently unavailable: check status.reincubate.com.

Interpreting the stream

Each message from aschannel is in two parts: a header and a payload. The header contains metadata about the message, while the payload contains the message itself. This could be a system message, or the data that the client has requested. Both the header and the payload end in a carriage return and a new line, and each is prefixed by a number indicating the length of the following piece of data.

<HEADER_LENGTH>\r\n
<HEADER>\r\n
<PAYLOAD_LENGTH>\r\n
<PAYLOAD>\r\n

Both <HEADER_LENGTH> and <PAYLOAD_LENGTH> are integers and both represent the length of the next field including the \r\n. The header is always JSON encoded, but the payload format varies depending on its type. Consequently, one could consume messages from the aschannel with Python like so:

buf = response.raw
while not buf.closed:
    header_length = buf.readline()
    header = buf.read(header_length)
    payload_length = buf.readline()
    payload = buf.read(payload_length)

    handle_payload(header, payload)

Important

aschannel acts as a firehose of data and does not permit a large buffer to form: content must be consumed by clients in realtime. If client consumption is too slow, the server may issue disconnect warnings, and ultimately disconnect the client.

On first connecting, the response will look similar to this, and the connection will stay open:

66
{ "type": "system",
  "id": "cff35e74-c709-4374-348e-03ea9a2cee61"}
45
{ "message": "Message streaming has begun."}

Here, 66 and 45 are the header length and payload length respectively. Reconnecting with the same key from another terminal will give this:

66
{ "type": "system",
  "id": "ad17c5a5-c658-4736-876e-cf7901be7292"}
155
{ "message": "A new HTTP connection has been opened using your your authentication details, culling old connections.",
  "error": "CULLING_OLD_CONNECTIONS"
}

Important

A channel may only be subscribed to with a single connection: each new connection to the channel will disconnect pre-existing connections.

Interpreting chunked payloads

Any payload that is larger than a few kilobytes will be split into smaller chunks, which a client must reassemble to form the original payload. Headers will contain fields named chunk, total_chunks, and chunk_size. These headers may be present even when there is only a single chunk.

As an example, a chunked result for a login task would result in an output as below:

123
{ "chunk": 1,
  "total_chunks": 1,
  "type": "log-in",
  "task_id": "2a24db6c-6870-4a27-a45a-a5c172d8ec98",
  "chunk_size": 16384
}
51
{ "message": "Log-in successful",
  "success": true
}

Receiving task results

The following is an example of a message response from a fetch_data task on the icloud service:

137
{ "stream": "<STREAM>",
  "task_id": "<TASK_ID>",
  "chunk": 1,
  "total_chunks": 1,
  "chunk_size": 16384,
  "type": "<PAYLOAD_TYPE>"
}
288
{ "sms": [{
    "conversation_id": "+447910000000",
    "from_me": false,
    "handle": "vodafone",
    "attachments": [],
    "deleted": false,
    "type": "SMS",
    "date": "2016-09-16 11:52:53.000000",
    "text": "Welcome to Vodafone!",
    "group_handles": null,
    "id": 1
  }]
}

Here, the <TASK_ID> is the same <TASK_ID> given by asapi when the task was submitted. The <PAYLOAD_TYPE> describes the nature of the payload, and will be described using the slug of the action submitted via asapi.

Note

Message types will either be system or a slug matching the name of the tasks found in service discovery.

Receiving system messages

From time to time, ricloud may need to transmit system messages to the client. These are always identified by the header’s type field being set to system. For example:

16
{ "type": "system"
}
97
{ "code": "client-too-slow",
  "message": "The client is reading data too slowly. Disconnecting."
}

Note

System messages are only guaranteed to contain a type value of system in the header and a message field inside the payload. System messages are never chunked.

The following system message code values are possible:

Response Summary
reconnect Reconnection required
CULLING_OLD_CONNECTIONS Simultaneous connection
BUFFER_WARNING_THRESHOLD Buffer warning: threshold
BUFFER_WARNING_TIMEOUT Buffer warning: timeout
CLOSING_SLOW_CONNECTION Client too slow

Reconnection required

The channel may instruct the client to reconnect, if so the client should open a new connection to the supplied endpoint as soon as possible.

Simultaneous connection

The channel only permits a single open connection per client, if a client opens a new connection the old one will be killed with this message.

Buffer warning: threshold

Clients can be disconnected for failing to consume messages at the rate they are being provided. They will be warned if they are falling behind, starting when the internal buffer reaches a predefined threshold of 40% of their messages being held for them.

Buffer warning: timeout

Once over the threshold, the client will be informed about the current state of the buffer no more than once every second.

If a client does not consume messages in a timely manner, the aschannel will drop connection and purge its buffer of all uncollected task results.

Client too slow

This message is sent as the connection is terminated when consumption of messages is too slow.

If the client has a backlog of tasks and does consume messages readily they run the risk of being repeatedly disconnected, and thus losing many or all task results. Once a client reconnects, they will only be streamed results from tasks that have completed subsequent to the disconnection.