Usage¶
sgn-arrakis provides two SGN pipeline elements for working with Arrakis data streams: a source for reading data, and a sink for writing it.
Both elements integrate with SGN pipelines via the sgn-ts timeseries framework.
ArrakisSource¶
ArrakisSource streams channel data from an
Arrakis server into an SGN pipeline. Source pads are named after the channels
they stream.
Key parameters¶
- source_pad_names -- list of channel names to stream (e.g.,
["L1:GDS-CALIB_STRAIN"]) - start -- GPS start time in seconds, or current time if
None - end or duration -- GPS end time or total duration in seconds; omit both for an endless stream
- in_queue_timeout -- seconds to wait for a block from the server before timing out (default: 60)
Stream from the current time¶
When start is not specified, ArrakisSource begins streaming from the current
GPS time:
from sgn_arrakis import ArrakisSource
from sgn import Pipeline, SignalEOS
from sgnts.sinks import NullSeriesSink
channels = ["L1:GDS-CALIB_STRAIN"]
src = ArrakisSource(
source_pad_names=channels,
)
sink = NullSeriesSink(
sink_pad_names=channels,
verbose=True,
)
pipeline = Pipeline()
pipeline.connect(src, sink)
with SignalEOS():
pipeline.run()
Stream a fixed duration¶
Set start and duration (or end) to stream a bounded segment:
src = ArrakisSource(
source_pad_names=["L1:GDS-CALIB_STRAIN"],
start=1187008882,
duration=64,
)
Gap handling¶
When Arrakis delivers blocks containing null data, ArrakisSource creates gap
buffers with no data payload. Downstream elements can detect these via the
buffer's data_valid flag.
Key parameters¶
- publisher_id -- admin-assigned publisher ID (required)
- sink_pad_names -- list of channel names to publish; must match all channels registered to the publisher
- block_duration -- duration of each published block in nanoseconds (default: 62,500,000 ns = 1/16 s)
Basic usage¶
from sgn_arrakis import ArrakisSink
from sgn import Pipeline, SignalEOS
from sgnts.sources import FakeSeriesSource
channels = ["H1:GDS-CALIB_STRAIN"]
source = FakeSeriesSource(
source_pad_names=channels,
signals={
"H1:GDS-CALIB_STRAIN": {
"signal-type": "const",
"sample-shape": (1,),
"sample-rate": 16384,
"value": 0.0,
},
},
)
sink = ArrakisSink(
publisher_id="my-publisher-id",
sink_pad_names=channels,
)
pipeline = Pipeline()
pipeline.connect(source, sink)
with SignalEOS():
pipeline.run()
Channel validation¶
On startup, ArrakisSink registers with the Arrakis server and verifies that
the sink pad names exactly match the channels assigned to the publisher. A
ValueError is raised if there is a mismatch.
Background publishing¶
ArrakisSink publishes data in a background thread so that the main pipeline
execution is not blocked by network I/O. Data blocks are queued internally and
published asynchronously.
CLI¶
sgn-arrakis includes a command-line tool for testing source and sink functionality.
Stream channels¶
Stream channel data from Arrakis and print to the console:
sgn-arrakis source L1:GDS-CALIB_STRAIN H1:GDS-CALIB_STRAIN
Publish test data¶
Publish constant-value test data to all channels registered to a publisher:
sgn-arrakis sink my-publisher-id
Use --include-gaps to include gap buffers in the published stream:
sgn-arrakis sink --include-gaps my-publisher-id
Options¶
--url/-u-- specify the Arrakis server URL--version-- display the versionLOG_LEVELenvironment variable -- set logging verbosity (default:DEBUG)