Source code for ccsdspy.utils
"""Utils for the CCSDSPy package."""
__author__ = "Daniel da Silva <mail@danieldasilva.org>"
from io import BytesIO
import warnings
import numpy as np
[docs]def split_by_apid(mixed_file, valid_apids=None):
"""Split a stream of mixed APIDs into separate streams by APID.
Parameters
----------
mixed_file: str, file-like
Path to file on the local file system, or file-like object
valid_apids: list of int, None
Optional list of valid APIDs. If specified, warning will be issued when
an APID is encountered outside this list.
Returns
-------
stream_by_apid : dict, int to :py:class:`~io.BytesIO`
Dictionary mapping integer apid number to BytesIO instance with the file
pointer at the beginning of the stream.
"""
if hasattr(mixed_file, "read"):
file_bytes = np.frombuffer(mixed_file.read(), "u1")
else:
file_bytes = np.fromfile(mixed_file, "u1")
offset = 0
stream_by_apid = {}
while offset < len(file_bytes):
packet_nbytes = file_bytes[offset + 4] * 256 + file_bytes[offset + 5] + 7
apid = np.array([file_bytes[offset], file_bytes[offset + 1]], dtype=np.uint8)
apid.dtype = ">u2"
apid = apid[0]
apid &= 0x07FF
if valid_apids is not None and apid not in valid_apids:
warnings.warn(f"Found unknown APID {apid}")
if apid not in stream_by_apid:
stream_by_apid[apid] = BytesIO()
stream_by_apid[apid].write(file_bytes[offset : offset + packet_nbytes])
offset += packet_nbytes
if offset != len(file_bytes):
missing_bytes = offset - len(file_bytes)
message = (
f"File appears truncated-- missing {missing_bytes} byte (or "
"maybe garbage at end)"
)
warnings.warn(message)
for stream in stream_by_apid.values():
stream.seek(0)
return stream_by_apid