jbpy

 1from .core import (
 2    Jbp,
 3    available_des_subheaders,
 4    available_tres,
 5    des_subheader_factory,
 6    tre_factory,
 7)
 8
 9__all__ = [
10    "Jbp",
11    "available_des_subheaders",
12    "available_tres",
13    "des_subheader_factory",
14    "tre_factory",
15]
class Jbp(jbpy.core.Group):
2963class Jbp(Group):
2964    """Class representing an entire NITF/NSIF
2965
2966    Contains the following keys:
2967    * FileHeader
2968    * ImageSegments
2969    * GraphicSegments
2970    * TextSegments
2971    * DataExtensionSegments
2972    * ReservedExtensionSegments
2973    """
2974
2975    def __init__(self):
2976        super().__init__("Root")
2977        self._append(
2978            FileHeader(
2979                "FileHeader",
2980                numi_callback=self._numi_handler,
2981                lin_callback=self._lin_handler,
2982                nums_callback=self._nums_handler,
2983                lsn_callback=self._lsn_handler,
2984                numt_callback=self._numt_handler,
2985                ltn_callback=self._ltn_handler,
2986                numdes_callback=self._numdes_handler,
2987                ldn_callback=self._ldn_handler,
2988                numres_callback=self._numres_handler,
2989                lreshn_callback=self._lreshn_handler,
2990                lren_callback=self._lren_handler,
2991            )
2992        )
2993        self._append(
2994            SegmentList(
2995                "ImageSegments",
2996                ImageSegment,
2997                maximum=999,
2998            )
2999        )
3000        self._append(
3001            SegmentList(
3002                "GraphicSegments",
3003                GraphicSegment,
3004                maximum=999,
3005            )
3006        )
3007        self._append(
3008            SegmentList(
3009                "TextSegments",
3010                TextSegment,
3011                maximum=999,
3012            )
3013        )
3014        self._append(
3015            SegmentList(
3016                "DataExtensionSegments",
3017                DataExtensionSegment,
3018                maximum=999,
3019            )
3020        )
3021        self._append(
3022            SegmentList(
3023                "ReservedExtensionSegments",
3024                ReservedExtensionSegment,
3025                maximum=999,
3026            )
3027        )
3028
3029    def _numi_handler(self, field: Field) -> None:
3030        self["ImageSegments"].set_count(field.value)
3031
3032    def _lin_handler(self, field: Field) -> None:
3033        idx = int(field.name.removeprefix("LI")) - 1
3034        self["ImageSegments"][idx]["Data"].size = field.value
3035
3036    def _nums_handler(self, field: Field) -> None:
3037        self["GraphicSegments"].set_count(field.value)
3038
3039    def _lsn_handler(self, field: Field) -> None:
3040        idx = int(field.name.removeprefix("LS")) - 1
3041        self["GraphicSegments"][idx]["Data"].size = field.value
3042
3043    def _numt_handler(self, field: Field) -> None:
3044        self["TextSegments"].set_count(field.value)
3045
3046    def _ltn_handler(self, field: Field) -> None:
3047        idx = int(field.name.removeprefix("LT")) - 1
3048        self["TextSegments"][idx]["Data"].size = field.value
3049
3050    def _numdes_handler(self, field: Field) -> None:
3051        self["DataExtensionSegments"].set_count(field.value)
3052
3053    def _ldn_handler(self, field: Field) -> None:
3054        idx = int(field.name.removeprefix("LD")) - 1
3055        self["DataExtensionSegments"][idx]["DESDATA"].size = field.value
3056
3057    def _numres_handler(self, field: Field) -> None:
3058        self["ReservedExtensionSegments"].set_count(field.value)
3059
3060    def _lreshn_handler(self, field: Field) -> None:
3061        # this callback should be removed if the Reserved Subheader is implemented
3062        idx = int(field.name.removeprefix("LRESH")) - 1
3063        self["ReservedExtensionSegments"][idx]["subheader"].size = field.value
3064
3065    def _lren_handler(self, field: Field) -> None:
3066        idx = int(field.name.removeprefix("LRE")) - 1
3067        self["ReservedExtensionSegments"][idx]["RESDATA"].size = field.value
3068
3069    def update_lengths(self) -> None:
3070        """Compute and set the segment lengths"""
3071        self["FileHeader"]["FL"]._set_value(self.get_size())
3072        self["FileHeader"]["HL"]._set_value(self["FileHeader"].get_size())
3073
3074        for idx, seg in enumerate(self["ImageSegments"]):
3075            self["FileHeader"][f"LISH{idx + 1:03d}"]._set_value(
3076                seg["subheader"].get_size()
3077            )
3078            self["FileHeader"][f"LI{idx + 1:03d}"]._set_value(seg["Data"].get_size())
3079
3080        for idx, seg in enumerate(self["GraphicSegments"]):
3081            self["FileHeader"][f"LSSH{idx + 1:03d}"]._set_value(
3082                seg["subheader"].get_size()
3083            )
3084            self["FileHeader"][f"LS{idx + 1:03d}"]._set_value(seg["Data"].get_size())
3085
3086        for idx, seg in enumerate(self["TextSegments"]):
3087            self["FileHeader"][f"LTSH{idx + 1:03d}"]._set_value(
3088                seg["subheader"].get_size()
3089            )
3090            self["FileHeader"][f"LT{idx + 1:03d}"]._set_value(seg["Data"].get_size())
3091
3092        for idx, seg in enumerate(self["DataExtensionSegments"]):
3093            self["FileHeader"][f"LDSH{idx + 1:03d}"]._set_value(
3094                seg["subheader"].get_size()
3095            )
3096            self["FileHeader"][f"LD{idx + 1:03d}"]._set_value(seg["DESDATA"].get_size())
3097
3098        for idx, seg in enumerate(self["ReservedExtensionSegments"]):
3099            self["FileHeader"][f"LRESH{idx + 1:03d}"]._set_value(
3100                seg["subheader"].get_size()
3101            )
3102            self["FileHeader"][f"LRE{idx + 1:03d}"]._set_value(
3103                seg["RESDATA"].get_size()
3104            )
3105
3106    def update_fdt(self) -> None:
3107        """Set the FDT field to the current time"""
3108        now = datetime.datetime.now(datetime.timezone.utc)
3109        self["FileHeader"]["FDT"].value = now.strftime("%Y%m%d%H%M%S")
3110
3111    def finalize(self) -> None:
3112        """Compute derived values such as lengths, and CLEVEL"""
3113        super().finalize()
3114        self.update_lengths()
3115        self.update_fdt()
3116        self.update_clevel()  # must be after lengths
3117
3118    def _clevel_ccs_extent(self) -> int:
3119        min_ccs_row = min_ccs_col = 0
3120        max_ccs_row = max_ccs_col = 0
3121
3122        level_origin = {0: {"row": 0, "col": 0}}
3123        for imseg in self["ImageSegments"]:
3124            alvl = imseg["subheader"]["IALVL"].value
3125            dlvl = imseg["subheader"]["IDLVL"].value
3126            iloc_row, iloc_col = imseg["subheader"]["ILOC"].value
3127            nrows = imseg["subheader"]["NROWS"].value
3128            ncols = imseg["subheader"]["NCOLS"].value
3129            level_origin[dlvl] = {
3130                "row": level_origin[alvl]["row"] + iloc_row,
3131                "col": level_origin[alvl]["col"] + iloc_col,
3132            }
3133
3134            min_ccs_row = min(min_ccs_row, level_origin[dlvl]["row"])
3135            min_ccs_col = min(min_ccs_col, level_origin[dlvl]["col"])
3136
3137            max_ccs_row = max(max_ccs_row, level_origin[dlvl]["row"] + nrows)
3138            max_ccs_col = max(max_ccs_col, level_origin[dlvl]["col"] + ncols)
3139
3140        if len(self["GraphicSegments"]):
3141            logger.warning("CLEVEL of JBPs with Graphic Segments is not supported")
3142
3143        max_extent = max(max_ccs_row - min_ccs_row, max_ccs_col - min_ccs_col)
3144        if max_extent <= 2047:
3145            return 3
3146        if max_extent <= 8191:
3147            return 5
3148        if max_extent <= 65535:
3149            return 6
3150        if max_extent <= 99_999_999:
3151            return 7
3152        return 9
3153
3154    def _clevel_file_size(self) -> int:
3155        if self["FileHeader"]["FL"].value < 50 * (1 << 20):
3156            return 3
3157        if self["FileHeader"]["FL"].value < 1 * (1 << 30):
3158            return 5
3159        if self["FileHeader"]["FL"].value < 2 * (1 << 30):
3160            return 6
3161        if self["FileHeader"]["FL"].value < 10 * (1 << 30):
3162            return 7
3163        return 9
3164
3165    def _clevel_image_size(self) -> int:
3166        clevel = 3
3167        for imseg in self["ImageSegments"]:
3168            nrows = imseg["subheader"]["NROWS"].value
3169            ncols = imseg["subheader"]["NCOLS"].value
3170
3171            if nrows <= 2048 and ncols <= 2048:
3172                clevel = max(clevel, 3)
3173            elif nrows <= 8192 and ncols <= 8192:
3174                clevel = max(clevel, 5)
3175            elif nrows <= 65536 and ncols <= 65536:
3176                clevel = max(clevel, 6)
3177            elif nrows <= 99_999_999 and ncols <= 99_999_999:
3178                clevel = max(clevel, 7)
3179        return clevel
3180
3181    def _clevel_image_blocking(self) -> int:
3182        clevel = 3
3183        for imseg in self["ImageSegments"]:
3184            horiz = imseg["subheader"]["NPPBH"].value
3185            vert = imseg["subheader"]["NPPBV"].value
3186
3187            if horiz <= 2048 and vert <= 2048:
3188                clevel = max(clevel, 3)
3189            elif horiz <= 8192 and vert <= 8192:
3190                clevel = max(clevel, 5)
3191        return clevel
3192
3193    def _clevel_irep(self) -> int:
3194        clevel = 0
3195        for imseg in self["ImageSegments"]:
3196            has_lut = bool(imseg["subheader"].find_all("NLUT.*"))
3197            num_bands = (
3198                imseg["subheader"].get("XBANDS", imseg["subheader"]["NBANDS"]).value
3199            )
3200            # Color (RGB) No Compression
3201            if (
3202                imseg["subheader"]["IREP"].value == "RGB"
3203                and num_bands == 3
3204                and not has_lut
3205                and imseg["subheader"]["IC"].value in ("NC", "NM")
3206                and imseg["subheader"]["IMODE"].value in ("B", "P", "R", "S")
3207            ):
3208                if imseg["subheader"]["NBPP"].value == 8:
3209                    clevel = max(clevel, 3)
3210
3211                if imseg["subheader"]["NBPP"].value in (8, 16, 32):
3212                    clevel = max(clevel, 6)
3213
3214            # Multiband (MULTI) No Compression
3215            if (
3216                imseg["subheader"]["IREP"].value == "MULTI"
3217                and imseg["subheader"]["NBPP"].value in (1, 8, 16, 32, 64)
3218                and imseg["subheader"]["IC"].value in ("NC", "NM")
3219                and imseg["subheader"]["IMODE"].value in ("B", "P", "R", "S")
3220            ):
3221                if 2 <= num_bands <= 9:
3222                    clevel = max(clevel, 3)
3223
3224                if 10 <= num_bands <= 255:
3225                    clevel = max(clevel, 5)
3226
3227                if 255 <= num_bands <= 999:
3228                    clevel = max(clevel, 7)
3229
3230            # JPEG2000 Compression Multiband (MULTI)
3231            if (
3232                imseg["subheader"]["IREP"].value == "MULTI"
3233                and imseg["subheader"]["NBPP"].value <= 32
3234                and imseg["subheader"]["IC"].value in ("C8", "M8")
3235                and imseg["subheader"]["IMODE"].value == "B"
3236            ):
3237                if 1 <= num_bands <= 9:
3238                    clevel = max(clevel, 3)
3239
3240                if 10 <= num_bands <= 255:
3241                    clevel = max(clevel, 5)
3242
3243                if 256 <= num_bands <= 999:
3244                    clevel = max(clevel, 7)
3245
3246            # Multiband (MULTI) Individual Band JPEG Compression
3247            if (
3248                imseg["subheader"]["IREP"].value == "MULTI"
3249                and imseg["subheader"]["NBPP"].value in (8, 12)
3250                and not has_lut
3251                and imseg["subheader"]["IC"].value in ("C3", "M3")
3252                and imseg["subheader"]["IMODE"].value in ("B", "S")
3253            ):
3254                if 2 <= num_bands <= 9:
3255                    clevel = max(clevel, 3)
3256
3257                if 10 <= num_bands <= 255:
3258                    clevel = max(clevel, 5)
3259
3260                if 256 <= num_bands <= 999:
3261                    clevel = max(clevel, 7)
3262
3263            # Multiband (MULTI) Multi-Component Compression
3264            if (
3265                imseg["subheader"]["IREP"].value == "MULTI"
3266                and imseg["subheader"]["NBPP"].value in (8, 12)
3267                and not has_lut
3268                and imseg["subheader"]["IC"].value in ("C6", "M6")
3269                and imseg["subheader"]["IMODE"].value in ("B", "P", "S")
3270            ):
3271                if 2 <= num_bands <= 9:
3272                    clevel = max(clevel, 3)
3273
3274                if 10 <= num_bands <= 255:
3275                    clevel = max(clevel, 5)
3276
3277                if 256 <= num_bands <= 999:
3278                    clevel = max(clevel, 7)
3279
3280            # Matrix Data (NODISPLY)
3281            if (
3282                imseg["subheader"]["IREP"].value == "NODISPLY"
3283                and imseg["subheader"]["NBPP"].value in (8, 16, 32, 64)
3284                and not has_lut
3285                and imseg["subheader"]["IMODE"].value in ("B", "P", "R", "S")
3286            ):
3287                if 2 <= num_bands <= 9:
3288                    clevel = max(clevel, 3)
3289
3290                if 10 <= num_bands <= 255:
3291                    clevel = max(clevel, 5)
3292
3293                if 256 <= num_bands <= 999:
3294                    clevel = max(clevel, 7)
3295
3296        return clevel
3297
3298    def _clevel_num_imseg(self) -> int:
3299        if len(self["ImageSegments"]) <= 20:
3300            return 3
3301        if 20 < len(self["ImageSegments"]) <= 100:
3302            return 5
3303        return 9
3304
3305    def _clevel_aggregate_size_of_graphic_segments(self) -> int:
3306        size = 0
3307        for field in self["FileHeader"].find_all("LS\\d+"):
3308            size += field.value
3309
3310        if size <= 1 * (1 << 20):
3311            return 3
3312        if size <= 2 * (1 << 20):
3313            return 5
3314        return 9
3315
3316    def _clevel_cl9(self) -> int:
3317        """Explicit CLEVEL 9 checks"""
3318        # 1
3319        if self["FileHeader"]["FL"].value >= 10 * (1 << 30):
3320            return 9
3321
3322        total_num_bands = 0
3323        for imseg in self["ImageSegments"]:
3324            # 2
3325            if (
3326                imseg["subheader"]["NPPBH"].value == 0
3327                or imseg["subheader"]["NPPBV"].value == 0
3328            ):
3329                return 9
3330            total_num_bands += imseg.get("XBANDS", imseg["subheader"]["NBANDS"]).value
3331
3332        # 3
3333        if total_num_bands > 999:
3334            return 9
3335
3336        # 4
3337        if len(self["ImageSegments"]) > 100:
3338            return 9
3339
3340        # 5
3341        if len(self["GraphicSegments"]) > 100:
3342            return 9
3343
3344        # 6
3345        size = 0
3346        for field in self["FileHeader"].find_all("LS\\d+"):
3347            size += field.value
3348        if size > 2 * (1 << 20):
3349            return 9
3350
3351        # 7
3352        if len(self["TextSegments"]) > 32:
3353            return 9
3354
3355        # 8
3356        if len(self["DataExtensionSegments"]) > 100:
3357            return 9
3358
3359        return 0
3360
3361    def update_clevel(self) -> None:
3362        """Compute and update the CLEVEL field.  See JBP-2025.1 Table G-1"""
3363        clevel = 3
3364        helpers = [attrib for attrib in dir(self) if attrib.startswith("_clevel_")]
3365        for helper in helpers:
3366            clevel = max(clevel, getattr(self, helper)())
3367
3368        self["FileHeader"]["CLEVEL"].value = clevel

Class representing an entire NITF/NSIF

Contains the following keys:

  • FileHeader
  • ImageSegments
  • GraphicSegments
  • TextSegments
  • DataExtensionSegments
  • ReservedExtensionSegments
def update_lengths(self) -> None:
3069    def update_lengths(self) -> None:
3070        """Compute and set the segment lengths"""
3071        self["FileHeader"]["FL"]._set_value(self.get_size())
3072        self["FileHeader"]["HL"]._set_value(self["FileHeader"].get_size())
3073
3074        for idx, seg in enumerate(self["ImageSegments"]):
3075            self["FileHeader"][f"LISH{idx + 1:03d}"]._set_value(
3076                seg["subheader"].get_size()
3077            )
3078            self["FileHeader"][f"LI{idx + 1:03d}"]._set_value(seg["Data"].get_size())
3079
3080        for idx, seg in enumerate(self["GraphicSegments"]):
3081            self["FileHeader"][f"LSSH{idx + 1:03d}"]._set_value(
3082                seg["subheader"].get_size()
3083            )
3084            self["FileHeader"][f"LS{idx + 1:03d}"]._set_value(seg["Data"].get_size())
3085
3086        for idx, seg in enumerate(self["TextSegments"]):
3087            self["FileHeader"][f"LTSH{idx + 1:03d}"]._set_value(
3088                seg["subheader"].get_size()
3089            )
3090            self["FileHeader"][f"LT{idx + 1:03d}"]._set_value(seg["Data"].get_size())
3091
3092        for idx, seg in enumerate(self["DataExtensionSegments"]):
3093            self["FileHeader"][f"LDSH{idx + 1:03d}"]._set_value(
3094                seg["subheader"].get_size()
3095            )
3096            self["FileHeader"][f"LD{idx + 1:03d}"]._set_value(seg["DESDATA"].get_size())
3097
3098        for idx, seg in enumerate(self["ReservedExtensionSegments"]):
3099            self["FileHeader"][f"LRESH{idx + 1:03d}"]._set_value(
3100                seg["subheader"].get_size()
3101            )
3102            self["FileHeader"][f"LRE{idx + 1:03d}"]._set_value(
3103                seg["RESDATA"].get_size()
3104            )

Compute and set the segment lengths

def update_fdt(self) -> None:
3106    def update_fdt(self) -> None:
3107        """Set the FDT field to the current time"""
3108        now = datetime.datetime.now(datetime.timezone.utc)
3109        self["FileHeader"]["FDT"].value = now.strftime("%Y%m%d%H%M%S")

Set the FDT field to the current time

def finalize(self) -> None:
3111    def finalize(self) -> None:
3112        """Compute derived values such as lengths, and CLEVEL"""
3113        super().finalize()
3114        self.update_lengths()
3115        self.update_fdt()
3116        self.update_clevel()  # must be after lengths

Compute derived values such as lengths, and CLEVEL

def update_clevel(self) -> None:
3361    def update_clevel(self) -> None:
3362        """Compute and update the CLEVEL field.  See JBP-2025.1 Table G-1"""
3363        clevel = 3
3364        helpers = [attrib for attrib in dir(self) if attrib.startswith("_clevel_")]
3365        for helper in helpers:
3366            clevel = max(clevel, getattr(self, helper)())
3367
3368        self["FileHeader"]["CLEVEL"].value = clevel

Compute and update the CLEVEL field. See JBP-2025.1 Table G-1

def available_des_subheaders() -> dict[tuple[str, int], Callable[[str], jbpy.core.DataExtensionSubheader]]:
2864def available_des_subheaders() -> DesSubheaderDefs:
2865    """All discovered and available Data Extension Segment (DES) subheaders
2866
2867    Returns
2868    -------
2869    dict of {(str, int) : callable}
2870        Mapping of (desid, desver) pairs to a function that accepts a string-valued name and
2871        instantiates the appropriate DES subheader
2872    """
2873    d: DesSubheaderDefs = {}
2874    for plugin in importlib.metadata.entry_points(
2875        group="jbpy.extensions.des_subheader"
2876    ):
2877        try:
2878            assert len(plugin.name) == 27
2879            desid = plugin.name[:25].rstrip()
2880            desver = int(plugin.name[-2:])
2881            d[(desid, desver)] = plugin.load()
2882        except (AssertionError, ValueError):
2883            logger.warning(f"Skipping {plugin=}; unable to parse")
2884    return d

All discovered and available Data Extension Segment (DES) subheaders

Returns
  • dict of {(str, int) (callable}): Mapping of (desid, desver) pairs to a function that accepts a string-valued name and instantiates the appropriate DES subheader
def available_tres() -> dict[str, Callable[[], jbpy.core.Tre]]:
3523def available_tres() -> dict[str, Callable[[], Tre]]:
3524    """All discovered and available Tagged Record Extensions (TREs)
3525
3526    Returns
3527    -------
3528    dict of {str : callable}
3529        Mapping of TRETAG name to a function with no required arguments that
3530        instantiates the appropriate TRE
3531    """
3532    d = {}
3533    for plugin in importlib.metadata.entry_points(group="jbpy.extensions.tre"):
3534        try:
3535            assert len(plugin.name) == 6
3536            tretag = plugin.name.rstrip()
3537            d[tretag] = plugin.load()
3538        except AssertionError:
3539            logger.warning(f"Skipping {plugin=}; unable to parse")
3540    return d

All discovered and available Tagged Record Extensions (TREs)

Returns
  • dict of {str (callable}): Mapping of TRETAG name to a function with no required arguments that instantiates the appropriate TRE
def des_subheader_factory( desid: str, desver: int, name: str = 'subheader') -> jbpy.core.DataExtensionSubheader:
2887def des_subheader_factory(
2888    desid: str, desver: int, name: str = "subheader"
2889) -> DataExtensionSubheader:
2890    """Create a Data Extension Segment (DES) subheader
2891
2892    Parameters
2893    ----------
2894    desid : str
2895        Unique DES type identifier
2896    desver : int
2897        Version of the data definition
2898    name : str, optional
2899        Name to give component
2900
2901    Returns
2902    -------
2903    DataExtensionSubheader
2904        If the DES data definition is available, an object of the appropriate DataExtensionSubheader subclass.
2905        Otherwise, a DataExtensionSubheader object with generic DES subheader.
2906    """
2907    des_subheaders = available_des_subheaders()
2908    subheader = des_subheaders.get((desid, desver), DataExtensionSubheader)(name)
2909    subheader["DESID"].value = desid
2910    subheader["DESVER"].value = desver
2911    return subheader

Create a Data Extension Segment (DES) subheader

Parameters
  • desid (str): Unique DES type identifier
  • desver (int): Version of the data definition
  • name (str, optional): Name to give component
Returns
  • DataExtensionSubheader: If the DES data definition is available, an object of the appropriate DataExtensionSubheader subclass. Otherwise, a DataExtensionSubheader object with generic DES subheader.
def tre_factory(tretag: str) -> jbpy.core.Tre:
3543def tre_factory(tretag: str) -> Tre:
3544    """Create a TRE instance
3545
3546    Parameters
3547    ----------
3548    tretag : str
3549        The 1-6 character name of the TRE
3550
3551    Returns
3552    -------
3553    Tre
3554        TRE object
3555    """
3556    tres = available_tres()
3557    if tretag in tres:
3558        return tres[tretag]()
3559
3560    return UnknownTre(tretag)

Create a TRE instance

Parameters
  • tretag (str): The 1-6 character name of the TRE
Returns
  • Tre: TRE object