Skip to content

sink

Arrakis sink element.

ArrakisSink dataclass

ArrakisSink(publisher_id=None, block_duration=16 * Freq.Hz)

Bases: TSSink

Sink element that streams channel data to Arrakis.

Sink pads should be named after the channel that they will publish into Arrakis.

Parameters:

Name Type Description Default
publisher_id str

admin-assigned publisher ID

None
block_duration int

the duration (in nanoseconds) of data to publish in a single block. Default is 1/16th of a second.

16 * Hz

buffers_to_masked_array

buffers_to_masked_array(bufs, dtype)

convert list of SeriesBuffer objects into a numpy masked array

Returns a single masked array, with the data from all buffers in the frame, with the data from gap bufers masked out.

Source code in sgn_arrakis/sink.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def buffers_to_masked_array(
    bufs: Iterable[SeriesBuffer],
    dtype: numpy.dtype,
) -> numpy.ma.MaskedArray:
    """convert list of SeriesBuffer objects into a numpy masked array

    Returns a single masked array, with the data from all buffers in
    the frame, with the data from gap bufers masked out.

    """
    return numpy.ma.concatenate(
        [
            numpy.ma.array(
                buf.filleddata(),
                mask=buf.data is None,
                dtype=dtype,
            )
            for buf in bufs
        ]
    )