Source code for ccsdspy.packet_types

"""High-level Object-oriented API for the different types of packets
(FixedLength and VariableLength) supported by the package.
"""

import csv
import os

import numpy as np

from .decode import _decode_fixed_length, _decode_variable_length
from .packet_fields import PacketField, PacketArray


__author__ = "Daniel da Silva <mail@danieldasilva.org>"


class _BasePacket:
    """Base class of FixedLength and VariableLength. Not to be instantiated
    directly.
    """

    def _init(self, fields):
        """
        Parameters
        ----------
        fields : list of `ccsdspy.PacketField`
            Layout of packet fields contained in the definition.
        """
        self._fields = fields[:]

    @classmethod
    def from_file(cls, file):
        """
        Parameters
        ----------
        file : str
           Path to file on the local file system that defines the packet fields.
           Currently only suports csv files.  See :download:`simple_csv_3col.csv <../../ccsdspy/tests/data/packet_def/simple_csv_3col.csv>`
           and :download:`simple_csv_4col.csv <../../ccsdspy/tests/data/packet_def/simple_csv_4col.csv>`

        Returns
        -------
        An instance of FixedLength.
        """
        file_extension = os.path.splitext(file)
        if file_extension[1] == ".csv":
            fields = _get_fields_csv_file(file)
        else:
            raise ValueError(f"File type {file_extension[1]} not supported.")

        return cls(fields)


[docs]class FixedLength(_BasePacket): """Define a fixed length packet to decode binary data. Fixed length packets correspond to packats that are the same length and layout every time. A common example of this is housekeeping or status messages. """ def __init__(self, fields): """ Parameters ---------- fields : list of :py:class:`~ccsdspy.PacketField` or :py:class:`~ccsdspy.PacketArray` Layout of packet fields contained in the definition. Raises ------ ValueError one or more of the arguments are invalid """ if any(field._bit_length == "expand" for field in fields): raise ValueError( "The FixedLength class does not support fields with " "bit_length='expand'. Instead, use the VariableLength " "class." ) self._init(fields)
[docs] def load(self, file, include_primary_header=False): """Decode a file-like object containing a sequence of these packets. Parameters ---------- file : str Path to file on the local file system, or file-like object include_primary_header : bool If True, provides the primary header in the output Returns ------- field_arrays : dict, string to NumPy array dictionary mapping field names to NumPy arrays, with key order matching the order of fields in the packet. """ return _load( file, self._fields, "fixed_length", include_primary_header=include_primary_header, )
[docs]class VariableLength(_BasePacket): """Define a variable length packet to decode binary data. Variable length packets are packets which have a different length each time. Each variable length packet should have a single `PacketArray` with the `array_shape='expand'`, which will grow to fill the packet. Please note that while this class is able to parse fixed length packets, it is much slower. Use the :py:class:`~ccsdspy.FixedLength` class instead. Rules for variable length packets: - Do provide only one one expanding PacketArray with `array_shape='expand'`. - Do not specify the primary header fields manually - Do not specify explicit bit_offsets (they will be computed automatically) """ def __init__(self, fields): """ Parameters ---------- fields : list of :py:class:`~ccsdspy.PacketField` or :py:class:`~ccsdspy.PacketArray` Layout of packet fields contained in the definition. No more than one field should have array_shape="expand". The field must have no bit_offset's. Do not include the primary header fields. Raises ------ ValueError one or more of the arguments are invalid, or do not follow the specified rules. """ expand_arrays = [ field for field in fields if isinstance(field, PacketArray) and field._array_shape == "expand" ] if len(expand_arrays) > 1: raise ValueError( "The VariableLength class does not support more than one field " "with array_shape='expand', as the decoding process becomes " "ambiguous." ) if expand_arrays and expand_arrays[0] is not fields[-1]: raise ValueError( "Expanding array with array_shape='expand' must be the last field" ) if not all(field._bit_offset is None for field in fields): raise ValueError( "The VariableLength class does not support explicit bit " "offsets. You must specify the entire packet so they can be " "determined automatically." ) self._init(fields)
[docs] def load(self, file, include_primary_header=False): """Decode a file-like object containing a sequence of these packets. Parameters ---------- file : str Path to file on the local file system, or file-like object include_primary_header : bool If True, provides the primary header in the output Returns ------- field_arrays : dict, string to NumPy array dictionary mapping field names to NumPy arrays, with key order matching the order of fields in the packet. """ # The variable length decoder requires the full packet definition, so if # they didn't want the primary header fields, we parse for them and then # remove them after. packet_arrays = _load( file, self._fields, "variable_length", include_primary_header=True ) if not include_primary_header: _delete_primary_header_fields(packet_arrays) return packet_arrays
def _delete_primary_header_fields(packet_arrays): """Modifies in place the packet arrays dictionary to delete primary header fields. Parameters ----------- packet_arrays dictionary mapping field names to NumPy arrays, with key order matching the order fields in the packet. Modified in place """ header_fields = _prepend_primary_header_fields([]) for header_field in header_fields: del packet_arrays[header_field._name] def _expand_array_fields(existing_fields): """Expand arrays into multiple fields, one for each element. Returns a new list of fields as well as a data structure which can be used to reverse this process. See the `_unexpand_field_arrays()` function to reverse this process. Parameters ---------- existing_fields : list of `ccsdspy.PacketField` Layout of packet fields contained in the definition, with PacketArray Returns ------- return_fields : list of `ccsdspy.PacketField` Layout of packet fields contained in the definition, without PacketArray's expand_history : dict Dictionary mapping array name with shape/data-type and field expansions """ return_fields = [] expand_history = {} for existing_field in existing_fields: if ( existing_field._field_type != "array" or existing_field._array_shape == "expand" ): return_fields.append(existing_field) continue array_shape = existing_field._array_shape array_order = existing_field._array_order index_vecs = [np.arange(dim) for dim in array_shape] index_grids = np.meshgrid(*index_vecs, indexing="ij") indeces_flat = [ index_grid.flatten(order=array_order) for index_grid in index_grids ] expand_history[existing_field._name] = { "shape": array_shape, "data_type": existing_field._data_type, "fields": {}, } for i, indeces in enumerate(zip(*indeces_flat)): name = f"{existing_field._name}[{','.join(map(str,indeces))}]" if existing_field._bit_offset is None: bit_offset = None else: bit_offset = existing_field._bit_offset + i * existing_field._bit_length return_field = PacketField( name=name, data_type=existing_field._data_type, bit_length=existing_field._bit_length, bit_offset=bit_offset, byte_order=existing_field._byte_order, ) expand_history[existing_field._name]["fields"][name] = indeces return_fields.append(return_field) return return_fields, expand_history def _unexpand_field_arrays(field_arrays, expand_history): """Reverse the array expansion process from `_expand_array_fields`. Parameters ---------- field_arrays : dict, str to numpy array Dictionary mapping field names to NumPy arrays, with key order matching the order fields in the packet. Has a key for each array element. expand_history : dict Dictionary mapping array name with shape/data-type and field expansions Returns ------- return_field_arrays : dict, str to array Dictionary mapping field names to NumPy arrays, with key order matching the order fields in the packet. Has keys mapping to full arrays. """ npackets = list(field_arrays.values())[0].shape[0] return_field_arrays = field_arrays.copy() for array_name, array_details in expand_history.items(): array_shape = (npackets,) + array_details["shape"] array_dtype = field_arrays[list(array_details["fields"].keys())[0]].dtype array = np.zeros(array_shape, dtype=array_dtype) for element_name, indeces in array_details["fields"].items(): array.__setitem__((slice(None),) + indeces, field_arrays[element_name]) del return_field_arrays[element_name] return_field_arrays[array_name] = array return return_field_arrays def _prepend_primary_header_fields(existing_fields): """Helper function that prepends primary header fields to a list of packet fields, to support load(include_primary_header=True) Parameters ---------- existing_fields : list of `ccsdspy.PacketField` Non-primary header fields defined by the packet. Returns ------- New list of fields with the primary header fields prepended. """ return_fields = [ PacketField( name="CCSDS_VERSION_NUMBER", data_type="uint", bit_length=3, bit_offset=0, ), PacketField( name="CCSDS_PACKET_TYPE", data_type="uint", bit_length=1, bit_offset=3, ), PacketField( name="CCSDS_SECONDARY_FLAG", data_type="uint", bit_length=1, bit_offset=4, ), PacketField(name="CCSDS_APID", data_type="uint", bit_length=11, bit_offset=5), PacketField( name="CCSDS_SEQUENCE_FLAG", data_type="uint", bit_length=2, bit_offset=16, ), PacketField( name="CCSDS_SEQUENCE_COUNT", data_type="uint", bit_length=14, bit_offset=18, ), PacketField( name="CCSDS_PACKET_LENGTH", data_type="uint", bit_length=16, bit_offset=32, ), ] return_fields.extend(existing_fields) return return_fields def _get_fields_csv_file(csv_file): """Parse a simple comma-delimited file that defines a packet. Should not include the CCSDS header. The minimum set of columns are (name, data_type, bit_length). An optional bit_offset can also be provided. Parameters ---------- csv_file : str Path to file on the local file system Returns ------- fields : list A list of `PacketField` objects. """ req_columns = ["name", "data_type", "bit_length"] with open(csv_file, "r") as fp: fields = [] reader = csv.DictReader(fp, skipinitialspace=True) headers = reader.fieldnames if headers is None: raise RuntimeError("CSV file must not be empty") if not all((req_col in headers) for req_col in req_columns): raise ValueError(f"Minimum required columns are {req_columns}.") for row in reader: # skip the header row if "bit_offset" not in headers: # 3 col csv file fields.append( PacketField( name=row["name"], data_type=row["data_type"], bit_length=int(row["bit_length"]), ) ) if "bit_offset" in headers: # 4 col csv file provides bit offsets # TODO: Check the consistency of bit_offsets versus previous bit_lengths fields.append( PacketField( name=row["name"], data_type=row["data_type"], bit_length=int(row["bit_length"]), bit_offset=int(row["bit_offset"]), ) ) return fields def _load(file, fields, decoder_name, include_primary_header=False): """Decode a file-like object containing a sequence of these packets. Parameters ---------- file: str Path to file on the local file system, or file-like object fields : list of `ccsdspy.PacketField` Layout of packet fields contained in the definition. decoder_name: {'fixed_length', 'variable_length'} String identifying which decoder to use. include_primary_header: bool If True, provides the primary header in the output Returns ------- dictionary mapping field names to NumPy arrays, with key order matching the order fields in the packet. Raises ------ ValueError the decoder_name is not one of the allowed values """ if hasattr(file, "read"): file_bytes = np.frombuffer(file.read(), "u1") else: file_bytes = np.fromfile(file, "u1") if include_primary_header: fields = _prepend_primary_header_fields(fields) fields, expand_history = _expand_array_fields(fields) if decoder_name == "fixed_length": field_arrays = _decode_fixed_length(file_bytes, fields) elif decoder_name == "variable_length": field_arrays = _decode_variable_length(file_bytes, fields) else: raise ValueError( f"Invalid decoder_name 'f{decoder_name}' specified. Must be " "either 'fixed_length', or 'variable_length'" ) field_arrays = _unexpand_field_arrays(field_arrays, expand_history) return field_arrays