aboutsummaryrefslogtreecommitdiffstats
path: root/env/lib/python3.10/site-packages/pikepdf/models
diff options
context:
space:
mode:
Diffstat (limited to 'env/lib/python3.10/site-packages/pikepdf/models')
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/__init__.py25
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/__pycache__/__init__.cpython-310.pycbin0 -> 918 bytes
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_content_stream.cpython-310.pycbin0 -> 4907 bytes
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_transcoding.cpython-310.pycbin0 -> 8037 bytes
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/__pycache__/encryption.cpython-310.pycbin0 -> 5098 bytes
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/__pycache__/image.cpython-310.pycbin0 -> 32029 bytes
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/__pycache__/matrix.cpython-310.pycbin0 -> 5540 bytes
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/__pycache__/metadata.cpython-310.pycbin0 -> 26845 bytes
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/__pycache__/outlines.cpython-310.pycbin0 -> 12353 bytes
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/_content_stream.py136
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/_transcoding.py243
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/encryption.py176
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/image.py991
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/matrix.py145
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/metadata.py866
-rw-r--r--env/lib/python3.10/site-packages/pikepdf/models/outlines.py421
16 files changed, 3003 insertions, 0 deletions
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__init__.py b/env/lib/python3.10/site-packages/pikepdf/models/__init__.py
new file mode 100644
index 0000000..e2e73ba
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/__init__.py
@@ -0,0 +1,25 @@
+# SPDX-FileCopyrightText: 2022 James R. Barlow
+# SPDX-License-Identifier: MPL-2.0
+
+"""Python implementation of higher level PDF constructs."""
+
+from __future__ import annotations
+
+from ._content_stream import (
+ ContentStreamInstructions,
+ PdfParsingError,
+ UnparseableContentStreamInstructions,
+ parse_content_stream,
+ unparse_content_stream,
+)
+from .encryption import Encryption, EncryptionInfo, Permissions
+from .image import PdfImage, PdfInlineImage, UnsupportedImageTypeError
+from .matrix import PdfMatrix
+from .metadata import PdfMetadata
+from .outlines import (
+ Outline,
+ OutlineItem,
+ OutlineStructureError,
+ PageLocation,
+ make_page_destination,
+)
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/__init__.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/__init__.cpython-310.pyc
new file mode 100644
index 0000000..b07eefb
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/__init__.cpython-310.pyc
Binary files differ
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_content_stream.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_content_stream.cpython-310.pyc
new file mode 100644
index 0000000..6e1c1c9
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_content_stream.cpython-310.pyc
Binary files differ
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_transcoding.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_transcoding.cpython-310.pyc
new file mode 100644
index 0000000..f9ad743
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_transcoding.cpython-310.pyc
Binary files differ
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/encryption.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/encryption.cpython-310.pyc
new file mode 100644
index 0000000..32e8098
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/encryption.cpython-310.pyc
Binary files differ
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/image.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/image.cpython-310.pyc
new file mode 100644
index 0000000..0de94e9
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/image.cpython-310.pyc
Binary files differ
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/matrix.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/matrix.cpython-310.pyc
new file mode 100644
index 0000000..ee96c86
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/matrix.cpython-310.pyc
Binary files differ
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/metadata.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/metadata.cpython-310.pyc
new file mode 100644
index 0000000..4b97e11
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/metadata.cpython-310.pyc
Binary files differ
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/outlines.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/outlines.cpython-310.pyc
new file mode 100644
index 0000000..18dbd1d
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/outlines.cpython-310.pyc
Binary files differ
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/_content_stream.py b/env/lib/python3.10/site-packages/pikepdf/models/_content_stream.py
new file mode 100644
index 0000000..8976c4c
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/_content_stream.py
@@ -0,0 +1,136 @@
+# SPDX-FileCopyrightText: 2022 James R. Barlow
+# SPDX-License-Identifier: MPL-2.0
+
+"""Content stream parsing."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Collection, List, Tuple, Union, cast
+
+from pikepdf import Object, ObjectType, Operator, Page, PdfError, _qpdf
+
+if TYPE_CHECKING:
+ from pikepdf.models.image import PdfInlineImage
+
+# Operands, Operator
+_OldContentStreamOperands = Collection[Union[Object, 'PdfInlineImage']]
+_OldContentStreamInstructions = Tuple[_OldContentStreamOperands, Operator]
+
+ContentStreamInstructions = Union[
+ _qpdf.ContentStreamInstruction, _qpdf.ContentStreamInlineImage
+]
+
+UnparseableContentStreamInstructions = Union[
+ ContentStreamInstructions, _OldContentStreamInstructions
+]
+
+
+class PdfParsingError(Exception):
+ """Error when parsing a PDF content stream."""
+
+ def __init__(self, message=None, line=None):
+ if not message:
+ message = f"Error encoding content stream at line {line}"
+ super().__init__(message)
+ self.line = line
+
+
+def parse_content_stream(
+ page_or_stream: Object | Page, operators: str = ''
+) -> list[ContentStreamInstructions]:
+ """Parse a PDF content stream into a sequence of instructions.
+
+ A PDF content stream is list of instructions that describe where to render
+ the text and graphics in a PDF. This is the starting point for analyzing
+ PDFs.
+
+ If the input is a page and page.Contents is an array, then the content
+ stream is automatically treated as one coalesced stream.
+
+ Each instruction contains at least one operator and zero or more operands.
+
+ This function does not have anything to do with opening a PDF file itself or
+ processing data from a whole PDF. It is for processing a specific object inside
+ a PDF that is already opened.
+
+ Args:
+ page_or_stream: A page object, or the content
+ stream attached to another object such as a Form XObject.
+ operators: A space-separated string of operators to whitelist.
+ For example 'q Q cm Do' will return only operators
+ that pertain to drawing images. Use 'BI ID EI' for inline images.
+ All other operators and associated tokens are ignored. If blank,
+ all tokens are accepted.
+
+ Example:
+ >>> with pikepdf.Pdf.open(input_pdf) as pdf:
+ >>> page = pdf.pages[0]
+ >>> for operands, command in parse_content_stream(page):
+ >>> print(command)
+
+ .. versionchanged:: 3.0
+ Returns a list of ``ContentStreamInstructions`` instead of a list
+ of (operand, operator) tuples. The returned items are duck-type compatible
+ with the previous returned items.
+ """
+ if not isinstance(page_or_stream, (Object, Page)):
+ raise TypeError("stream must be a pikepdf.Object or pikepdf.Page")
+
+ if (
+ isinstance(page_or_stream, Object)
+ and page_or_stream._type_code != ObjectType.stream
+ and page_or_stream.get('/Type') != '/Page'
+ ):
+ raise TypeError("parse_content_stream called on page or stream object")
+
+ if isinstance(page_or_stream, Page):
+ page_or_stream = page_or_stream.obj
+
+ try:
+ if page_or_stream.get('/Type') == '/Page':
+ page = page_or_stream
+ instructions = cast(
+ List[ContentStreamInstructions],
+ page._parse_page_contents_grouped(operators),
+ )
+ else:
+ stream = page_or_stream
+ instructions = cast(
+ List[ContentStreamInstructions],
+ Object._parse_stream_grouped(stream, operators),
+ )
+ except PdfError as e:
+ if 'supposed to be a stream or an array' in str(e):
+ raise TypeError("parse_content_stream called on non-stream Object") from e
+ raise e from e
+
+ return instructions
+
+
+def unparse_content_stream(
+ instructions: Collection[UnparseableContentStreamInstructions],
+) -> bytes:
+ """Convert collection of instructions to bytes suitable for storing in PDF.
+
+ Given a parsed list of instructions/operand-operators, convert to bytes suitable
+ for embedding in a PDF. In PDF the operator always follows the operands.
+
+ Args:
+ instructions: collection of instructions such as is returned
+ by :func:`parse_content_stream()`
+
+ Returns:
+ A binary content stream, suitable for attaching to a Pdf.
+ To attach to a Pdf, use :meth:`Pdf.make_stream()``.
+
+ .. versionchanged:: 3.0
+ Now accept collections that contain any mixture of
+ ``ContentStreamInstruction``, ``ContentStreamInlineImage``, and the older
+ operand-operator tuples from pikepdf 2.x.
+ """
+ try:
+ return _qpdf._unparse_content_stream(instructions)
+ except (ValueError, TypeError, RuntimeError) as e:
+ raise PdfParsingError(
+ "While unparsing a content stream, an error occurred"
+ ) from e
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/_transcoding.py b/env/lib/python3.10/site-packages/pikepdf/models/_transcoding.py
new file mode 100644
index 0000000..e54facf
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/_transcoding.py
@@ -0,0 +1,243 @@
+# SPDX-FileCopyrightText: 2022 James R. Barlow
+# SPDX-License-Identifier: MPL-2.0
+
+from __future__ import annotations
+
+import struct
+from typing import Any, Callable, NamedTuple, Union
+
+from PIL import Image
+from PIL.TiffTags import TAGS_V2 as TIFF_TAGS
+
+BytesLike = Union[bytes, memoryview]
+MutableBytesLike = Union[bytearray, memoryview]
+
+
+def _next_multiple(n: int, k: int) -> int:
+ """Return the multiple of k that is greater than or equal n.
+
+ >>> _next_multiple(101, 4)
+ 104
+ >>> _next_multiple(100, 4)
+ 100
+ """
+ div, mod = divmod(n, k)
+ if mod > 0:
+ div += 1
+ return div * k
+
+
+def unpack_subbyte_pixels(
+ packed: BytesLike, size: tuple[int, int], bits: int, scale: int = 0
+) -> tuple[BytesLike, int]:
+ """Unpack subbyte *bits* pixels into full bytes and rescale.
+
+ When scale is 0, the appropriate scale is calculated.
+ e.g. for 2-bit, the scale is adjusted so that
+ 0b00 = 0.00 = 0x00
+ 0b01 = 0.33 = 0x55
+ 0b10 = 0.66 = 0xaa
+ 0b11 = 1.00 = 0xff
+ When scale is 1, no scaling is applied, appropriate when
+ the bytes are palette indexes.
+ """
+ width, height = size
+ bits_per_byte = 8 // bits
+ stride = _next_multiple(width, bits_per_byte)
+ buffer = bytearray(bits_per_byte * stride * height)
+ max_read = len(buffer) // bits_per_byte
+ if scale == 0:
+ scale = 255 / ((2**bits) - 1)
+ if bits == 4:
+ _4bit_inner_loop(packed[:max_read], buffer, scale)
+ elif bits == 2:
+ _2bit_inner_loop(packed[:max_read], buffer, scale)
+ # elif bits == 1:
+ # _1bit_inner_loop(packed[:max_read], buffer, scale)
+ else:
+ raise NotImplementedError(bits)
+ return memoryview(buffer), stride
+
+
+# def _1bit_inner_loop(in_: BytesLike, out: MutableBytesLike, scale: int) -> None:
+# """Unpack 1-bit values to their 8-bit equivalents.
+
+# Thus *out* must be 8x at long as *in*.
+# """
+# for n, val in enumerate(in_):
+# out[8 * n + 0] = int((val >> 7) & 0b1) * scale
+# out[8 * n + 1] = int((val >> 6) & 0b1) * scale
+# out[8 * n + 2] = int((val >> 5) & 0b1) * scale
+# out[8 * n + 3] = int((val >> 4) & 0b1) * scale
+# out[8 * n + 4] = int((val >> 3) & 0b1) * scale
+# out[8 * n + 5] = int((val >> 2) & 0b1) * scale
+# out[8 * n + 6] = int((val >> 1) & 0b1) * scale
+# out[8 * n + 7] = int((val >> 0) & 0b1) * scale
+
+
+def _2bit_inner_loop(in_: BytesLike, out: MutableBytesLike, scale: int) -> None:
+ """Unpack 2-bit values to their 8-bit equivalents.
+
+ Thus *out* must be 4x at long as *in*.
+ """
+ for n, val in enumerate(in_):
+ out[4 * n] = int((val >> 6) * scale)
+ out[4 * n + 1] = int(((val >> 4) & 0b11) * scale)
+ out[4 * n + 2] = int(((val >> 2) & 0b11) * scale)
+ out[4 * n + 3] = int((val & 0b11) * scale)
+
+
+def _4bit_inner_loop(in_: BytesLike, out: MutableBytesLike, scale: int) -> None:
+ """Unpack 4-bit values to their 8-bit equivalents.
+
+ Thus *out* must be 2x at long as *in*.
+ """
+ for n, val in enumerate(in_):
+ out[2 * n] = int((val >> 4) * scale)
+ out[2 * n + 1] = int((val & 0b1111) * scale)
+
+
+def image_from_byte_buffer(buffer: BytesLike, size: tuple[int, int], stride: int):
+ """Use Pillow to create one-component image from a byte buffer.
+
+ *stride* is the number of bytes per row, and is essential for packed bits
+ with odd image widths.
+ """
+ ystep = 1 # image is top to bottom in memory
+ return Image.frombuffer('L', size, buffer, "raw", 'L', stride, ystep)
+
+
+def _make_rgb_palette(gray_palette: bytes) -> bytes:
+ palette = b''
+ for entry in gray_palette:
+ palette += bytes([entry]) * 3
+ return palette
+
+
+def _depalettize_cmyk(buffer: BytesLike, palette: BytesLike):
+ with memoryview(buffer) as mv:
+ output = bytearray(4 * len(mv))
+ for n, pal_idx in enumerate(mv):
+ output[4 * n : 4 * (n + 1)] = palette[4 * pal_idx : 4 * (pal_idx + 1)]
+ return output
+
+
+def image_from_buffer_and_palette(
+ buffer: BytesLike,
+ size: tuple[int, int],
+ stride: int,
+ base_mode: str,
+ palette: BytesLike,
+) -> Image.Image:
+ """Construct an image from a byte buffer and apply the palette.
+
+ 1/2/4-bit images must be unpacked (no scaling!) to byte buffers first, such
+ that every 8-bit integer is an index into the palette.
+ """
+ # Reminder Pillow palette byte order unintentionally changed in 8.3.0
+ # https://github.com/python-pillow/Pillow/issues/5595
+ # 8.2.0: all aligned by channel (very nonstandard)
+ # 8.3.0: all channels for one color followed by the next color (e.g. RGBRGBRGB)
+
+ if base_mode == 'RGB':
+ im = image_from_byte_buffer(buffer, size, stride)
+ im.putpalette(palette, rawmode=base_mode)
+ elif base_mode == 'L':
+ # Pillow does not fully support palettes with rawmode='L'.
+ # Convert to RGB palette.
+ gray_palette = _make_rgb_palette(palette)
+ im = image_from_byte_buffer(buffer, size, stride)
+ im.putpalette(gray_palette, rawmode='RGB')
+ elif base_mode == 'CMYK':
+ # Pillow does not support CMYK with palettes; convert manually
+ output = _depalettize_cmyk(buffer, palette)
+ im = Image.frombuffer('CMYK', size, data=output, decoder_name='raw')
+ else:
+ raise NotImplementedError(f'palette with {base_mode}')
+ return im
+
+
+def fix_1bit_palette_image(
+ im: Image.Image, base_mode: str, palette: BytesLike
+) -> Image.Image:
+ """Apply palettes to 1-bit images."""
+ im = im.convert('P')
+ if base_mode == 'RGB' and len(palette) == 6:
+ # rgbrgb -> rgb000000...rgb
+ palette = palette[0:3] + (b'\x00\x00\x00' * (256 - 2)) + palette[3:6]
+ im.putpalette(palette, rawmode='RGB')
+ elif base_mode == 'L':
+ try:
+ im.putpalette(palette, rawmode='L')
+ except ValueError as e:
+ if 'unrecognized raw mode' in str(e):
+ rgb_palette = _make_rgb_palette(palette)
+ im.putpalette(rgb_palette, rawmode='RGB')
+ return im
+
+
+def generate_ccitt_header(
+ size: tuple[int, int],
+ data_length: int,
+ ccitt_group: int,
+ photometry: int,
+ icc: bytes,
+) -> bytes:
+ """Generate binary CCITT header for image with given parameters."""
+ tiff_header_struct = '<' + '2s' + 'H' + 'L' + 'H'
+
+ tag_keys = {tag.name: key for key, tag in TIFF_TAGS.items()} # type: ignore
+ ifd_struct = '<HHLL'
+
+ class IFD(NamedTuple):
+ key: int
+ typecode: Any
+ count_: int
+ data: int | Callable[[], int | None]
+
+ ifds: list[IFD] = []
+
+ def header_length(ifd_count) -> int:
+ return (
+ struct.calcsize(tiff_header_struct)
+ + struct.calcsize(ifd_struct) * ifd_count
+ + 4
+ )
+
+ def add_ifd(tag_name: str, data: int | Callable[[], int | None], count: int = 1):
+ key = tag_keys[tag_name]
+ typecode = TIFF_TAGS[key].type # type: ignore
+ ifds.append(IFD(key, typecode, count, data))
+
+ image_offset = None
+ width, height = size
+ add_ifd('ImageWidth', width)
+ add_ifd('ImageLength', height)
+ add_ifd('BitsPerSample', 1)
+ add_ifd('Compression', ccitt_group)
+ add_ifd('PhotometricInterpretation', int(photometry))
+ add_ifd('StripOffsets', lambda: image_offset)
+ add_ifd('RowsPerStrip', height)
+ add_ifd('StripByteCounts', data_length)
+
+ icc_offset = 0
+ if icc:
+ add_ifd('ICCProfile', lambda: icc_offset, count=len(icc))
+
+ icc_offset = header_length(len(ifds))
+ image_offset = icc_offset + len(icc)
+
+ ifd_args = [(arg() if callable(arg) else arg) for ifd in ifds for arg in ifd]
+ tiff_header = struct.pack(
+ (tiff_header_struct + ifd_struct[1:] * len(ifds) + 'L'),
+ b'II', # Byte order indication: Little endian
+ 42, # Version number (always 42)
+ 8, # Offset to first IFD
+ len(ifds), # Number of tags in IFD
+ *ifd_args,
+ 0, # Last IFD
+ )
+
+ if icc:
+ tiff_header += icc
+ return tiff_header
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/encryption.py b/env/lib/python3.10/site-packages/pikepdf/models/encryption.py
new file mode 100644
index 0000000..d6b5036
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/encryption.py
@@ -0,0 +1,176 @@
+# SPDX-FileCopyrightText: 2022 James R. Barlow
+# SPDX-License-Identifier: MPL-2.0
+
+"""For managing PDF encryption."""
+
+from __future__ import annotations
+
+import sys
+from typing import TYPE_CHECKING, Any, NamedTuple, cast
+
+if sys.version_info >= (3, 8):
+ from typing import Literal
+else:
+ from typing_extensions import Literal # pragma: no cover
+
+if TYPE_CHECKING:
+ from pikepdf._qpdf import EncryptionMethod
+
+
+class Permissions(NamedTuple):
+ """
+ Stores the user-level permissions for an encrypted PDF.
+
+ A compliant PDF reader/writer should enforce these restrictions on people
+ who have the user password and not the owner password. In practice, either
+ password is sufficient to decrypt all document contents. A person who has
+ the owner password should be allowed to modify the document in any way.
+ pikepdf does not enforce the restrictions in any way; it is up to application
+ developers to enforce them as they see fit.
+
+ Unencrypted PDFs implicitly have all permissions allowed. Permissions can
+ only be changed when a PDF is saved.
+ """
+
+ accessibility: bool = True
+ """Can users use screen readers and accessibility tools to read the PDF?"""
+
+ extract: bool = True
+ """Can users extract contents?"""
+
+ modify_annotation: bool = True
+ """Can users modify annotations?"""
+
+ modify_assembly: bool = False
+ """Can users arrange document contents?"""
+
+ modify_form: bool = True
+ """Can users fill out forms?"""
+
+ modify_other: bool = True
+ """Can users modify the document?"""
+
+ print_lowres: bool = True
+ """Can users print the document at low resolution?"""
+
+ print_highres: bool = True
+ """Can users print the document at high resolution?"""
+
+
+DEFAULT_PERMISSIONS = Permissions()
+
+
+class EncryptionInfo:
+ """
+ Reports encryption information for an encrypted PDF.
+
+ This information may not be changed, except when a PDF is saved.
+ This object is not used to specify the encryption settings to save
+ a PDF, due to non-overlapping information requirements.
+ """
+
+ def __init__(self, encdict: dict[str, Any]):
+ """
+ Initialize EncryptionInfo.
+
+ Generally pikepdf will initialize and return it.
+
+ Args:
+ encdict: Python dictionary containing encryption settings.
+ """
+ self._encdict = encdict
+
+ @property
+ def R(self) -> int:
+ """Revision number of the security handler."""
+ return int(self._encdict['R'])
+
+ @property
+ def V(self) -> int:
+ """Version of PDF password algorithm."""
+ return int(self._encdict['V'])
+
+ @property
+ def P(self) -> int:
+ """Return encoded permission bits.
+
+ See :meth:`Pdf.allow` instead.
+ """
+ return int(self._encdict['P'])
+
+ @property
+ def stream_method(self) -> EncryptionMethod:
+ """Encryption method used to encode streams."""
+ return cast('EncryptionMethod', self._encdict['stream'])
+
+ @property
+ def string_method(self) -> EncryptionMethod:
+ """Encryption method used to encode strings."""
+ return cast('EncryptionMethod', self._encdict['string'])
+
+ @property
+ def file_method(self) -> EncryptionMethod:
+ """Encryption method used to encode the whole file."""
+ return cast('EncryptionMethod', self._encdict['file'])
+
+ @property
+ def user_password(self) -> bytes:
+ """If possible, return the user password.
+
+ The user password can only be retrieved when a PDF is opened
+ with the owner password and when older versions of the
+ encryption algorithm are used.
+
+ The password is always returned as ``bytes`` even if it has
+ a clear Unicode representation.
+ """
+ return bytes(self._encdict['user_passwd'])
+
+ @property
+ def encryption_key(self) -> bytes:
+ """Return the RC4 or AES encryption key used for this file."""
+ return bytes(self._encdict['encryption_key'])
+
+ @property
+ def bits(self) -> int:
+ """Return the number of bits in the encryption algorithm.
+
+ e.g. if the algorithm is AES-256, this returns 256.
+ """
+ return len(self._encdict['encryption_key']) * 8
+
+
+class Encryption(NamedTuple):
+ """Specify the encryption settings to apply when a PDF is saved."""
+
+ owner: str = ''
+ """The owner password to use. This allows full control
+ of the file. If blank, the PDF will be encrypted and
+ present as "(SECURED)" in PDF viewers. If the owner password
+ is blank, the user password should be as well."""
+
+ user: str = ''
+ """The user password to use. With this password, some
+ restrictions will be imposed by a typical PDF reader.
+ If blank, the PDF can be opened by anyone, but only modified
+ as allowed by the permissions in ``allow``."""
+
+ R: Literal[2, 3, 4, 5, 6] = 6
+ """Select the security handler algorithm to use. Choose from:
+ ``2``, ``3``, ``4`` or ``6``. By default, the highest version of
+ is selected (``6``). ``5`` is a deprecated algorithm that should
+ not be used."""
+
+ allow: Permissions = DEFAULT_PERMISSIONS
+ """The permissions to set.
+ If omitted, all permissions are granted to the user."""
+
+ aes: bool = True
+ """If True, request the AES algorithm. If False, use RC4.
+ If omitted, AES is selected whenever possible (R >= 4)."""
+
+ metadata: bool = True
+ """If True, also encrypt the PDF metadata. If False,
+ metadata is not encrypted. Reading document metadata without
+ decryption may be desirable in some cases. Requires ``aes=True``.
+ If omitted, metadata is encrypted whenever possible."""
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/image.py b/env/lib/python3.10/site-packages/pikepdf/models/image.py
new file mode 100644
index 0000000..5981a8e
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/image.py
@@ -0,0 +1,991 @@
+# SPDX-FileCopyrightText: 2022 James R. Barlow
+# SPDX-License-Identifier: MPL-2.0
+
+"""Extract images embedded in PDF."""
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from decimal import Decimal
+from io import BytesIO
+from itertools import zip_longest
+from pathlib import Path
+from shutil import copyfileobj
+from typing import Any, BinaryIO, Callable, NamedTuple, Sequence, TypeVar, cast
+
+from PIL import Image
+from PIL.ImageCms import ImageCmsProfile
+
+from pikepdf import (
+ Array,
+ Dictionary,
+ Name,
+ Object,
+ Pdf,
+ PdfError,
+ Stream,
+ StreamDecodeLevel,
+ String,
+ jbig2,
+)
+from pikepdf._exceptions import DependencyError
+from pikepdf._qpdf import Buffer
+from pikepdf._version import __version__
+from pikepdf.models import _transcoding
+
+T = TypeVar('T')
+
+
+class UnsupportedImageTypeError(Exception):
+ """This image is formatted in a way pikepdf does not supported."""
+
+
+class NotExtractableError(Exception):
+ """Indicates that an image cannot be directly extracted."""
+
+
+class HifiPrintImageNotTranscodableError(NotExtractableError):
+ """Image contains high fidelity printing information and cannot be extracted."""
+
+
+class InvalidPdfImageError(Exception):
+ """This image is not valid according to the PDF 1.7 specification."""
+
+
+def _array_str(value: Object | str | list):
+ """Simplify pikepdf objects to array of str. Keep Streams and dictionaries intact."""
+
+ def _convert(item):
+ if isinstance(item, (list, Array)):
+ return [_convert(subitem) for subitem in item]
+ if isinstance(item, (Stream, Dictionary, bytes, int)):
+ return item
+ if isinstance(item, (Name, str)):
+ return str(item)
+ if isinstance(item, (String)):
+ return bytes(item)
+ raise NotImplementedError(value)
+
+ result = _convert(value)
+ if not isinstance(result, list):
+ result = [result]
+ return result
+
+
+def _ensure_list(value: list[Object] | Dictionary | Array) -> list[Object]:
+ """Ensure value is a list of pikepdf.Object, if it was not already.
+
+ To support DecodeParms which can be present as either an array of dicts or a single
+ dict. It's easier to convert to an array of one dict.
+ """
+ if isinstance(value, list):
+ return value
+ return list(value.wrap_in_array().as_list())
+
+
+def _metadata_from_obj(
+ obj: Dictionary | Stream, name: str, type_: Callable[[Any], T], default: T
+) -> T | None:
+ """Retrieve metadata from a dictionary or stream, and ensure it is the expected type."""
+ val = getattr(obj, name, default)
+ try:
+ return type_(val)
+ except TypeError:
+ if val is None:
+ return None
+ raise NotImplementedError('Metadata access for ' + name)
+
+
+class PaletteData(NamedTuple):
+ """Returns the color space and binary representation of the palette.
+
+ ``base_colorspace`` is typically ``"RGB"`` or ``"L"`` (for grayscale).
+
+ ``palette`` is typically 256 or 256*3=768 bytes, for grayscale and RGB color
+ respectively, with each unit/triplet being the grayscale/RGB triplet values.
+ """
+
+ base_colorspace: str
+ palette: bytes
+
+
+class PdfImageBase(ABC):
+ """Abstract base class for images."""
+
+ SIMPLE_COLORSPACES = {'/DeviceRGB', '/DeviceGray', '/CalRGB', '/CalGray'}
+ MAIN_COLORSPACES = SIMPLE_COLORSPACES | {'/DeviceCMYK', '/CalCMYK', '/ICCBased'}
+ PRINT_COLORSPACES = {'/Separation', '/DeviceN'}
+
+ @abstractmethod
+ def _metadata(self, name: str, type_: Callable[[Any], T], default: T) -> T:
+ """Get metadata for this image type."""
+
+ @property
+ def width(self) -> int:
+ """Width of the image data in pixels."""
+ return self._metadata('Width', int, 0)
+
+ @property
+ def height(self) -> int:
+ """Height of the image data in pixels."""
+ return self._metadata('Height', int, 0)
+
+ @property
+ def image_mask(self) -> bool:
+ """Return ``True`` if this is an image mask."""
+ return self._metadata('ImageMask', bool, False)
+
+ @property
+ def _bpc(self) -> int | None:
+ """Bits per component for this image (low-level)."""
+ return self._metadata('BitsPerComponent', int, 0)
+
+ @property
+ def _colorspaces(self):
+ """Colorspace (low-level)."""
+ return self._metadata('ColorSpace', _array_str, [])
+
+ @property
+ def filters(self):
+ """List of names of the filters that we applied to encode this image."""
+ return self._metadata('Filter', _array_str, [])
+
+ @property
+ def decode_parms(self):
+ """List of the /DecodeParms, arguments to filters."""
+ return self._metadata('DecodeParms', _ensure_list, [])
+
+ @property
+ def colorspace(self) -> str | None:
+ """PDF name of the colorspace that best describes this image."""
+ if self.image_mask:
+ return None # Undefined for image masks
+ if self._colorspaces:
+ if self._colorspaces[0] in self.MAIN_COLORSPACES:
+ return self._colorspaces[0]
+ if self._colorspaces[0] == '/Indexed':
+ subspace = self._colorspaces[1]
+ if isinstance(subspace, str) and subspace in self.MAIN_COLORSPACES:
+ return subspace
+ if isinstance(subspace, list) and subspace[0] in (
+ '/ICCBased',
+ '/DeviceN',
+ ):
+ return subspace[0]
+ if self._colorspaces[0] == '/DeviceN':
+ return '/DeviceN'
+
+ raise NotImplementedError(
+ "not sure how to get colorspace: " + repr(self._colorspaces)
+ )
+
+ @property
+ def bits_per_component(self) -> int:
+ """Bits per component of this image."""
+ if self._bpc is None or self._bpc == 0:
+ return 1 if self.image_mask else 8
+ return self._bpc
+
+ @property
+ @abstractmethod
+ def icc(self) -> ImageCmsProfile | None:
+ """Return ICC profile for this image if one is defined."""
+
+ @property
+ def indexed(self) -> bool:
+ """Check if the image has a defined color palette."""
+ return '/Indexed' in self._colorspaces
+
+ def _colorspace_has_name(self, name):
+ try:
+ cs = self._colorspaces
+ if cs[0] == '/Indexed' and cs[1][0] == name:
+ return True
+ if cs[0] == name:
+ return True
+ except (IndexError, AttributeError, KeyError):
+ pass
+ return False
+
+ @property
+ def is_device_n(self) -> bool:
+ """Check if image has a /DeviceN (complex printing) colorspace."""
+ return self._colorspace_has_name('/DeviceN')
+
+ @property
+ def is_separation(self) -> bool:
+ """Check if image has a /DeviceN (complex printing) colorspace."""
+ return self._colorspace_has_name('/Separation')
+
+ @property
+ def size(self) -> tuple[int, int]:
+ """Size of image as (width, height)."""
+ return self.width, self.height
+
+ def _approx_mode_from_icc(self):
+ if self.indexed:
+ icc_profile = self._colorspaces[1][1]
+ else:
+ icc_profile = self._colorspaces[1]
+ icc_profile_nchannels = int(icc_profile['/N'])
+
+ if icc_profile_nchannels == 1:
+ return 'L'
+
+ # Multiple channels, need to open the profile and look
+ mode_from_xcolor_space = {'RGB ': 'RGB', 'CMYK': 'CMYK'}
+ xcolor_space = self.icc.profile.xcolor_space
+ return mode_from_xcolor_space.get(xcolor_space, '')
+
+ @property
+ def mode(self) -> str:
+ """``PIL.Image.mode`` equivalent for this image, where possible.
+
+ If an ICC profile is attached to the image, we still attempt to resolve a Pillow
+ mode.
+ """
+ m = ''
+ if self.is_device_n:
+ m = 'DeviceN'
+ elif self.is_separation:
+ m = 'Separation'
+ elif self.indexed:
+ m = 'P'
+ elif self.colorspace == '/DeviceGray' and self.bits_per_component == 1:
+ m = '1'
+ elif self.colorspace == '/DeviceGray' and self.bits_per_component > 1:
+ m = 'L'
+ elif self.colorspace == '/DeviceRGB':
+ m = 'RGB'
+ elif self.colorspace == '/DeviceCMYK':
+ m = 'CMYK'
+ elif self.colorspace == '/ICCBased':
+ try:
+ m = self._approx_mode_from_icc()
+ except (ValueError, TypeError) as e:
+ raise NotImplementedError(
+ "Not sure how to handle PDF image of this type"
+ ) from e
+ if m == '':
+ raise NotImplementedError(
+ "Not sure how to handle PDF image of this type"
+ ) from None
+ return m
+
+ @property
+ def filter_decodeparms(self):
+ """Return normalized the Filter and DecodeParms data.
+
+ PDF has a lot of possible data structures concerning /Filter and
+ /DecodeParms. /Filter can be absent or a name or an array, /DecodeParms
+ can be absent or a dictionary (if /Filter is a name) or an array (if
+ /Filter is an array). When both are arrays the lengths match.
+
+ Normalize this into:
+ [(/FilterName, {/DecodeParmName: Value, ...}), ...]
+
+ The order of /Filter matters as indicates the encoding/decoding sequence.
+ """
+ return list(zip_longest(self.filters, self.decode_parms, fillvalue={}))
+
+ @property
+ def palette(self) -> PaletteData | None:
+ """Retrieve the color palette for this image if applicable."""
+ if not self.indexed:
+ return None
+ try:
+ _idx, base, _hival, lookup = self._colorspaces
+ except ValueError as e:
+ raise ValueError('Not sure how to interpret this palette') from e
+ if self.icc or self.is_device_n or self.is_separation:
+ base = str(base[0])
+ else:
+ base = str(base)
+ lookup = bytes(lookup)
+ if base not in self.MAIN_COLORSPACES and base not in self.PRINT_COLORSPACES:
+ raise NotImplementedError(f"not sure how to interpret this palette: {base}")
+ if base == '/DeviceRGB':
+ base = 'RGB'
+ elif base == '/DeviceGray':
+ base = 'L'
+ elif base == '/DeviceCMYK':
+ base = 'CMYK'
+ elif base == '/DeviceN':
+ base = 'DeviceN'
+ elif base == '/Separation':
+ base = 'Separation'
+ elif base == '/ICCBased':
+ base = self._approx_mode_from_icc()
+ return PaletteData(base, lookup)
+
+ @abstractmethod
+ def as_pil_image(self) -> Image.Image:
+ """Convert this PDF image to a Python PIL (Pillow) image."""
+
+ @staticmethod
+ def _remove_simple_filters(obj: Stream, filters: Sequence[str]):
+ """Remove simple lossless compression where it appears.
+
+ Args:
+ obj: the compressed object
+ filters: all files on the data
+ """
+ COMPLEX_FILTERS = {
+ '/DCTDecode',
+ '/JPXDecode',
+ '/JBIG2Decode',
+ '/CCITTFaxDecode',
+ }
+
+ idx = [n for n, item in enumerate(filters) if item in COMPLEX_FILTERS]
+ if idx:
+ if len(idx) > 1:
+ raise NotImplementedError(
+ f"Object {obj.objgen} has compound complex filters: {filters}. "
+ "We cannot decompress this."
+ )
+ simple_filters = filters[: idx[0]]
+ complex_filters = filters[idx[0] :]
+ else:
+ simple_filters = filters
+ complex_filters = []
+
+ if not simple_filters:
+ return obj.read_raw_bytes(), complex_filters
+
+ original_filters = obj.Filter
+ try:
+ obj.Filter = Array([Name(s) for s in simple_filters])
+ data = obj.read_bytes(StreamDecodeLevel.specialized)
+ finally:
+ obj.Filter = original_filters
+
+ return data, complex_filters
+
+
+class PdfImage(PdfImageBase):
+ """Support class to provide a consistent API for manipulating PDF images.
+
+ The data structure for images inside PDFs is irregular and complex,
+ making it difficult to use without introducing errors for less
+ typical cases. This class addresses these difficulties by providing a
+ regular, Pythonic API similar in spirit (and convertible to) the Python
+ Pillow imaging library.
+ """
+
+ obj: Stream
+ _icc: ImageCmsProfile | None
+
+ def __new__(cls, obj):
+ """Construct a PdfImage... or a PdfJpxImage if that is what we really are."""
+ instance = super().__new__(cls)
+ instance.__init__(obj)
+ if '/JPXDecode' in instance.filters:
+ instance = super().__new__(PdfJpxImage)
+ instance.__init__(obj)
+ return instance
+
+ def __init__(self, obj: Stream):
+ """Construct a PDF image from a Image XObject inside a PDF.
+
+ ``pim = PdfImage(page.Resources.XObject['/ImageNN'])``
+
+ Args:
+ obj: an Image XObject
+ """
+ if isinstance(obj, Stream) and obj.stream_dict.get("/Subtype") != "/Image":
+ raise TypeError("can't construct PdfImage from non-image")
+ self.obj = obj
+ self._icc = None
+
+ def __eq__(self, other):
+ if not isinstance(other, PdfImageBase):
+ return NotImplemented
+ return self.obj == other.obj
+
+ @classmethod
+ def _from_pil_image(cls, *, pdf, page, name, image): # pragma: no cover
+ """Insert a PIL image into a PDF (rudimentary).
+
+ Args:
+ pdf (pikepdf.Pdf): the PDF to attach the image to
+ page (pikepdf.Object): the page to attach the image to
+ name (str or pikepdf.Name): the name to set the image
+ image (PIL.Image.Image): the image to insert
+ """
+ data = image.tobytes()
+
+ imstream = Stream(pdf, data)
+ imstream.Type = Name('/XObject')
+ imstream.Subtype = Name('/Image')
+ if image.mode == 'RGB':
+ imstream.ColorSpace = Name('/DeviceRGB')
+ elif image.mode in ('1', 'L'):
+ imstream.ColorSpace = Name('/DeviceGray')
+ imstream.BitsPerComponent = 1 if image.mode == '1' else 8
+ imstream.Width = image.width
+ imstream.Height = image.height
+
+ page.Resources.XObject[name] = imstream
+
+ return cls(imstream)
+
+ def _metadata(self, name, type_, default):
+ return _metadata_from_obj(self.obj, name, type_, default)
+
+ @property
+ def _iccstream(self):
+ if self.colorspace == '/ICCBased':
+ if not self.indexed:
+ return self._colorspaces[1]
+ assert isinstance(self._colorspaces[1], list)
+ return self._colorspaces[1][1]
+ raise NotImplementedError("Don't know how to find ICC stream for image")
+
+ @property
+ def icc(self) -> ImageCmsProfile | None:
+ """If an ICC profile is attached, return a Pillow object that describe it.
+
+ Most of the information may be found in ``icc.profile``.
+ """
+ if self.colorspace not in ('/ICCBased', '/Indexed'):
+ return None
+ if not self._icc:
+ iccstream = self._iccstream
+ iccbuffer = iccstream.get_stream_buffer()
+ iccbytesio = BytesIO(iccbuffer)
+ try:
+ self._icc = ImageCmsProfile(iccbytesio)
+ except OSError as e:
+ if str(e) == 'cannot open profile from string':
+ # ICC profile is corrupt
+ raise UnsupportedImageTypeError(
+ "ICC profile corrupt or not readable"
+ ) from e
+ return self._icc
+
+ def _extract_direct(self, *, stream: BinaryIO) -> str:
+ """Attempt to extract the image directly to a usable image file.
+
+ If there is no way to extract the image without decompressing or
+ transcoding then raise an exception. The type and format of image
+ generated will vary.
+
+ Args:
+ stream: Writable file stream to write data to, e.g. an open file
+ """
+
+ def normal_dct_rgb() -> bool:
+ # Normal DCTDecode RGB images have the default value of
+ # /ColorTransform 1 and are actually in YUV. Such a file can be
+ # saved as a standard JPEG. RGB JPEGs without YUV conversion can't
+ # be saved as JPEGs, and are probably bugs. Some software in the
+ # wild actually produces RGB JPEGs in PDFs (probably a bug).
+ DEFAULT_CT_RGB = 1
+ ct = self.filter_decodeparms[0][1].get('/ColorTransform', DEFAULT_CT_RGB)
+ return self.mode == 'RGB' and ct == DEFAULT_CT_RGB
+
+ def normal_dct_cmyk() -> bool:
+ # Normal DCTDecode CMYKs have /ColorTransform 0 and can be saved.
+ # There is a YUVK colorspace but CMYK JPEGs don't generally use it
+ DEFAULT_CT_CMYK = 0
+ ct = self.filter_decodeparms[0][1].get('/ColorTransform', DEFAULT_CT_CMYK)
+ return self.mode == 'CMYK' and ct == DEFAULT_CT_CMYK
+
+ data, filters = self._remove_simple_filters(self.obj, self.filters)
+
+ if filters == ['/CCITTFaxDecode']:
+ if self.colorspace == '/ICCBased':
+ icc = self._iccstream.read_bytes()
+ else:
+ icc = None
+ stream.write(self._generate_ccitt_header(data, icc=icc))
+ stream.write(data)
+ return '.tif'
+ if filters == ['/DCTDecode'] and (
+ self.mode == 'L' or normal_dct_rgb() or normal_dct_cmyk()
+ ):
+ stream.write(data)
+ return '.jpg'
+
+ raise NotExtractableError()
+
+ def _extract_transcoded_1248bits(self) -> Image.Image:
+ """Extract an image when there are 1/2/4/8 bits packed in byte data."""
+ stride = 0 # tell Pillow to calculate stride from line width
+ scale = 0 if self.mode == 'L' else 1
+ if self.bits_per_component in (2, 4):
+ buffer, stride = _transcoding.unpack_subbyte_pixels(
+ self.read_bytes(), self.size, self.bits_per_component, scale
+ )
+ elif self.bits_per_component == 8:
+ buffer = cast(memoryview, self.get_stream_buffer())
+ else:
+ raise InvalidPdfImageError("BitsPerComponent must be 1, 2, 4, 8, or 16")
+
+ if self.mode == 'P' and self.palette is not None:
+ base_mode, palette = self.palette
+ im = _transcoding.image_from_buffer_and_palette(
+ buffer,
+ self.size,
+ stride,
+ base_mode,
+ palette,
+ )
+ else:
+ im = _transcoding.image_from_byte_buffer(buffer, self.size, stride)
+ return im
+
+ def _extract_transcoded_1bit(self) -> Image.Image:
+ if self.mode in ('RGB', 'CMYK'):
+ raise UnsupportedImageTypeError("1-bit RGB and CMYK are not supported")
+ try:
+ data = self.read_bytes()
+ except (RuntimeError, PdfError) as e:
+ if (
+ 'read_bytes called on unfilterable stream' in str(e)
+ and not jbig2.get_decoder().available()
+ ):
+ raise DependencyError(
+ "jbig2dec - not installed or installed version is too old "
+ "(older than version 0.15)"
+ ) from None
+ raise
+
+ im = Image.frombytes('1', self.size, data)
+
+ if self.palette is not None:
+ base_mode, palette = self.palette
+ im = _transcoding.fix_1bit_palette_image(im, base_mode, palette)
+
+ return im
+
+ def _extract_transcoded(self) -> Image.Image:
+ if self.mode in {'DeviceN', 'Separation'}:
+ raise HifiPrintImageNotTranscodableError()
+
+ if self.mode == 'RGB' and self.bits_per_component == 8:
+ # Cannot use the zero-copy .get_stream_buffer here, we have 3-byte
+ # RGB and Pillow needs RGBX.
+ im = Image.frombuffer(
+ 'RGB', self.size, self.read_bytes(), 'raw', 'RGB', 0, 1
+ )
+ elif self.mode == 'CMYK' and self.bits_per_component == 8:
+ im = Image.frombuffer(
+ 'CMYK', self.size, self.get_stream_buffer(), 'raw', 'CMYK', 0, 1
+ )
+ # elif self.mode == '1':
+ elif self.bits_per_component == 1:
+ im = self._extract_transcoded_1bit()
+ elif self.mode in ('L', 'P') and self.bits_per_component <= 8:
+ im = self._extract_transcoded_1248bits()
+ else:
+ raise UnsupportedImageTypeError(repr(self) + ", " + repr(self.obj))
+
+ if self.colorspace == '/ICCBased' and self.icc is not None:
+ im.info['icc_profile'] = self.icc.tobytes()
+
+ return im
+
+ def _extract_to_stream(self, *, stream: BinaryIO) -> str:
+ """Extract the image to a stream.
+
+ If possible, the compressed data is extracted and inserted into
+ a compressed image file format without transcoding the compressed
+ content. If this is not possible, the data will be decompressed
+ and extracted to an appropriate format.
+
+ Args:
+ stream: Writable stream to write data to
+
+ Returns:
+ The file format extension.
+ """
+ try:
+ return self._extract_direct(stream=stream)
+ except NotExtractableError:
+ pass
+
+ im = None
+ try:
+ im = self._extract_transcoded()
+ if im.mode == 'CMYK':
+ im.save(stream, format='tiff', compression='tiff_adobe_deflate')
+ return '.tiff'
+ if im:
+ im.save(stream, format='png')
+ return '.png'
+ except PdfError as e:
+ if 'called on unfilterable stream' in str(e):
+ raise UnsupportedImageTypeError(repr(self)) from e
+ raise
+ finally:
+ if im:
+ im.close()
+
+ raise UnsupportedImageTypeError(repr(self))
+
+ def extract_to(
+ self, *, stream: BinaryIO | None = None, fileprefix: str = ''
+ ) -> str:
+ """Extract the image directly to a usable image file.
+
+ If possible, the compressed data is extracted and inserted into
+ a compressed image file format without transcoding the compressed
+ content. If this is not possible, the data will be decompressed
+ and extracted to an appropriate format.
+
+ Because it is not known until attempted what image format will be
+ extracted, users should not assume what format they are getting back.
+ When saving the image to a file, use a temporary filename, and then
+ rename the file to its final name based on the returned file extension.
+
+ Images might be saved as any of .png, .jpg, or .tiff.
+
+ Examples:
+ >>> im.extract_to(stream=bytes_io)
+ '.png'
+
+ >>> im.extract_to(fileprefix='/tmp/image00')
+ '/tmp/image00.jpg'
+
+ Args:
+ stream: Writable stream to write data to.
+ fileprefix (str or Path): The path to write the extracted image to,
+ without the file extension.
+
+ Returns:
+ If *fileprefix* was provided, then the fileprefix with the
+ appropriate extension. If no *fileprefix*, then an extension
+ indicating the file type.
+ """
+ if bool(stream) == bool(fileprefix):
+ raise ValueError("Cannot set both stream and fileprefix")
+ if stream:
+ return self._extract_to_stream(stream=stream)
+
+ bio = BytesIO()
+ extension = self._extract_to_stream(stream=bio)
+ bio.seek(0)
+ filepath = Path(str(Path(fileprefix)) + extension)
+ with filepath.open('wb') as target:
+ copyfileobj(bio, target)
+ return str(filepath)
+
+ def read_bytes(
+ self, decode_level: StreamDecodeLevel = StreamDecodeLevel.specialized
+ ) -> bytes:
+ """Decompress this image and return it as unencoded bytes."""
+ return self.obj.read_bytes(decode_level=decode_level)
+
+ def get_stream_buffer(
+ self, decode_level: StreamDecodeLevel = StreamDecodeLevel.specialized
+ ) -> Buffer:
+ """Access this image with the buffer protocol."""
+ return self.obj.get_stream_buffer(decode_level=decode_level)
+
+ def as_pil_image(self) -> Image.Image:
+ """Extract the image as a Pillow Image, using decompression as necessary.
+
+ Caller must close the image.
+ """
+ try:
+ bio = BytesIO()
+ self._extract_direct(stream=bio)
+ bio.seek(0)
+ return Image.open(bio)
+ except NotExtractableError:
+ pass
+
+ im = self._extract_transcoded()
+ if not im:
+ raise UnsupportedImageTypeError(repr(self))
+
+ return im
+
+ def _generate_ccitt_header(self, data: bytes, icc: bytes | None = None) -> bytes:
+ """Construct a CCITT G3 or G4 header from the PDF metadata."""
+ # https://stackoverflow.com/questions/2641770/
+ # https://www.itu.int/itudoc/itu-t/com16/tiff-fx/docs/tiff6.pdf
+
+ if not self.decode_parms:
+ raise ValueError("/CCITTFaxDecode without /DecodeParms")
+ if self.decode_parms[0].get("/EncodedByteAlign", False):
+ raise UnsupportedImageTypeError(
+ "/CCITTFaxDecode with /EncodedByteAlign true"
+ )
+
+ k = self.decode_parms[0].get("/K", 0)
+ if k < 0:
+ ccitt_group = 4 # Pure two-dimensional encoding (Group 4)
+ elif k > 0:
+ ccitt_group = 3 # Group 3 2-D
+ else:
+ ccitt_group = 2 # Group 3 1-D
+ _black_is_one = self.decode_parms[0].get("/BlackIs1", False)
+ # PDF spec says:
+ # BlackIs1: A flag indicating whether 1 bits shall be interpreted as black
+ # pixels and 0 bits as white pixels, the reverse of the normal
+ # PDF convention for image data. Default value: false.
+ # TIFF spec says:
+ # use 0 for white_is_zero (=> black is 1) MINISWHITE
+ # use 1 for black_is_zero (=> white is 1) MINISBLACK
+ # However, despite the documentation, it seems PDF viewers treat
+ # photometry as 0 when ccitt is involved.
+ # For example see
+ # https://gitlab.gnome.org/GNOME/evince/-/blob/main/backend/tiff/tiff2ps.c#L852-865
+ photometry = 0
+
+ img_size = len(data)
+ if icc is None:
+ icc = b''
+ return _transcoding.generate_ccitt_header(
+ self.size, img_size, ccitt_group, photometry, icc
+ )
+
+ def show(self): # pragma: no cover
+ """Show the image however PIL wants to."""
+ self.as_pil_image().show()
+
+ def __repr__(self):
+ return (
+ f'<pikepdf.PdfImage image mode={self.mode} '
+ f'size={self.width}x{self.height} at {hex(id(self))}>'
+ )
+
+ def _repr_png_(self) -> bytes:
+ """Display hook for IPython/Jupyter."""
+ b = BytesIO()
+ with self.as_pil_image() as im:
+ im.save(b, 'PNG')
+ return b.getvalue()
+
+
+class PdfJpxImage(PdfImage):
+ """Support class for JPEG 2000 images. Implements the same API as :class:`PdfImage`.
+
+ If you call PdfImage(object_that_is_actually_jpeg2000_image), pikepdf will return
+ this class instead, due to the check in PdfImage.__new__.
+ """
+
+ def __init__(self, obj):
+ """Initialize a JPEG 2000 image."""
+ super().__init__(obj)
+ self._jpxpil = self.as_pil_image()
+
+ def __eq__(self, other):
+ if not isinstance(other, PdfImageBase):
+ return NotImplemented
+ return (
+ self.obj == other.obj
+ and isinstance(other, PdfJpxImage)
+ and self._jpxpil == other._jpxpil
+ )
+
+ def _extract_direct(self, *, stream: BinaryIO):
+ data, filters = self._remove_simple_filters(self.obj, self.filters)
+ if filters != ['/JPXDecode']:
+ raise UnsupportedImageTypeError(self.filters)
+ stream.write(data)
+ return '.jp2'
+
+ @property
+ def _colorspaces(self):
+ """Return the effective colorspace of a JPEG 2000 image.
+
+ If the ColorSpace dictionary is present, the colorspace embedded in the
+ JPEG 2000 data will be ignored, as required by the specification.
+ """
+ # (PDF 1.7 Table 89) If ColorSpace is present, any colour space
+ # specifications in the JPEG2000 data shall be ignored.
+ super_colorspaces = super()._colorspaces
+ if super_colorspaces:
+ return super_colorspaces
+ if self._jpxpil.mode == 'L':
+ return ['/DeviceGray']
+ if self._jpxpil.mode == 'RGB':
+ return ['/DeviceRGB']
+ raise NotImplementedError('Complex JP2 colorspace')
+
+ @property
+ def _bpc(self) -> int:
+ """Return 8, since bpc is not meaningful for JPEG 2000 encoding."""
+ # (PDF 1.7 Table 89) If the image stream uses the JPXDecode filter, this
+ # entry is optional and shall be ignored if present. The bit depth is
+ # determined by the conforming reader in the process of decoding the
+ # JPEG2000 image.
+ return 8
+
+ @property
+ def indexed(self) -> bool:
+ """Return False, since JPEG 2000 should not be indexed."""
+ # Nothing in the spec precludes an Indexed JPXDecode image, except for
+ # the fact that doing so is madness. Let's assume it no one is that
+ # insane.
+ return False
+
+ def __repr__(self):
+ return (
+ f'<pikepdf.PdfJpxImage JPEG2000 image mode={self.mode} '
+ f'size={self.width}x{self.height} at {hex(id(self))}>'
+ )
+
+
+class PdfInlineImage(PdfImageBase):
+ """Support class for PDF inline images. Implements the same API as :class:`PdfImage`."""
+
+ # Inline images can contain abbreviations that we write automatically
+ ABBREVS = {
+ b'/W': b'/Width',
+ b'/H': b'/Height',
+ b'/BPC': b'/BitsPerComponent',
+ b'/IM': b'/ImageMask',
+ b'/CS': b'/ColorSpace',
+ b'/F': b'/Filter',
+ b'/DP': b'/DecodeParms',
+ b'/G': b'/DeviceGray',
+ b'/RGB': b'/DeviceRGB',
+ b'/CMYK': b'/DeviceCMYK',
+ b'/I': b'/Indexed',
+ b'/AHx': b'/ASCIIHexDecode',
+ b'/A85': b'/ASCII85Decode',
+ b'/LZW': b'/LZWDecode',
+ b'/RL': b'/RunLengthDecode',
+ b'/CCF': b'/CCITTFaxDecode',
+ b'/DCT': b'/DCTDecode',
+ }
+ REVERSE_ABBREVS = {v: k for k, v in ABBREVS.items()}
+
+ _data: Object
+ _image_object: tuple[Object, ...]
+
+ def __init__(self, *, image_data: Object, image_object: tuple):
+ """Construct wrapper for inline image.
+
+ Args:
+ image_data: data stream for image, extracted from content stream
+ image_object: the metadata for image, also from content stream
+ """
+ # Convert the sequence of pikepdf.Object from the content stream into
+ # a dictionary object by unparsing it (to bytes), eliminating inline
+ # image abbreviations, and constructing a bytes string equivalent to
+ # what an image XObject would look like. Then retrieve data from there
+
+ self._data = image_data
+ self._image_object = image_object
+
+ reparse = b' '.join(
+ self._unparse_obj(obj, remap_names=self.ABBREVS) for obj in image_object
+ )
+ try:
+ reparsed_obj = Object.parse(b'<< ' + reparse + b' >>')
+ except PdfError as e:
+ raise PdfError("parsing inline " + reparse.decode('unicode_escape')) from e
+ self.obj = reparsed_obj
+
+ def __eq__(self, other):
+ if not isinstance(other, PdfImageBase):
+ return NotImplemented
+ return (
+ self.obj == other.obj
+ and isinstance(other, PdfInlineImage)
+ and (
+ self._data._inline_image_raw_bytes()
+ == other._data._inline_image_raw_bytes()
+ )
+ )
+
+ @classmethod
+ def _unparse_obj(cls, obj, remap_names):
+ if isinstance(obj, Object):
+ if isinstance(obj, Name):
+ name = obj.unparse(resolved=True)
+ assert isinstance(name, bytes)
+ return remap_names.get(name, name)
+ return obj.unparse(resolved=True)
+ if isinstance(obj, bool):
+ return b'true' if obj else b'false' # Lower case for PDF spec
+ if isinstance(obj, (int, Decimal, float)):
+ return str(obj).encode('ascii')
+ raise NotImplementedError(repr(obj))
+
+ def _metadata(self, name, type_, default):
+ return _metadata_from_obj(self.obj, name, type_, default)
+
+ def unparse(self) -> bytes:
+ """Create the content stream bytes that reproduce this inline image."""
+
+ def metadata_tokens():
+ for metadata_obj in self._image_object:
+ unparsed = self._unparse_obj(
+ metadata_obj, remap_names=self.REVERSE_ABBREVS
+ )
+ assert isinstance(unparsed, bytes)
+ yield unparsed
+
+ def inline_image_tokens():
+ yield b'BI\n'
+ yield b' '.join(m for m in metadata_tokens())
+ yield b'\nID\n'
+ yield self._data._inline_image_raw_bytes()
+ yield b'EI'
+
+ return b''.join(inline_image_tokens())
+
+ @property
+ def icc(self): # pragma: no cover
+ """Raise an exception since ICC profiles are not supported on inline images."""
+ raise InvalidPdfImageError(
+ "Inline images with ICC profiles are not supported in the PDF specification"
+ )
+
+ def __repr__(self):
+ try:
+ mode = self.mode
+ except NotImplementedError:
+ mode = '?'
+ return (
+ f'<pikepdf.PdfInlineImage image mode={mode} '
+ f'size={self.width}x{self.height} at {hex(id(self))}>'
+ )
+
+ def _convert_to_pdfimage(self):
+ # Construct a temporary PDF that holds this inline image, and...
+ tmppdf = Pdf.new()
+ tmppdf.add_blank_page(page_size=(self.width, self.height))
+ tmppdf.pages[0].contents_add(
+ f'{self.width} 0 0 {self.height} 0 0 cm'.encode('ascii'), prepend=True
+ )
+ tmppdf.pages[0].contents_add(self.unparse())
+
+ # ...externalize it,
+ tmppdf.pages[0].externalize_inline_images()
+ raw_img = next(im for im in tmppdf.pages[0].images.values())
+
+ # ...then use the regular PdfImage API to extract it.
+ img = PdfImage(raw_img)
+ return img
+
+ def as_pil_image(self) -> Image.Image:
+ """Return inline image as a Pillow Image."""
+ return self._convert_to_pdfimage().as_pil_image()
+
+ def extract_to(self, *, stream: BinaryIO | None = None, fileprefix: str = ''):
+ """Extract the inline image directly to a usable image file.
+
+ See:
+ :meth:`PdfImage.extract_to`
+ """
+ return self._convert_to_pdfimage().extract_to(
+ stream=stream, fileprefix=fileprefix
+ )
+
+ def read_bytes(self):
+ """Return decompressed image bytes."""
+ # QPDF does not have an API to return this directly, so convert it.
+ return self._convert_to_pdfimage().read_bytes()
+
+ def get_stream_buffer(self):
+ """Return decompressed stream buffer."""
+ # QPDF does not have an API to return this directly, so convert it.
+ return self._convert_to_pdfimage().get_stream_buffer()
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/matrix.py b/env/lib/python3.10/site-packages/pikepdf/models/matrix.py
new file mode 100644
index 0000000..c660320
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/matrix.py
@@ -0,0 +1,145 @@
+# SPDX-FileCopyrightText: 2022 James R. Barlow
+# SPDX-License-Identifier: MPL-2.0
+
+"""PDF content matrix support."""
+
+from __future__ import annotations
+
+from math import cos, pi, sin
+
+
+class PdfMatrix:
+ """
+ Support class for PDF content stream matrices.
+
+ PDF content stream matrices are 3x3 matrices summarized by a shorthand
+ ``(a, b, c, d, e, f)`` which correspond to the first two column vectors.
+ The final column vector is always ``(0, 0, 1)`` since this is using
+ `homogenous coordinates <https://en.wikipedia.org/wiki/Homogeneous_coordinates>`_.
+
+ PDF uses row vectors. That is, ``vr @ A'`` gives the effect of transforming
+ a row vector ``vr=(x, y, 1)`` by the matrix ``A'``. Most textbook
+ treatments use ``A @ vc`` where the column vector ``vc=(x, y, 1)'``.
+
+ (``@`` is the Python matrix multiplication operator.)
+
+ Addition and other operations are not implemented because they're not that
+ meaningful in a PDF context (they can be defined and are mathematically
+ meaningful in general).
+
+ PdfMatrix objects are immutable. All transformations on them produce a new
+ matrix.
+
+ """
+
+ def __init__(self, *args):
+ # fmt: off
+ if not args:
+ self.values = ((1, 0, 0), (0, 1, 0), (0, 0, 1))
+ elif len(args) == 6:
+ a, b, c, d, e, f = map(float, args)
+ self.values = ((a, b, 0),
+ (c, d, 0),
+ (e, f, 1))
+ elif isinstance(args[0], PdfMatrix):
+ self.values = args[0].values
+ elif len(args[0]) == 6:
+ a, b, c, d, e, f = map(float, args[0])
+ self.values = ((a, b, 0),
+ (c, d, 0),
+ (e, f, 1))
+ elif len(args[0]) == 3 and len(args[0][0]) == 3:
+ self.values = (tuple(args[0][0]),
+ tuple(args[0][1]),
+ tuple(args[0][2]))
+ else:
+ raise ValueError('invalid arguments: ' + repr(args))
+ # fmt: on
+
+ @staticmethod
+ def identity():
+ """Constructs and returns an identity matrix."""
+ return PdfMatrix()
+
+ def __matmul__(self, other):
+ """Multiply this matrix by another matrix.
+
+ Can be used to concatenate transformations.
+ """
+ a = self.values
+ b = other.values
+ return PdfMatrix(
+ [
+ [sum(float(i) * float(j) for i, j in zip(row, col)) for col in zip(*b)]
+ for row in a
+ ]
+ )
+
+ def scaled(self, x, y):
+ """Concatenates a scaling matrix on this matrix."""
+ return self @ PdfMatrix((x, 0, 0, y, 0, 0))
+
+ def rotated(self, angle_degrees_ccw):
+ """Concatenates a rotation matrix on this matrix."""
+ angle = angle_degrees_ccw / 180.0 * pi
+ c, s = cos(angle), sin(angle)
+ return self @ PdfMatrix((c, s, -s, c, 0, 0))
+
+ def translated(self, x, y):
+ """Translates this matrix."""
+ return self @ PdfMatrix((1, 0, 0, 1, x, y))
+
+ @property
+ def shorthand(self):
+ """Return the 6-tuple (a,b,c,d,e,f) that describes this matrix."""
+ return (self.a, self.b, self.c, self.d, self.e, self.f)
+
+ @property
+ def a(self):
+ """Return matrix this value."""
+ return self.values[0][0]
+
+ @property
+ def b(self):
+ """Return matrix this value."""
+ return self.values[0][1]
+
+ @property
+ def c(self):
+ """Return matrix this value."""
+ return self.values[1][0]
+
+ @property
+ def d(self):
+ """Return matrix this value."""
+ return self.values[1][1]
+
+ @property
+ def e(self):
+ """Return matrix this value.
+
+ Typically corresponds to translation on the x-axis.
+ """
+ return self.values[2][0]
+
+ @property
+ def f(self):
+ """Return matrix this value.
+
+ Typically corresponds to translation on the y-axis.
+ """
+ return self.values[2][1]
+
+ def __eq__(self, other):
+ if isinstance(other, PdfMatrix):
+ return self.shorthand == other.shorthand
+ return False
+
+ def encode(self):
+ """Encode this matrix in binary suitable for including in a PDF."""
+ return '{:.6f} {:.6f} {:.6f} {:.6f} {:.6f} {:.6f}'.format(
+ self.a, self.b, self.c, self.d, self.e, self.f
+ ).encode()
+
+ def __repr__(self):
+ return f"pikepdf.PdfMatrix({repr(self.values)})"
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/metadata.py b/env/lib/python3.10/site-packages/pikepdf/models/metadata.py
new file mode 100644
index 0000000..62158b1
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/metadata.py
@@ -0,0 +1,866 @@
+# SPDX-FileCopyrightText: 2022 James R. Barlow
+# SPDX-License-Identifier: MPL-2.0
+
+"""PDF metadata handling."""
+
+from __future__ import annotations
+
+import logging
+import re
+import sys
+from abc import ABC, abstractmethod
+from datetime import datetime
+from functools import wraps
+from io import BytesIO
+from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Set
+from warnings import warn
+
+from lxml import etree
+from lxml.etree import QName, XMLSyntaxError
+
+from .. import Name, Stream, String
+from .. import __version__ as pikepdf_version
+from .._xml import parse_xml
+
+if sys.version_info < (3, 9): # pragma: no cover
+ from typing import Iterable, MutableMapping
+else:
+ from collections.abc import Iterable, MutableMapping
+
+if TYPE_CHECKING: # pragma: no cover
+ from pikepdf import Pdf
+
+
+XMP_NS_DC = "http://purl.org/dc/elements/1.1/"
+XMP_NS_PDF = "http://ns.adobe.com/pdf/1.3/"
+XMP_NS_PDFA_ID = "http://www.aiim.org/pdfa/ns/id/"
+XMP_NS_PDFX_ID = "http://www.npes.org/pdfx/ns/id/"
+XMP_NS_PHOTOSHOP = "http://ns.adobe.com/photoshop/1.0/"
+XMP_NS_PRISM = "http://prismstandard.org/namespaces/basic/1.0/"
+XMP_NS_PRISM2 = "http://prismstandard.org/namespaces/basic/2.0/"
+XMP_NS_PRISM3 = "http://prismstandard.org/namespaces/basic/3.0/"
+XMP_NS_RDF = "http://www.w3.org/1999/02/22-rdf-syntax-ns#"
+XMP_NS_XMP = "http://ns.adobe.com/xap/1.0/"
+XMP_NS_XMP_MM = "http://ns.adobe.com/xap/1.0/mm/"
+XMP_NS_XMP_RIGHTS = "http://ns.adobe.com/xap/1.0/rights/"
+
+DEFAULT_NAMESPACES: list[tuple[str, str]] = [
+ ('adobe:ns:meta/', 'x'),
+ (XMP_NS_DC, 'dc'),
+ (XMP_NS_PDF, 'pdf'),
+ (XMP_NS_PDFA_ID, 'pdfaid'),
+ (XMP_NS_PDFX_ID, 'pdfxid'),
+ (XMP_NS_PHOTOSHOP, 'photoshop'),
+ (XMP_NS_PRISM, 'prism'),
+ (XMP_NS_PRISM2, 'prism2'),
+ (XMP_NS_PRISM3, 'prism3'),
+ (XMP_NS_RDF, 'rdf'),
+ (XMP_NS_XMP, 'xmp'),
+ (XMP_NS_XMP_MM, 'xmpMM'),
+ (XMP_NS_XMP_RIGHTS, 'xmpRights'),
+]
+
+for _uri, _prefix in DEFAULT_NAMESPACES:
+ etree.register_namespace(_prefix, _uri)
+
+# This one should not be registered
+XMP_NS_XML = "http://www.w3.org/XML/1998/namespace"
+
+XPACKET_BEGIN = b"""<?xpacket begin="\xef\xbb\xbf" id="W5M0MpCehiHzreSzNTczkc9d"?>\n"""
+
+XMP_EMPTY = b"""<x:xmpmeta xmlns:x="adobe:ns:meta/" x:xmptk="pikepdf">
+ <rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#">
+ </rdf:RDF>
+</x:xmpmeta>
+"""
+
+XPACKET_END = b"""\n<?xpacket end="w"?>\n"""
+
+
+class XmpContainer(NamedTuple):
+ """Map XMP container object to suitable Python container."""
+
+ rdf_type: str
+ py_type: type
+ insert_fn: Callable[..., None]
+
+
+log = logging.getLogger(__name__)
+
+
+class NeverRaise(Exception):
+ """An exception that is never raised."""
+
+
+class AltList(list):
+ """XMP AltList container."""
+
+
+XMP_CONTAINERS = [
+ XmpContainer('Alt', AltList, AltList.append),
+ XmpContainer('Bag', set, set.add),
+ XmpContainer('Seq', list, list.append),
+]
+
+LANG_ALTS = frozenset(
+ [
+ str(QName(XMP_NS_DC, 'title')),
+ str(QName(XMP_NS_DC, 'description')),
+ str(QName(XMP_NS_DC, 'rights')),
+ str(QName(XMP_NS_XMP_RIGHTS, 'UsageTerms')),
+ ]
+)
+
+# These are the illegal characters in XML 1.0. (XML 1.1 is a bit more permissive,
+# but we'll be strict to ensure wider compatibility.)
+re_xml_illegal_chars = re.compile(
+ r"(?u)[^\x09\x0A\x0D\x20-\U0000D7FF\U0000E000-\U0000FFFD\U00010000-\U0010FFFF]"
+)
+re_xml_illegal_bytes = re.compile(
+ br"[^\x09\x0A\x0D\x20-\xFF]|&#0;"
+ # br"&#(?:[0-9]|0[0-9]|1[0-9]|2[0-9]|3[0-1]|x[0-9A-Fa-f]|x0[0-9A-Fa-f]|x1[0-9A-Fa-f]);"
+)
+
+
+def _parser_basic(xml: bytes):
+ return parse_xml(BytesIO(xml))
+
+
+def _parser_strip_illegal_bytes(xml: bytes):
+ return parse_xml(BytesIO(re_xml_illegal_bytes.sub(b'', xml)))
+
+
+def _parser_recovery(xml: bytes):
+ return parse_xml(BytesIO(xml), recover=True)
+
+
+def _parser_replace_with_empty_xmp(_xml: bytes = b''):
+ log.warning("Error occurred parsing XMP, replacing with empty XMP.")
+ return _parser_basic(XMP_EMPTY)
+
+
+def _clean(s: str | Iterable[str], joiner: str = '; ') -> str:
+ """Ensure an object can safely be inserted in a XML tag body.
+
+ If we still have a non-str object at this point, the best option is to
+ join it, because it's apparently calling for a new node in a place that
+ isn't allowed in the spec or not supported.
+ """
+ if not isinstance(s, str):
+ if isinstance(s, Iterable):
+ warn(f"Merging elements of {s}")
+ if isinstance(s, Set):
+ s = joiner.join(sorted(s))
+ else:
+ s = joiner.join(s)
+ else:
+ raise TypeError("object must be a string or iterable of strings")
+ return re_xml_illegal_chars.sub('', s)
+
+
+def encode_pdf_date(d: datetime) -> str:
+ """Encode Python datetime object as PDF date string.
+
+ From Adobe pdfmark manual:
+ (D:YYYYMMDDHHmmSSOHH'mm')
+ D: is an optional prefix. YYYY is the year. All fields after the year are
+ optional. MM is the month (01-12), DD is the day (01-31), HH is the
+ hour (00-23), mm are the minutes (00-59), and SS are the seconds
+ (00-59). The remainder of the string defines the relation of local
+ time to GMT. O is either + for a positive difference (local time is
+ later than GMT) or - (minus) for a negative difference. HH' is the
+ absolute value of the offset from GMT in hours, and mm' is the
+ absolute value of the offset in minutes. If no GMT information is
+ specified, the relation between the specified time and GMT is
+ considered unknown. Regardless of whether or not GMT
+ information is specified, the remainder of the string should specify
+ the local time.
+
+ 'D:' is required in PDF/A, so we always add it.
+ """
+ # The formatting of %Y is not consistent as described in
+ # https://bugs.python.org/issue13305 and underspecification in libc.
+ # So explicitly format the year with leading zeros
+ s = f"D:{d.year:04d}"
+ s += d.strftime(r'%m%d%H%M%S')
+ tz = d.strftime('%z')
+ if tz:
+ sign, tz_hours, tz_mins = tz[0], tz[1:3], tz[3:5]
+ s += f"{sign}{tz_hours}'{tz_mins}'"
+ return s
+
+
+def decode_pdf_date(s: str) -> datetime:
+ """Decode a pdfmark date to a Python datetime object.
+
+ A pdfmark date is a string in a paritcular format. See the pdfmark
+ Reference for the specification.
+ """
+ if isinstance(s, String):
+ s = str(s)
+ if s.startswith('D:'):
+ s = s[2:]
+
+ # Literal Z00'00', is incorrect but found in the wild,
+ # probably made by OS X Quartz -- standardize
+ if s.endswith("Z00'00'"):
+ s = s.replace("Z00'00'", '+0000')
+ elif s.endswith('Z'):
+ s = s.replace('Z', '+0000')
+ s = s.replace("'", "") # Remove apos from PDF time strings
+ try:
+ return datetime.strptime(s, r'%Y%m%d%H%M%S%z')
+ except ValueError:
+ return datetime.strptime(s, r'%Y%m%d%H%M%S')
+
+
+class Converter(ABC):
+ """XMP <-> DocumentInfo converter."""
+
+ @staticmethod
+ @abstractmethod
+ def xmp_from_docinfo(docinfo_val: str | None) -> Any: # type: ignore
+ """Derive XMP metadata from a DocumentInfo string."""
+
+ @staticmethod
+ @abstractmethod
+ def docinfo_from_xmp(xmp_val: Any) -> str | None:
+ """Derive a DocumentInfo value from equivalent XMP metadata."""
+
+
+class AuthorConverter(Converter):
+ """Convert XMP document authors to DocumentInfo."""
+
+ @staticmethod
+ def xmp_from_docinfo(docinfo_val: str | None) -> Any: # type: ignore
+ """Derive XMP authors info from DocumentInfo."""
+ return [docinfo_val]
+
+ @staticmethod
+ def docinfo_from_xmp(xmp_val):
+ """Derive DocumentInfo authors from XMP.
+
+ XMP supports multiple author values, while DocumentInfo has a string,
+ so we return the values separated by semi-colons.
+ """
+ if isinstance(xmp_val, str):
+ return xmp_val
+ if xmp_val is None or xmp_val == [None]:
+ return None
+ return '; '.join(xmp_val)
+
+
+class DateConverter(Converter):
+ """Convert XMP dates to DocumentInfo."""
+
+ @staticmethod
+ def xmp_from_docinfo(docinfo_val):
+ """Derive XMP date from DocumentInfo."""
+ if docinfo_val == '':
+ return ''
+ return decode_pdf_date(docinfo_val).isoformat()
+
+ @staticmethod
+ def docinfo_from_xmp(xmp_val):
+ """Derive DocumentInfo from XMP."""
+ if xmp_val.endswith('Z'):
+ xmp_val = xmp_val[:-1] + '+00:00'
+ try:
+ dateobj = datetime.fromisoformat(xmp_val)
+ except IndexError:
+ # PyPy 3.7 may raise IndexError - convert to ValueError
+ raise ValueError(f"Invalid isoformat string: '{xmp_val}'") from None
+ return encode_pdf_date(dateobj)
+
+
+class DocinfoMapping(NamedTuple):
+ """Map DocumentInfo keys to their XMP equivalents, along with converter."""
+
+ ns: str
+ key: str
+ name: Name
+ converter: type[Converter] | None
+
+
+def ensure_loaded(fn):
+ """Ensure the XMP has been loaded and parsed.
+
+ TODO: Can this be removed? Why allow the uninit'ed state to even exist?
+ """
+
+ @wraps(fn)
+ def wrapper(self, *args, **kwargs):
+ if not self._xmp:
+ self._load()
+ return fn(self, *args, **kwargs)
+
+ return wrapper
+
+
+class PdfMetadata(MutableMapping):
+ """Read and edit the metadata associated with a PDF.
+
+ The PDF specification contain two types of metadata, the newer XMP
+ (Extensible Metadata Platform, XML-based) and older DocumentInformation
+ dictionary. The PDF 2.0 specification removes the DocumentInformation
+ dictionary.
+
+ This primarily works with XMP metadata, but includes methods to generate
+ XMP from DocumentInformation and will also coordinate updates to
+ DocumentInformation so that the two are kept consistent.
+
+ XMP metadata fields may be accessed using the full XML namespace URI or
+ the short name. For example ``metadata['dc:description']``
+ and ``metadata['{http://purl.org/dc/elements/1.1/}description']``
+ both refer to the same field. Several common XML namespaces are registered
+ automatically.
+
+ See the XMP specification for details of allowable fields.
+
+ To update metadata, use a with block.
+
+ Example:
+
+ >>> with pdf.open_metadata() as records:
+ records['dc:title'] = 'New Title'
+
+ See Also:
+ :meth:`pikepdf.Pdf.open_metadata`
+ """
+
+ DOCINFO_MAPPING: list[DocinfoMapping] = [
+ DocinfoMapping(XMP_NS_DC, 'creator', Name.Author, AuthorConverter),
+ DocinfoMapping(XMP_NS_DC, 'description', Name.Subject, None),
+ DocinfoMapping(XMP_NS_DC, 'title', Name.Title, None),
+ DocinfoMapping(XMP_NS_PDF, 'Keywords', Name.Keywords, None),
+ DocinfoMapping(XMP_NS_PDF, 'Producer', Name.Producer, None),
+ DocinfoMapping(XMP_NS_XMP, 'CreateDate', Name.CreationDate, DateConverter),
+ DocinfoMapping(XMP_NS_XMP, 'CreatorTool', Name.Creator, None),
+ DocinfoMapping(XMP_NS_XMP, 'ModifyDate', Name.ModDate, DateConverter),
+ ]
+
+ NS: dict[str, str] = {prefix: uri for uri, prefix in DEFAULT_NAMESPACES}
+ REVERSE_NS: dict[str, str] = dict(DEFAULT_NAMESPACES)
+
+ _PARSERS_OVERWRITE_INVALID_XML: Iterable[Callable[[bytes], Any]] = [
+ _parser_basic,
+ _parser_strip_illegal_bytes,
+ _parser_recovery,
+ _parser_replace_with_empty_xmp,
+ ]
+ _PARSERS_STANDARD: Iterable[Callable[[bytes], Any]] = [_parser_basic]
+
+ def __init__(
+ self,
+ pdf: Pdf,
+ pikepdf_mark: bool = True,
+ sync_docinfo: bool = True,
+ overwrite_invalid_xml: bool = True,
+ ):
+ self._pdf = pdf
+ self._xmp = None
+ self.mark = pikepdf_mark
+ self.sync_docinfo = sync_docinfo
+ self._updating = False
+ self.overwrite_invalid_xml = overwrite_invalid_xml
+
+ def load_from_docinfo(
+ self, docinfo, delete_missing: bool = False, raise_failure: bool = False
+ ) -> None:
+ """Populate the XMP metadata object with DocumentInfo.
+
+ Arguments:
+ docinfo: a DocumentInfo, e.g pdf.docinfo
+ delete_missing: if the entry is not DocumentInfo, delete the equivalent
+ from XMP
+ raise_failure: if True, raise any failure to convert docinfo;
+ otherwise warn and continue
+
+ A few entries in the deprecated DocumentInfo dictionary are considered
+ approximately equivalent to certain XMP records. This method copies
+ those entries into the XMP metadata.
+ """
+
+ def warn_or_raise(msg, e=None):
+ if raise_failure:
+ raise ValueError(msg) from e
+ warn(msg)
+
+ for uri, shortkey, docinfo_name, converter in self.DOCINFO_MAPPING:
+ qname = QName(uri, shortkey)
+ # docinfo might be a dict or pikepdf.Dictionary, so lookup keys
+ # by str(Name)
+ val = docinfo.get(str(docinfo_name))
+ if val is None:
+ if delete_missing and qname in self:
+ del self[qname]
+ continue
+ try:
+ val = str(val)
+ if converter:
+ val = converter.xmp_from_docinfo(val)
+ if not val:
+ continue
+ self._setitem(qname, val, True)
+ except (ValueError, AttributeError, NotImplementedError) as e:
+ warn_or_raise(
+ f"The metadata field {docinfo_name} could not be copied to XMP", e
+ )
+ valid_docinfo_names = {
+ str(docinfo_name) for _, _, docinfo_name, _ in self.DOCINFO_MAPPING
+ }
+ extra_docinfo_names = {str(k) for k in docinfo.keys()} - valid_docinfo_names
+ for extra in extra_docinfo_names:
+ warn_or_raise(
+ f"The metadata field {extra} with value '{repr(docinfo.get(extra))}' "
+ "has no XMP equivalent, so it was discarded",
+ )
+
+ def _load(self) -> None:
+ try:
+ data = self._pdf.Root.Metadata.read_bytes()
+ except AttributeError:
+ data = b''
+ self._load_from(data)
+
+ def _load_from(self, data: bytes) -> None:
+ if data.strip() == b'':
+ data = XMP_EMPTY # on some platforms lxml chokes on empty documents
+
+ parsers = (
+ self._PARSERS_OVERWRITE_INVALID_XML
+ if self.overwrite_invalid_xml
+ else self._PARSERS_STANDARD
+ )
+
+ for parser in parsers:
+ try:
+ self._xmp = parser(data)
+ except (
+ XMLSyntaxError
+ if self.overwrite_invalid_xml
+ else NeverRaise # type: ignore
+ ) as e:
+ if str(e).startswith("Start tag expected, '<' not found") or str(
+ e
+ ).startswith("Document is empty"):
+ self._xmp = _parser_replace_with_empty_xmp()
+ break
+ else:
+ break
+
+ if self._xmp is not None:
+ try:
+ pis = self._xmp.xpath('/processing-instruction()')
+ for pi in pis:
+ etree.strip_tags(self._xmp, pi.tag)
+ self._get_rdf_root()
+ except (
+ Exception # pylint: disable=broad-except
+ if self.overwrite_invalid_xml
+ else NeverRaise
+ ) as e:
+ log.warning("Error occurred parsing XMP", exc_info=e)
+ self._xmp = _parser_replace_with_empty_xmp()
+ else:
+ log.warning("Error occurred parsing XMP")
+ self._xmp = _parser_replace_with_empty_xmp()
+
+ @ensure_loaded
+ def __enter__(self):
+ self._updating = True
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ try:
+ if exc_type is not None:
+ return
+ self._apply_changes()
+ finally:
+ self._updating = False
+
+ def _update_docinfo(self):
+ """Update the PDF's DocumentInfo dictionary to match XMP metadata.
+
+ The standard mapping is described here:
+ https://www.pdfa.org/pdfa-metadata-xmp-rdf-dublin-core/
+ """
+ # Touch object to ensure it exists
+ self._pdf.docinfo # pylint: disable=pointless-statement
+ for uri, element, docinfo_name, converter in self.DOCINFO_MAPPING:
+ qname = QName(uri, element)
+ try:
+ value = self[qname]
+ except KeyError:
+ if docinfo_name in self._pdf.docinfo:
+ del self._pdf.docinfo[docinfo_name]
+ continue
+ if converter:
+ try:
+ value = converter.docinfo_from_xmp(value)
+ except ValueError:
+ warn(
+ f"The DocumentInfo field {docinfo_name} could not be "
+ "updated from XMP"
+ )
+ value = None
+ except Exception as e:
+ raise ValueError(
+ "An error occurred while updating DocumentInfo field "
+ f"{docinfo_name} from XMP {qname} with value {value}"
+ ) from e
+ if value is None:
+ if docinfo_name in self._pdf.docinfo:
+ del self._pdf.docinfo[docinfo_name]
+ continue
+ value = _clean(value)
+ try:
+ # Try to save pure ASCII
+ self._pdf.docinfo[docinfo_name] = value.encode('ascii')
+ except UnicodeEncodeError:
+ # qpdf will serialize this as a UTF-16 with BOM string
+ self._pdf.docinfo[docinfo_name] = value
+
+ def _get_xml_bytes(self, xpacket=True):
+ data = BytesIO()
+ if xpacket:
+ data.write(XPACKET_BEGIN)
+ self._xmp.write(data, encoding='utf-8', pretty_print=True)
+ if xpacket:
+ data.write(XPACKET_END)
+ data.seek(0)
+ xml_bytes = data.read()
+ return xml_bytes
+
+ def _apply_changes(self):
+ """Serialize our changes back to the PDF in memory.
+
+ Depending how we are initialized, leave our metadata mark and producer.
+ """
+ if self.mark:
+ # We were asked to mark the file as being edited by pikepdf
+ self._setitem(
+ QName(XMP_NS_XMP, 'MetadataDate'),
+ datetime.now(datetime.utcnow().astimezone().tzinfo).isoformat(),
+ applying_mark=True,
+ )
+ self._setitem(
+ QName(XMP_NS_PDF, 'Producer'),
+ 'pikepdf ' + pikepdf_version,
+ applying_mark=True,
+ )
+ xml = self._get_xml_bytes()
+ self._pdf.Root.Metadata = Stream(self._pdf, xml)
+ self._pdf.Root.Metadata[Name.Type] = Name.Metadata
+ self._pdf.Root.Metadata[Name.Subtype] = Name.XML
+ if self.sync_docinfo:
+ self._update_docinfo()
+
+ @classmethod
+ def _qname(cls, name: QName | str) -> str:
+ """Convert name to an XML QName.
+
+ e.g. pdf:Producer -> {http://ns.adobe.com/pdf/1.3/}Producer
+ """
+ if isinstance(name, QName):
+ return str(name)
+ if not isinstance(name, str):
+ raise TypeError(f"{name} must be str")
+ if name == '':
+ return name
+ if name.startswith('{'):
+ return name
+ try:
+ prefix, tag = name.split(':', maxsplit=1)
+ except ValueError:
+ # If missing the namespace, put it in the top level namespace
+ # To do this completely correct we actually need to figure out
+ # the namespace based on context defined by parent tags. That
+ # https://www.w3.org/2001/tag/doc/qnameids.html
+ prefix, tag = 'x', name
+ uri = cls.NS[prefix]
+ return str(QName(uri, tag))
+
+ def _prefix_from_uri(self, uriname):
+ """Given a fully qualified XML name, find a prefix.
+
+ e.g. {http://ns.adobe.com/pdf/1.3/}Producer -> pdf:Producer
+ """
+ uripart, tag = uriname.split('}', maxsplit=1)
+ uri = uripart.replace('{', '')
+ return self.REVERSE_NS[uri] + ':' + tag
+
+ def _get_subelements(self, node):
+ """Gather the sub-elements attached to a node.
+
+ Gather rdf:Bag and and rdf:Seq into set and list respectively. For
+ alternate languages values, take the first language only for
+ simplicity.
+ """
+ items = node.find('rdf:Alt', self.NS)
+ if items is not None:
+ try:
+ return items[0].text
+ except IndexError:
+ return ''
+
+ for xmlcontainer, container, insertfn in XMP_CONTAINERS:
+ items = node.find(f'rdf:{xmlcontainer}', self.NS)
+ if items is None:
+ continue
+ result = container()
+ for item in items:
+ insertfn(result, item.text)
+ return result
+ return ''
+
+ def _get_rdf_root(self):
+ rdf = self._xmp.find('.//rdf:RDF', self.NS)
+ if rdf is None:
+ rdf = self._xmp.getroot()
+ if not rdf.tag == '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF':
+ raise ValueError("Metadata seems to be XML but not XMP")
+ return rdf
+
+ def _get_elements(self, name: str | QName = ''):
+ """Get elements from XMP.
+
+ Core routine to find elements matching name within the XMP and yield
+ them.
+
+ For XMP spec 7.9.2.2, rdf:Description with property attributes,
+ we yield the node which will have the desired as one of its attributes.
+ qname is returned so that the node.attrib can be used to locate the
+ source.
+
+ For XMP spec 7.5, simple valued XMP properties, we yield the node,
+ None, and the value. For structure or array valued properties we gather
+ the elements. We ignore qualifiers.
+
+ Args:
+ name: a prefixed name or QName to look for within the
+ data section of the XMP; looks for all data keys if omitted
+
+ Yields:
+ tuple: (node, qname_attrib, value, parent_node)
+
+ """
+ qname = self._qname(name)
+ rdf = self._get_rdf_root()
+ for rdfdesc in rdf.findall('rdf:Description[@rdf:about=""]', self.NS):
+ if qname and qname in rdfdesc.keys():
+ yield (rdfdesc, qname, rdfdesc.get(qname), rdf)
+ elif not qname:
+ for k, v in rdfdesc.items():
+ if v:
+ yield (rdfdesc, k, v, rdf)
+ xpath = qname if name else '*'
+ for node in rdfdesc.findall(xpath, self.NS):
+ if node.text and node.text.strip():
+ yield (node, None, node.text, rdfdesc)
+ continue
+ values = self._get_subelements(node)
+ yield (node, None, values, rdfdesc)
+
+ def _get_element_values(self, name=''):
+ yield from (v[2] for v in self._get_elements(name))
+
+ @ensure_loaded
+ def __contains__(self, key: str | QName):
+ return any(self._get_element_values(key))
+
+ @ensure_loaded
+ def __getitem__(self, key: str | QName):
+ try:
+ return next(self._get_element_values(key))
+ except StopIteration:
+ raise KeyError(key) from None
+
+ @ensure_loaded
+ def __iter__(self):
+ for node, attrib, _val, _parents in self._get_elements():
+ if attrib:
+ yield attrib
+ else:
+ yield node.tag
+
+ @ensure_loaded
+ def __len__(self):
+ return len(list(iter(self)))
+
+ def _setitem(
+ self,
+ key: str | QName,
+ val: set[str] | list[str] | str,
+ applying_mark: bool = False,
+ ):
+ if not self._updating:
+ raise RuntimeError("Metadata not opened for editing, use with block")
+
+ qkey = self._qname(key)
+ self._setitem_check_args(key, val, applying_mark, qkey)
+
+ try:
+ # Update existing node
+ self._setitem_update(key, val, qkey)
+ except StopIteration:
+ # Insert a new node
+ self._setitem_insert(key, val)
+
+ def _setitem_check_args(self, key, val, applying_mark: bool, qkey: str) -> None:
+ if (
+ self.mark
+ and not applying_mark
+ and qkey
+ in (
+ self._qname('xmp:MetadataDate'),
+ self._qname('pdf:Producer'),
+ )
+ ):
+ # Complain if user writes self[pdf:Producer] = ... and because it will
+ # be overwritten on save, unless self._updating_mark, in which case
+ # the action was initiated internally
+ log.warning(
+ f"Update to {key} will be overwritten because metadata was opened "
+ "with set_pikepdf_as_editor=True"
+ )
+ if isinstance(val, str) and qkey in (self._qname('dc:creator')):
+ log.error(f"{key} should be set to a list of strings")
+
+ def _setitem_add_array(self, node, items: Iterable) -> None:
+ rdf_type = next(
+ c.rdf_type for c in XMP_CONTAINERS if isinstance(items, c.py_type)
+ )
+ seq = etree.SubElement(node, str(QName(XMP_NS_RDF, rdf_type)))
+ tag_attrib: dict[str, str] | None = None
+ if rdf_type == 'Alt':
+ tag_attrib = {str(QName(XMP_NS_XML, 'lang')): 'x-default'}
+ for item in items:
+ el = etree.SubElement(seq, str(QName(XMP_NS_RDF, 'li')), attrib=tag_attrib)
+ el.text = _clean(item)
+
+ def _setitem_update(self, key, val, qkey):
+ # Locate existing node to replace
+ node, attrib, _oldval, _parent = next(self._get_elements(key))
+ if attrib:
+ if not isinstance(val, str):
+ if qkey == self._qname('dc:creator'):
+ # dc:creator incorrectly created as an attribute - we're
+ # replacing it anyway, so remove the old one
+ del node.attrib[qkey]
+ self._setitem_add_array(node, _clean(val))
+ else:
+ raise TypeError(f"Setting {key} to {val} with type {type(val)}")
+ else:
+ node.set(attrib, _clean(val))
+ elif isinstance(val, (list, set)):
+ for child in node.findall('*'):
+ node.remove(child)
+ self._setitem_add_array(node, val)
+ elif isinstance(val, str):
+ for child in node.findall('*'):
+ node.remove(child)
+ if str(self._qname(key)) in LANG_ALTS:
+ self._setitem_add_array(node, AltList([_clean(val)]))
+ else:
+ node.text = _clean(val)
+ else:
+ raise TypeError(f"Setting {key} to {val} with type {type(val)}")
+
+ def _setitem_insert(self, key, val):
+ rdf = self._get_rdf_root()
+ if str(self._qname(key)) in LANG_ALTS:
+ val = AltList([_clean(val)])
+ if isinstance(val, (list, set)):
+ rdfdesc = etree.SubElement(
+ rdf,
+ str(QName(XMP_NS_RDF, 'Description')),
+ attrib={str(QName(XMP_NS_RDF, 'about')): ''},
+ )
+ node = etree.SubElement(rdfdesc, self._qname(key))
+ self._setitem_add_array(node, val)
+ elif isinstance(val, str):
+ _rdfdesc = etree.SubElement(
+ rdf,
+ str(QName(XMP_NS_RDF, 'Description')),
+ attrib={
+ QName(XMP_NS_RDF, 'about'): '',
+ self._qname(key): _clean(val),
+ },
+ )
+ else:
+ raise TypeError(f"Setting {key} to {val} with type {type(val)}") from None
+
+ @ensure_loaded
+ def __setitem__(self, key: str | QName, val: set[str] | list[str] | str):
+ return self._setitem(key, val, False)
+
+ @ensure_loaded
+ def __delitem__(self, key: str | QName):
+ if not self._updating:
+ raise RuntimeError("Metadata not opened for editing, use with block")
+ try:
+ node, attrib, _oldval, parent = next(self._get_elements(key))
+ if attrib: # Inline
+ del node.attrib[attrib]
+ if (
+ len(node.attrib) == 1
+ and len(node) == 0
+ and QName(XMP_NS_RDF, 'about') in node.attrib
+ ):
+ # The only thing left on this node is rdf:about="", so remove it
+ parent.remove(node)
+ else:
+ parent.remove(node)
+ except StopIteration:
+ raise KeyError(key) from None
+
+ @property
+ def pdfa_status(self) -> str:
+ """Return the PDF/A conformance level claimed by this PDF, or False.
+
+ A PDF may claim to PDF/A compliant without this being true. Use an
+ independent verifier such as veraPDF to test if a PDF is truly
+ conformant.
+
+ Returns:
+ The conformance level of the PDF/A, or an empty string if the
+ PDF does not claim PDF/A conformance. Possible valid values
+ are: 1A, 1B, 2A, 2B, 2U, 3A, 3B, 3U.
+ """
+ # do same as @ensure_loaded - mypy can't handle decorated property
+ if not self._xmp:
+ self._load()
+
+ key_part = QName(XMP_NS_PDFA_ID, 'part')
+ key_conformance = QName(XMP_NS_PDFA_ID, 'conformance')
+ try:
+ return self[key_part] + self[key_conformance]
+ except KeyError:
+ return ''
+
+ @property
+ def pdfx_status(self) -> str:
+ """Return the PDF/X conformance level claimed by this PDF, or False.
+
+ A PDF may claim to PDF/X compliant without this being true. Use an
+ independent verifier such as veraPDF to test if a PDF is truly
+ conformant.
+
+ Returns:
+ The conformance level of the PDF/X, or an empty string if the
+ PDF does not claim PDF/X conformance.
+ """
+ # do same as @ensure_loaded - mypy can't handle decorated property
+ if not self._xmp:
+ self._load()
+
+ pdfx_version = QName(XMP_NS_PDFX_ID, 'GTS_PDFXVersion')
+ try:
+ return self[pdfx_version]
+ except KeyError:
+ return ''
+
+ @ensure_loaded
+ def __str__(self):
+ return self._get_xml_bytes(xpacket=False).decode('utf-8')
diff --git a/env/lib/python3.10/site-packages/pikepdf/models/outlines.py b/env/lib/python3.10/site-packages/pikepdf/models/outlines.py
new file mode 100644
index 0000000..1143de6
--- /dev/null
+++ b/env/lib/python3.10/site-packages/pikepdf/models/outlines.py
@@ -0,0 +1,421 @@
+# SPDX-FileCopyrightText: 2022 James R. Barlow, 2020 Matthias Erll
+
+# SPDX-License-Identifier: MPL-2.0
+
+"""Support for document outlines (e.g. table of contents)."""
+
+from __future__ import annotations
+
+from enum import Enum
+from itertools import chain
+from typing import Iterable, List, cast
+
+from pikepdf import Array, Dictionary, Name, Object, Page, Pdf, String
+
+
+class PageLocation(Enum):
+ """Page view location definitions, from PDF spec."""
+
+ XYZ = 1
+ Fit = 2
+ FitH = 3
+ FitV = 4
+ FitR = 5
+ FitB = 6
+ FitBH = 7
+ FitBV = 8
+
+
+PAGE_LOCATION_ARGS = {
+ PageLocation.XYZ: ('left', 'top', 'zoom'),
+ PageLocation.FitH: ('top',),
+ PageLocation.FitV: ('left',),
+ PageLocation.FitR: ('left', 'bottom', 'right', 'top'),
+ PageLocation.FitBH: ('top',),
+ PageLocation.FitBV: ('left',),
+}
+ALL_PAGE_LOCATION_KWARGS = set(chain.from_iterable(PAGE_LOCATION_ARGS.values()))
+
+
+def make_page_destination(
+ pdf: Pdf,
+ page_num: int,
+ page_location: PageLocation | str | None = None,
+ *,
+ left: float | None = None,
+ top: float | None = None,
+ right: float | None = None,
+ bottom: float | None = None,
+ zoom: float | None = None,
+) -> Array:
+ """
+ Create a destination ``Array`` with reference to a Pdf document's page number.
+
+ Arguments:
+ pdf: PDF document object.
+ page_num: Page number (zero-based).
+ page_location: Optional page location, as a string or :enum:`PageLocation`.
+ left: Specify page viewport rectangle.
+ top: Specify page viewport rectangle.
+ right: Specify page viewport rectangle.
+ bottom: Specify page viewport rectangle.
+ zoom: Specify page viewport rectangle's zoom level.
+
+ left, top, right, bottom, zoom are used in conjunction with the page fit style
+ specified by *page_location*.
+ """
+ return _make_page_destination(
+ pdf,
+ page_num,
+ page_location=page_location,
+ left=left,
+ top=top,
+ right=right,
+ bottom=bottom,
+ zoom=zoom,
+ )
+
+
+def _make_page_destination(
+ pdf: Pdf,
+ page_num: int,
+ page_location: PageLocation | str | None = None,
+ **kwargs,
+) -> Array:
+ kwargs = {k: v for k, v in kwargs.items() if v is not None}
+
+ res: list[Dictionary | Name] = [pdf.pages[page_num].obj]
+ if page_location:
+ if isinstance(page_location, PageLocation):
+ loc_key = page_location
+ loc_str = loc_key.name
+ else:
+ loc_str = page_location
+ try:
+ loc_key = PageLocation[loc_str]
+ except KeyError:
+ raise ValueError(
+ f"Invalid or unsupported page location type {loc_str}"
+ ) from None
+ res.append(Name(f'/{loc_str}'))
+ dest_arg_names = PAGE_LOCATION_ARGS.get(loc_key)
+ if dest_arg_names:
+ res.extend(kwargs.get(k, 0) for k in dest_arg_names)
+ else:
+ res.append(Name.Fit)
+ return Array(res)
+
+
+class OutlineStructureError(Exception):
+ """Indicates an error in the outline data structure."""
+
+
+class OutlineItem:
+ """Manage a single item in a PDF document outlines structure.
+
+ Includes nested items.
+
+ Arguments:
+ title: Title of the outlines item.
+ destination: Page number, destination name, or any other PDF object
+ to be used as a reference when clicking on the outlines entry. Note
+ this should be ``None`` if an action is used instead. If set to a
+ page number, it will be resolved to a reference at the time of
+ writing the outlines back to the document.
+ page_location: Supplemental page location for a page number
+ in ``destination``, e.g. ``PageLocation.Fit``. May also be
+ a simple string such as ``'FitH'``.
+ action: Action to perform when clicking on this item. Will be ignored
+ during writing if ``destination`` is also set.
+ obj: ``Dictionary`` object representing this outlines item in a ``Pdf``.
+ May be ``None`` for creating a new object. If present, an existing
+ object is modified in-place during writing and original attributes
+ are retained.
+ left, top, bottom, right, zoom: Describes the viewport position associated
+ with a destination.
+
+ This object does not contain any information about higher-level or
+ neighboring elements.
+
+ Valid destination arrays:
+ [page /XYZ left top zoom]
+ generally
+ [page, PageLocationEntry, 0 to 4 ints]
+ """
+
+ def __init__(
+ self,
+ title: str,
+ destination: Array | String | Name | int | None = None,
+ page_location: PageLocation | str | None = None,
+ action: Dictionary | None = None,
+ obj: Dictionary | None = None,
+ *,
+ left: float | None = None,
+ top: float | None = None,
+ right: float | None = None,
+ bottom: float | None = None,
+ zoom: float | None = None,
+ ):
+ self.title = title
+ self.destination = destination
+ self.page_location = page_location
+ self.page_location_kwargs = {}
+ self.action = action
+ if self.destination is not None and self.action is not None:
+ raise ValueError("Only one of destination and action may be set")
+ self.obj = obj
+ kwargs = dict(left=left, top=top, right=right, bottom=bottom, zoom=zoom)
+ self.page_location_kwargs = {k: v for k, v in kwargs.items() if v is not None}
+ self.is_closed = False
+ self.children: list[OutlineItem] = []
+
+ def __str__(self):
+ if self.children:
+ if self.is_closed:
+ oc_indicator = '[+]'
+ else:
+ oc_indicator = '[-]'
+ else:
+ oc_indicator = '[ ]'
+ if self.destination is not None:
+ if isinstance(self.destination, Array):
+ # 12.3.2.2 Explicit destination
+ # [raw_page, /PageLocation.SomeThing, integer parameters for viewport]
+ raw_page = self.destination[0]
+ page = Page(raw_page)
+ dest = page.label
+ elif isinstance(self.destination, String):
+ # 12.3.2.2 Named destination, byte string reference to Names
+ dest = f'<Named Destination in document .Root.Names dictionary: {self.destination}>'
+ elif isinstance(self.destination, Name):
+ # 12.3.2.2 Named destination, name object (PDF 1.1)
+ dest = f'<Named Destination in document .Root.Dests dictionary: {self.destination}>'
+ elif isinstance(self.destination, int):
+ # Page number
+ dest = f'<Page {self.destination}>'
+ else:
+ dest = '<Action>'
+ return f'{oc_indicator} {self.title} -> {dest}'
+
+ def __repr__(self):
+ return f'<pikepdf.{self.__class__.__name__}: "{self.title}">'
+
+ @classmethod
+ def from_dictionary_object(cls, obj: Dictionary):
+ """Creates a ``OutlineItem`` from a ``Dictionary``.
+
+ Does not process nested items.
+
+ Arguments:
+ obj: ``Dictionary`` object representing a single outline node.
+ """
+ title = str(obj.Title)
+ destination = obj.get(Name.Dest)
+ if destination is not None and not isinstance(
+ destination, (Array, String, Name)
+ ):
+ # 12.3.3: /Dest may be a name, byte string or array
+ raise OutlineStructureError(
+ f"Unexpected object type in Outline's /Dest: {destination!r}"
+ )
+ action = obj.get(Name.A)
+ if action is not None and not isinstance(action, Dictionary):
+ raise OutlineStructureError(
+ f"Unexpected object type in Outline's /A: {action!r}"
+ )
+ return cls(title, destination=destination, action=action, obj=obj)
+
+ def to_dictionary_object(self, pdf: Pdf, create_new: bool = False) -> Dictionary:
+ """Creates/updates a ``Dictionary`` object from this outline node.
+
+ Page numbers are resolved to a page reference on the input
+ ``Pdf`` object.
+
+ Arguments:
+ pdf: PDF document object.
+ create_new: If set to ``True``, creates a new object instead of
+ modifying an existing one in-place.
+ """
+ if create_new or self.obj is None:
+ self.obj = obj = pdf.make_indirect(Dictionary())
+ else:
+ obj = self.obj
+ obj.Title = self.title
+ if self.destination is not None:
+ if isinstance(self.destination, int):
+ self.destination = make_page_destination(
+ pdf,
+ self.destination,
+ self.page_location,
+ **self.page_location_kwargs,
+ )
+ obj.Dest = self.destination
+ if Name.A in obj:
+ del obj.A
+ elif self.action is not None:
+ obj.A = self.action
+ if Name.Dest in obj:
+ del obj.Dest
+ return obj
+
+
+class Outline:
+ """Maintains a intuitive interface for creating and editing PDF document outlines.
+
+ See |pdfrm| section 12.3.
+
+ Arguments:
+ pdf: PDF document object.
+ max_depth: Maximum recursion depth to consider when reading the outline.
+ strict: If set to ``False`` (default) silently ignores structural errors.
+ Setting it to ``True`` raises a
+ :class:`pikepdf.OutlineStructureError`
+ if any object references re-occur while the outline is being read or
+ written.
+
+ See Also:
+ :meth:`pikepdf.Pdf.open_outline`
+ """
+
+ def __init__(self, pdf: Pdf, max_depth: int = 15, strict: bool = False):
+ self._root: list[OutlineItem] | None = None
+ self._pdf = pdf
+ self._max_depth = max_depth
+ self._strict = strict
+ self._updating = False
+
+ def __str__(self):
+ return str(self.root)
+
+ def __repr__(self):
+ return f'<pikepdf.{self.__class__.__name__}: {len(self.root)} items>'
+
+ def __enter__(self):
+ self._updating = True
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ try:
+ if exc_type is not None:
+ return
+ self._save()
+ finally:
+ self._updating = False
+
+ def _save_level_outline(
+ self,
+ parent: Dictionary,
+ outline_items: Iterable[OutlineItem],
+ level: int,
+ visited_objs: set[tuple[int, int]],
+ ):
+ count = 0
+ prev: Dictionary | None = None
+ first: Dictionary | None = None
+ for item in outline_items:
+ out_obj = item.to_dictionary_object(self._pdf)
+ objgen = out_obj.objgen
+ if objgen in visited_objs:
+ if self._strict:
+ raise OutlineStructureError(
+ f"Outline object {objgen} reoccurred in structure"
+ )
+ out_obj = item.to_dictionary_object(self._pdf, create_new=True)
+ else:
+ visited_objs.add(objgen)
+
+ out_obj.Parent = parent
+ count += 1
+ if prev is not None:
+ prev.Next = out_obj
+ out_obj.Prev = prev
+ else:
+ first = out_obj
+ if Name.Prev in out_obj:
+ del out_obj.Prev
+ prev = out_obj
+ if level < self._max_depth:
+ sub_items: Iterable[OutlineItem] = item.children
+ else:
+ sub_items = ()
+ self._save_level_outline(out_obj, sub_items, level + 1, visited_objs)
+ if item.is_closed:
+ out_obj.Count = -cast(int, out_obj.Count)
+ else:
+ count += cast(int, out_obj.Count)
+ if count:
+ assert prev is not None and first is not None
+ if Name.Next in prev:
+ del prev.Next
+ parent.First = first
+ parent.Last = prev
+ else:
+ if Name.First in parent:
+ del parent.First
+ if Name.Last in parent:
+ del parent.Last
+ parent.Count = count
+
+ def _load_level_outline(
+ self,
+ first_obj: Dictionary,
+ outline_items: list[Object],
+ level: int,
+ visited_objs: set[tuple[int, int]],
+ ):
+ current_obj: Dictionary | None = first_obj
+ while current_obj:
+ objgen = current_obj.objgen
+ if objgen in visited_objs:
+ if self._strict:
+ raise OutlineStructureError(
+ f"Outline object {objgen} reoccurred in structure"
+ )
+ return
+ visited_objs.add(objgen)
+
+ item = OutlineItem.from_dictionary_object(current_obj)
+ first_child = current_obj.get(Name.First)
+ if isinstance(first_child, Dictionary) and level < self._max_depth:
+ self._load_level_outline(
+ first_child, item.children, level + 1, visited_objs
+ )
+ count = current_obj.get(Name.Count)
+ if isinstance(count, int) and count < 0:
+ item.is_closed = True
+ outline_items.append(item)
+ next_obj = current_obj.get(Name.Next)
+ if next_obj is None or isinstance(next_obj, Dictionary):
+ current_obj = next_obj
+ else:
+ raise OutlineStructureError(
+ f"Outline object {objgen} points to non-dictionary"
+ )
+
+ def _save(self):
+ if self._root is None:
+ return
+ if Name.Outlines in self._pdf.Root:
+ outlines = self._pdf.Root.Outlines
+ else:
+ self._pdf.Root.Outlines = outlines = self._pdf.make_indirect(
+ Dictionary(Type=Name.Outlines)
+ )
+ self._save_level_outline(outlines, self._root, 0, set())
+
+ def _load(self):
+ self._root = root = []
+ if Name.Outlines not in self._pdf.Root:
+ return
+ outlines = self._pdf.Root.Outlines or {}
+ first_obj = outlines.get(Name.First)
+ if first_obj:
+ self._load_level_outline(first_obj, root, 0, set())
+
+ @property
+ def root(self) -> list[OutlineItem]:
+ """Return the root node of the outline."""
+ if self._root is None:
+ self._load()
+ return cast(List[OutlineItem], self._root)