diff options
Diffstat (limited to 'env/lib/python3.10/site-packages/pikepdf')
41 files changed, 6108 insertions, 0 deletions
diff --git a/env/lib/python3.10/site-packages/pikepdf/__init__.py b/env/lib/python3.10/site-packages/pikepdf/__init__.py new file mode 100644 index 0000000..bca1e92 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/__init__.py @@ -0,0 +1,100 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""A library for manipulating PDFs. + +isort:skip_file +""" + +try: + from . import _qpdf +except ImportError as _e: # pragma: no cover + _msg = "pikepdf's extension library failed to import" + raise ImportError(_msg) from _e + +try: + from ._version import __version__ +except ImportError as _e: # pragma: no cover + raise ImportError("Failed to determine version") from _e + +from ._qpdf import ( + AccessMode, + Annotation, + AttachedFileSpec, + ContentStreamInlineImage, + ContentStreamInstruction, + DataDecodingError, + ForeignObjectError, + Job, + JobUsageError, + NameTree, + NumberTree, + ObjectHelper, + ObjectStreamMode, + Page, + PasswordError, + Pdf, + PdfError, + Rectangle, + StreamDecodeLevel, + Token, + TokenFilter, + TokenType, +) + +from .objects import ( + Array, + Dictionary, + Name, + Object, + ObjectType, + Operator, + Stream, + String, +) + +from .models import ( + Encryption, + Outline, + OutlineItem, + OutlineStructureError, + PageLocation, + PdfImage, + PdfInlineImage, + PdfMatrix, + Permissions, + UnsupportedImageTypeError, + make_page_destination, + parse_content_stream, + unparse_content_stream, +) + +from . import settings + +# Importing these will monkeypatch classes defined in C++ and register a new +# pdfdoc codec +from . import _methods, codec + +# While _cpphelpers is intended to be called from our C++ code only, explicitly +# importing helps introspection tools like PyInstaller figure out that the module +# is necessary. +from . import _cpphelpers + +__libqpdf_version__ = _qpdf.qpdf_version() + + +# Provide pikepdf.{open, new} -> pikepdf.Pdf.{open, new} +open = Pdf.open # pylint: disable=redefined-builtin +new = Pdf.new + +# Exclude .open, .new here from to make sure from pikepdf import * does not clobber +# builtins.open() +# Exclude codec, objects, jbig2 because we import the interesting bits from them +# directly to here. +_exclude_from__all__ = {'open', 'new', 'codec', 'objects', 'jbig2'} + +__all__ = [ + k + for k in locals().keys() + if not k.startswith('_') and k not in _exclude_from__all__ +] diff --git a/env/lib/python3.10/site-packages/pikepdf/__pycache__/__init__.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/__pycache__/__init__.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..71dd313 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/__pycache__/__init__.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/__pycache__/_augments.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/__pycache__/_augments.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..4158830 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/__pycache__/_augments.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/__pycache__/_cpphelpers.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/__pycache__/_cpphelpers.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..b4fa25a --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/__pycache__/_cpphelpers.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/__pycache__/_exceptions.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/__pycache__/_exceptions.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..e8622ca --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/__pycache__/_exceptions.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/__pycache__/_methods.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/__pycache__/_methods.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..9c55237 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/__pycache__/_methods.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/__pycache__/_version.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/__pycache__/_version.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..fdfc76d --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/__pycache__/_version.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/__pycache__/_xml.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/__pycache__/_xml.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..6999039 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/__pycache__/_xml.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/__pycache__/codec.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/__pycache__/codec.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..d61f814 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/__pycache__/codec.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/__pycache__/jbig2.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/__pycache__/jbig2.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..d8356e2 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/__pycache__/jbig2.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/__pycache__/objects.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/__pycache__/objects.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..52714f4 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/__pycache__/objects.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/__pycache__/settings.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/__pycache__/settings.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..655e576 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/__pycache__/settings.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/_augments.py b/env/lib/python3.10/site-packages/pikepdf/_augments.py new file mode 100644 index 0000000..88fc6e5 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/_augments.py @@ -0,0 +1,151 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""A peculiar method of monkeypatching C++ binding classes with Python methods.""" + +from __future__ import annotations + +import inspect +import platform +import sys +from typing import Any, Callable, TypeVar + +if sys.version_info >= (3, 8): + from typing import Protocol +else: + from typing_extensions import Protocol # pragma: no cover + + +class AugmentedCallable(Protocol): + """Protocol for any method, with attached booleans.""" + + _augment_override_cpp: bool + _augment_if_no_cpp: bool + + def __call__(self, *args, **kwargs) -> Any: + """Any function.""" # pragma: no cover + + +def augment_override_cpp(fn: AugmentedCallable) -> AugmentedCallable: + """Replace the C++ implementation, if there is one.""" + fn._augment_override_cpp = True + return fn + + +def augment_if_no_cpp(fn: AugmentedCallable) -> AugmentedCallable: + """Provide a Python implementation if no C++ implementation exists.""" + fn._augment_if_no_cpp = True + return fn + + +def _is_inherited_method(meth: Callable) -> bool: + # Augmenting a C++ with a method that cls inherits from the Python + # object is never what we want. + return meth.__qualname__.startswith('object.') + + +def _is_augmentable(m: Any) -> bool: + return ( + inspect.isfunction(m) and not _is_inherited_method(m) + ) or inspect.isdatadescriptor(m) + + +Tcpp = TypeVar('Tcpp') +T = TypeVar('T') + + +def augments(cls_cpp: type[Tcpp]): + """Attach methods of a Python support class to an existing class. + + This monkeypatches all methods defined in the support class onto an + existing class. Example: + + .. code-block:: python + + @augments(ClassDefinedInCpp) + class SupportClass: + def foo(self): + pass + + The Python method 'foo' will be monkeypatched on ClassDefinedInCpp. SupportClass + has no meaning on its own and should not be used, but gets returned from + this function so IDE code inspection doesn't get too confused. + + We don't subclass because it's much more convenient to monkeypatch Python + methods onto the existing Python binding of the C++ class. For one thing, + this allows the implementation to be moved from Python to C++ or vice + versa. It saves having to implement an intermediate Python subclass and then + ensures that the C++ superclass never 'leaks' to pikepdf users. Finally, + wrapper classes and subclasses can become problematic if the call stack + crosses the C++/Python boundary multiple times. + + Any existing methods may be used, regardless of whether they are defined + elsewhere in the support class or in the target class. + + For data fields to work, the target class must be + tagged ``py::dynamic_attr`` in pybind11. + + Strictly, the target class does not have to be C++ or derived from pybind11. + This works on pure Python classes too. + + THIS DOES NOT work for class methods. + + (Alternative ideas: https://github.com/pybind/pybind11/issues/1074) + """ + OVERRIDE_WHITELIST = {'__eq__', '__hash__', '__repr__'} + if platform.python_implementation() == 'PyPy': + # Either PyPy or pybind11's interface to PyPy automatically adds a __getattr__ + OVERRIDE_WHITELIST |= {'__getattr__'} # pragma: no cover + + def class_augment(cls: type[T], cls_cpp: type[Tcpp] = cls_cpp) -> type[T]: + + # inspect.getmembers has different behavior on PyPy - in particular it seems + # that a typical PyPy class like cls will have more methods that it considers + # methods than CPython does. Our predicate should take care of this. + for name, member in inspect.getmembers(cls, predicate=_is_augmentable): + if name == '__weakref__': + continue + if ( + hasattr(cls_cpp, name) + and hasattr(cls, name) + and name not in getattr(cls, '__abstractmethods__', set()) + and name not in OVERRIDE_WHITELIST + and not getattr(getattr(cls, name), '_augment_override_cpp', False) + ): + if getattr(getattr(cls, name), '_augment_if_no_cpp', False): + # If tagged as "augment if no C++", we only want the binding to be + # applied when the primary class does not provide a C++ + # implementation. Usually this would be a function that not is + # provided by pybind11 in some template. + continue + + # If the original C++ class and Python support class both define the + # same name, we generally have a conflict, because this is augmentation + # not inheritance. However, if the method provided by the support class + # is an abstract method, then we can consider the C++ version the + # implementation. Also, pybind11 provides defaults for __eq__, + # __hash__ and __repr__ that we often do want to override directly. + + raise RuntimeError( + f"C++ {cls_cpp} and Python {cls} both define the same " + f"non-abstract method {name}: " + f"{getattr(cls_cpp, name, '')!r}, " + f"{getattr(cls, name, '')!r}" + ) + if inspect.isfunction(member): + setattr(cls_cpp, name, member) + installed_member = getattr(cls_cpp, name) + installed_member.__qualname__ = member.__qualname__.replace( + cls.__name__, cls_cpp.__name__ + ) + elif inspect.isdatadescriptor(member): + setattr(cls_cpp, name, member) + + def disable_init(self): + # Prevent initialization of the support class + raise NotImplementedError(self.__class__.__name__ + '.__init__') + + cls.__init__ = disable_init # type: ignore + return cls + + return class_augment diff --git a/env/lib/python3.10/site-packages/pikepdf/_cpphelpers.py b/env/lib/python3.10/site-packages/pikepdf/_cpphelpers.py new file mode 100644 index 0000000..4dff072 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/_cpphelpers.py @@ -0,0 +1,104 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""Support functions called by the C++ library binding layer. + +Not intended to be called from Python, and subject to change at any time. +""" + +from __future__ import annotations + +from typing import Callable +from warnings import warn + +from pikepdf import Dictionary, Name, Pdf + + +def update_xmp_pdfversion(pdf: Pdf, version: str) -> None: + """Update XMP metadata to specified PDF version.""" + if Name.Metadata not in pdf.Root: + return # Don't create an empty XMP object just to store the version + + with pdf.open_metadata(set_pikepdf_as_editor=False, update_docinfo=False) as meta: + if 'pdf:PDFVersion' in meta: + meta['pdf:PDFVersion'] = version + + +def _alpha(n: int) -> str: + """Excel-style column numbering A..Z, AA..AZ..BA..ZZ.., AAA.""" + if n < 1: + raise ValueError(f"Can't represent {n} in alphabetic numbering") + p = [] + while n > 0: + n, r = divmod(n - 1, 26) + p.append(r) + base = ord('A') + ords = [(base + v) for v in reversed(p)] + return ''.join(chr(o) for o in ords) + + +def _roman(n: int) -> str: + """Convert integer n to Roman numeral representation as a string.""" + if not (1 <= n <= 5000): + raise ValueError(f"Can't represent {n} in Roman numerals") + roman_numerals = ( + (1000, 'M'), + (900, 'CM'), + (500, 'D'), + (400, 'CD'), + (100, 'C'), + (90, 'XC'), + (50, 'L'), + (40, 'XL'), + (10, 'X'), + (9, 'IX'), + (5, 'V'), + (4, 'IV'), + (1, 'I'), + ) + roman = "" + for value, numeral in roman_numerals: + while n >= value: + roman += numeral + n -= value + return roman + + +LABEL_STYLE_MAP: dict[Name, Callable[[int], str]] = { + Name.D: str, + Name.A: _alpha, + Name.a: lambda x: _alpha(x).lower(), + Name.R: _roman, + Name.r: lambda x: _roman(x).lower(), +} + + +def label_from_label_dict(label_dict: int | Dictionary) -> str: + """Convert a label dictionary returned by QPDF into a text string.""" + if isinstance(label_dict, int): + return str(label_dict) + + label = '' + if Name.P in label_dict: + prefix = label_dict[Name.P] + label += str(prefix) + + # If there is no S, return only the P portion + if Name.S in label_dict: + # St defaults to 1 + numeric_value = label_dict[Name.St] if Name.St in label_dict else 1 + if not isinstance(numeric_value, int): + warn( + "Page label dictionary has invalid non-integer start value", UserWarning + ) + numeric_value = 1 + + style = label_dict[Name.S] + if isinstance(style, Name): + style_fn = LABEL_STYLE_MAP[style] + value = style_fn(numeric_value) + label += value + else: + warn("Page label dictionary has invalid page label style", UserWarning) + + return label diff --git a/env/lib/python3.10/site-packages/pikepdf/_exceptions.py b/env/lib/python3.10/site-packages/pikepdf/_exceptions.py new file mode 100644 index 0000000..8f2412f --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/_exceptions.py @@ -0,0 +1,8 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +from __future__ import annotations + + +class DependencyError(Exception): + """A third party dependency is needed to extract streams of this type.""" diff --git a/env/lib/python3.10/site-packages/pikepdf/_methods.py b/env/lib/python3.10/site-packages/pikepdf/_methods.py new file mode 100644 index 0000000..25e1d95 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/_methods.py @@ -0,0 +1,1340 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""Implement some features in Python and monkey-patch them onto C++ classes. + +In several cases the implementation of some higher levels features might as +well be in Python. Fortunately we can attach Python methods to C++ class +bindings after the fact. + +We can also move the implementation to C++ if desired. +""" + +from __future__ import annotations + +import datetime +import mimetypes +import shutil +from collections.abc import KeysView, MutableMapping +from decimal import Decimal +from io import BytesIO +from pathlib import Path +from subprocess import run +from tempfile import NamedTemporaryFile +from typing import BinaryIO, Callable, ItemsView, Iterator, TypeVar, ValuesView +from warnings import warn + +from . import Array, Dictionary, Name, Object, Page, Pdf, Stream +from ._augments import augment_override_cpp, augments +from ._qpdf import ( + AccessMode, + AttachedFile, + AttachedFileSpec, + Attachments, + NameTree, + NumberTree, + ObjectStreamMode, + Rectangle, + StreamDecodeLevel, + StreamParser, + Token, + _ObjectMapping, +) +from .models import Encryption, EncryptionInfo, Outline, PdfMetadata, Permissions +from .models.metadata import decode_pdf_date, encode_pdf_date + +# pylint: disable=no-member,unsupported-membership-test,unsubscriptable-object +# mypy: ignore-errors + +__all__ = [] + +Numeric = TypeVar('Numeric', int, float, Decimal) + + +def _single_page_pdf(page) -> bytes: + """Construct a single page PDF from the provided page in memory.""" + pdf = Pdf.new() + pdf.pages.append(page) + bio = BytesIO() + pdf.save(bio) + bio.seek(0) + return bio.read() + + +def _mudraw(buffer, fmt) -> bytes: + """Use mupdf draw to rasterize the PDF in the memory buffer.""" + # mudraw cannot read from stdin so NamedTemporaryFile is required + with NamedTemporaryFile(suffix='.pdf') as tmp_in: + tmp_in.write(buffer) + tmp_in.seek(0) + tmp_in.flush() + + proc = run( + ['mudraw', '-F', fmt, '-o', '-', tmp_in.name], + capture_output=True, + check=True, + ) + return proc.stdout + + +@augments(Object) +class Extend_Object: + def _ipython_key_completions_(self): + if isinstance(self, (Dictionary, Stream)): + return self.keys() + return None + + def emplace(self, other: Object, retain=(Name.Parent,)): + """Copy all items from other without making a new object. + + Particularly when working with pages, it may be desirable to remove all + of the existing page's contents and emplace (insert) a new page on top + of it, in a way that preserves all links and references to the original + page. (Or similarly, for other Dictionary objects in a PDF.) + + Any Dictionary keys in the iterable *retain* are preserved. By default, + /Parent is retained. + + When a page is assigned (``pdf.pages[0] = new_page``), only the + application knows if references to the original the original page are + still valid. For example, a PDF optimizer might restructure a page + object into another visually similar one, and references would be valid; + but for a program that reorganizes page contents such as a N-up + compositor, references may not be valid anymore. + + This method takes precautions to ensure that child objects in common + with ``self`` and ``other`` are not inadvertently deleted. + + Example: + >>> pdf.pages[0].objgen + (16, 0) + >>> pdf.pages[0].emplace(pdf.pages[1]) + >>> pdf.pages[0].objgen + (16, 0) # Same object + + .. versionchanged:: 2.11.1 + Added the *retain* argument. + """ + if not self.same_owner_as(other): + raise TypeError("Objects must have the same owner for emplace()") + + # .keys() returns strings, so make all strings + retain = {str(k) for k in retain} + self_keys = set(self.keys()) + other_keys = set(other.keys()) + + assert all(isinstance(k, str) for k in (retain | self_keys | other_keys)) + + del_keys = self_keys - other_keys - retain + for k in (k for k in other_keys if k not in retain): + self[k] = other[k] # pylint: disable=unsupported-assignment-operation + for k in del_keys: + del self[k] # pylint: disable=unsupported-delete-operation + + def _type_check_write(self, filter_, decode_parms): + if isinstance(filter_, list): + filter_ = Array(filter_) + filter_ = filter_.wrap_in_array() + + if isinstance(decode_parms, list): + decode_parms = Array(decode_parms) + elif decode_parms is None: + decode_parms = Array([]) + else: + decode_parms = decode_parms.wrap_in_array() + + if not all(isinstance(item, Name) for item in filter_): + raise TypeError( + "filter must be: pikepdf.Name or pikepdf.Array([pikepdf.Name])" + ) + if not all( + (isinstance(item, Dictionary) or item is None) for item in decode_parms + ): + raise TypeError( + "decode_parms must be: pikepdf.Dictionary or " + "pikepdf.Array([pikepdf.Dictionary])" + ) + if len(decode_parms) != 0 and len(filter_) != len(decode_parms): + raise ValueError( + f"filter ({repr(filter_)}) and decode_parms " + f"({repr(decode_parms)}) must be arrays of same length" + ) + if len(filter_) == 1: + filter_ = filter_[0] + if len(decode_parms) == 0: + decode_parms = None + elif len(decode_parms) == 1: + decode_parms = decode_parms[0] + return filter_, decode_parms + + def write( + self, + data: bytes, + *, + filter: Name | Array | None = None, + decode_parms: Dictionary | Array | None = None, + type_check: bool = True, + ): # pylint: disable=redefined-builtin + """ + Replace stream object's data with new (possibly compressed) `data`. + + `filter` and `decode_parms` describe any compression that is already + present on the input `data`. For example, if your data is already + compressed with the Deflate algorithm, you would set + ``filter=Name.FlateDecode``. + + When writing the PDF in :meth:`pikepdf.Pdf.save`, + pikepdf may change the compression or apply compression to data that was + not compressed, depending on the parameters given to that function. It + will never change lossless to lossy encoding. + + PNG and TIFF images, even if compressed, cannot be directly inserted + into a PDF and displayed as images. + + Args: + data: the new data to use for replacement + filter: The filter(s) with which the + data is (already) encoded + decode_parms: Parameters for the + filters with which the object is encode + type_check: Check arguments; use False only if you want to + intentionally create malformed PDFs. + + If only one `filter` is specified, it may be a name such as + `Name('/FlateDecode')`. If there are multiple filters, then array + of names should be given. + + If there is only one filter, `decode_parms` is a Dictionary of + parameters for that filter. If there are multiple filters, then + `decode_parms` is an Array of Dictionary, where each array index + is corresponds to the filter. + """ + if type_check and filter is not None: + filter, decode_parms = self._type_check_write(filter, decode_parms) + + self._write(data, filter=filter, decode_parms=decode_parms) + + +@augments(Pdf) +class Extend_Pdf: + def _repr_mimebundle_( + self, include=None, exclude=None + ): # pylint: disable=unused-argument + """ + Present options to IPython or Jupyter for rich display of this object. + + See https://ipython.readthedocs.io/en/stable/config/integrating.html#rich-display + """ + bio = BytesIO() + self.save(bio) + bio.seek(0) + + data = {'application/pdf': bio.read()} + return data + + @property + def docinfo(self) -> Dictionary: + """ + Access the (deprecated) document information dictionary. + + The document information dictionary is a brief metadata record that can + store some information about the origin of a PDF. It is deprecated and + removed in the PDF 2.0 specification (not deprecated from the + perspective of pikepdf). Use the ``.open_metadata()`` API instead, which + will edit the modern (and unfortunately, more complicated) XMP metadata + object and synchronize changes to the document information dictionary. + + This property simplifies access to the actual document information + dictionary and ensures that it is created correctly if it needs to be + created. + + A new, empty dictionary will be created if this property is accessed + and dictionary does not exist. (This is to ensure that convenient code + like ``pdf.docinfo[Name.Title] = "Title"`` will work when the dictionary + does not exist at all.) + + You can delete the document information dictionary by deleting this property, + ``del pdf.docinfo``. Note that accessing the property after deleting it + will re-create with a new, empty dictionary. + + .. versionchanged: 2.4 + Added support for ``del pdf.docinfo``. + """ + if Name.Info not in self.trailer: + self.trailer.Info = self.make_indirect(Dictionary()) + return self.trailer.Info + + @docinfo.setter + def docinfo(self, new_docinfo: Dictionary): + if not new_docinfo.is_indirect: + raise ValueError( + "docinfo must be an indirect object - use Pdf.make_indirect" + ) + self.trailer.Info = new_docinfo + + @docinfo.deleter + def docinfo(self): + if Name.Info in self.trailer: + del self.trailer.Info + + def open_metadata( + self, + set_pikepdf_as_editor: bool = True, + update_docinfo: bool = True, + strict: bool = False, + ) -> PdfMetadata: + """ + Open the PDF's XMP metadata for editing. + + There is no ``.close()`` function on the metadata object, since this is + intended to be used inside a ``with`` block only. + + For historical reasons, certain parts of PDF metadata are stored in + two different locations and formats. This feature coordinates edits so + that both types of metadata are updated consistently and "atomically" + (assuming single threaded access). It operates on the ``Pdf`` in memory, + not any file on disk. To persist metadata changes, you must still use + ``Pdf.save()``. + + Example: + >>> with pdf.open_metadata() as meta: + meta['dc:title'] = 'Set the Dublic Core Title' + meta['dc:description'] = 'Put the Abstract here' + + Args: + set_pikepdf_as_editor: Automatically update the metadata ``pdf:Producer`` + to show that this version of pikepdf is the most recent software to + modify the metadata, and ``xmp:MetadataDate`` to timestamp the update. + Recommended, except for testing. + + update_docinfo: Update the standard fields of DocumentInfo + (the old PDF metadata dictionary) to match the corresponding + XMP fields. The mapping is described in + :attr:`PdfMetadata.DOCINFO_MAPPING`. Nonstandard DocumentInfo + fields and XMP metadata fields with no DocumentInfo equivalent + are ignored. + + strict: If ``False`` (the default), we aggressively attempt + to recover from any parse errors in XMP, and if that fails we + overwrite the XMP with an empty XMP record. If ``True``, raise + errors when either metadata bytes are not valid and well-formed + XMP (and thus, XML). Some trivial cases that are equivalent to + empty or incomplete "XMP skeletons" are never treated as errors, + and always replaced with a proper empty XMP block. Certain + errors may be logged. + """ + return PdfMetadata( + self, + pikepdf_mark=set_pikepdf_as_editor, + sync_docinfo=update_docinfo, + overwrite_invalid_xml=not strict, + ) + + def open_outline(self, max_depth: int = 15, strict: bool = False) -> Outline: + """ + Open the PDF outline ("bookmarks") for editing. + + Recommend for use in a ``with`` block. Changes are committed to the + PDF when the block exits. (The ``Pdf`` must still be opened.) + + Example: + >>> with pdf.open_outline() as outline: + outline.root.insert(0, OutlineItem('Intro', 0)) + + Args: + max_depth: Maximum recursion depth of the outline to be + imported and re-written to the document. ``0`` means only + considering the root level, ``1`` the first-level + sub-outline of each root element, and so on. Items beyond + this depth will be silently ignored. Default is ``15``. + strict: With the default behavior (set to ``False``), + structural errors (e.g. reference loops) in the PDF document + will only cancel processing further nodes on that particular + level, recovering the valid parts of the document outline + without raising an exception. When set to ``True``, any such + error will raise an ``OutlineStructureError``, leaving the + invalid parts in place. + Similarly, outline objects that have been accidentally + duplicated in the ``Outline`` container will be silently + fixed (i.e. reproduced as new objects) or raise an + ``OutlineStructureError``. + """ + return Outline(self, max_depth=max_depth, strict=strict) + + def make_stream(self, data: bytes, d=None, **kwargs) -> Stream: + """ + Create a new pikepdf.Stream object that is attached to this PDF. + + See: + :meth:`pikepdf.Stream.__new__` + + """ + return Stream(self, data, d, **kwargs) + + def add_blank_page( + self, *, page_size: tuple[Numeric, Numeric] = (612.0, 792.0) + ) -> Page: + """ + Add a blank page to this PDF. + + If pages already exist, the page will be added to the end. Pages may be + reordered using ``Pdf.pages``. + + The caller may add content to the page by modifying its objects after creating + it. + + Args: + page_size (tuple): The size of the page in PDF units (1/72 inch or 0.35mm). + Default size is set to a US Letter 8.5" x 11" page. + """ + for dim in page_size: + if not (3 <= dim <= 14400): + raise ValueError('Page size must be between 3 and 14400 PDF units') + + page_dict = Dictionary( + Type=Name.Page, + MediaBox=Array([0, 0, page_size[0], page_size[1]]), + Contents=self.make_stream(b''), + Resources=Dictionary(), + ) + page_obj = self.make_indirect(page_dict) + self._add_page(page_obj, first=False) + return Page(page_obj) + + def close(self) -> None: + """ + Close a ``Pdf`` object and release resources acquired by pikepdf. + + If pikepdf opened the file handle it will close it (e.g. when opened with a file + path). If the caller opened the file for pikepdf, the caller close the file. + ``with`` blocks will call close when exit. + + pikepdf lazily loads data from PDFs, so some :class:`pikepdf.Object` may + implicitly depend on the :class:`pikepdf.Pdf` being open. This is always the + case for :class:`pikepdf.Stream` but can be true for any object. Do not close + the `Pdf` object if you might still be accessing content from it. + + When an ``Object`` is copied from one ``Pdf`` to another, the ``Object`` is copied into + the destination ``Pdf`` immediately, so after accessing all desired information + from the source ``Pdf`` it may be closed. + + .. versionchanged:: 3.0 + In pikepdf 2.x, this function actually worked by resetting to a very short + empty PDF. Code that relied on this quirk may not function correctly. + """ + self._close() + if getattr(self, '_tmp_stream', None): + self._tmp_stream.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + @property + def allow(self) -> Permissions: + """ + Report permissions associated with this PDF. + + By default these permissions will be replicated when the PDF is + saved. Permissions may also only be changed when a PDF is being saved, + and are only available for encrypted PDFs. If a PDF is not encrypted, + all operations are reported as allowed. + + pikepdf has no way of enforcing permissions. + """ + results = {} + for field in Permissions._fields: + results[field] = getattr(self, '_allow_' + field) + return Permissions(**results) + + @property + def encryption(self) -> EncryptionInfo: + """ + Report encryption information for this PDF. + + Encryption settings may only be changed when a PDF is saved. + """ + return EncryptionInfo(self._encryption_data) + + def check(self) -> list[str]: + """ + Check if PDF is well-formed. + + Similar to ``qpdf --check``. + """ + + class DiscardingParser(StreamParser): + def __init__(self): # pylint: disable=useless-super-delegation + super().__init__() # required for C++ + + def handle_object(self, *_args): + pass + + def handle_eof(self): + pass + + problems: list[str] = [] + + self._decode_all_streams_and_discard() + + discarding_parser = DiscardingParser() + for page in self.pages: + page.parse_contents(discarding_parser) + + for warning in self.get_warnings(): + problems.append("WARNING: " + warning) + + return problems + + def save( + self, + filename_or_stream: Path | str | BinaryIO | None = None, + *, + static_id: bool = False, + preserve_pdfa: bool = True, + min_version: str | tuple[str, int] = "", + force_version: str | tuple[str, int] = "", + fix_metadata_version: bool = True, + compress_streams: bool = True, + stream_decode_level: StreamDecodeLevel | None = None, + object_stream_mode: ObjectStreamMode = ObjectStreamMode.preserve, + normalize_content: bool = False, + linearize: bool = False, + qdf: bool = False, + progress: Callable[[int], None] = None, + encryption: Encryption | bool | None = None, + recompress_flate: bool = False, + deterministic_id: bool = False, + ) -> None: + """ + Save all modifications to this :class:`pikepdf.Pdf`. + + Args: + filename_or_stream: Where to write the output. If a file + exists in this location it will be overwritten. + If the file was opened with ``allow_overwriting_input=True``, + then it is permitted to overwrite the original file, and + this parameter may be omitted to implicitly use the original + filename. Otherwise, the filename may not be the same as the + input file, as overwriting the input file would corrupt data + since pikepdf using lazy loading. + + static_id: Indicates that the ``/ID`` metadata, normally + calculated as a hash of certain PDF contents and metadata + including the current time, should instead be set to a static + value. Only use this for debugging and testing. Use + ``deterministic_id`` if you want to get the same ``/ID`` for + the same document contents. + preserve_pdfa: Ensures that the file is generated in a + manner compliant with PDF/A and other stricter variants. + This should be True, the default, in most cases. + + min_version: Sets the minimum version of PDF + specification that should be required. If left alone QPDF + will decide. If a tuple, the second element is an integer, the + extension level. If the version number is not a valid format, + QPDF will decide what to do. + force_version: Override the version recommend by QPDF, + potentially creating an invalid file that does not display + in old versions. See QPDF manual for details. If a tuple, the + second element is an integer, the extension level. + fix_metadata_version: If ``True`` (default) and the XMP metadata + contains the optional PDF version field, ensure the version in + metadata is correct. If the XMP metadata does not contain a PDF + version field, none will be added. To ensure that the field is + added, edit the metadata and insert a placeholder value in + ``pdf:PDFVersion``. If XMP metadata does not exist, it will + not be created regardless of the value of this argument. + + object_stream_mode: + ``disable`` prevents the use of object streams. + ``preserve`` keeps object streams from the input file. + ``generate`` uses object streams wherever possible, + creating the smallest files but requiring PDF 1.5+. + + compress_streams: Enables or disables the compression of + stream objects in the PDF that are created without specifying + any compression setting. Metadata is never compressed. + By default this is set to ``True``, and should be except + for debugging. Existing streams in the PDF or streams will not + be modified. To decompress existing streams, you must set + both ``compress_streams=False`` and ``stream_decode_level`` + to the desired decode level (e.g. ``.generalized`` will + decompress most non-image content). + + stream_decode_level: Specifies how + to encode stream objects. See documentation for + :class:`pikepdf.StreamDecodeLevel`. + + recompress_flate: When disabled (the default), qpdf does not + uncompress and recompress streams compressed with the Flate + compression algorithm. If True, pikepdf will instruct qpdf to + do this, which may be useful if recompressing streams to a + higher compression level. + + normalize_content: Enables parsing and reformatting the + content stream within PDFs. This may debugging PDFs easier. + + linearize: Enables creating linear or "fast web view", + where the file's contents are organized sequentially so that + a viewer can begin rendering before it has the whole file. + As a drawback, it tends to make files larger. + + qdf: Save output QDF mode. QDF mode is a special output + mode in QPDF to allow editing of PDFs in a text editor. Use + the program ``fix-qdf`` to fix convert back to a standard + PDF. + + progress: Specify a callback function that is called + as the PDF is written. The function will be called with an + integer between 0-100 as the sole parameter, the progress + percentage. This function may not access or modify the PDF + while it is being written, or data corruption will almost + certainly occur. + + encryption: If ``False`` + or omitted, existing encryption will be removed. If ``True`` + encryption settings are copied from the originating PDF. + Alternately, an ``Encryption`` object may be provided that + sets the parameters for new encryption. + + deterministic_id: Indicates that the ``/ID`` metadata, normally + calculated as a hash of certain PDF contents and metadata + including the current time, should instead be computed using + only deterministic data like the file contents. At a small + runtime cost, this enables generation of the same ``/ID`` if + the same inputs are converted in the same way multiple times. + Does not work for encrypted files. + + Raises: + PdfError + ForeignObjectError + ValueError + + You may call ``.save()`` multiple times with different parameters + to generate different versions of a file, and you *may* continue + to modify the file after saving it. ``.save()`` does not modify + the ``Pdf`` object in memory, except possibly by updating the XMP + metadata version with ``fix_metadata_version``. + + .. note:: + + :meth:`pikepdf.Pdf.remove_unreferenced_resources` before saving + may eliminate unnecessary resources from the output file if there + are any objects (such as images) that are referenced in a page's + Resources dictionary but never called in the page's content stream. + + .. note:: + + pikepdf can read PDFs with incremental updates, but always + coalesces any incremental updates into a single non-incremental + PDF file when saving. + + .. versionchanged:: 2.7 + Added *recompress_flate*. + + .. versionchanged:: 3.0 + Keyword arguments now mandatory for everything except the first + argument. + """ + if not filename_or_stream and getattr(self, '_original_filename', None): + filename_or_stream = self._original_filename + if not filename_or_stream: + raise ValueError( + "Cannot save to original filename because the original file was " + "not opening using Pdf.open(..., allow_overwriting_input=True). " + "Either specify a new destination filename/file stream or open " + "with allow_overwriting_input=True. If this Pdf was created using " + "Pdf.new(), you must specify a destination object since there is " + "no original filename to save to." + ) + self._save( + filename_or_stream, + static_id=static_id, + preserve_pdfa=preserve_pdfa, + min_version=min_version, + force_version=force_version, + fix_metadata_version=fix_metadata_version, + compress_streams=compress_streams, + stream_decode_level=stream_decode_level, + object_stream_mode=object_stream_mode, + normalize_content=normalize_content, + linearize=linearize, + qdf=qdf, + progress=progress, + encryption=encryption, + samefile_check=getattr(self, '_tmp_stream', None) is None, + recompress_flate=recompress_flate, + deterministic_id=deterministic_id, + ) + + @staticmethod + def open( + filename_or_stream: Path | str | BinaryIO, + *, + password: str | bytes = "", + hex_password: bool = False, + ignore_xref_streams: bool = False, + suppress_warnings: bool = True, + attempt_recovery: bool = True, + inherit_page_attributes: bool = True, + access_mode: AccessMode = AccessMode.default, + allow_overwriting_input: bool = False, + ) -> Pdf: + """ + Open an existing file at *filename_or_stream*. + + If *filename_or_stream* is path-like, the file will be opened for reading. + The file should not be modified by another process while it is open in + pikepdf, or undefined behavior may occur. This is because the file may be + lazily loaded. Despite this restriction, pikepdf does not try to use any OS + services to obtain an exclusive lock on the file. Some applications may + want to attempt this or copy the file to a temporary location before + editing. This behaviour changes if *allow_overwriting_input* is set: the whole + file is then read and copied to memory, so that pikepdf can overwrite it + when calling ``.save()``. + + When this function is called with a stream-like object, you must ensure + that the data it returns cannot be modified, or undefined behavior will + occur. + + Any changes to the file must be persisted by using ``.save()``. + + If *filename_or_stream* has ``.read()`` and ``.seek()`` methods, the file + will be accessed as a readable binary stream. pikepdf will read the + entire stream into a private buffer. + + ``.open()`` may be used in a ``with``-block; ``.close()`` will be called when + the block exits, if applicable. + + Whenever pikepdf opens a file, it will close it. If you open the file + for pikepdf or give it a stream-like object to read from, you must + release that object when appropriate. + + Examples: + >>> with Pdf.open("test.pdf") as pdf: + ... + + >>> pdf = Pdf.open("test.pdf", password="rosebud") + + Args: + filename_or_stream: Filename or Python readable and seekable file + stream of PDF to open. + password: User or owner password to open an + encrypted PDF. If the type of this parameter is ``str`` + it will be encoded as UTF-8. If the type is ``bytes`` it will + be saved verbatim. Passwords are always padded or + truncated to 32 bytes internally. Use ASCII passwords for + maximum compatibility. + hex_password: If True, interpret the password as a + hex-encoded version of the exact encryption key to use, without + performing the normal key computation. Useful in forensics. + ignore_xref_streams: If True, ignore cross-reference + streams. See qpdf documentation. + suppress_warnings: If True (default), warnings are not + printed to stderr. Use :meth:`pikepdf.Pdf.get_warnings()` to + retrieve warnings. + attempt_recovery: If True (default), attempt to recover + from PDF parsing errors. + inherit_page_attributes: If True (default), push attributes + set on a group of pages to individual pages + access_mode: If ``.default``, pikepdf will + decide how to access the file. Currently, it will always + selected stream access. To attempt memory mapping and fallback + to stream if memory mapping failed, use ``.mmap``. Use + ``.mmap_only`` to require memory mapping or fail + (this is expected to only be useful for testing). Applications + should be prepared to handle the SIGBUS signal on POSIX in + the event that the file is successfully mapped but later goes + away. + allow_overwriting_input: If True, allows calling ``.save()`` + to overwrite the input file. This is performed by loading the + entire input file into memory at open time; this will use more + memory and may recent performance especially when the opened + file will not be modified. + + Raises: + pikepdf.PasswordError: If the password failed to open the + file. + pikepdf.PdfError: If for other reasons we could not open + the file. + TypeError: If the type of ``filename_or_stream`` is not + usable. + FileNotFoundError: If the file was not found. + + Note: + When *filename_or_stream* is a stream and the stream is located on a + network, pikepdf assumes that the stream using buffering and read caches + to achieve reasonable performance. Streams that fetch data over a network + in response to every read or seek request, no matter how small, will + perform poorly. It may be easier to download a PDF from network to + temporary local storage (such as ``io.BytesIO``), manipulate it, and + then re-upload it. + + .. versionchanged:: 3.0 + Keyword arguments now mandatory for everything except the first + argument. + """ + if isinstance(filename_or_stream, bytes) and filename_or_stream.startswith( + b'%PDF-' + ): + warn( + "It looks like you called with Pdf.open(data) with a bytes-like object " + "containing a PDF. This will probably fail because this function " + "expects a filename or opened file-like object. Instead, please use " + "Pdf.open(BytesIO(data))." + ) + + tmp_stream, original_filename = None, False + if allow_overwriting_input: + try: + Path(filename_or_stream) + except TypeError as error: + raise ValueError( + '"allow_overwriting_input=True" requires "open" first argument ' + 'to be a file path' + ) from error + original_filename = Path(filename_or_stream) + with open(original_filename, 'rb') as pdf_file: + tmp_stream = BytesIO() + shutil.copyfileobj(pdf_file, tmp_stream) + pdf = Pdf._open( + tmp_stream or filename_or_stream, + password=password, + hex_password=hex_password, + ignore_xref_streams=ignore_xref_streams, + suppress_warnings=suppress_warnings, + attempt_recovery=attempt_recovery, + inherit_page_attributes=inherit_page_attributes, + access_mode=access_mode, + ) + pdf._tmp_stream = tmp_stream + pdf._original_filename = original_filename + return pdf + + +@augments(_ObjectMapping) +class Extend_ObjectMapping: + def get(self, key, default=None) -> Object: + try: + return self[key] + except KeyError: + return default + + +def check_is_box(obj) -> None: + try: + if obj.is_rectangle: + return + except AttributeError: + pass + + try: + pdfobj = Array(obj) + if pdfobj.is_rectangle: + return + except Exception as e: + raise ValueError("object is not a rectangle") from e + + raise ValueError("object is not a rectangle") + + +@augments(Page) +class Extend_Page: + @property + def mediabox(self): + """Return page's /MediaBox, in PDF units.""" + return self._get_mediabox(True) + + @mediabox.setter + def mediabox(self, value): + check_is_box(value) + self.obj['/MediaBox'] = value + + @property + def cropbox(self): + """Return page's effective /CropBox, in PDF units. + + If the /CropBox is not defined, the /MediaBox is returned. + """ + return self._get_cropbox(True, False) + + @cropbox.setter + def cropbox(self, value): + check_is_box(value) + self.obj['/CropBox'] = value + + @property + def trimbox(self): + """Return page's effective /TrimBox, in PDF units. + + If the /TrimBox is not defined, the /CropBox is returned (and if + /CropBox is not defined, /MediaBox is returned). + """ + return self._get_trimbox(True, False) + + @trimbox.setter + def trimbox(self, value): + check_is_box(value) + self.obj['/TrimBox'] = value + + @property + def images(self) -> _ObjectMapping: + """Return all regular images associated with this page. + + This method does not recurse into Form XObjects and does not + attempt to find inline images. + """ + return self._images + + @property + def resources(self) -> Dictionary: + """Return this page's resources dictionary.""" + return self.obj['/Resources'] + + def add_resource( + self, + res: Object, + res_type: Name, + name: Name | None = None, + *, + prefix: str = '', + replace_existing: bool = True, + ) -> Name: + """Add a new resource to the page's Resources dictionary. + + If the Resources dictionaries do not exist, they will be created. + + Args: + self: The object to add to the resources dictionary. + res: The dictionary object to insert into the resources + dictionary. + res_type: Should be one of the following Resource dictionary types: + ExtGState, ColorSpace, Pattern, Shading, XObject, Font, Properties. + name: The name of the object. If omitted, a random name will be + generated with enough randomness to be globally unique. + prefix: A prefix for the name of the object. Allows conveniently + namespacing when using random names, e.g. prefix="Im" for images. + Mutually exclusive with name parameter. + replace_existing: If the name already exists in one of the resource + dictionaries, remove it. + + Example: + >>> resource_name = pdf.pages[0].add_resource(formxobj, Name.XObject) + + .. versionadded:: 2.3 + + .. versionchanged:: 2.14 + If *res* does not belong to the same `Pdf` that owns this page, + a copy of *res* is automatically created and added instead. In previous + versions, it was necessary to change for this case manually. + + .. versionchanged:: 4.3.0 + Returns the name of the overlay in the resources dictionary instead + of returning None. + """ + if Name.Resources not in self.obj: + self.obj.Resources = Dictionary() + elif not isinstance(self.obj.Resources, Dictionary): + raise TypeError("Page /Resources exists but is not a dictionary") + resources = self.obj.Resources + + if res_type not in resources: + resources[res_type] = Dictionary() + + if name is not None and prefix: + raise ValueError("Must specify one of name= or prefix=") + if name is None: + name = Name.random(prefix=prefix) + + for res_dict in resources.as_dict().values(): + if not isinstance(res_dict, Dictionary): + continue + if name in res_dict: + if replace_existing: + del res_dict[name] + else: + raise ValueError(f"Name {name} already exists in page /Resources") + + resources[res_type][name] = res.with_same_owner_as(self.obj) + return name + + def _over_underlay( + self, + other, + rect: Rectangle | None, + under: bool, + push_stack: bool, + shrink: bool, + expand: bool, + ) -> Name: + formx = None + if isinstance(other, Page): + formx = other.as_form_xobject() + elif isinstance(other, Dictionary) and other.get(Name.Type) == Name.Page: + formx = Page(other).as_form_xobject() + elif ( + isinstance(other, Stream) + and other.get(Name.Type) == Name.XObject + and other.get(Name.Subtype) == Name.Form + ): + formx = other + + if formx is None: + raise TypeError( + "other object is not something we can convert to Form XObject" + ) + + if rect is None: + rect = Rectangle(self.trimbox) + + formx_placed_name = self.add_resource(formx, Name.XObject) + cs = self.calc_form_xobject_placement( + formx, formx_placed_name, rect, allow_shrink=shrink, allow_expand=expand + ) + + if push_stack: + self.contents_add(b'q\n', prepend=True) # prepend q + self.contents_add(b'Q\n', prepend=False) # i.e. append Q + + self.contents_add(cs, prepend=under) + self.contents_coalesce() + return formx_placed_name + + def add_overlay( + self, + other: Object | Page, + rect: Rectangle | None = None, + *, + push_stack: bool = True, + shrink: bool = True, + expand: bool = True, + ) -> Name: + """Overlay another object on this page. + + Overlays will be drawn after all previous content, potentially drawing on top + of existing content. + + Args: + other: A Page or Form XObject to render as an overlay on top of this + page. + rect: The PDF rectangle (in PDF units) in which to draw the overlay. + If omitted, this page's trimbox, cropbox or mediabox (in that order) + will be used. + push_stack: If True (default), push the graphics stack of the existing + content stream to ensure that the overlay is rendered correctly. + Officially PDF limits the graphics stack depth to 32. Most + viewers will tolerate more, but excessive pushes may cause problems. + Multiple content streams may also be coalesced into a single content + stream where this parameter is True, since the PDF specification + permits PDF writers to coalesce streams as they see fit. + shrink: If True (default), allow the object to shrink to fit inside the + rectangle. The aspect ratio will be preserved. + expand: If True (default), allow the object to expand to fit inside the + rectangle. The aspect ratio will be preserved. + + Returns: + The name of the Form XObject that contains the overlay. + + .. versionadded:: 2.14 + + .. versionchanged:: 4.0.0 + Added the *push_stack* parameter. Previously, this method behaved + as if *push_stack* were False. + + .. versionchanged:: 4.2.0 + Added the *shrink* and *expand* parameters. Previously, this method + behaved as if ``shrink=True, expand=False``. + + .. versionchanged:: 4.3.0 + Returns the name of the overlay in the resources dictionary instead + of returning None. + """ + return self._over_underlay( + other, + rect, + under=False, + push_stack=push_stack, + expand=expand, + shrink=shrink, + ) + + def add_underlay( + self, + other: Object | Page, + rect: Rectangle | None = None, + *, + shrink: bool = True, + expand: bool = True, + ) -> Name: + """Underlay another object beneath this page. + + Underlays will be drawn before all other content, so they may be overdrawn + partially or completely. + + There is no *push_stack* parameter for this function, since adding an + underlay can be done without manipulating the graphics stack. + + Args: + other: A Page or Form XObject to render as an underlay underneath this + page. + rect: The PDF rectangle (in PDF units) in which to draw the underlay. + If omitted, this page's trimbox, cropbox or mediabox (in that order) + will be used. + shrink: If True (default), allow the object to shrink to fit inside the + rectangle. The aspect ratio will be preserved. + expand: If True (default), allow the object to expand to fit inside the + rectangle. The aspect ratio will be preserved. + + Returns: + The name of the Form XObject that contains the underlay. + + .. versionadded:: 2.14 + + .. versionchanged:: 4.2.0 + Added the *shrink* and *expand* parameters. Previously, this method + behaved as if ``shrink=True, expand=False``. Fixed issue with wrong + page rect being selected. + """ + return self._over_underlay( + other, rect, under=True, push_stack=False, expand=expand, shrink=shrink + ) + + def contents_add(self, contents: Stream | bytes, *, prepend: bool = False): + """Append or prepend to an existing page's content stream. + + Args: + contents: An existing content stream to append or prepend. + prepend: Prepend if true, append if false (default). + + .. versionadded:: 2.14 + """ + return self._contents_add(contents, prepend=prepend) + + def __getattr__(self, name): + return getattr(self.obj, name) + + @augment_override_cpp + def __setattr__(self, name, value): + if hasattr(self.__class__, name): + object.__setattr__(self, name, value) + else: + setattr(self.obj, name, value) + + @augment_override_cpp + def __delattr__(self, name): + if hasattr(self.__class__, name): + object.__delattr__(self, name) + else: + delattr(self.obj, name) + + def __getitem__(self, key): + return self.obj[key] + + def __setitem__(self, key, value): + self.obj[key] = value + + def __delitem__(self, key): + del self.obj[key] + + def __contains__(self, key): + return key in self.obj + + def get(self, key, default=None): + try: + return self[key] + except KeyError: + return default + + def emplace(self, other: Page, retain=(Name.Parent,)): + return self.obj.emplace(other.obj, retain=retain) + + def __repr__(self): + return ( + repr(self.obj) + .replace('Dictionary', 'Page', 1) + .replace('(Type="/Page")', '', 1) + ) + + def _repr_mimebundle_(self, include=None, exclude=None): + data = {} + bundle = {'application/pdf', 'image/png'} + if include: + bundle = {k for k in bundle if k in include} + if exclude: + bundle = {k for k in bundle if k not in exclude} + pagedata = _single_page_pdf(self.obj) + if 'application/pdf' in bundle: + data['application/pdf'] = pagedata + if 'image/png' in bundle: + try: + data['image/png'] = _mudraw(pagedata, 'png') + except (FileNotFoundError, RuntimeError): + pass + return data + + +@augments(Token) +class Extend_Token: + def __repr__(self): + return f'pikepdf.Token({self.type_}, {self.raw_value})' + + +@augments(Rectangle) +class Extend_Rectangle: + def __repr__(self): + return f'pikepdf.Rectangle({self.llx}, {self.lly}, {self.urx}, {self.ury})' + + def __hash__(self): + return hash((self.llx, self.lly, self.urx, self.ury)) + + +@augments(Attachments) +class Extend_Attachments(MutableMapping): + def __getitem__(self, k: str) -> AttachedFileSpec: + filespec = self._get_filespec(k) + if filespec is None: + raise KeyError(k) + return filespec + + def __setitem__(self, k: str, v: AttachedFileSpec) -> None: + if not v.filename: + v.filename = k + return self._add_replace_filespec(k, v) + + def __delitem__(self, k: str) -> None: + return self._remove_filespec(k) + + def __len__(self): + return len(self._get_all_filespecs()) + + def __iter__(self) -> Iterator[str]: + yield from self._get_all_filespecs() + + def __repr__(self): + return f"<pikepdf._qpdf.Attachments with {len(self)} attached files>" + + +@augments(AttachedFileSpec) +class Extend_AttachedFileSpec: + @staticmethod + def from_filepath(pdf: Pdf, path: Path | str, *, description: str = ''): + """Construct a file specification from a file path. + + This function will automatically add a creation and modified date + using the file system, and a MIME type inferred from the file's extension. + + If the data required for the attach is in memory, use + :meth:`pikepdf.AttachedFileSpec` instead. + + Args: + pdf: The Pdf to attach this file specification to. + path: A file path for the file to attach to this Pdf. + description: An optional description. May be shown to the user in + PDF viewers. + """ + mime, _ = mimetypes.guess_type(str(path)) + if mime is None: + mime = '' + if not isinstance(path, Path): + path = Path(path) + + stat = path.stat() + return AttachedFileSpec( + pdf, + path.read_bytes(), + description=description, + filename=str(path.name), + mime_type=mime, + creation_date=encode_pdf_date( + datetime.datetime.fromtimestamp(stat.st_ctime) + ), + mod_date=encode_pdf_date(datetime.datetime.fromtimestamp(stat.st_mtime)), + ) + + def __repr__(self): + if self.filename: + return ( + f"<pikepdf._qpdf.AttachedFileSpec for {self.filename!r}, " + f"description {self.description!r}>" + ) + return f"<pikepdf._qpdf.AttachedFileSpec description {self.description!r}>" + + +@augments(AttachedFile) +class Extend_AttachedFile: + @property + def creation_date(self) -> datetime.datetime | None: + if not self._creation_date: + return None + return decode_pdf_date(self._creation_date) + + @creation_date.setter + def creation_date(self, value: datetime.datetime): + self._creation_date = encode_pdf_date(value) + + @property + def mod_date(self) -> datetime.datetime | None: + if not self._mod_date: + return None + return decode_pdf_date(self._mod_date) + + @mod_date.setter + def mod_date(self, value: datetime.datetime): + self._mod_date = encode_pdf_date(value) + + def read_bytes(self) -> bytes: + return self.obj.read_bytes() + + def __repr__(self): + return ( + f'<pikepdf._qpdf.AttachedFile objid={self.obj.objgen} size={self.size} ' + f'mime_type={self.mime_type} creation_date={self.creation_date} ' + f'mod_date={self.mod_date}>' + ) + + +@augments(NameTree) +class Extend_NameTree: + def keys(self): + return KeysView(self._as_map()) + + def values(self): + return ValuesView(self._as_map()) + + def items(self): + return ItemsView(self._as_map()) + + get = MutableMapping.get + pop = MutableMapping.pop + popitem = MutableMapping.popitem + clear = MutableMapping.clear + update = MutableMapping.update + setdefault = MutableMapping.setdefault + + +MutableMapping.register(NameTree) + + +@augments(NumberTree) +class Extend_NumberTree: + def keys(self): + return KeysView(self._as_map()) + + def values(self): + return ValuesView(self._as_map()) + + def items(self): + return ItemsView(self._as_map()) + + get = MutableMapping.get + pop = MutableMapping.pop + popitem = MutableMapping.popitem + clear = MutableMapping.clear + update = MutableMapping.update + setdefault = MutableMapping.setdefault + + +MutableMapping.register(NumberTree) diff --git a/env/lib/python3.10/site-packages/pikepdf/_qpdf.cpython-310-x86_64-linux-gnu.so b/env/lib/python3.10/site-packages/pikepdf/_qpdf.cpython-310-x86_64-linux-gnu.so Binary files differnew file mode 100755 index 0000000..31165aa --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/_qpdf.cpython-310-x86_64-linux-gnu.so diff --git a/env/lib/python3.10/site-packages/pikepdf/_qpdf.pyi b/env/lib/python3.10/site-packages/pikepdf/_qpdf.pyi new file mode 100644 index 0000000..828891a --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/_qpdf.pyi @@ -0,0 +1,762 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +from __future__ import annotations + +# pybind11 does not generate type annotations yet, and mypy doesn't understand +# the way we're augmenting C++ classes with Python methods as in +# pikepdf/_methods.py. Thus, we need to manually spell out the resulting types +# after augmenting. +import datetime +import sys +from abc import abstractmethod +from decimal import Decimal +from enum import Enum +from pathlib import Path +from typing import ( + Any, + BinaryIO, + Callable, + ClassVar, + Collection, + Iterable, + Iterator, + KeysView, + Mapping, + MutableMapping, + Sequence, + TypeVar, + overload, +) + +if sys.version_info >= (3, 8): + from typing import Literal +else: + from typing_extensions import Literal # pragma: no cover + +from pikepdf.models.encryption import Encryption, EncryptionInfo, Permissions +from pikepdf.models.image import PdfInlineImage +from pikepdf.models.metadata import PdfMetadata +from pikepdf.models.outlines import Outline +from pikepdf.objects import Array, Dictionary, Name, Stream, String + +# This is the whole point of stub files, but apparently we have to do this... +# pylint: disable=no-method-argument,unused-argument,no-self-use,too-many-public-methods + +T = TypeVar('T', bound='Object') +Numeric = TypeVar('Numeric', int, float, Decimal) + +class Buffer: ... + +# Exceptions + +class DataDecodingError(Exception): ... +class JobUsageError(Exception): ... +class PasswordError(Exception): ... +class PdfError(Exception): ... +class ForeignObjectError(Exception): ... + +# Enums +class AccessMode(Enum): + default: int = ... + mmap: int = ... + mmap_only: int = ... + stream: int = ... + +class EncryptionMethod(Enum): + none: int = ... + unknown: int = ... + rc4: int = ... + aes: int = ... + aesv3: int = ... + +class ObjectStreamMode(Enum): + disable: int = ... + generate: int = ... + preserve: int = ... + +class ObjectType(Enum): + array: int = ... + boolean: int = ... + dictionary: int = ... + inlineimage: int = ... + integer: int = ... + name_: int = ... + null: int = ... + operator: int = ... + real: int = ... + reserved: int = ... + stream: int = ... + string: int = ... + uninitialized: int = ... + +class StreamDecodeLevel(Enum): + all: int = ... + generalized: int = ... + none: int = ... + specialized: int = ... + +class TokenType(Enum): + array_close: int = ... + array_open: int = ... + bad: int = ... + bool: int = ... + brace_close: int = ... + brace_open: int = ... + comment: int = ... + dict_close: int = ... + dict_open: int = ... + eof: int = ... + inline_image: int = ... + integer: int = ... + name_: int = ... + null: int = ... + real: int = ... + space: int = ... + string: int = ... + word: int = ... + +class Object: + def _ipython_key_completions_(self) -> KeysView | None: ... + def _inline_image_raw_bytes(self) -> bytes: ... + def _parse_page_contents(self, callbacks: Callable) -> None: ... + def _parse_page_contents_grouped( + self, whitelist: str + ) -> list[tuple[Collection[Object | PdfInlineImage], Operator]]: ... + @staticmethod + def _parse_stream(stream: Object, parser: StreamParser) -> list: ... + @staticmethod + def _parse_stream_grouped(stream: Object, whitelist: str) -> list: ... + def _repr_mimebundle_(self, include=None, exclude=None) -> dict | None: ... + def _write( + self, + data: bytes, + filter: Object, # pylint: disable=redefined-builtin + decode_parms: Object, + ) -> None: ... + def append(self, pyitem: Any) -> None: ... + def as_dict(self) -> _ObjectMapping: ... + def as_list(self) -> _ObjectList: ... + def emplace(self, other: Object, retain: Iterable[Name] = ...) -> None: ... + def extend(self, arg0: Iterable[Object]) -> None: ... + @overload + def get(self, key: str, default: T | None = ...) -> Object | T | None: ... + @overload + def get(self, key: Name, default: T | None = ...) -> Object | T | None: ... + def get_raw_stream_buffer(self) -> Buffer: ... + def get_stream_buffer(self, decode_level: StreamDecodeLevel = ...) -> Buffer: ... + def is_owned_by(self, possible_owner: Pdf) -> bool: ... + def items(self) -> Iterable[tuple[str, Object]]: ... + def keys(self) -> set[str]: ... + @staticmethod + def parse(stream: bytes, description: str = ...) -> Object: ... + def read_bytes(self, decode_level: StreamDecodeLevel = ...) -> bytes: ... + def read_raw_bytes(self) -> bytes: ... + def same_owner_as(self, other: Object) -> bool: ... + def to_json(self, dereference: bool = ...) -> bytes: ... + def unparse(self, resolved: bool = ...) -> bytes: ... + def with_same_owner_as(self, arg0: Object) -> Object: ... + def wrap_in_array(self) -> Object: ... + def write( + self, + data: bytes, + *, + filter: Name | Array | None = ..., # pylint: disable=redefined-builtin + decode_parms: Dictionary | Array | None = ..., + type_check: bool = ..., + ) -> None: ... + def __bytes__(self) -> bytes: ... + @overload + def __contains__(self, arg0: Object) -> bool: ... + @overload + def __contains__(self, arg0: str) -> bool: ... + def __copy__(self) -> Object: ... + def __delattr__(self, arg0: str) -> None: ... + @overload + def __delitem__(self, arg0: str) -> None: ... + @overload + def __delitem__(self, arg0: Object) -> None: ... + @overload + def __delitem__(self, arg0: int) -> None: ... + def __dir__(self) -> list: ... + def __eq__(self, other: Any) -> bool: ... + def __getattr__(self, arg0: str) -> Object: ... + @overload + def __getitem__(self, arg0: str) -> Object: ... + @overload + def __getitem__(self, arg0: Object) -> Object: ... + @overload + def __getitem__(self, arg0: int) -> Object: ... + def __hash__(self) -> int: ... + def __iter__(self) -> Iterable[Object]: ... + def __len__(self) -> int: ... + def __setattr__(self, arg0: str, arg1: object) -> None: ... + @overload + def __setitem__(self, arg0: str, arg1: Object) -> None: ... + @overload + def __setitem__(self, arg0: Object, arg1: Object) -> None: ... + @overload + def __setitem__(self, arg0: str, arg1: object) -> None: ... + @overload + def __setitem__(self, arg0: Object, arg1: object) -> None: ... + @overload + def __setitem__(self, arg0: int, arg1: Object) -> None: ... + @overload + def __setitem__(self, arg0: int, arg1: object) -> None: ... + @property + def _objgen(self) -> tuple[int, int]: ... + @property + def _type_code(self) -> ObjectType: ... + @property + def _type_name(self) -> str: ... + @property + def images(self) -> _ObjectMapping: ... + @property + def is_indirect(self) -> bool: ... + @property + def is_rectangle(self) -> bool: ... + @property + def objgen(self) -> tuple[int, int]: ... + @property + def stream_dict(self) -> Object: ... + @stream_dict.setter + def stream_dict(self, val: Object) -> None: ... + +class ObjectHelper: + def __eq__(self, other: Any) -> bool: ... + @property + def obj(self) -> Object: ... + +class _ObjectList: + @overload + def __init__(self) -> None: ... + @overload + def __init__(self, arg0: _ObjectList) -> None: ... + @overload + def __init__(self, arg0: Iterable) -> None: ... + @overload + def __init__(*args, **kwargs) -> None: ... + def append(self, x: Object) -> None: ... + def clear(self) -> None: ... + def count(self, x: Object) -> int: ... + @overload + def extend(self, L: _ObjectList) -> None: ... + @overload + def extend(self, L: Iterable[Object]) -> None: ... + def insert(self, i: int, x: Object) -> None: ... + @overload + def pop(self) -> Object: ... + @overload + def pop(self, i: int) -> Object: ... + @overload + def pop(*args, **kwargs) -> Any: ... + def remove(self, x: Object) -> None: ... + def __bool__(self) -> bool: ... + def __contains__(self, x: Object) -> bool: ... + @overload + def __delitem__(self, arg0: int) -> None: ... + @overload + def __delitem__(self, arg0: slice) -> None: ... + @overload + def __delitem__(*args, **kwargs) -> Any: ... + def __eq__(self, other: Any) -> bool: ... + @overload + def __getitem__(self, s: slice) -> _ObjectList: ... + @overload + def __getitem__(self, arg0: int) -> Object: ... + @overload + def __getitem__(*args, **kwargs) -> Any: ... + def __iter__(self) -> Iterator[Object]: ... + def __len__(self) -> int: ... + def __ne__(self, other: Any) -> bool: ... + @overload + def __setitem__(self, arg0: int, arg1: Object) -> None: ... + @overload + def __setitem__(self, arg0: slice, arg1: _ObjectList) -> None: ... + @overload + def __setitem__(*args, **kwargs) -> Any: ... + +class _ObjectMapping: + get: Any = ... + keys: Any = ... + values: Any = ... + __contains__: Any = ... + def __init__(self) -> None: ... + def items(self) -> Iterator: ... + def __bool__(self) -> bool: ... + def __delitem__(self, arg0: str) -> None: ... + def __getitem__(self, arg0: str) -> Object: ... + def __iter__(self) -> Iterator: ... + def __len__(self) -> int: ... + def __setitem__(self, arg0: str, arg1: Object) -> None: ... + +class Operator(Object): ... + +class Annotation: + def __init__(self, arg0: Object) -> None: ... + @overload + def get_appearance_stream(self, which: Object) -> Object: ... + @overload + def get_appearance_stream(self, which: Object, state: Object) -> Object: ... + def get_page_content_for_appearance( + self, + name: Object, + rotate: int, + required_flags: int = ..., + forbidden_flags: int = ..., + ) -> bytes: ... + @property + def appearance_dict(self) -> Object: ... + @property + def appearance_state(self) -> Object: ... + @property + def flags(self) -> int: ... + @property + def obj(self) -> Object: ... + @property + def subtype(self) -> str: ... + +class AttachedFile: + _creation_date: str + _mod_date: str + creation_date: datetime.datetime | None + mime_type: str + mod_date: datetime.datetime | None + @property + def md5(self) -> bytes: ... + @property + def obj(self) -> Object: ... + def read_bytes(self) -> bytes: ... + @property + def size(self) -> int: ... + +class AttachedFileSpec: + description: str + filename: str + def __init__( + self, + data: bytes, + *, + description: str, + filename: str, + mime_type: str, + creation_date: str, + mod_date: str, + ) -> None: ... + def get_all_filenames(self) -> dict: ... + @overload + def get_file(self) -> AttachedFile: ... + @overload + def get_file(self, name: Name) -> AttachedFile: ... + @property + def obj(self) -> Object: ... + @staticmethod + def from_filepath( + pdf: Pdf, path: Path | str, *, description: str = '' + ) -> AttachedFileSpec: ... + +class Attachments(MutableMapping[str, AttachedFileSpec]): + def __contains__(self, k: object) -> bool: ... + def __delitem__(self, k: str) -> None: ... + def __eq__(self, other: Any) -> bool: ... + def __getitem__(self, k: str) -> AttachedFileSpec: ... + def __iter__(self) -> Iterator[str]: ... + def __len__(self) -> int: ... + def __setitem__(self, k: str, v: AttachedFileSpec): ... + def __init__(self, *args, **kwargs) -> None: ... + def _add_replace_filespec(self, arg0: str, arg1: AttachedFileSpec) -> None: ... + def _get_all_filespecs(self) -> dict[str, AttachedFileSpec]: ... + def _get_filespec(self, arg0: str) -> AttachedFileSpec: ... + def _remove_filespec(self, arg0: str) -> bool: ... + @property + def _has_embedded_files(self) -> bool: ... + +class Token: + def __init__(self, arg0: TokenType, arg1: bytes) -> None: ... + def __eq__(self, other: Any) -> bool: ... + @property + def error_msg(self) -> str: ... + @property + def raw_value(self) -> bytes: ... + @property + def type_(self) -> TokenType: ... + @property + def value(self) -> str: ... + +class _QPDFTokenFilter: ... + +class TokenFilter(_QPDFTokenFilter): + def __init__(self) -> None: ... + def handle_token(self, token: Token = ...) -> None | list | Token: ... + +class StreamParser: + def __init__(self) -> None: ... + @abstractmethod + def handle_eof(self) -> None: ... + @abstractmethod + def handle_object(self, obj: Object, offset: int, length: int) -> None: ... + +class Page: + _repr_mimebundle_: Any = ... + @overload + def __init__(self, arg0: Object) -> None: ... + @overload + def __init__(self, arg0: Page) -> None: ... + def __contains__(self, key: Any) -> bool: ... + def __delattr__(self, name: Any) -> None: ... + def __eq__(self, other: Any) -> bool: ... + def __getattr__(self, name: Any) -> Object: ... + def __getitem__(self, name: Any) -> Object: ... + def __setattr__(self, name: Any, value: Any): ... + def __setitem__(self, name: Any, value: Any): ... + def _get_cropbox(self, arg0: bool, arg1: bool) -> Object: ... + def _get_mediabox(self, arg0: bool) -> Object: ... + def _get_trimbox(self, arg0: bool, arg1: bool) -> Object: ... + def add_content_token_filter(self, tf: TokenFilter) -> None: ... + def add_overlay( + self, + other: Object | Page, + rect: Rectangle | None, + *, + push_stack: bool | None = ..., + ): ... + def add_underlay(self, other: Object | Page, rect: Rectangle | None): ... + def as_form_xobject(self, handle_transformations: bool = ...) -> Object: ... + def calc_form_xobject_placement( + self, + formx: Object, + name: Name, + rec: Rectangle, + *, + invert_transformations: bool, + allow_shrink: bool, + allow_expand: bool, + ) -> bytes: ... + def contents_add(self, contents: Stream | bytes, *, prepend: bool) -> None: ... + def contents_coalesce(self) -> None: ... + def emplace(self, other: Page, retain: Iterable[Name]) -> None: ... + def externalize_inline_images(self, min_size: int = ...) -> None: ... + def get(self, key: str | Name, default: T | None = ...) -> T | None | Object: ... + def get_filtered_contents(self, tf: TokenFilter) -> bytes: ... + def index(self) -> int: ... + def label(self) -> str: ... + def parse_contents(self, arg0: StreamParser) -> None: ... + def remove_unreferenced_resources(self) -> None: ... + def rotate(self, angle: int, relative: bool) -> None: ... + @property + def images(self) -> _ObjectMapping: ... + @property + def cropbox(self) -> Array: ... + @cropbox.setter + def cropbox(self, val: Array) -> None: ... + @property + def mediabox(self) -> Array: ... + @mediabox.setter + def mediabox(self, val: Array) -> None: ... + @property + def obj(self) -> Dictionary: ... + @property + def trimbox(self) -> Array: ... + @trimbox.setter + def trimbox(self, val: Array) -> None: ... + @property + def resources(self) -> Dictionary: ... + def add_resource( + self, + res: Object, + res_type: Name, + name: Name | None = None, + *, + prefix: str = '', + replace_existing: bool = True, + ) -> Name: ... + +class PageList: + def __init__(self, *args, **kwargs) -> None: ... + def append(self, page: Page) -> None: ... + @overload + def extend(self, other: PageList) -> None: ... + @overload + def extend(self, iterable: Iterable[Page]) -> None: ... + def insert(self, index: int, obj: Page) -> None: ... + def p(self, pnum: int) -> Page: ... + def remove(self, **kwargs) -> None: ... + def reverse(self) -> None: ... + @overload + def __delitem__(self, arg0: int) -> None: ... + @overload + def __delitem__(self, arg0: slice) -> None: ... + @overload + def __getitem__(self, arg0: int) -> Page: ... + @overload + def __getitem__(self, arg0: slice) -> list[Page]: ... + def __iter__(self) -> PageList: ... + def __len__(self) -> int: ... + def __next__(self) -> Page: ... + @overload + def __setitem__(self, arg0: int, arg1: Page) -> None: ... + @overload + def __setitem__(self, arg0: slice, arg1: Iterable[Page]) -> None: ... + +class Pdf: + _repr_mimebundle_: Any = ... + def add_blank_page(self, *, page_size: tuple[Numeric, Numeric] = ...) -> Page: ... + def __enter__(self) -> Pdf: ... + def __exit__(self, exc_type, exc_value, traceback) -> None: ... + def __init__(self, *args, **kwargs) -> None: ... + def _add_page(self, page: Object, first: bool = ...) -> None: ... + def _decode_all_streams_and_discard(self) -> None: ... + def _get_object_id(self, arg0: int, arg1: int) -> Object: ... + def _process(self, arg0: str, arg1: bytes) -> None: ... + def _remove_page(self, arg0: Object) -> None: ... + def _replace_object(self, arg0: tuple[int, int], arg1: Object) -> None: ... + def _swap_objects(self, arg0: tuple[int, int], arg1: tuple[int, int]) -> None: ... + def check(self) -> list[str]: ... + def check_linearization(self, stream: object = ...) -> bool: ... + def close(self) -> None: ... + def copy_foreign(self, h: Object) -> Object: ... + @overload + def get_object(self, objgen: tuple[int, int]) -> Object: ... + @overload + def get_object(self, objid: int, gen: int) -> Object: ... + def get_warnings(self) -> list: ... + @overload + def make_indirect(self, h: T) -> T: ... + @overload + def make_indirect(self, obj: Any) -> Object: ... + def make_stream(self, data: bytes, d=None, **kwargs) -> Stream: ... + @classmethod + def new(cls) -> Pdf: ... + @staticmethod + def open( + filename_or_stream: Path | str | BinaryIO, + *, + password: str | bytes = "", + hex_password: bool = False, + ignore_xref_streams: bool = False, + suppress_warnings: bool = True, + attempt_recovery: bool = True, + inherit_page_attributes: bool = True, + access_mode: AccessMode = AccessMode.default, + allow_overwriting_input: bool = False, + ) -> Pdf: ... + def open_metadata( + self, + set_pikepdf_as_editor: bool = True, + update_docinfo: bool = True, + strict: bool = False, + ) -> PdfMetadata: ... + def open_outline(self, max_depth: int = 15, strict: bool = False) -> Outline: ... + def remove_unreferenced_resources(self) -> None: ... + def save( + self, + filename_or_stream: Path | str | BinaryIO | None = None, + *, + static_id: bool = False, + preserve_pdfa: bool = True, + min_version: str | tuple[str, int] = "", + force_version: str | tuple[str, int] = "", + fix_metadata_version: bool = True, + compress_streams: bool = True, + stream_decode_level: StreamDecodeLevel | None = None, + object_stream_mode: ObjectStreamMode = ObjectStreamMode.preserve, + normalize_content: bool = False, + linearize: bool = False, + qdf: bool = False, + progress: Callable[[int], None] = None, + encryption: Encryption | bool | None = None, + recompress_flate: bool = False, + deterministic_id: bool = False, + ) -> None: ... + def show_xref_table(self) -> None: ... + @property + def Root(self) -> Object: ... + @property + def _allow_accessibility(self) -> bool: ... + @property + def _allow_extract(self) -> bool: ... + @property + def _allow_modify_all(self) -> bool: ... + @property + def _allow_modify_annotation(self) -> bool: ... + @property + def _allow_modify_assembly(self) -> bool: ... + @property + def _allow_modify_form(self) -> bool: ... + @property + def _allow_modify_other(self) -> bool: ... + @property + def _allow_print_highres(self) -> bool: ... + @property + def _allow_print_lowres(self) -> bool: ... + @property + def _encryption_data(self) -> dict: ... + @property + def _pages(self) -> Any: ... + @property + def allow(self) -> Permissions: ... + @property + def docinfo(self) -> Object: ... + @docinfo.setter + def docinfo(self, val: Object) -> None: ... + @property + def encryption(self) -> EncryptionInfo: ... + @property + def extension_level(self) -> int: ... + @property + def filename(self) -> str: ... + @property + def is_encrypted(self) -> bool: ... + @property + def is_linearized(self) -> bool: ... + @property + def objects(self) -> Any: ... + @property + def pages(self) -> PageList: ... + @property + def pdf_version(self) -> str: ... + @property + def root(self) -> Object: ... + @property + def trailer(self) -> Object: ... + @property + def user_password_matched(self) -> bool: ... + @property + def owner_password_matched(self) -> bool: ... + def generate_appearance_streams(self) -> None: ... + def flatten_annotations(self, mode: str) -> None: ... + @property + def attachments(self) -> Attachments: ... + +class Rectangle: + llx: float = ... + lly: float = ... + urx: float = ... + ury: float = ... + @overload + def __init__(self, llx: float, lly: float, urx: float, ury: float) -> None: ... + @overload + def __init__(self, a: Array) -> None: ... + @property + def width(self) -> float: ... + @property + def height(self) -> float: ... + @property + def lower_left(self) -> tuple[float, float]: ... + @property + def lower_right(self) -> tuple[float, float]: ... + @property + def upper_left(self) -> tuple[float, float]: ... + @property + def upper_right(self) -> tuple[float, float]: ... + def as_array(self) -> Array: ... + +class NameTree(MutableMapping[str | bytes, Object]): + @staticmethod + def new(pdf: Pdf, auto_repair: bool = True) -> NameTree: ... + def __contains__(self, name: object) -> bool: ... + def __delitem__(self, name: str | bytes) -> None: ... + def __eq__(self, other: Any) -> bool: ... + def __getitem__(self, name: str | bytes) -> Object: ... + def __iter__(self) -> Iterator[bytes]: ... + def __len__(self) -> int: ... + def __setitem__(self, name: str | bytes, o: Object) -> None: ... + def __init__(self, obj: Object, *, auto_repair: bool = ...) -> None: ... + def _as_map(self) -> _ObjectMapping: ... + @property + def obj(self) -> Object: ... + +class NumberTree(MutableMapping[int, Object]): + @staticmethod + def new(pdf: Pdf, auto_repair: bool = True) -> NumberTree: ... + def __contains__(self, key: object) -> bool: ... + def __delitem__(self, key: int) -> None: ... + def __eq__(self, other: Any) -> bool: ... + def __getitem__(self, key: int) -> Object: ... + def __iter__(self) -> Iterator[int]: ... + def __len__(self) -> int: ... + def __setitem__(self, key: int, o: Object) -> None: ... + def __init__(self, obj: Object, pdf: Pdf, *, auto_repair: bool = ...) -> None: ... + def _as_map(self) -> _ObjectMapping: ... + @property + def obj(self) -> Object: ... + +class ContentStreamInstruction: + @property + def operands(self) -> _ObjectList: ... + @property + def operator(self) -> Operator: ... + def __getitem__(self, index: int) -> _ObjectList | Operator: ... + def __len__(self) -> int: ... + +class ContentStreamInlineImage: + @property + def operands(self) -> _ObjectList: ... + @property + def operator(self) -> Operator: ... + def __getitem__(self, index: int) -> _ObjectList | Operator: ... + def __len__(self) -> int: ... + @property + def iimage(self) -> PdfInlineImage: ... + +class Job: + EXIT_ERROR: ClassVar[int] = 2 + EXIT_WARNING: ClassVar[int] = 3 + EXIT_IS_NOT_ENCRYPTED: ClassVar[int] = 2 + EXIT_CORRECT_PASSWORD: ClassVar[int] = 3 + LATEST_JOB_JSON: ClassVar[int] + LATEST_JSON: ClassVar[int] + + @staticmethod + def json_out_schema(*, schema: int) -> str: ... + @staticmethod + def job_json_schema(*, schema: int) -> str: ... + @overload + def __init__(self, json: str) -> None: ... + @overload + def __init__(self, json_dict: Mapping) -> None: ... + @overload + def __init__( + self, args: Sequence[str | bytes], *, progname: str = "pikepdf" + ) -> None: ... + def check_configuration(self) -> None: ... + @property + def creates_output(self) -> bool: ... + @property + def message_prefix(self) -> str: ... + def run(self) -> None: ... + @property + def has_warnings(self) -> bool: ... + @property + def exit_code(self) -> int: ... + @property + def encryption_status(self) -> dict[str, bool]: ... + +def _Null() -> Any: ... +def _encode(handle: Any) -> Object: ... +def _new_array(arg0: Iterable) -> Array: ... +def _new_boolean(arg0: bool) -> Object: ... +def _new_dictionary(arg0: Mapping[Any, Any]) -> Dictionary: ... +def _new_integer(arg0: int) -> Object: ... +def _new_name(arg0: str) -> Name: ... +def _new_operator(op: str) -> Operator: ... +@overload +def _new_real(arg0: str) -> Object: ... +@overload +def _new_real(value: float, places: int = ...) -> Object: ... +def _new_stream(arg0: Pdf, arg1: bytes) -> Stream: ... +def _new_string(s: str | bytes) -> String: ... +def _new_string_utf8(s: str) -> String: ... +def _test_file_not_found(*args, **kwargs) -> Any: ... +def _translate_qpdf_logic_error(arg0: str) -> str: ... +def get_decimal_precision() -> int: ... +def pdf_doc_to_utf8(pdfdoc: bytes) -> str: ... +def qpdf_version() -> str: ... +def set_access_default_mmap(mmap: bool) -> bool: ... +def set_decimal_precision(prec: int) -> int: ... +def unparse(obj: Any) -> bytes: ... +def utf8_to_pdf_doc(utf8: str, unknown: bytes) -> tuple[bool, bytes]: ... +def _unparse_content_stream(contentstream: Iterable[Any]) -> bytes: ... +def set_flate_compression_level( + level: Literal[-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] +) -> None: ... diff --git a/env/lib/python3.10/site-packages/pikepdf/_version.py b/env/lib/python3.10/site-packages/pikepdf/_version.py new file mode 100644 index 0000000..9a084d9 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/_version.py @@ -0,0 +1,13 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +from __future__ import annotations + +try: + from importlib_metadata import version as _package_version # type: ignore +except ImportError: + from importlib.metadata import version as _package_version + +__version__ = _package_version('pikepdf') + +__all__ = ['__version__'] diff --git a/env/lib/python3.10/site-packages/pikepdf/_xml.py b/env/lib/python3.10/site-packages/pikepdf/_xml.py new file mode 100644 index 0000000..edf811c --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/_xml.py @@ -0,0 +1,28 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +from __future__ import annotations + +from typing import IO, Any, AnyStr + +from lxml.etree import XMLParser as _UnsafeXMLParser +from lxml.etree import _ElementTree +from lxml.etree import parse as _parse + + +class _XMLParser(_UnsafeXMLParser): + def __init__(self, *args: Any, **kwargs: Any): + # Prevent XXE attacks + # https://rules.sonarsource.com/python/type/Vulnerability/RSPEC-2755 + kwargs['resolve_entities'] = False + kwargs['no_network'] = True + super().__init__(*args, **kwargs) + + +def parse_xml(source: AnyStr | IO[Any], recover: bool = False) -> _ElementTree: + """Wrap lxml's parse to provide protection against XXE attacks.""" + parser = _XMLParser(recover=recover, remove_pis=False) + return _parse(source, parser=parser) + + +__all__ = ['parse_xml'] diff --git a/env/lib/python3.10/site-packages/pikepdf/codec.py b/env/lib/python3.10/site-packages/pikepdf/codec.py new file mode 100644 index 0000000..4290b91 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/codec.py @@ -0,0 +1,170 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""Implement pdfdoc codec.""" + +from __future__ import annotations + +import codecs +from typing import Container + +from ._qpdf import pdf_doc_to_utf8, utf8_to_pdf_doc + +# pylint: disable=redefined-builtin + +# See PDF Reference Manual 1.7, Table D.2. +# The following generates set of all Unicode code points that can be encoded in +# pdfdoc. Since pdfdoc is 8-bit, the vast majority of code points cannot be. + +# Due to a bug, QPDF <= 10.5 and pikepdf < 5 had some inconsistencies around +# PdfDocEncoding. +PDFDOC_ENCODABLE = frozenset( + list(range(0x00, 0x17 + 1)) + + list(range(0x20, 0x7E + 1)) + + [ + 0x2022, + 0x2020, + 0x2021, + 0x2026, + 0x2014, + 0x2013, + 0x0192, + 0x2044, + 0x2039, + 0x203A, + 0x2212, + 0x2030, + 0x201E, + 0x201C, + 0x201D, + 0x2018, + 0x2019, + 0x201A, + 0x2122, + 0xFB01, + 0xFB02, + 0x0141, + 0x0152, + 0x0160, + 0x0178, + 0x017D, + 0x0131, + 0x0142, + 0x0153, + 0x0161, + 0x017E, + 0x20AC, + ] + + [0x02D8, 0x02C7, 0x02C6, 0x02D9, 0x02DD, 0x02DB, 0x02DA, 0x02DC] + + list(range(0xA1, 0xAC + 1)) + + list(range(0xAE, 0xFF + 1)) +) + + +def _find_first_index(s: str, ordinals: Container[int]) -> int: + for n, char in enumerate(s): + if ord(char) not in ordinals: + return n + raise ValueError("couldn't find the unencodable character") # pragma: no cover + + +def pdfdoc_encode(input: str, errors: str = 'strict') -> tuple[bytes, int]: + error_marker = b'?' if errors == 'replace' else b'\xad' + success, pdfdoc = utf8_to_pdf_doc(input, error_marker) + if success: + return pdfdoc, len(input) + + if errors == 'ignore': + pdfdoc = pdfdoc.replace(b'\xad', b'') + return pdfdoc, len(input) + if errors == 'replace': + return pdfdoc, len(input) + if errors == 'strict': + if input.startswith('\xfe\xff') or input.startswith('\xff\xfe'): + raise UnicodeEncodeError( + 'pdfdoc', + input, + 0, + 2, + "strings beginning with byte order marks cannot be encoded in pdfdoc", + ) + + # libqpdf doesn't return what character caused the error, and Python + # needs this, so make an educated guess and raise an exception based + # on that. + offending_index = _find_first_index(input, PDFDOC_ENCODABLE) + raise UnicodeEncodeError( + 'pdfdoc', + input, + offending_index, + offending_index + 1, + "character cannot be represented in pdfdoc encoding", + ) + raise LookupError(errors) + + +def pdfdoc_decode(input: bytes, errors: str = 'strict') -> tuple[str, int]: + if isinstance(input, memoryview): + input = input.tobytes() + s = pdf_doc_to_utf8(input) + if errors == 'strict': + idx = s.find('\ufffd') + if idx >= 0: + raise UnicodeDecodeError( + 'pdfdoc', + input, + idx, + idx + 1, + "no Unicode mapping is defined for this character", + ) + + return s, len(input) + + +class PdfDocCodec(codecs.Codec): + """Implements PdfDocEncoding character map used inside PDFs.""" + + def encode(self, input: str, errors: str = 'strict') -> tuple[bytes, int]: + return pdfdoc_encode(input, errors) + + def decode(self, input: bytes, errors: str = 'strict') -> tuple[str, int]: + return pdfdoc_decode(input, errors) + + +class PdfDocStreamWriter(PdfDocCodec, codecs.StreamWriter): + pass + + +class PdfDocStreamReader(PdfDocCodec, codecs.StreamReader): + def decode(self, input: bytes, errors: str = 'strict') -> tuple[str, int]: + return PdfDocCodec.decode(self, input, errors) + + +class PdfDocIncrementalEncoder(codecs.IncrementalEncoder): + def encode(self, input: str, final: bool = False) -> bytes: + return pdfdoc_encode(input, 'strict')[0] + + +class PdfDocIncrementalDecoder(codecs.IncrementalDecoder): + def decode(self, input: bytes, final: bool = False) -> str: + return pdfdoc_decode(input, 'strict')[0] + + +def find_pdfdoc(encoding: str) -> codecs.CodecInfo | None: + if encoding in ('pdfdoc', 'pdfdoc_pikepdf'): + codec = PdfDocCodec() + return codecs.CodecInfo( + name=encoding, + encode=codec.encode, + decode=codec.decode, + streamwriter=PdfDocStreamWriter, + streamreader=PdfDocStreamReader, + incrementalencoder=PdfDocIncrementalEncoder, + incrementaldecoder=PdfDocIncrementalDecoder, + ) + return None # pragma: no cover + + +codecs.register(find_pdfdoc) + +__all__ = ['utf8_to_pdf_doc', 'pdf_doc_to_utf8'] diff --git a/env/lib/python3.10/site-packages/pikepdf/jbig2.py b/env/lib/python3.10/site-packages/pikepdf/jbig2.py new file mode 100644 index 0000000..28c596b --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/jbig2.py @@ -0,0 +1,108 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""Integrate JBIG2 image decoding. + +Requires third-party JBIG2 decoder in the form of an external program, like +jbig2dec. +""" + +from __future__ import annotations + +import os +from abc import ABC, abstractmethod +from pathlib import Path +from subprocess import DEVNULL, PIPE, CalledProcessError, run +from tempfile import TemporaryDirectory + +from packaging.version import Version +from PIL import Image + +from pikepdf._exceptions import DependencyError + + +def _extract_jbig2_bytes(jbig2: bytes, jbig2_globals: bytes) -> bytes: + with TemporaryDirectory(prefix='pikepdf-', suffix='.jbig2') as tmpdir: + image_path = Path(tmpdir) / "image" + global_path = Path(tmpdir) / "global" + output_path = Path(tmpdir) / "outfile" + + args = [ + "jbig2dec", + "--embedded", + "--format", + "png", + "--output", + os.fspath(output_path), + ] + + # Get the raw stream, because we can't decode im_obj - that is why we are here + # (Strictly speaking we should remove any non-JBIG2 filters if double encoded) + image_path.write_bytes(jbig2) + + if len(jbig2_globals) > 0: + global_path.write_bytes(jbig2_globals) + args.append(os.fspath(global_path)) + + args.append(os.fspath(image_path)) + + run(args, stdout=DEVNULL, check=True) + with Image.open(output_path) as im: + return im.tobytes() + + +class JBIG2DecoderInterface(ABC): + """pikepdf's C++ expects this Python interface to be available for JBIG2.""" + + @abstractmethod + def check_available(self) -> None: + """Check if decoder is available. Throws DependencyError if not.""" + + @abstractmethod + def decode_jbig2(self, jbig2: bytes, jbig2_globals: bytes) -> bytes: + """Decode JBIG2 from jbig2 and globals, returning decoded bytes.""" + + def available(self) -> bool: + """Return True if decoder is available.""" + try: + self.check_available() + except DependencyError: + return False + else: + return True + + +class JBIG2Decoder(JBIG2DecoderInterface): + """JBIG2 decoder implementation.""" + + def check_available(self) -> None: + """Check if jbig2dec is installed and usable.""" + version = self._version() + if version < Version('0.15'): + raise DependencyError("jbig2dec is too old (older than version 0.15)") + + def decode_jbig2(self, jbig2: bytes, jbig2_globals: bytes) -> bytes: + """Decode JBIG2 from binary data, returning decode bytes.""" + return _extract_jbig2_bytes(jbig2, jbig2_globals) + + def _version(self) -> Version: + try: + proc = run( + ['jbig2dec', '--version'], stdout=PIPE, check=True, encoding='ascii' + ) + except (CalledProcessError, FileNotFoundError) as e: + raise DependencyError("jbig2dec - not installed or not found") from e + else: + result = proc.stdout + version_str = result.replace( + 'jbig2dec', '' + ).strip() # returns "jbig2dec 0.xx" + return Version(version_str) + + +_jbig2_decoder = JBIG2Decoder() + + +def get_decoder() -> JBIG2DecoderInterface: + """Return an instance of a JBIG2 decoder.""" + return _jbig2_decoder diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__init__.py b/env/lib/python3.10/site-packages/pikepdf/models/__init__.py new file mode 100644 index 0000000..e2e73ba --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/__init__.py @@ -0,0 +1,25 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""Python implementation of higher level PDF constructs.""" + +from __future__ import annotations + +from ._content_stream import ( + ContentStreamInstructions, + PdfParsingError, + UnparseableContentStreamInstructions, + parse_content_stream, + unparse_content_stream, +) +from .encryption import Encryption, EncryptionInfo, Permissions +from .image import PdfImage, PdfInlineImage, UnsupportedImageTypeError +from .matrix import PdfMatrix +from .metadata import PdfMetadata +from .outlines import ( + Outline, + OutlineItem, + OutlineStructureError, + PageLocation, + make_page_destination, +) diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/__init__.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/__init__.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..b07eefb --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/__init__.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_content_stream.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_content_stream.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..6e1c1c9 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_content_stream.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_transcoding.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_transcoding.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..f9ad743 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/_transcoding.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/encryption.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/encryption.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..32e8098 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/encryption.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/image.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/image.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..0de94e9 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/image.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/matrix.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/matrix.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..ee96c86 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/matrix.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/metadata.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/metadata.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..4b97e11 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/metadata.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/outlines.cpython-310.pyc b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/outlines.cpython-310.pyc Binary files differnew file mode 100644 index 0000000..18dbd1d --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/__pycache__/outlines.cpython-310.pyc diff --git a/env/lib/python3.10/site-packages/pikepdf/models/_content_stream.py b/env/lib/python3.10/site-packages/pikepdf/models/_content_stream.py new file mode 100644 index 0000000..8976c4c --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/_content_stream.py @@ -0,0 +1,136 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""Content stream parsing.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Collection, List, Tuple, Union, cast + +from pikepdf import Object, ObjectType, Operator, Page, PdfError, _qpdf + +if TYPE_CHECKING: + from pikepdf.models.image import PdfInlineImage + +# Operands, Operator +_OldContentStreamOperands = Collection[Union[Object, 'PdfInlineImage']] +_OldContentStreamInstructions = Tuple[_OldContentStreamOperands, Operator] + +ContentStreamInstructions = Union[ + _qpdf.ContentStreamInstruction, _qpdf.ContentStreamInlineImage +] + +UnparseableContentStreamInstructions = Union[ + ContentStreamInstructions, _OldContentStreamInstructions +] + + +class PdfParsingError(Exception): + """Error when parsing a PDF content stream.""" + + def __init__(self, message=None, line=None): + if not message: + message = f"Error encoding content stream at line {line}" + super().__init__(message) + self.line = line + + +def parse_content_stream( + page_or_stream: Object | Page, operators: str = '' +) -> list[ContentStreamInstructions]: + """Parse a PDF content stream into a sequence of instructions. + + A PDF content stream is list of instructions that describe where to render + the text and graphics in a PDF. This is the starting point for analyzing + PDFs. + + If the input is a page and page.Contents is an array, then the content + stream is automatically treated as one coalesced stream. + + Each instruction contains at least one operator and zero or more operands. + + This function does not have anything to do with opening a PDF file itself or + processing data from a whole PDF. It is for processing a specific object inside + a PDF that is already opened. + + Args: + page_or_stream: A page object, or the content + stream attached to another object such as a Form XObject. + operators: A space-separated string of operators to whitelist. + For example 'q Q cm Do' will return only operators + that pertain to drawing images. Use 'BI ID EI' for inline images. + All other operators and associated tokens are ignored. If blank, + all tokens are accepted. + + Example: + >>> with pikepdf.Pdf.open(input_pdf) as pdf: + >>> page = pdf.pages[0] + >>> for operands, command in parse_content_stream(page): + >>> print(command) + + .. versionchanged:: 3.0 + Returns a list of ``ContentStreamInstructions`` instead of a list + of (operand, operator) tuples. The returned items are duck-type compatible + with the previous returned items. + """ + if not isinstance(page_or_stream, (Object, Page)): + raise TypeError("stream must be a pikepdf.Object or pikepdf.Page") + + if ( + isinstance(page_or_stream, Object) + and page_or_stream._type_code != ObjectType.stream + and page_or_stream.get('/Type') != '/Page' + ): + raise TypeError("parse_content_stream called on page or stream object") + + if isinstance(page_or_stream, Page): + page_or_stream = page_or_stream.obj + + try: + if page_or_stream.get('/Type') == '/Page': + page = page_or_stream + instructions = cast( + List[ContentStreamInstructions], + page._parse_page_contents_grouped(operators), + ) + else: + stream = page_or_stream + instructions = cast( + List[ContentStreamInstructions], + Object._parse_stream_grouped(stream, operators), + ) + except PdfError as e: + if 'supposed to be a stream or an array' in str(e): + raise TypeError("parse_content_stream called on non-stream Object") from e + raise e from e + + return instructions + + +def unparse_content_stream( + instructions: Collection[UnparseableContentStreamInstructions], +) -> bytes: + """Convert collection of instructions to bytes suitable for storing in PDF. + + Given a parsed list of instructions/operand-operators, convert to bytes suitable + for embedding in a PDF. In PDF the operator always follows the operands. + + Args: + instructions: collection of instructions such as is returned + by :func:`parse_content_stream()` + + Returns: + A binary content stream, suitable for attaching to a Pdf. + To attach to a Pdf, use :meth:`Pdf.make_stream()``. + + .. versionchanged:: 3.0 + Now accept collections that contain any mixture of + ``ContentStreamInstruction``, ``ContentStreamInlineImage``, and the older + operand-operator tuples from pikepdf 2.x. + """ + try: + return _qpdf._unparse_content_stream(instructions) + except (ValueError, TypeError, RuntimeError) as e: + raise PdfParsingError( + "While unparsing a content stream, an error occurred" + ) from e diff --git a/env/lib/python3.10/site-packages/pikepdf/models/_transcoding.py b/env/lib/python3.10/site-packages/pikepdf/models/_transcoding.py new file mode 100644 index 0000000..e54facf --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/_transcoding.py @@ -0,0 +1,243 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +from __future__ import annotations + +import struct +from typing import Any, Callable, NamedTuple, Union + +from PIL import Image +from PIL.TiffTags import TAGS_V2 as TIFF_TAGS + +BytesLike = Union[bytes, memoryview] +MutableBytesLike = Union[bytearray, memoryview] + + +def _next_multiple(n: int, k: int) -> int: + """Return the multiple of k that is greater than or equal n. + + >>> _next_multiple(101, 4) + 104 + >>> _next_multiple(100, 4) + 100 + """ + div, mod = divmod(n, k) + if mod > 0: + div += 1 + return div * k + + +def unpack_subbyte_pixels( + packed: BytesLike, size: tuple[int, int], bits: int, scale: int = 0 +) -> tuple[BytesLike, int]: + """Unpack subbyte *bits* pixels into full bytes and rescale. + + When scale is 0, the appropriate scale is calculated. + e.g. for 2-bit, the scale is adjusted so that + 0b00 = 0.00 = 0x00 + 0b01 = 0.33 = 0x55 + 0b10 = 0.66 = 0xaa + 0b11 = 1.00 = 0xff + When scale is 1, no scaling is applied, appropriate when + the bytes are palette indexes. + """ + width, height = size + bits_per_byte = 8 // bits + stride = _next_multiple(width, bits_per_byte) + buffer = bytearray(bits_per_byte * stride * height) + max_read = len(buffer) // bits_per_byte + if scale == 0: + scale = 255 / ((2**bits) - 1) + if bits == 4: + _4bit_inner_loop(packed[:max_read], buffer, scale) + elif bits == 2: + _2bit_inner_loop(packed[:max_read], buffer, scale) + # elif bits == 1: + # _1bit_inner_loop(packed[:max_read], buffer, scale) + else: + raise NotImplementedError(bits) + return memoryview(buffer), stride + + +# def _1bit_inner_loop(in_: BytesLike, out: MutableBytesLike, scale: int) -> None: +# """Unpack 1-bit values to their 8-bit equivalents. + +# Thus *out* must be 8x at long as *in*. +# """ +# for n, val in enumerate(in_): +# out[8 * n + 0] = int((val >> 7) & 0b1) * scale +# out[8 * n + 1] = int((val >> 6) & 0b1) * scale +# out[8 * n + 2] = int((val >> 5) & 0b1) * scale +# out[8 * n + 3] = int((val >> 4) & 0b1) * scale +# out[8 * n + 4] = int((val >> 3) & 0b1) * scale +# out[8 * n + 5] = int((val >> 2) & 0b1) * scale +# out[8 * n + 6] = int((val >> 1) & 0b1) * scale +# out[8 * n + 7] = int((val >> 0) & 0b1) * scale + + +def _2bit_inner_loop(in_: BytesLike, out: MutableBytesLike, scale: int) -> None: + """Unpack 2-bit values to their 8-bit equivalents. + + Thus *out* must be 4x at long as *in*. + """ + for n, val in enumerate(in_): + out[4 * n] = int((val >> 6) * scale) + out[4 * n + 1] = int(((val >> 4) & 0b11) * scale) + out[4 * n + 2] = int(((val >> 2) & 0b11) * scale) + out[4 * n + 3] = int((val & 0b11) * scale) + + +def _4bit_inner_loop(in_: BytesLike, out: MutableBytesLike, scale: int) -> None: + """Unpack 4-bit values to their 8-bit equivalents. + + Thus *out* must be 2x at long as *in*. + """ + for n, val in enumerate(in_): + out[2 * n] = int((val >> 4) * scale) + out[2 * n + 1] = int((val & 0b1111) * scale) + + +def image_from_byte_buffer(buffer: BytesLike, size: tuple[int, int], stride: int): + """Use Pillow to create one-component image from a byte buffer. + + *stride* is the number of bytes per row, and is essential for packed bits + with odd image widths. + """ + ystep = 1 # image is top to bottom in memory + return Image.frombuffer('L', size, buffer, "raw", 'L', stride, ystep) + + +def _make_rgb_palette(gray_palette: bytes) -> bytes: + palette = b'' + for entry in gray_palette: + palette += bytes([entry]) * 3 + return palette + + +def _depalettize_cmyk(buffer: BytesLike, palette: BytesLike): + with memoryview(buffer) as mv: + output = bytearray(4 * len(mv)) + for n, pal_idx in enumerate(mv): + output[4 * n : 4 * (n + 1)] = palette[4 * pal_idx : 4 * (pal_idx + 1)] + return output + + +def image_from_buffer_and_palette( + buffer: BytesLike, + size: tuple[int, int], + stride: int, + base_mode: str, + palette: BytesLike, +) -> Image.Image: + """Construct an image from a byte buffer and apply the palette. + + 1/2/4-bit images must be unpacked (no scaling!) to byte buffers first, such + that every 8-bit integer is an index into the palette. + """ + # Reminder Pillow palette byte order unintentionally changed in 8.3.0 + # https://github.com/python-pillow/Pillow/issues/5595 + # 8.2.0: all aligned by channel (very nonstandard) + # 8.3.0: all channels for one color followed by the next color (e.g. RGBRGBRGB) + + if base_mode == 'RGB': + im = image_from_byte_buffer(buffer, size, stride) + im.putpalette(palette, rawmode=base_mode) + elif base_mode == 'L': + # Pillow does not fully support palettes with rawmode='L'. + # Convert to RGB palette. + gray_palette = _make_rgb_palette(palette) + im = image_from_byte_buffer(buffer, size, stride) + im.putpalette(gray_palette, rawmode='RGB') + elif base_mode == 'CMYK': + # Pillow does not support CMYK with palettes; convert manually + output = _depalettize_cmyk(buffer, palette) + im = Image.frombuffer('CMYK', size, data=output, decoder_name='raw') + else: + raise NotImplementedError(f'palette with {base_mode}') + return im + + +def fix_1bit_palette_image( + im: Image.Image, base_mode: str, palette: BytesLike +) -> Image.Image: + """Apply palettes to 1-bit images.""" + im = im.convert('P') + if base_mode == 'RGB' and len(palette) == 6: + # rgbrgb -> rgb000000...rgb + palette = palette[0:3] + (b'\x00\x00\x00' * (256 - 2)) + palette[3:6] + im.putpalette(palette, rawmode='RGB') + elif base_mode == 'L': + try: + im.putpalette(palette, rawmode='L') + except ValueError as e: + if 'unrecognized raw mode' in str(e): + rgb_palette = _make_rgb_palette(palette) + im.putpalette(rgb_palette, rawmode='RGB') + return im + + +def generate_ccitt_header( + size: tuple[int, int], + data_length: int, + ccitt_group: int, + photometry: int, + icc: bytes, +) -> bytes: + """Generate binary CCITT header for image with given parameters.""" + tiff_header_struct = '<' + '2s' + 'H' + 'L' + 'H' + + tag_keys = {tag.name: key for key, tag in TIFF_TAGS.items()} # type: ignore + ifd_struct = '<HHLL' + + class IFD(NamedTuple): + key: int + typecode: Any + count_: int + data: int | Callable[[], int | None] + + ifds: list[IFD] = [] + + def header_length(ifd_count) -> int: + return ( + struct.calcsize(tiff_header_struct) + + struct.calcsize(ifd_struct) * ifd_count + + 4 + ) + + def add_ifd(tag_name: str, data: int | Callable[[], int | None], count: int = 1): + key = tag_keys[tag_name] + typecode = TIFF_TAGS[key].type # type: ignore + ifds.append(IFD(key, typecode, count, data)) + + image_offset = None + width, height = size + add_ifd('ImageWidth', width) + add_ifd('ImageLength', height) + add_ifd('BitsPerSample', 1) + add_ifd('Compression', ccitt_group) + add_ifd('PhotometricInterpretation', int(photometry)) + add_ifd('StripOffsets', lambda: image_offset) + add_ifd('RowsPerStrip', height) + add_ifd('StripByteCounts', data_length) + + icc_offset = 0 + if icc: + add_ifd('ICCProfile', lambda: icc_offset, count=len(icc)) + + icc_offset = header_length(len(ifds)) + image_offset = icc_offset + len(icc) + + ifd_args = [(arg() if callable(arg) else arg) for ifd in ifds for arg in ifd] + tiff_header = struct.pack( + (tiff_header_struct + ifd_struct[1:] * len(ifds) + 'L'), + b'II', # Byte order indication: Little endian + 42, # Version number (always 42) + 8, # Offset to first IFD + len(ifds), # Number of tags in IFD + *ifd_args, + 0, # Last IFD + ) + + if icc: + tiff_header += icc + return tiff_header diff --git a/env/lib/python3.10/site-packages/pikepdf/models/encryption.py b/env/lib/python3.10/site-packages/pikepdf/models/encryption.py new file mode 100644 index 0000000..d6b5036 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/encryption.py @@ -0,0 +1,176 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""For managing PDF encryption.""" + +from __future__ import annotations + +import sys +from typing import TYPE_CHECKING, Any, NamedTuple, cast + +if sys.version_info >= (3, 8): + from typing import Literal +else: + from typing_extensions import Literal # pragma: no cover + +if TYPE_CHECKING: + from pikepdf._qpdf import EncryptionMethod + + +class Permissions(NamedTuple): + """ + Stores the user-level permissions for an encrypted PDF. + + A compliant PDF reader/writer should enforce these restrictions on people + who have the user password and not the owner password. In practice, either + password is sufficient to decrypt all document contents. A person who has + the owner password should be allowed to modify the document in any way. + pikepdf does not enforce the restrictions in any way; it is up to application + developers to enforce them as they see fit. + + Unencrypted PDFs implicitly have all permissions allowed. Permissions can + only be changed when a PDF is saved. + """ + + accessibility: bool = True + """Can users use screen readers and accessibility tools to read the PDF?""" + + extract: bool = True + """Can users extract contents?""" + + modify_annotation: bool = True + """Can users modify annotations?""" + + modify_assembly: bool = False + """Can users arrange document contents?""" + + modify_form: bool = True + """Can users fill out forms?""" + + modify_other: bool = True + """Can users modify the document?""" + + print_lowres: bool = True + """Can users print the document at low resolution?""" + + print_highres: bool = True + """Can users print the document at high resolution?""" + + +DEFAULT_PERMISSIONS = Permissions() + + +class EncryptionInfo: + """ + Reports encryption information for an encrypted PDF. + + This information may not be changed, except when a PDF is saved. + This object is not used to specify the encryption settings to save + a PDF, due to non-overlapping information requirements. + """ + + def __init__(self, encdict: dict[str, Any]): + """ + Initialize EncryptionInfo. + + Generally pikepdf will initialize and return it. + + Args: + encdict: Python dictionary containing encryption settings. + """ + self._encdict = encdict + + @property + def R(self) -> int: + """Revision number of the security handler.""" + return int(self._encdict['R']) + + @property + def V(self) -> int: + """Version of PDF password algorithm.""" + return int(self._encdict['V']) + + @property + def P(self) -> int: + """Return encoded permission bits. + + See :meth:`Pdf.allow` instead. + """ + return int(self._encdict['P']) + + @property + def stream_method(self) -> EncryptionMethod: + """Encryption method used to encode streams.""" + return cast('EncryptionMethod', self._encdict['stream']) + + @property + def string_method(self) -> EncryptionMethod: + """Encryption method used to encode strings.""" + return cast('EncryptionMethod', self._encdict['string']) + + @property + def file_method(self) -> EncryptionMethod: + """Encryption method used to encode the whole file.""" + return cast('EncryptionMethod', self._encdict['file']) + + @property + def user_password(self) -> bytes: + """If possible, return the user password. + + The user password can only be retrieved when a PDF is opened + with the owner password and when older versions of the + encryption algorithm are used. + + The password is always returned as ``bytes`` even if it has + a clear Unicode representation. + """ + return bytes(self._encdict['user_passwd']) + + @property + def encryption_key(self) -> bytes: + """Return the RC4 or AES encryption key used for this file.""" + return bytes(self._encdict['encryption_key']) + + @property + def bits(self) -> int: + """Return the number of bits in the encryption algorithm. + + e.g. if the algorithm is AES-256, this returns 256. + """ + return len(self._encdict['encryption_key']) * 8 + + +class Encryption(NamedTuple): + """Specify the encryption settings to apply when a PDF is saved.""" + + owner: str = '' + """The owner password to use. This allows full control + of the file. If blank, the PDF will be encrypted and + present as "(SECURED)" in PDF viewers. If the owner password + is blank, the user password should be as well.""" + + user: str = '' + """The user password to use. With this password, some + restrictions will be imposed by a typical PDF reader. + If blank, the PDF can be opened by anyone, but only modified + as allowed by the permissions in ``allow``.""" + + R: Literal[2, 3, 4, 5, 6] = 6 + """Select the security handler algorithm to use. Choose from: + ``2``, ``3``, ``4`` or ``6``. By default, the highest version of + is selected (``6``). ``5`` is a deprecated algorithm that should + not be used.""" + + allow: Permissions = DEFAULT_PERMISSIONS + """The permissions to set. + If omitted, all permissions are granted to the user.""" + + aes: bool = True + """If True, request the AES algorithm. If False, use RC4. + If omitted, AES is selected whenever possible (R >= 4).""" + + metadata: bool = True + """If True, also encrypt the PDF metadata. If False, + metadata is not encrypted. Reading document metadata without + decryption may be desirable in some cases. Requires ``aes=True``. + If omitted, metadata is encrypted whenever possible.""" diff --git a/env/lib/python3.10/site-packages/pikepdf/models/image.py b/env/lib/python3.10/site-packages/pikepdf/models/image.py new file mode 100644 index 0000000..5981a8e --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/image.py @@ -0,0 +1,991 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""Extract images embedded in PDF.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from decimal import Decimal +from io import BytesIO +from itertools import zip_longest +from pathlib import Path +from shutil import copyfileobj +from typing import Any, BinaryIO, Callable, NamedTuple, Sequence, TypeVar, cast + +from PIL import Image +from PIL.ImageCms import ImageCmsProfile + +from pikepdf import ( + Array, + Dictionary, + Name, + Object, + Pdf, + PdfError, + Stream, + StreamDecodeLevel, + String, + jbig2, +) +from pikepdf._exceptions import DependencyError +from pikepdf._qpdf import Buffer +from pikepdf._version import __version__ +from pikepdf.models import _transcoding + +T = TypeVar('T') + + +class UnsupportedImageTypeError(Exception): + """This image is formatted in a way pikepdf does not supported.""" + + +class NotExtractableError(Exception): + """Indicates that an image cannot be directly extracted.""" + + +class HifiPrintImageNotTranscodableError(NotExtractableError): + """Image contains high fidelity printing information and cannot be extracted.""" + + +class InvalidPdfImageError(Exception): + """This image is not valid according to the PDF 1.7 specification.""" + + +def _array_str(value: Object | str | list): + """Simplify pikepdf objects to array of str. Keep Streams and dictionaries intact.""" + + def _convert(item): + if isinstance(item, (list, Array)): + return [_convert(subitem) for subitem in item] + if isinstance(item, (Stream, Dictionary, bytes, int)): + return item + if isinstance(item, (Name, str)): + return str(item) + if isinstance(item, (String)): + return bytes(item) + raise NotImplementedError(value) + + result = _convert(value) + if not isinstance(result, list): + result = [result] + return result + + +def _ensure_list(value: list[Object] | Dictionary | Array) -> list[Object]: + """Ensure value is a list of pikepdf.Object, if it was not already. + + To support DecodeParms which can be present as either an array of dicts or a single + dict. It's easier to convert to an array of one dict. + """ + if isinstance(value, list): + return value + return list(value.wrap_in_array().as_list()) + + +def _metadata_from_obj( + obj: Dictionary | Stream, name: str, type_: Callable[[Any], T], default: T +) -> T | None: + """Retrieve metadata from a dictionary or stream, and ensure it is the expected type.""" + val = getattr(obj, name, default) + try: + return type_(val) + except TypeError: + if val is None: + return None + raise NotImplementedError('Metadata access for ' + name) + + +class PaletteData(NamedTuple): + """Returns the color space and binary representation of the palette. + + ``base_colorspace`` is typically ``"RGB"`` or ``"L"`` (for grayscale). + + ``palette`` is typically 256 or 256*3=768 bytes, for grayscale and RGB color + respectively, with each unit/triplet being the grayscale/RGB triplet values. + """ + + base_colorspace: str + palette: bytes + + +class PdfImageBase(ABC): + """Abstract base class for images.""" + + SIMPLE_COLORSPACES = {'/DeviceRGB', '/DeviceGray', '/CalRGB', '/CalGray'} + MAIN_COLORSPACES = SIMPLE_COLORSPACES | {'/DeviceCMYK', '/CalCMYK', '/ICCBased'} + PRINT_COLORSPACES = {'/Separation', '/DeviceN'} + + @abstractmethod + def _metadata(self, name: str, type_: Callable[[Any], T], default: T) -> T: + """Get metadata for this image type.""" + + @property + def width(self) -> int: + """Width of the image data in pixels.""" + return self._metadata('Width', int, 0) + + @property + def height(self) -> int: + """Height of the image data in pixels.""" + return self._metadata('Height', int, 0) + + @property + def image_mask(self) -> bool: + """Return ``True`` if this is an image mask.""" + return self._metadata('ImageMask', bool, False) + + @property + def _bpc(self) -> int | None: + """Bits per component for this image (low-level).""" + return self._metadata('BitsPerComponent', int, 0) + + @property + def _colorspaces(self): + """Colorspace (low-level).""" + return self._metadata('ColorSpace', _array_str, []) + + @property + def filters(self): + """List of names of the filters that we applied to encode this image.""" + return self._metadata('Filter', _array_str, []) + + @property + def decode_parms(self): + """List of the /DecodeParms, arguments to filters.""" + return self._metadata('DecodeParms', _ensure_list, []) + + @property + def colorspace(self) -> str | None: + """PDF name of the colorspace that best describes this image.""" + if self.image_mask: + return None # Undefined for image masks + if self._colorspaces: + if self._colorspaces[0] in self.MAIN_COLORSPACES: + return self._colorspaces[0] + if self._colorspaces[0] == '/Indexed': + subspace = self._colorspaces[1] + if isinstance(subspace, str) and subspace in self.MAIN_COLORSPACES: + return subspace + if isinstance(subspace, list) and subspace[0] in ( + '/ICCBased', + '/DeviceN', + ): + return subspace[0] + if self._colorspaces[0] == '/DeviceN': + return '/DeviceN' + + raise NotImplementedError( + "not sure how to get colorspace: " + repr(self._colorspaces) + ) + + @property + def bits_per_component(self) -> int: + """Bits per component of this image.""" + if self._bpc is None or self._bpc == 0: + return 1 if self.image_mask else 8 + return self._bpc + + @property + @abstractmethod + def icc(self) -> ImageCmsProfile | None: + """Return ICC profile for this image if one is defined.""" + + @property + def indexed(self) -> bool: + """Check if the image has a defined color palette.""" + return '/Indexed' in self._colorspaces + + def _colorspace_has_name(self, name): + try: + cs = self._colorspaces + if cs[0] == '/Indexed' and cs[1][0] == name: + return True + if cs[0] == name: + return True + except (IndexError, AttributeError, KeyError): + pass + return False + + @property + def is_device_n(self) -> bool: + """Check if image has a /DeviceN (complex printing) colorspace.""" + return self._colorspace_has_name('/DeviceN') + + @property + def is_separation(self) -> bool: + """Check if image has a /DeviceN (complex printing) colorspace.""" + return self._colorspace_has_name('/Separation') + + @property + def size(self) -> tuple[int, int]: + """Size of image as (width, height).""" + return self.width, self.height + + def _approx_mode_from_icc(self): + if self.indexed: + icc_profile = self._colorspaces[1][1] + else: + icc_profile = self._colorspaces[1] + icc_profile_nchannels = int(icc_profile['/N']) + + if icc_profile_nchannels == 1: + return 'L' + + # Multiple channels, need to open the profile and look + mode_from_xcolor_space = {'RGB ': 'RGB', 'CMYK': 'CMYK'} + xcolor_space = self.icc.profile.xcolor_space + return mode_from_xcolor_space.get(xcolor_space, '') + + @property + def mode(self) -> str: + """``PIL.Image.mode`` equivalent for this image, where possible. + + If an ICC profile is attached to the image, we still attempt to resolve a Pillow + mode. + """ + m = '' + if self.is_device_n: + m = 'DeviceN' + elif self.is_separation: + m = 'Separation' + elif self.indexed: + m = 'P' + elif self.colorspace == '/DeviceGray' and self.bits_per_component == 1: + m = '1' + elif self.colorspace == '/DeviceGray' and self.bits_per_component > 1: + m = 'L' + elif self.colorspace == '/DeviceRGB': + m = 'RGB' + elif self.colorspace == '/DeviceCMYK': + m = 'CMYK' + elif self.colorspace == '/ICCBased': + try: + m = self._approx_mode_from_icc() + except (ValueError, TypeError) as e: + raise NotImplementedError( + "Not sure how to handle PDF image of this type" + ) from e + if m == '': + raise NotImplementedError( + "Not sure how to handle PDF image of this type" + ) from None + return m + + @property + def filter_decodeparms(self): + """Return normalized the Filter and DecodeParms data. + + PDF has a lot of possible data structures concerning /Filter and + /DecodeParms. /Filter can be absent or a name or an array, /DecodeParms + can be absent or a dictionary (if /Filter is a name) or an array (if + /Filter is an array). When both are arrays the lengths match. + + Normalize this into: + [(/FilterName, {/DecodeParmName: Value, ...}), ...] + + The order of /Filter matters as indicates the encoding/decoding sequence. + """ + return list(zip_longest(self.filters, self.decode_parms, fillvalue={})) + + @property + def palette(self) -> PaletteData | None: + """Retrieve the color palette for this image if applicable.""" + if not self.indexed: + return None + try: + _idx, base, _hival, lookup = self._colorspaces + except ValueError as e: + raise ValueError('Not sure how to interpret this palette') from e + if self.icc or self.is_device_n or self.is_separation: + base = str(base[0]) + else: + base = str(base) + lookup = bytes(lookup) + if base not in self.MAIN_COLORSPACES and base not in self.PRINT_COLORSPACES: + raise NotImplementedError(f"not sure how to interpret this palette: {base}") + if base == '/DeviceRGB': + base = 'RGB' + elif base == '/DeviceGray': + base = 'L' + elif base == '/DeviceCMYK': + base = 'CMYK' + elif base == '/DeviceN': + base = 'DeviceN' + elif base == '/Separation': + base = 'Separation' + elif base == '/ICCBased': + base = self._approx_mode_from_icc() + return PaletteData(base, lookup) + + @abstractmethod + def as_pil_image(self) -> Image.Image: + """Convert this PDF image to a Python PIL (Pillow) image.""" + + @staticmethod + def _remove_simple_filters(obj: Stream, filters: Sequence[str]): + """Remove simple lossless compression where it appears. + + Args: + obj: the compressed object + filters: all files on the data + """ + COMPLEX_FILTERS = { + '/DCTDecode', + '/JPXDecode', + '/JBIG2Decode', + '/CCITTFaxDecode', + } + + idx = [n for n, item in enumerate(filters) if item in COMPLEX_FILTERS] + if idx: + if len(idx) > 1: + raise NotImplementedError( + f"Object {obj.objgen} has compound complex filters: {filters}. " + "We cannot decompress this." + ) + simple_filters = filters[: idx[0]] + complex_filters = filters[idx[0] :] + else: + simple_filters = filters + complex_filters = [] + + if not simple_filters: + return obj.read_raw_bytes(), complex_filters + + original_filters = obj.Filter + try: + obj.Filter = Array([Name(s) for s in simple_filters]) + data = obj.read_bytes(StreamDecodeLevel.specialized) + finally: + obj.Filter = original_filters + + return data, complex_filters + + +class PdfImage(PdfImageBase): + """Support class to provide a consistent API for manipulating PDF images. + + The data structure for images inside PDFs is irregular and complex, + making it difficult to use without introducing errors for less + typical cases. This class addresses these difficulties by providing a + regular, Pythonic API similar in spirit (and convertible to) the Python + Pillow imaging library. + """ + + obj: Stream + _icc: ImageCmsProfile | None + + def __new__(cls, obj): + """Construct a PdfImage... or a PdfJpxImage if that is what we really are.""" + instance = super().__new__(cls) + instance.__init__(obj) + if '/JPXDecode' in instance.filters: + instance = super().__new__(PdfJpxImage) + instance.__init__(obj) + return instance + + def __init__(self, obj: Stream): + """Construct a PDF image from a Image XObject inside a PDF. + + ``pim = PdfImage(page.Resources.XObject['/ImageNN'])`` + + Args: + obj: an Image XObject + """ + if isinstance(obj, Stream) and obj.stream_dict.get("/Subtype") != "/Image": + raise TypeError("can't construct PdfImage from non-image") + self.obj = obj + self._icc = None + + def __eq__(self, other): + if not isinstance(other, PdfImageBase): + return NotImplemented + return self.obj == other.obj + + @classmethod + def _from_pil_image(cls, *, pdf, page, name, image): # pragma: no cover + """Insert a PIL image into a PDF (rudimentary). + + Args: + pdf (pikepdf.Pdf): the PDF to attach the image to + page (pikepdf.Object): the page to attach the image to + name (str or pikepdf.Name): the name to set the image + image (PIL.Image.Image): the image to insert + """ + data = image.tobytes() + + imstream = Stream(pdf, data) + imstream.Type = Name('/XObject') + imstream.Subtype = Name('/Image') + if image.mode == 'RGB': + imstream.ColorSpace = Name('/DeviceRGB') + elif image.mode in ('1', 'L'): + imstream.ColorSpace = Name('/DeviceGray') + imstream.BitsPerComponent = 1 if image.mode == '1' else 8 + imstream.Width = image.width + imstream.Height = image.height + + page.Resources.XObject[name] = imstream + + return cls(imstream) + + def _metadata(self, name, type_, default): + return _metadata_from_obj(self.obj, name, type_, default) + + @property + def _iccstream(self): + if self.colorspace == '/ICCBased': + if not self.indexed: + return self._colorspaces[1] + assert isinstance(self._colorspaces[1], list) + return self._colorspaces[1][1] + raise NotImplementedError("Don't know how to find ICC stream for image") + + @property + def icc(self) -> ImageCmsProfile | None: + """If an ICC profile is attached, return a Pillow object that describe it. + + Most of the information may be found in ``icc.profile``. + """ + if self.colorspace not in ('/ICCBased', '/Indexed'): + return None + if not self._icc: + iccstream = self._iccstream + iccbuffer = iccstream.get_stream_buffer() + iccbytesio = BytesIO(iccbuffer) + try: + self._icc = ImageCmsProfile(iccbytesio) + except OSError as e: + if str(e) == 'cannot open profile from string': + # ICC profile is corrupt + raise UnsupportedImageTypeError( + "ICC profile corrupt or not readable" + ) from e + return self._icc + + def _extract_direct(self, *, stream: BinaryIO) -> str: + """Attempt to extract the image directly to a usable image file. + + If there is no way to extract the image without decompressing or + transcoding then raise an exception. The type and format of image + generated will vary. + + Args: + stream: Writable file stream to write data to, e.g. an open file + """ + + def normal_dct_rgb() -> bool: + # Normal DCTDecode RGB images have the default value of + # /ColorTransform 1 and are actually in YUV. Such a file can be + # saved as a standard JPEG. RGB JPEGs without YUV conversion can't + # be saved as JPEGs, and are probably bugs. Some software in the + # wild actually produces RGB JPEGs in PDFs (probably a bug). + DEFAULT_CT_RGB = 1 + ct = self.filter_decodeparms[0][1].get('/ColorTransform', DEFAULT_CT_RGB) + return self.mode == 'RGB' and ct == DEFAULT_CT_RGB + + def normal_dct_cmyk() -> bool: + # Normal DCTDecode CMYKs have /ColorTransform 0 and can be saved. + # There is a YUVK colorspace but CMYK JPEGs don't generally use it + DEFAULT_CT_CMYK = 0 + ct = self.filter_decodeparms[0][1].get('/ColorTransform', DEFAULT_CT_CMYK) + return self.mode == 'CMYK' and ct == DEFAULT_CT_CMYK + + data, filters = self._remove_simple_filters(self.obj, self.filters) + + if filters == ['/CCITTFaxDecode']: + if self.colorspace == '/ICCBased': + icc = self._iccstream.read_bytes() + else: + icc = None + stream.write(self._generate_ccitt_header(data, icc=icc)) + stream.write(data) + return '.tif' + if filters == ['/DCTDecode'] and ( + self.mode == 'L' or normal_dct_rgb() or normal_dct_cmyk() + ): + stream.write(data) + return '.jpg' + + raise NotExtractableError() + + def _extract_transcoded_1248bits(self) -> Image.Image: + """Extract an image when there are 1/2/4/8 bits packed in byte data.""" + stride = 0 # tell Pillow to calculate stride from line width + scale = 0 if self.mode == 'L' else 1 + if self.bits_per_component in (2, 4): + buffer, stride = _transcoding.unpack_subbyte_pixels( + self.read_bytes(), self.size, self.bits_per_component, scale + ) + elif self.bits_per_component == 8: + buffer = cast(memoryview, self.get_stream_buffer()) + else: + raise InvalidPdfImageError("BitsPerComponent must be 1, 2, 4, 8, or 16") + + if self.mode == 'P' and self.palette is not None: + base_mode, palette = self.palette + im = _transcoding.image_from_buffer_and_palette( + buffer, + self.size, + stride, + base_mode, + palette, + ) + else: + im = _transcoding.image_from_byte_buffer(buffer, self.size, stride) + return im + + def _extract_transcoded_1bit(self) -> Image.Image: + if self.mode in ('RGB', 'CMYK'): + raise UnsupportedImageTypeError("1-bit RGB and CMYK are not supported") + try: + data = self.read_bytes() + except (RuntimeError, PdfError) as e: + if ( + 'read_bytes called on unfilterable stream' in str(e) + and not jbig2.get_decoder().available() + ): + raise DependencyError( + "jbig2dec - not installed or installed version is too old " + "(older than version 0.15)" + ) from None + raise + + im = Image.frombytes('1', self.size, data) + + if self.palette is not None: + base_mode, palette = self.palette + im = _transcoding.fix_1bit_palette_image(im, base_mode, palette) + + return im + + def _extract_transcoded(self) -> Image.Image: + if self.mode in {'DeviceN', 'Separation'}: + raise HifiPrintImageNotTranscodableError() + + if self.mode == 'RGB' and self.bits_per_component == 8: + # Cannot use the zero-copy .get_stream_buffer here, we have 3-byte + # RGB and Pillow needs RGBX. + im = Image.frombuffer( + 'RGB', self.size, self.read_bytes(), 'raw', 'RGB', 0, 1 + ) + elif self.mode == 'CMYK' and self.bits_per_component == 8: + im = Image.frombuffer( + 'CMYK', self.size, self.get_stream_buffer(), 'raw', 'CMYK', 0, 1 + ) + # elif self.mode == '1': + elif self.bits_per_component == 1: + im = self._extract_transcoded_1bit() + elif self.mode in ('L', 'P') and self.bits_per_component <= 8: + im = self._extract_transcoded_1248bits() + else: + raise UnsupportedImageTypeError(repr(self) + ", " + repr(self.obj)) + + if self.colorspace == '/ICCBased' and self.icc is not None: + im.info['icc_profile'] = self.icc.tobytes() + + return im + + def _extract_to_stream(self, *, stream: BinaryIO) -> str: + """Extract the image to a stream. + + If possible, the compressed data is extracted and inserted into + a compressed image file format without transcoding the compressed + content. If this is not possible, the data will be decompressed + and extracted to an appropriate format. + + Args: + stream: Writable stream to write data to + + Returns: + The file format extension. + """ + try: + return self._extract_direct(stream=stream) + except NotExtractableError: + pass + + im = None + try: + im = self._extract_transcoded() + if im.mode == 'CMYK': + im.save(stream, format='tiff', compression='tiff_adobe_deflate') + return '.tiff' + if im: + im.save(stream, format='png') + return '.png' + except PdfError as e: + if 'called on unfilterable stream' in str(e): + raise UnsupportedImageTypeError(repr(self)) from e + raise + finally: + if im: + im.close() + + raise UnsupportedImageTypeError(repr(self)) + + def extract_to( + self, *, stream: BinaryIO | None = None, fileprefix: str = '' + ) -> str: + """Extract the image directly to a usable image file. + + If possible, the compressed data is extracted and inserted into + a compressed image file format without transcoding the compressed + content. If this is not possible, the data will be decompressed + and extracted to an appropriate format. + + Because it is not known until attempted what image format will be + extracted, users should not assume what format they are getting back. + When saving the image to a file, use a temporary filename, and then + rename the file to its final name based on the returned file extension. + + Images might be saved as any of .png, .jpg, or .tiff. + + Examples: + >>> im.extract_to(stream=bytes_io) + '.png' + + >>> im.extract_to(fileprefix='/tmp/image00') + '/tmp/image00.jpg' + + Args: + stream: Writable stream to write data to. + fileprefix (str or Path): The path to write the extracted image to, + without the file extension. + + Returns: + If *fileprefix* was provided, then the fileprefix with the + appropriate extension. If no *fileprefix*, then an extension + indicating the file type. + """ + if bool(stream) == bool(fileprefix): + raise ValueError("Cannot set both stream and fileprefix") + if stream: + return self._extract_to_stream(stream=stream) + + bio = BytesIO() + extension = self._extract_to_stream(stream=bio) + bio.seek(0) + filepath = Path(str(Path(fileprefix)) + extension) + with filepath.open('wb') as target: + copyfileobj(bio, target) + return str(filepath) + + def read_bytes( + self, decode_level: StreamDecodeLevel = StreamDecodeLevel.specialized + ) -> bytes: + """Decompress this image and return it as unencoded bytes.""" + return self.obj.read_bytes(decode_level=decode_level) + + def get_stream_buffer( + self, decode_level: StreamDecodeLevel = StreamDecodeLevel.specialized + ) -> Buffer: + """Access this image with the buffer protocol.""" + return self.obj.get_stream_buffer(decode_level=decode_level) + + def as_pil_image(self) -> Image.Image: + """Extract the image as a Pillow Image, using decompression as necessary. + + Caller must close the image. + """ + try: + bio = BytesIO() + self._extract_direct(stream=bio) + bio.seek(0) + return Image.open(bio) + except NotExtractableError: + pass + + im = self._extract_transcoded() + if not im: + raise UnsupportedImageTypeError(repr(self)) + + return im + + def _generate_ccitt_header(self, data: bytes, icc: bytes | None = None) -> bytes: + """Construct a CCITT G3 or G4 header from the PDF metadata.""" + # https://stackoverflow.com/questions/2641770/ + # https://www.itu.int/itudoc/itu-t/com16/tiff-fx/docs/tiff6.pdf + + if not self.decode_parms: + raise ValueError("/CCITTFaxDecode without /DecodeParms") + if self.decode_parms[0].get("/EncodedByteAlign", False): + raise UnsupportedImageTypeError( + "/CCITTFaxDecode with /EncodedByteAlign true" + ) + + k = self.decode_parms[0].get("/K", 0) + if k < 0: + ccitt_group = 4 # Pure two-dimensional encoding (Group 4) + elif k > 0: + ccitt_group = 3 # Group 3 2-D + else: + ccitt_group = 2 # Group 3 1-D + _black_is_one = self.decode_parms[0].get("/BlackIs1", False) + # PDF spec says: + # BlackIs1: A flag indicating whether 1 bits shall be interpreted as black + # pixels and 0 bits as white pixels, the reverse of the normal + # PDF convention for image data. Default value: false. + # TIFF spec says: + # use 0 for white_is_zero (=> black is 1) MINISWHITE + # use 1 for black_is_zero (=> white is 1) MINISBLACK + # However, despite the documentation, it seems PDF viewers treat + # photometry as 0 when ccitt is involved. + # For example see + # https://gitlab.gnome.org/GNOME/evince/-/blob/main/backend/tiff/tiff2ps.c#L852-865 + photometry = 0 + + img_size = len(data) + if icc is None: + icc = b'' + return _transcoding.generate_ccitt_header( + self.size, img_size, ccitt_group, photometry, icc + ) + + def show(self): # pragma: no cover + """Show the image however PIL wants to.""" + self.as_pil_image().show() + + def __repr__(self): + return ( + f'<pikepdf.PdfImage image mode={self.mode} ' + f'size={self.width}x{self.height} at {hex(id(self))}>' + ) + + def _repr_png_(self) -> bytes: + """Display hook for IPython/Jupyter.""" + b = BytesIO() + with self.as_pil_image() as im: + im.save(b, 'PNG') + return b.getvalue() + + +class PdfJpxImage(PdfImage): + """Support class for JPEG 2000 images. Implements the same API as :class:`PdfImage`. + + If you call PdfImage(object_that_is_actually_jpeg2000_image), pikepdf will return + this class instead, due to the check in PdfImage.__new__. + """ + + def __init__(self, obj): + """Initialize a JPEG 2000 image.""" + super().__init__(obj) + self._jpxpil = self.as_pil_image() + + def __eq__(self, other): + if not isinstance(other, PdfImageBase): + return NotImplemented + return ( + self.obj == other.obj + and isinstance(other, PdfJpxImage) + and self._jpxpil == other._jpxpil + ) + + def _extract_direct(self, *, stream: BinaryIO): + data, filters = self._remove_simple_filters(self.obj, self.filters) + if filters != ['/JPXDecode']: + raise UnsupportedImageTypeError(self.filters) + stream.write(data) + return '.jp2' + + @property + def _colorspaces(self): + """Return the effective colorspace of a JPEG 2000 image. + + If the ColorSpace dictionary is present, the colorspace embedded in the + JPEG 2000 data will be ignored, as required by the specification. + """ + # (PDF 1.7 Table 89) If ColorSpace is present, any colour space + # specifications in the JPEG2000 data shall be ignored. + super_colorspaces = super()._colorspaces + if super_colorspaces: + return super_colorspaces + if self._jpxpil.mode == 'L': + return ['/DeviceGray'] + if self._jpxpil.mode == 'RGB': + return ['/DeviceRGB'] + raise NotImplementedError('Complex JP2 colorspace') + + @property + def _bpc(self) -> int: + """Return 8, since bpc is not meaningful for JPEG 2000 encoding.""" + # (PDF 1.7 Table 89) If the image stream uses the JPXDecode filter, this + # entry is optional and shall be ignored if present. The bit depth is + # determined by the conforming reader in the process of decoding the + # JPEG2000 image. + return 8 + + @property + def indexed(self) -> bool: + """Return False, since JPEG 2000 should not be indexed.""" + # Nothing in the spec precludes an Indexed JPXDecode image, except for + # the fact that doing so is madness. Let's assume it no one is that + # insane. + return False + + def __repr__(self): + return ( + f'<pikepdf.PdfJpxImage JPEG2000 image mode={self.mode} ' + f'size={self.width}x{self.height} at {hex(id(self))}>' + ) + + +class PdfInlineImage(PdfImageBase): + """Support class for PDF inline images. Implements the same API as :class:`PdfImage`.""" + + # Inline images can contain abbreviations that we write automatically + ABBREVS = { + b'/W': b'/Width', + b'/H': b'/Height', + b'/BPC': b'/BitsPerComponent', + b'/IM': b'/ImageMask', + b'/CS': b'/ColorSpace', + b'/F': b'/Filter', + b'/DP': b'/DecodeParms', + b'/G': b'/DeviceGray', + b'/RGB': b'/DeviceRGB', + b'/CMYK': b'/DeviceCMYK', + b'/I': b'/Indexed', + b'/AHx': b'/ASCIIHexDecode', + b'/A85': b'/ASCII85Decode', + b'/LZW': b'/LZWDecode', + b'/RL': b'/RunLengthDecode', + b'/CCF': b'/CCITTFaxDecode', + b'/DCT': b'/DCTDecode', + } + REVERSE_ABBREVS = {v: k for k, v in ABBREVS.items()} + + _data: Object + _image_object: tuple[Object, ...] + + def __init__(self, *, image_data: Object, image_object: tuple): + """Construct wrapper for inline image. + + Args: + image_data: data stream for image, extracted from content stream + image_object: the metadata for image, also from content stream + """ + # Convert the sequence of pikepdf.Object from the content stream into + # a dictionary object by unparsing it (to bytes), eliminating inline + # image abbreviations, and constructing a bytes string equivalent to + # what an image XObject would look like. Then retrieve data from there + + self._data = image_data + self._image_object = image_object + + reparse = b' '.join( + self._unparse_obj(obj, remap_names=self.ABBREVS) for obj in image_object + ) + try: + reparsed_obj = Object.parse(b'<< ' + reparse + b' >>') + except PdfError as e: + raise PdfError("parsing inline " + reparse.decode('unicode_escape')) from e + self.obj = reparsed_obj + + def __eq__(self, other): + if not isinstance(other, PdfImageBase): + return NotImplemented + return ( + self.obj == other.obj + and isinstance(other, PdfInlineImage) + and ( + self._data._inline_image_raw_bytes() + == other._data._inline_image_raw_bytes() + ) + ) + + @classmethod + def _unparse_obj(cls, obj, remap_names): + if isinstance(obj, Object): + if isinstance(obj, Name): + name = obj.unparse(resolved=True) + assert isinstance(name, bytes) + return remap_names.get(name, name) + return obj.unparse(resolved=True) + if isinstance(obj, bool): + return b'true' if obj else b'false' # Lower case for PDF spec + if isinstance(obj, (int, Decimal, float)): + return str(obj).encode('ascii') + raise NotImplementedError(repr(obj)) + + def _metadata(self, name, type_, default): + return _metadata_from_obj(self.obj, name, type_, default) + + def unparse(self) -> bytes: + """Create the content stream bytes that reproduce this inline image.""" + + def metadata_tokens(): + for metadata_obj in self._image_object: + unparsed = self._unparse_obj( + metadata_obj, remap_names=self.REVERSE_ABBREVS + ) + assert isinstance(unparsed, bytes) + yield unparsed + + def inline_image_tokens(): + yield b'BI\n' + yield b' '.join(m for m in metadata_tokens()) + yield b'\nID\n' + yield self._data._inline_image_raw_bytes() + yield b'EI' + + return b''.join(inline_image_tokens()) + + @property + def icc(self): # pragma: no cover + """Raise an exception since ICC profiles are not supported on inline images.""" + raise InvalidPdfImageError( + "Inline images with ICC profiles are not supported in the PDF specification" + ) + + def __repr__(self): + try: + mode = self.mode + except NotImplementedError: + mode = '?' + return ( + f'<pikepdf.PdfInlineImage image mode={mode} ' + f'size={self.width}x{self.height} at {hex(id(self))}>' + ) + + def _convert_to_pdfimage(self): + # Construct a temporary PDF that holds this inline image, and... + tmppdf = Pdf.new() + tmppdf.add_blank_page(page_size=(self.width, self.height)) + tmppdf.pages[0].contents_add( + f'{self.width} 0 0 {self.height} 0 0 cm'.encode('ascii'), prepend=True + ) + tmppdf.pages[0].contents_add(self.unparse()) + + # ...externalize it, + tmppdf.pages[0].externalize_inline_images() + raw_img = next(im for im in tmppdf.pages[0].images.values()) + + # ...then use the regular PdfImage API to extract it. + img = PdfImage(raw_img) + return img + + def as_pil_image(self) -> Image.Image: + """Return inline image as a Pillow Image.""" + return self._convert_to_pdfimage().as_pil_image() + + def extract_to(self, *, stream: BinaryIO | None = None, fileprefix: str = ''): + """Extract the inline image directly to a usable image file. + + See: + :meth:`PdfImage.extract_to` + """ + return self._convert_to_pdfimage().extract_to( + stream=stream, fileprefix=fileprefix + ) + + def read_bytes(self): + """Return decompressed image bytes.""" + # QPDF does not have an API to return this directly, so convert it. + return self._convert_to_pdfimage().read_bytes() + + def get_stream_buffer(self): + """Return decompressed stream buffer.""" + # QPDF does not have an API to return this directly, so convert it. + return self._convert_to_pdfimage().get_stream_buffer() diff --git a/env/lib/python3.10/site-packages/pikepdf/models/matrix.py b/env/lib/python3.10/site-packages/pikepdf/models/matrix.py new file mode 100644 index 0000000..c660320 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/matrix.py @@ -0,0 +1,145 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""PDF content matrix support.""" + +from __future__ import annotations + +from math import cos, pi, sin + + +class PdfMatrix: + """ + Support class for PDF content stream matrices. + + PDF content stream matrices are 3x3 matrices summarized by a shorthand + ``(a, b, c, d, e, f)`` which correspond to the first two column vectors. + The final column vector is always ``(0, 0, 1)`` since this is using + `homogenous coordinates <https://en.wikipedia.org/wiki/Homogeneous_coordinates>`_. + + PDF uses row vectors. That is, ``vr @ A'`` gives the effect of transforming + a row vector ``vr=(x, y, 1)`` by the matrix ``A'``. Most textbook + treatments use ``A @ vc`` where the column vector ``vc=(x, y, 1)'``. + + (``@`` is the Python matrix multiplication operator.) + + Addition and other operations are not implemented because they're not that + meaningful in a PDF context (they can be defined and are mathematically + meaningful in general). + + PdfMatrix objects are immutable. All transformations on them produce a new + matrix. + + """ + + def __init__(self, *args): + # fmt: off + if not args: + self.values = ((1, 0, 0), (0, 1, 0), (0, 0, 1)) + elif len(args) == 6: + a, b, c, d, e, f = map(float, args) + self.values = ((a, b, 0), + (c, d, 0), + (e, f, 1)) + elif isinstance(args[0], PdfMatrix): + self.values = args[0].values + elif len(args[0]) == 6: + a, b, c, d, e, f = map(float, args[0]) + self.values = ((a, b, 0), + (c, d, 0), + (e, f, 1)) + elif len(args[0]) == 3 and len(args[0][0]) == 3: + self.values = (tuple(args[0][0]), + tuple(args[0][1]), + tuple(args[0][2])) + else: + raise ValueError('invalid arguments: ' + repr(args)) + # fmt: on + + @staticmethod + def identity(): + """Constructs and returns an identity matrix.""" + return PdfMatrix() + + def __matmul__(self, other): + """Multiply this matrix by another matrix. + + Can be used to concatenate transformations. + """ + a = self.values + b = other.values + return PdfMatrix( + [ + [sum(float(i) * float(j) for i, j in zip(row, col)) for col in zip(*b)] + for row in a + ] + ) + + def scaled(self, x, y): + """Concatenates a scaling matrix on this matrix.""" + return self @ PdfMatrix((x, 0, 0, y, 0, 0)) + + def rotated(self, angle_degrees_ccw): + """Concatenates a rotation matrix on this matrix.""" + angle = angle_degrees_ccw / 180.0 * pi + c, s = cos(angle), sin(angle) + return self @ PdfMatrix((c, s, -s, c, 0, 0)) + + def translated(self, x, y): + """Translates this matrix.""" + return self @ PdfMatrix((1, 0, 0, 1, x, y)) + + @property + def shorthand(self): + """Return the 6-tuple (a,b,c,d,e,f) that describes this matrix.""" + return (self.a, self.b, self.c, self.d, self.e, self.f) + + @property + def a(self): + """Return matrix this value.""" + return self.values[0][0] + + @property + def b(self): + """Return matrix this value.""" + return self.values[0][1] + + @property + def c(self): + """Return matrix this value.""" + return self.values[1][0] + + @property + def d(self): + """Return matrix this value.""" + return self.values[1][1] + + @property + def e(self): + """Return matrix this value. + + Typically corresponds to translation on the x-axis. + """ + return self.values[2][0] + + @property + def f(self): + """Return matrix this value. + + Typically corresponds to translation on the y-axis. + """ + return self.values[2][1] + + def __eq__(self, other): + if isinstance(other, PdfMatrix): + return self.shorthand == other.shorthand + return False + + def encode(self): + """Encode this matrix in binary suitable for including in a PDF.""" + return '{:.6f} {:.6f} {:.6f} {:.6f} {:.6f} {:.6f}'.format( + self.a, self.b, self.c, self.d, self.e, self.f + ).encode() + + def __repr__(self): + return f"pikepdf.PdfMatrix({repr(self.values)})" diff --git a/env/lib/python3.10/site-packages/pikepdf/models/metadata.py b/env/lib/python3.10/site-packages/pikepdf/models/metadata.py new file mode 100644 index 0000000..62158b1 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/metadata.py @@ -0,0 +1,866 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""PDF metadata handling.""" + +from __future__ import annotations + +import logging +import re +import sys +from abc import ABC, abstractmethod +from datetime import datetime +from functools import wraps +from io import BytesIO +from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Set +from warnings import warn + +from lxml import etree +from lxml.etree import QName, XMLSyntaxError + +from .. import Name, Stream, String +from .. import __version__ as pikepdf_version +from .._xml import parse_xml + +if sys.version_info < (3, 9): # pragma: no cover + from typing import Iterable, MutableMapping +else: + from collections.abc import Iterable, MutableMapping + +if TYPE_CHECKING: # pragma: no cover + from pikepdf import Pdf + + +XMP_NS_DC = "http://purl.org/dc/elements/1.1/" +XMP_NS_PDF = "http://ns.adobe.com/pdf/1.3/" +XMP_NS_PDFA_ID = "http://www.aiim.org/pdfa/ns/id/" +XMP_NS_PDFX_ID = "http://www.npes.org/pdfx/ns/id/" +XMP_NS_PHOTOSHOP = "http://ns.adobe.com/photoshop/1.0/" +XMP_NS_PRISM = "http://prismstandard.org/namespaces/basic/1.0/" +XMP_NS_PRISM2 = "http://prismstandard.org/namespaces/basic/2.0/" +XMP_NS_PRISM3 = "http://prismstandard.org/namespaces/basic/3.0/" +XMP_NS_RDF = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" +XMP_NS_XMP = "http://ns.adobe.com/xap/1.0/" +XMP_NS_XMP_MM = "http://ns.adobe.com/xap/1.0/mm/" +XMP_NS_XMP_RIGHTS = "http://ns.adobe.com/xap/1.0/rights/" + +DEFAULT_NAMESPACES: list[tuple[str, str]] = [ + ('adobe:ns:meta/', 'x'), + (XMP_NS_DC, 'dc'), + (XMP_NS_PDF, 'pdf'), + (XMP_NS_PDFA_ID, 'pdfaid'), + (XMP_NS_PDFX_ID, 'pdfxid'), + (XMP_NS_PHOTOSHOP, 'photoshop'), + (XMP_NS_PRISM, 'prism'), + (XMP_NS_PRISM2, 'prism2'), + (XMP_NS_PRISM3, 'prism3'), + (XMP_NS_RDF, 'rdf'), + (XMP_NS_XMP, 'xmp'), + (XMP_NS_XMP_MM, 'xmpMM'), + (XMP_NS_XMP_RIGHTS, 'xmpRights'), +] + +for _uri, _prefix in DEFAULT_NAMESPACES: + etree.register_namespace(_prefix, _uri) + +# This one should not be registered +XMP_NS_XML = "http://www.w3.org/XML/1998/namespace" + +XPACKET_BEGIN = b"""<?xpacket begin="\xef\xbb\xbf" id="W5M0MpCehiHzreSzNTczkc9d"?>\n""" + +XMP_EMPTY = b"""<x:xmpmeta xmlns:x="adobe:ns:meta/" x:xmptk="pikepdf"> + <rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"> + </rdf:RDF> +</x:xmpmeta> +""" + +XPACKET_END = b"""\n<?xpacket end="w"?>\n""" + + +class XmpContainer(NamedTuple): + """Map XMP container object to suitable Python container.""" + + rdf_type: str + py_type: type + insert_fn: Callable[..., None] + + +log = logging.getLogger(__name__) + + +class NeverRaise(Exception): + """An exception that is never raised.""" + + +class AltList(list): + """XMP AltList container.""" + + +XMP_CONTAINERS = [ + XmpContainer('Alt', AltList, AltList.append), + XmpContainer('Bag', set, set.add), + XmpContainer('Seq', list, list.append), +] + +LANG_ALTS = frozenset( + [ + str(QName(XMP_NS_DC, 'title')), + str(QName(XMP_NS_DC, 'description')), + str(QName(XMP_NS_DC, 'rights')), + str(QName(XMP_NS_XMP_RIGHTS, 'UsageTerms')), + ] +) + +# These are the illegal characters in XML 1.0. (XML 1.1 is a bit more permissive, +# but we'll be strict to ensure wider compatibility.) +re_xml_illegal_chars = re.compile( + r"(?u)[^\x09\x0A\x0D\x20-\U0000D7FF\U0000E000-\U0000FFFD\U00010000-\U0010FFFF]" +) +re_xml_illegal_bytes = re.compile( + br"[^\x09\x0A\x0D\x20-\xFF]|�" + # br"&#(?:[0-9]|0[0-9]|1[0-9]|2[0-9]|3[0-1]|x[0-9A-Fa-f]|x0[0-9A-Fa-f]|x1[0-9A-Fa-f]);" +) + + +def _parser_basic(xml: bytes): + return parse_xml(BytesIO(xml)) + + +def _parser_strip_illegal_bytes(xml: bytes): + return parse_xml(BytesIO(re_xml_illegal_bytes.sub(b'', xml))) + + +def _parser_recovery(xml: bytes): + return parse_xml(BytesIO(xml), recover=True) + + +def _parser_replace_with_empty_xmp(_xml: bytes = b''): + log.warning("Error occurred parsing XMP, replacing with empty XMP.") + return _parser_basic(XMP_EMPTY) + + +def _clean(s: str | Iterable[str], joiner: str = '; ') -> str: + """Ensure an object can safely be inserted in a XML tag body. + + If we still have a non-str object at this point, the best option is to + join it, because it's apparently calling for a new node in a place that + isn't allowed in the spec or not supported. + """ + if not isinstance(s, str): + if isinstance(s, Iterable): + warn(f"Merging elements of {s}") + if isinstance(s, Set): + s = joiner.join(sorted(s)) + else: + s = joiner.join(s) + else: + raise TypeError("object must be a string or iterable of strings") + return re_xml_illegal_chars.sub('', s) + + +def encode_pdf_date(d: datetime) -> str: + """Encode Python datetime object as PDF date string. + + From Adobe pdfmark manual: + (D:YYYYMMDDHHmmSSOHH'mm') + D: is an optional prefix. YYYY is the year. All fields after the year are + optional. MM is the month (01-12), DD is the day (01-31), HH is the + hour (00-23), mm are the minutes (00-59), and SS are the seconds + (00-59). The remainder of the string defines the relation of local + time to GMT. O is either + for a positive difference (local time is + later than GMT) or - (minus) for a negative difference. HH' is the + absolute value of the offset from GMT in hours, and mm' is the + absolute value of the offset in minutes. If no GMT information is + specified, the relation between the specified time and GMT is + considered unknown. Regardless of whether or not GMT + information is specified, the remainder of the string should specify + the local time. + + 'D:' is required in PDF/A, so we always add it. + """ + # The formatting of %Y is not consistent as described in + # https://bugs.python.org/issue13305 and underspecification in libc. + # So explicitly format the year with leading zeros + s = f"D:{d.year:04d}" + s += d.strftime(r'%m%d%H%M%S') + tz = d.strftime('%z') + if tz: + sign, tz_hours, tz_mins = tz[0], tz[1:3], tz[3:5] + s += f"{sign}{tz_hours}'{tz_mins}'" + return s + + +def decode_pdf_date(s: str) -> datetime: + """Decode a pdfmark date to a Python datetime object. + + A pdfmark date is a string in a paritcular format. See the pdfmark + Reference for the specification. + """ + if isinstance(s, String): + s = str(s) + if s.startswith('D:'): + s = s[2:] + + # Literal Z00'00', is incorrect but found in the wild, + # probably made by OS X Quartz -- standardize + if s.endswith("Z00'00'"): + s = s.replace("Z00'00'", '+0000') + elif s.endswith('Z'): + s = s.replace('Z', '+0000') + s = s.replace("'", "") # Remove apos from PDF time strings + try: + return datetime.strptime(s, r'%Y%m%d%H%M%S%z') + except ValueError: + return datetime.strptime(s, r'%Y%m%d%H%M%S') + + +class Converter(ABC): + """XMP <-> DocumentInfo converter.""" + + @staticmethod + @abstractmethod + def xmp_from_docinfo(docinfo_val: str | None) -> Any: # type: ignore + """Derive XMP metadata from a DocumentInfo string.""" + + @staticmethod + @abstractmethod + def docinfo_from_xmp(xmp_val: Any) -> str | None: + """Derive a DocumentInfo value from equivalent XMP metadata.""" + + +class AuthorConverter(Converter): + """Convert XMP document authors to DocumentInfo.""" + + @staticmethod + def xmp_from_docinfo(docinfo_val: str | None) -> Any: # type: ignore + """Derive XMP authors info from DocumentInfo.""" + return [docinfo_val] + + @staticmethod + def docinfo_from_xmp(xmp_val): + """Derive DocumentInfo authors from XMP. + + XMP supports multiple author values, while DocumentInfo has a string, + so we return the values separated by semi-colons. + """ + if isinstance(xmp_val, str): + return xmp_val + if xmp_val is None or xmp_val == [None]: + return None + return '; '.join(xmp_val) + + +class DateConverter(Converter): + """Convert XMP dates to DocumentInfo.""" + + @staticmethod + def xmp_from_docinfo(docinfo_val): + """Derive XMP date from DocumentInfo.""" + if docinfo_val == '': + return '' + return decode_pdf_date(docinfo_val).isoformat() + + @staticmethod + def docinfo_from_xmp(xmp_val): + """Derive DocumentInfo from XMP.""" + if xmp_val.endswith('Z'): + xmp_val = xmp_val[:-1] + '+00:00' + try: + dateobj = datetime.fromisoformat(xmp_val) + except IndexError: + # PyPy 3.7 may raise IndexError - convert to ValueError + raise ValueError(f"Invalid isoformat string: '{xmp_val}'") from None + return encode_pdf_date(dateobj) + + +class DocinfoMapping(NamedTuple): + """Map DocumentInfo keys to their XMP equivalents, along with converter.""" + + ns: str + key: str + name: Name + converter: type[Converter] | None + + +def ensure_loaded(fn): + """Ensure the XMP has been loaded and parsed. + + TODO: Can this be removed? Why allow the uninit'ed state to even exist? + """ + + @wraps(fn) + def wrapper(self, *args, **kwargs): + if not self._xmp: + self._load() + return fn(self, *args, **kwargs) + + return wrapper + + +class PdfMetadata(MutableMapping): + """Read and edit the metadata associated with a PDF. + + The PDF specification contain two types of metadata, the newer XMP + (Extensible Metadata Platform, XML-based) and older DocumentInformation + dictionary. The PDF 2.0 specification removes the DocumentInformation + dictionary. + + This primarily works with XMP metadata, but includes methods to generate + XMP from DocumentInformation and will also coordinate updates to + DocumentInformation so that the two are kept consistent. + + XMP metadata fields may be accessed using the full XML namespace URI or + the short name. For example ``metadata['dc:description']`` + and ``metadata['{http://purl.org/dc/elements/1.1/}description']`` + both refer to the same field. Several common XML namespaces are registered + automatically. + + See the XMP specification for details of allowable fields. + + To update metadata, use a with block. + + Example: + + >>> with pdf.open_metadata() as records: + records['dc:title'] = 'New Title' + + See Also: + :meth:`pikepdf.Pdf.open_metadata` + """ + + DOCINFO_MAPPING: list[DocinfoMapping] = [ + DocinfoMapping(XMP_NS_DC, 'creator', Name.Author, AuthorConverter), + DocinfoMapping(XMP_NS_DC, 'description', Name.Subject, None), + DocinfoMapping(XMP_NS_DC, 'title', Name.Title, None), + DocinfoMapping(XMP_NS_PDF, 'Keywords', Name.Keywords, None), + DocinfoMapping(XMP_NS_PDF, 'Producer', Name.Producer, None), + DocinfoMapping(XMP_NS_XMP, 'CreateDate', Name.CreationDate, DateConverter), + DocinfoMapping(XMP_NS_XMP, 'CreatorTool', Name.Creator, None), + DocinfoMapping(XMP_NS_XMP, 'ModifyDate', Name.ModDate, DateConverter), + ] + + NS: dict[str, str] = {prefix: uri for uri, prefix in DEFAULT_NAMESPACES} + REVERSE_NS: dict[str, str] = dict(DEFAULT_NAMESPACES) + + _PARSERS_OVERWRITE_INVALID_XML: Iterable[Callable[[bytes], Any]] = [ + _parser_basic, + _parser_strip_illegal_bytes, + _parser_recovery, + _parser_replace_with_empty_xmp, + ] + _PARSERS_STANDARD: Iterable[Callable[[bytes], Any]] = [_parser_basic] + + def __init__( + self, + pdf: Pdf, + pikepdf_mark: bool = True, + sync_docinfo: bool = True, + overwrite_invalid_xml: bool = True, + ): + self._pdf = pdf + self._xmp = None + self.mark = pikepdf_mark + self.sync_docinfo = sync_docinfo + self._updating = False + self.overwrite_invalid_xml = overwrite_invalid_xml + + def load_from_docinfo( + self, docinfo, delete_missing: bool = False, raise_failure: bool = False + ) -> None: + """Populate the XMP metadata object with DocumentInfo. + + Arguments: + docinfo: a DocumentInfo, e.g pdf.docinfo + delete_missing: if the entry is not DocumentInfo, delete the equivalent + from XMP + raise_failure: if True, raise any failure to convert docinfo; + otherwise warn and continue + + A few entries in the deprecated DocumentInfo dictionary are considered + approximately equivalent to certain XMP records. This method copies + those entries into the XMP metadata. + """ + + def warn_or_raise(msg, e=None): + if raise_failure: + raise ValueError(msg) from e + warn(msg) + + for uri, shortkey, docinfo_name, converter in self.DOCINFO_MAPPING: + qname = QName(uri, shortkey) + # docinfo might be a dict or pikepdf.Dictionary, so lookup keys + # by str(Name) + val = docinfo.get(str(docinfo_name)) + if val is None: + if delete_missing and qname in self: + del self[qname] + continue + try: + val = str(val) + if converter: + val = converter.xmp_from_docinfo(val) + if not val: + continue + self._setitem(qname, val, True) + except (ValueError, AttributeError, NotImplementedError) as e: + warn_or_raise( + f"The metadata field {docinfo_name} could not be copied to XMP", e + ) + valid_docinfo_names = { + str(docinfo_name) for _, _, docinfo_name, _ in self.DOCINFO_MAPPING + } + extra_docinfo_names = {str(k) for k in docinfo.keys()} - valid_docinfo_names + for extra in extra_docinfo_names: + warn_or_raise( + f"The metadata field {extra} with value '{repr(docinfo.get(extra))}' " + "has no XMP equivalent, so it was discarded", + ) + + def _load(self) -> None: + try: + data = self._pdf.Root.Metadata.read_bytes() + except AttributeError: + data = b'' + self._load_from(data) + + def _load_from(self, data: bytes) -> None: + if data.strip() == b'': + data = XMP_EMPTY # on some platforms lxml chokes on empty documents + + parsers = ( + self._PARSERS_OVERWRITE_INVALID_XML + if self.overwrite_invalid_xml + else self._PARSERS_STANDARD + ) + + for parser in parsers: + try: + self._xmp = parser(data) + except ( + XMLSyntaxError + if self.overwrite_invalid_xml + else NeverRaise # type: ignore + ) as e: + if str(e).startswith("Start tag expected, '<' not found") or str( + e + ).startswith("Document is empty"): + self._xmp = _parser_replace_with_empty_xmp() + break + else: + break + + if self._xmp is not None: + try: + pis = self._xmp.xpath('/processing-instruction()') + for pi in pis: + etree.strip_tags(self._xmp, pi.tag) + self._get_rdf_root() + except ( + Exception # pylint: disable=broad-except + if self.overwrite_invalid_xml + else NeverRaise + ) as e: + log.warning("Error occurred parsing XMP", exc_info=e) + self._xmp = _parser_replace_with_empty_xmp() + else: + log.warning("Error occurred parsing XMP") + self._xmp = _parser_replace_with_empty_xmp() + + @ensure_loaded + def __enter__(self): + self._updating = True + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + if exc_type is not None: + return + self._apply_changes() + finally: + self._updating = False + + def _update_docinfo(self): + """Update the PDF's DocumentInfo dictionary to match XMP metadata. + + The standard mapping is described here: + https://www.pdfa.org/pdfa-metadata-xmp-rdf-dublin-core/ + """ + # Touch object to ensure it exists + self._pdf.docinfo # pylint: disable=pointless-statement + for uri, element, docinfo_name, converter in self.DOCINFO_MAPPING: + qname = QName(uri, element) + try: + value = self[qname] + except KeyError: + if docinfo_name in self._pdf.docinfo: + del self._pdf.docinfo[docinfo_name] + continue + if converter: + try: + value = converter.docinfo_from_xmp(value) + except ValueError: + warn( + f"The DocumentInfo field {docinfo_name} could not be " + "updated from XMP" + ) + value = None + except Exception as e: + raise ValueError( + "An error occurred while updating DocumentInfo field " + f"{docinfo_name} from XMP {qname} with value {value}" + ) from e + if value is None: + if docinfo_name in self._pdf.docinfo: + del self._pdf.docinfo[docinfo_name] + continue + value = _clean(value) + try: + # Try to save pure ASCII + self._pdf.docinfo[docinfo_name] = value.encode('ascii') + except UnicodeEncodeError: + # qpdf will serialize this as a UTF-16 with BOM string + self._pdf.docinfo[docinfo_name] = value + + def _get_xml_bytes(self, xpacket=True): + data = BytesIO() + if xpacket: + data.write(XPACKET_BEGIN) + self._xmp.write(data, encoding='utf-8', pretty_print=True) + if xpacket: + data.write(XPACKET_END) + data.seek(0) + xml_bytes = data.read() + return xml_bytes + + def _apply_changes(self): + """Serialize our changes back to the PDF in memory. + + Depending how we are initialized, leave our metadata mark and producer. + """ + if self.mark: + # We were asked to mark the file as being edited by pikepdf + self._setitem( + QName(XMP_NS_XMP, 'MetadataDate'), + datetime.now(datetime.utcnow().astimezone().tzinfo).isoformat(), + applying_mark=True, + ) + self._setitem( + QName(XMP_NS_PDF, 'Producer'), + 'pikepdf ' + pikepdf_version, + applying_mark=True, + ) + xml = self._get_xml_bytes() + self._pdf.Root.Metadata = Stream(self._pdf, xml) + self._pdf.Root.Metadata[Name.Type] = Name.Metadata + self._pdf.Root.Metadata[Name.Subtype] = Name.XML + if self.sync_docinfo: + self._update_docinfo() + + @classmethod + def _qname(cls, name: QName | str) -> str: + """Convert name to an XML QName. + + e.g. pdf:Producer -> {http://ns.adobe.com/pdf/1.3/}Producer + """ + if isinstance(name, QName): + return str(name) + if not isinstance(name, str): + raise TypeError(f"{name} must be str") + if name == '': + return name + if name.startswith('{'): + return name + try: + prefix, tag = name.split(':', maxsplit=1) + except ValueError: + # If missing the namespace, put it in the top level namespace + # To do this completely correct we actually need to figure out + # the namespace based on context defined by parent tags. That + # https://www.w3.org/2001/tag/doc/qnameids.html + prefix, tag = 'x', name + uri = cls.NS[prefix] + return str(QName(uri, tag)) + + def _prefix_from_uri(self, uriname): + """Given a fully qualified XML name, find a prefix. + + e.g. {http://ns.adobe.com/pdf/1.3/}Producer -> pdf:Producer + """ + uripart, tag = uriname.split('}', maxsplit=1) + uri = uripart.replace('{', '') + return self.REVERSE_NS[uri] + ':' + tag + + def _get_subelements(self, node): + """Gather the sub-elements attached to a node. + + Gather rdf:Bag and and rdf:Seq into set and list respectively. For + alternate languages values, take the first language only for + simplicity. + """ + items = node.find('rdf:Alt', self.NS) + if items is not None: + try: + return items[0].text + except IndexError: + return '' + + for xmlcontainer, container, insertfn in XMP_CONTAINERS: + items = node.find(f'rdf:{xmlcontainer}', self.NS) + if items is None: + continue + result = container() + for item in items: + insertfn(result, item.text) + return result + return '' + + def _get_rdf_root(self): + rdf = self._xmp.find('.//rdf:RDF', self.NS) + if rdf is None: + rdf = self._xmp.getroot() + if not rdf.tag == '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF': + raise ValueError("Metadata seems to be XML but not XMP") + return rdf + + def _get_elements(self, name: str | QName = ''): + """Get elements from XMP. + + Core routine to find elements matching name within the XMP and yield + them. + + For XMP spec 7.9.2.2, rdf:Description with property attributes, + we yield the node which will have the desired as one of its attributes. + qname is returned so that the node.attrib can be used to locate the + source. + + For XMP spec 7.5, simple valued XMP properties, we yield the node, + None, and the value. For structure or array valued properties we gather + the elements. We ignore qualifiers. + + Args: + name: a prefixed name or QName to look for within the + data section of the XMP; looks for all data keys if omitted + + Yields: + tuple: (node, qname_attrib, value, parent_node) + + """ + qname = self._qname(name) + rdf = self._get_rdf_root() + for rdfdesc in rdf.findall('rdf:Description[@rdf:about=""]', self.NS): + if qname and qname in rdfdesc.keys(): + yield (rdfdesc, qname, rdfdesc.get(qname), rdf) + elif not qname: + for k, v in rdfdesc.items(): + if v: + yield (rdfdesc, k, v, rdf) + xpath = qname if name else '*' + for node in rdfdesc.findall(xpath, self.NS): + if node.text and node.text.strip(): + yield (node, None, node.text, rdfdesc) + continue + values = self._get_subelements(node) + yield (node, None, values, rdfdesc) + + def _get_element_values(self, name=''): + yield from (v[2] for v in self._get_elements(name)) + + @ensure_loaded + def __contains__(self, key: str | QName): + return any(self._get_element_values(key)) + + @ensure_loaded + def __getitem__(self, key: str | QName): + try: + return next(self._get_element_values(key)) + except StopIteration: + raise KeyError(key) from None + + @ensure_loaded + def __iter__(self): + for node, attrib, _val, _parents in self._get_elements(): + if attrib: + yield attrib + else: + yield node.tag + + @ensure_loaded + def __len__(self): + return len(list(iter(self))) + + def _setitem( + self, + key: str | QName, + val: set[str] | list[str] | str, + applying_mark: bool = False, + ): + if not self._updating: + raise RuntimeError("Metadata not opened for editing, use with block") + + qkey = self._qname(key) + self._setitem_check_args(key, val, applying_mark, qkey) + + try: + # Update existing node + self._setitem_update(key, val, qkey) + except StopIteration: + # Insert a new node + self._setitem_insert(key, val) + + def _setitem_check_args(self, key, val, applying_mark: bool, qkey: str) -> None: + if ( + self.mark + and not applying_mark + and qkey + in ( + self._qname('xmp:MetadataDate'), + self._qname('pdf:Producer'), + ) + ): + # Complain if user writes self[pdf:Producer] = ... and because it will + # be overwritten on save, unless self._updating_mark, in which case + # the action was initiated internally + log.warning( + f"Update to {key} will be overwritten because metadata was opened " + "with set_pikepdf_as_editor=True" + ) + if isinstance(val, str) and qkey in (self._qname('dc:creator')): + log.error(f"{key} should be set to a list of strings") + + def _setitem_add_array(self, node, items: Iterable) -> None: + rdf_type = next( + c.rdf_type for c in XMP_CONTAINERS if isinstance(items, c.py_type) + ) + seq = etree.SubElement(node, str(QName(XMP_NS_RDF, rdf_type))) + tag_attrib: dict[str, str] | None = None + if rdf_type == 'Alt': + tag_attrib = {str(QName(XMP_NS_XML, 'lang')): 'x-default'} + for item in items: + el = etree.SubElement(seq, str(QName(XMP_NS_RDF, 'li')), attrib=tag_attrib) + el.text = _clean(item) + + def _setitem_update(self, key, val, qkey): + # Locate existing node to replace + node, attrib, _oldval, _parent = next(self._get_elements(key)) + if attrib: + if not isinstance(val, str): + if qkey == self._qname('dc:creator'): + # dc:creator incorrectly created as an attribute - we're + # replacing it anyway, so remove the old one + del node.attrib[qkey] + self._setitem_add_array(node, _clean(val)) + else: + raise TypeError(f"Setting {key} to {val} with type {type(val)}") + else: + node.set(attrib, _clean(val)) + elif isinstance(val, (list, set)): + for child in node.findall('*'): + node.remove(child) + self._setitem_add_array(node, val) + elif isinstance(val, str): + for child in node.findall('*'): + node.remove(child) + if str(self._qname(key)) in LANG_ALTS: + self._setitem_add_array(node, AltList([_clean(val)])) + else: + node.text = _clean(val) + else: + raise TypeError(f"Setting {key} to {val} with type {type(val)}") + + def _setitem_insert(self, key, val): + rdf = self._get_rdf_root() + if str(self._qname(key)) in LANG_ALTS: + val = AltList([_clean(val)]) + if isinstance(val, (list, set)): + rdfdesc = etree.SubElement( + rdf, + str(QName(XMP_NS_RDF, 'Description')), + attrib={str(QName(XMP_NS_RDF, 'about')): ''}, + ) + node = etree.SubElement(rdfdesc, self._qname(key)) + self._setitem_add_array(node, val) + elif isinstance(val, str): + _rdfdesc = etree.SubElement( + rdf, + str(QName(XMP_NS_RDF, 'Description')), + attrib={ + QName(XMP_NS_RDF, 'about'): '', + self._qname(key): _clean(val), + }, + ) + else: + raise TypeError(f"Setting {key} to {val} with type {type(val)}") from None + + @ensure_loaded + def __setitem__(self, key: str | QName, val: set[str] | list[str] | str): + return self._setitem(key, val, False) + + @ensure_loaded + def __delitem__(self, key: str | QName): + if not self._updating: + raise RuntimeError("Metadata not opened for editing, use with block") + try: + node, attrib, _oldval, parent = next(self._get_elements(key)) + if attrib: # Inline + del node.attrib[attrib] + if ( + len(node.attrib) == 1 + and len(node) == 0 + and QName(XMP_NS_RDF, 'about') in node.attrib + ): + # The only thing left on this node is rdf:about="", so remove it + parent.remove(node) + else: + parent.remove(node) + except StopIteration: + raise KeyError(key) from None + + @property + def pdfa_status(self) -> str: + """Return the PDF/A conformance level claimed by this PDF, or False. + + A PDF may claim to PDF/A compliant without this being true. Use an + independent verifier such as veraPDF to test if a PDF is truly + conformant. + + Returns: + The conformance level of the PDF/A, or an empty string if the + PDF does not claim PDF/A conformance. Possible valid values + are: 1A, 1B, 2A, 2B, 2U, 3A, 3B, 3U. + """ + # do same as @ensure_loaded - mypy can't handle decorated property + if not self._xmp: + self._load() + + key_part = QName(XMP_NS_PDFA_ID, 'part') + key_conformance = QName(XMP_NS_PDFA_ID, 'conformance') + try: + return self[key_part] + self[key_conformance] + except KeyError: + return '' + + @property + def pdfx_status(self) -> str: + """Return the PDF/X conformance level claimed by this PDF, or False. + + A PDF may claim to PDF/X compliant without this being true. Use an + independent verifier such as veraPDF to test if a PDF is truly + conformant. + + Returns: + The conformance level of the PDF/X, or an empty string if the + PDF does not claim PDF/X conformance. + """ + # do same as @ensure_loaded - mypy can't handle decorated property + if not self._xmp: + self._load() + + pdfx_version = QName(XMP_NS_PDFX_ID, 'GTS_PDFXVersion') + try: + return self[pdfx_version] + except KeyError: + return '' + + @ensure_loaded + def __str__(self): + return self._get_xml_bytes(xpacket=False).decode('utf-8') diff --git a/env/lib/python3.10/site-packages/pikepdf/models/outlines.py b/env/lib/python3.10/site-packages/pikepdf/models/outlines.py new file mode 100644 index 0000000..1143de6 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/models/outlines.py @@ -0,0 +1,421 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow, 2020 Matthias Erll + +# SPDX-License-Identifier: MPL-2.0 + +"""Support for document outlines (e.g. table of contents).""" + +from __future__ import annotations + +from enum import Enum +from itertools import chain +from typing import Iterable, List, cast + +from pikepdf import Array, Dictionary, Name, Object, Page, Pdf, String + + +class PageLocation(Enum): + """Page view location definitions, from PDF spec.""" + + XYZ = 1 + Fit = 2 + FitH = 3 + FitV = 4 + FitR = 5 + FitB = 6 + FitBH = 7 + FitBV = 8 + + +PAGE_LOCATION_ARGS = { + PageLocation.XYZ: ('left', 'top', 'zoom'), + PageLocation.FitH: ('top',), + PageLocation.FitV: ('left',), + PageLocation.FitR: ('left', 'bottom', 'right', 'top'), + PageLocation.FitBH: ('top',), + PageLocation.FitBV: ('left',), +} +ALL_PAGE_LOCATION_KWARGS = set(chain.from_iterable(PAGE_LOCATION_ARGS.values())) + + +def make_page_destination( + pdf: Pdf, + page_num: int, + page_location: PageLocation | str | None = None, + *, + left: float | None = None, + top: float | None = None, + right: float | None = None, + bottom: float | None = None, + zoom: float | None = None, +) -> Array: + """ + Create a destination ``Array`` with reference to a Pdf document's page number. + + Arguments: + pdf: PDF document object. + page_num: Page number (zero-based). + page_location: Optional page location, as a string or :enum:`PageLocation`. + left: Specify page viewport rectangle. + top: Specify page viewport rectangle. + right: Specify page viewport rectangle. + bottom: Specify page viewport rectangle. + zoom: Specify page viewport rectangle's zoom level. + + left, top, right, bottom, zoom are used in conjunction with the page fit style + specified by *page_location*. + """ + return _make_page_destination( + pdf, + page_num, + page_location=page_location, + left=left, + top=top, + right=right, + bottom=bottom, + zoom=zoom, + ) + + +def _make_page_destination( + pdf: Pdf, + page_num: int, + page_location: PageLocation | str | None = None, + **kwargs, +) -> Array: + kwargs = {k: v for k, v in kwargs.items() if v is not None} + + res: list[Dictionary | Name] = [pdf.pages[page_num].obj] + if page_location: + if isinstance(page_location, PageLocation): + loc_key = page_location + loc_str = loc_key.name + else: + loc_str = page_location + try: + loc_key = PageLocation[loc_str] + except KeyError: + raise ValueError( + f"Invalid or unsupported page location type {loc_str}" + ) from None + res.append(Name(f'/{loc_str}')) + dest_arg_names = PAGE_LOCATION_ARGS.get(loc_key) + if dest_arg_names: + res.extend(kwargs.get(k, 0) for k in dest_arg_names) + else: + res.append(Name.Fit) + return Array(res) + + +class OutlineStructureError(Exception): + """Indicates an error in the outline data structure.""" + + +class OutlineItem: + """Manage a single item in a PDF document outlines structure. + + Includes nested items. + + Arguments: + title: Title of the outlines item. + destination: Page number, destination name, or any other PDF object + to be used as a reference when clicking on the outlines entry. Note + this should be ``None`` if an action is used instead. If set to a + page number, it will be resolved to a reference at the time of + writing the outlines back to the document. + page_location: Supplemental page location for a page number + in ``destination``, e.g. ``PageLocation.Fit``. May also be + a simple string such as ``'FitH'``. + action: Action to perform when clicking on this item. Will be ignored + during writing if ``destination`` is also set. + obj: ``Dictionary`` object representing this outlines item in a ``Pdf``. + May be ``None`` for creating a new object. If present, an existing + object is modified in-place during writing and original attributes + are retained. + left, top, bottom, right, zoom: Describes the viewport position associated + with a destination. + + This object does not contain any information about higher-level or + neighboring elements. + + Valid destination arrays: + [page /XYZ left top zoom] + generally + [page, PageLocationEntry, 0 to 4 ints] + """ + + def __init__( + self, + title: str, + destination: Array | String | Name | int | None = None, + page_location: PageLocation | str | None = None, + action: Dictionary | None = None, + obj: Dictionary | None = None, + *, + left: float | None = None, + top: float | None = None, + right: float | None = None, + bottom: float | None = None, + zoom: float | None = None, + ): + self.title = title + self.destination = destination + self.page_location = page_location + self.page_location_kwargs = {} + self.action = action + if self.destination is not None and self.action is not None: + raise ValueError("Only one of destination and action may be set") + self.obj = obj + kwargs = dict(left=left, top=top, right=right, bottom=bottom, zoom=zoom) + self.page_location_kwargs = {k: v for k, v in kwargs.items() if v is not None} + self.is_closed = False + self.children: list[OutlineItem] = [] + + def __str__(self): + if self.children: + if self.is_closed: + oc_indicator = '[+]' + else: + oc_indicator = '[-]' + else: + oc_indicator = '[ ]' + if self.destination is not None: + if isinstance(self.destination, Array): + # 12.3.2.2 Explicit destination + # [raw_page, /PageLocation.SomeThing, integer parameters for viewport] + raw_page = self.destination[0] + page = Page(raw_page) + dest = page.label + elif isinstance(self.destination, String): + # 12.3.2.2 Named destination, byte string reference to Names + dest = f'<Named Destination in document .Root.Names dictionary: {self.destination}>' + elif isinstance(self.destination, Name): + # 12.3.2.2 Named destination, name object (PDF 1.1) + dest = f'<Named Destination in document .Root.Dests dictionary: {self.destination}>' + elif isinstance(self.destination, int): + # Page number + dest = f'<Page {self.destination}>' + else: + dest = '<Action>' + return f'{oc_indicator} {self.title} -> {dest}' + + def __repr__(self): + return f'<pikepdf.{self.__class__.__name__}: "{self.title}">' + + @classmethod + def from_dictionary_object(cls, obj: Dictionary): + """Creates a ``OutlineItem`` from a ``Dictionary``. + + Does not process nested items. + + Arguments: + obj: ``Dictionary`` object representing a single outline node. + """ + title = str(obj.Title) + destination = obj.get(Name.Dest) + if destination is not None and not isinstance( + destination, (Array, String, Name) + ): + # 12.3.3: /Dest may be a name, byte string or array + raise OutlineStructureError( + f"Unexpected object type in Outline's /Dest: {destination!r}" + ) + action = obj.get(Name.A) + if action is not None and not isinstance(action, Dictionary): + raise OutlineStructureError( + f"Unexpected object type in Outline's /A: {action!r}" + ) + return cls(title, destination=destination, action=action, obj=obj) + + def to_dictionary_object(self, pdf: Pdf, create_new: bool = False) -> Dictionary: + """Creates/updates a ``Dictionary`` object from this outline node. + + Page numbers are resolved to a page reference on the input + ``Pdf`` object. + + Arguments: + pdf: PDF document object. + create_new: If set to ``True``, creates a new object instead of + modifying an existing one in-place. + """ + if create_new or self.obj is None: + self.obj = obj = pdf.make_indirect(Dictionary()) + else: + obj = self.obj + obj.Title = self.title + if self.destination is not None: + if isinstance(self.destination, int): + self.destination = make_page_destination( + pdf, + self.destination, + self.page_location, + **self.page_location_kwargs, + ) + obj.Dest = self.destination + if Name.A in obj: + del obj.A + elif self.action is not None: + obj.A = self.action + if Name.Dest in obj: + del obj.Dest + return obj + + +class Outline: + """Maintains a intuitive interface for creating and editing PDF document outlines. + + See |pdfrm| section 12.3. + + Arguments: + pdf: PDF document object. + max_depth: Maximum recursion depth to consider when reading the outline. + strict: If set to ``False`` (default) silently ignores structural errors. + Setting it to ``True`` raises a + :class:`pikepdf.OutlineStructureError` + if any object references re-occur while the outline is being read or + written. + + See Also: + :meth:`pikepdf.Pdf.open_outline` + """ + + def __init__(self, pdf: Pdf, max_depth: int = 15, strict: bool = False): + self._root: list[OutlineItem] | None = None + self._pdf = pdf + self._max_depth = max_depth + self._strict = strict + self._updating = False + + def __str__(self): + return str(self.root) + + def __repr__(self): + return f'<pikepdf.{self.__class__.__name__}: {len(self.root)} items>' + + def __enter__(self): + self._updating = True + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + if exc_type is not None: + return + self._save() + finally: + self._updating = False + + def _save_level_outline( + self, + parent: Dictionary, + outline_items: Iterable[OutlineItem], + level: int, + visited_objs: set[tuple[int, int]], + ): + count = 0 + prev: Dictionary | None = None + first: Dictionary | None = None + for item in outline_items: + out_obj = item.to_dictionary_object(self._pdf) + objgen = out_obj.objgen + if objgen in visited_objs: + if self._strict: + raise OutlineStructureError( + f"Outline object {objgen} reoccurred in structure" + ) + out_obj = item.to_dictionary_object(self._pdf, create_new=True) + else: + visited_objs.add(objgen) + + out_obj.Parent = parent + count += 1 + if prev is not None: + prev.Next = out_obj + out_obj.Prev = prev + else: + first = out_obj + if Name.Prev in out_obj: + del out_obj.Prev + prev = out_obj + if level < self._max_depth: + sub_items: Iterable[OutlineItem] = item.children + else: + sub_items = () + self._save_level_outline(out_obj, sub_items, level + 1, visited_objs) + if item.is_closed: + out_obj.Count = -cast(int, out_obj.Count) + else: + count += cast(int, out_obj.Count) + if count: + assert prev is not None and first is not None + if Name.Next in prev: + del prev.Next + parent.First = first + parent.Last = prev + else: + if Name.First in parent: + del parent.First + if Name.Last in parent: + del parent.Last + parent.Count = count + + def _load_level_outline( + self, + first_obj: Dictionary, + outline_items: list[Object], + level: int, + visited_objs: set[tuple[int, int]], + ): + current_obj: Dictionary | None = first_obj + while current_obj: + objgen = current_obj.objgen + if objgen in visited_objs: + if self._strict: + raise OutlineStructureError( + f"Outline object {objgen} reoccurred in structure" + ) + return + visited_objs.add(objgen) + + item = OutlineItem.from_dictionary_object(current_obj) + first_child = current_obj.get(Name.First) + if isinstance(first_child, Dictionary) and level < self._max_depth: + self._load_level_outline( + first_child, item.children, level + 1, visited_objs + ) + count = current_obj.get(Name.Count) + if isinstance(count, int) and count < 0: + item.is_closed = True + outline_items.append(item) + next_obj = current_obj.get(Name.Next) + if next_obj is None or isinstance(next_obj, Dictionary): + current_obj = next_obj + else: + raise OutlineStructureError( + f"Outline object {objgen} points to non-dictionary" + ) + + def _save(self): + if self._root is None: + return + if Name.Outlines in self._pdf.Root: + outlines = self._pdf.Root.Outlines + else: + self._pdf.Root.Outlines = outlines = self._pdf.make_indirect( + Dictionary(Type=Name.Outlines) + ) + self._save_level_outline(outlines, self._root, 0, set()) + + def _load(self): + self._root = root = [] + if Name.Outlines not in self._pdf.Root: + return + outlines = self._pdf.Root.Outlines or {} + first_obj = outlines.get(Name.First) + if first_obj: + self._load_level_outline(first_obj, root, 0, set()) + + @property + def root(self) -> list[OutlineItem]: + """Return the root node of the outline.""" + if self._root is None: + self._load() + return cast(List[OutlineItem], self._root) diff --git a/env/lib/python3.10/site-packages/pikepdf/objects.py b/env/lib/python3.10/site-packages/pikepdf/objects.py new file mode 100644 index 0000000..338d9f0 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/objects.py @@ -0,0 +1,300 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""Provide classes to stand in for PDF objects. + +The purpose of these is to provide nice-looking classes to allow explicit +construction of PDF objects and more pythonic idioms and facilitate discovery +by documentation generators and linters. + +It's also a place to narrow the scope of input types to those more easily +converted to C++. + +There is some deliberate "smoke and mirrors" here: all of the objects are truly +instances of ``pikepdf.Object``, which is a variant container object. The +``__new__`` constructs a ``pikepdf.Object`` in each case, and the rest of the +class definition is present as an aide for code introspection. +""" + +from __future__ import annotations + +# pylint: disable=unused-import, abstract-method +from secrets import token_urlsafe +from typing import TYPE_CHECKING, Any, Iterable, Mapping, cast +from warnings import warn + +from . import _qpdf +from ._qpdf import Object, ObjectType, Rectangle + +if TYPE_CHECKING: # pragma: no cover + from pikepdf import Pdf + +# By default pikepdf.Object will identify itself as pikepdf._qpdf.Object +# Here we change the module to discourage people from using that internal name +# Instead it will become pikepdf.objects.Object +Object.__module__ = __name__ +ObjectType.__module__ = __name__ + + +# type(Object) is the metaclass that pybind11 defines; we wish to extend that +# pylint cannot see the C++ metaclass definition and is thoroughly confused. +# pylint: disable=invalid-metaclass + + +class _ObjectMeta(type(Object)): # type: ignore + """Support instance checking.""" + + def __instancecheck__(self, instance: Any) -> bool: + # Note: since this class is a metaclass, self is a class object + if type(instance) != Object: + return False + return self.object_type == instance._type_code + + +class _NameObjectMeta(_ObjectMeta): + """Support usage pikepdf.Name.Whatever -> Name('/Whatever').""" + + def __getattr__(self, attr: str) -> Any: + if attr.startswith('_') or attr == 'object_type': + return getattr(_ObjectMeta, attr) + return Name('/' + attr) + + def __setattr__(self, attr: str, value: Any) -> None: + # No need for a symmetric .startswith('_'). To prevent user error, we + # simply don't allow mucking with the pikepdf.Name class's attributes. + # There is no reason to ever assign to them. + raise AttributeError( + "Attributes may not be set on pikepdf.Name. Perhaps you meant to " + "modify a Dictionary rather than a Name?" + ) + + def __getitem__(self, item: str) -> Name: + if item.startswith('/'): + item = item[1:] + raise TypeError( + "pikepdf.Name is not subscriptable. You probably meant:\n" + f" pikepdf.Name.{item}\n" + "or\n" + f" pikepdf.Name('/{item}')\n" + ) + + +class Name(Object, metaclass=_NameObjectMeta): + """Construct a PDF Name object. + + Names can be constructed with two notations: + + 1. ``Name.Resources`` + + 2. ``Name('/Resources')`` + + The two are semantically equivalent. The former is preferred for names + that are normally expected to be in a PDF. The latter is preferred for + dynamic names and attributes. + """ + + object_type = ObjectType.name_ + + def __new__(cls, name: str | Name) -> Name: + """Construct a PDF Name.""" + # QPDF_Name::unparse ensures that names are always saved in a UTF-8 + # compatible way, so we only need to guard the input. + if isinstance(name, bytes): + raise TypeError("Name should be str") + if isinstance(name, Name): + return name # Names are immutable so we can return a reference + return _qpdf._new_name(name) + + @classmethod + def random(cls, len_: int = 16, prefix: str = '') -> Name: + """Generate a cryptographically strong random, valid PDF Name. + + This function uses Python's secrets.token_urlsafe, which returns a + URL-safe encoded random number of the desired length. An optional + *prefix* may be prepended. (The encoding is ultimately done with + :func:`base64.urlsafe_b64encode`.) Serendipitously, URL-safe is also + PDF-safe. + + When the length parameter is 16 (16 random bytes or 128 bits), the result + is probably globally unique and can be treated as never colliding with + other names. + + The length of the string may vary because it is encoded. + """ + random_string = token_urlsafe(len_) + return _qpdf._new_name(f"/{prefix}{random_string}") + + +class Operator(Object, metaclass=_ObjectMeta): + """Construct an operator for use in a content stream. + + An Operator is one of a limited set of commands that can appear in PDF content + streams (roughly the mini-language that draws objects, lines and text on a + virtual PDF canvas). The commands :func:`parse_content_stream` and + :func:`unparse_content_stream` create and expect Operators respectively, along + with their operands. + + pikepdf uses the special Operator "INLINE IMAGE" to denote an inline image + in a content stream. + """ + + object_type = ObjectType.operator + + def __new__(cls, name: str) -> Operator: + """Construct an operator.""" + return cast('Operator', _qpdf._new_operator(name)) + + +class String(Object, metaclass=_ObjectMeta): + """Construct a PDF String object.""" + + object_type = ObjectType.string + + def __new__(cls, s: str | bytes) -> String: + """ + Construct a PDF String. + + Args: + s: The string to use. String will be encoded for + PDF, bytes will be constructed without encoding. + + Return type: + pikepdf.Object + """ + if isinstance(s, bytes): + return _qpdf._new_string(s) + return _qpdf._new_string_utf8(s) + + +class Array(Object, metaclass=_ObjectMeta): + """Construct a PDF Array object.""" + + object_type = ObjectType.array + + def __new__(cls, a: Iterable | Rectangle | None = None) -> Array: + """ + Construct a PDF Array. + + Args: + a: An iterable of objects. All objects must be either + `pikepdf.Object` or convertible to `pikepdf.Object`. + + Return type: + pikepdf.Array + """ + if isinstance(a, (str, bytes)): + raise TypeError('Strings cannot be converted to arrays of chars') + + if a is None: + a = [] + elif isinstance(a, Rectangle): + return a.as_array() + elif isinstance(a, Array): + return cast(Array, a.__copy__()) + return _qpdf._new_array(a) + + +class Dictionary(Object, metaclass=_ObjectMeta): + """Construct a PDF Dictionary object.""" + + object_type = ObjectType.dictionary + + def __new__(cls, d: Mapping | None = None, **kwargs) -> Dictionary: + """ + Construct a PDF Dictionary. + + Works from either a Python ``dict`` or keyword arguments. + + These two examples are equivalent: + + .. code-block:: python + + pikepdf.Dictionary({'/NameOne': 1, '/NameTwo': 'Two'}) + + pikepdf.Dictionary(NameOne=1, NameTwo='Two') + + In either case, the keys must be strings, and the strings + correspond to the desired Names in the PDF Dictionary. The values + must all be convertible to `pikepdf.Object`. + + Return type: + pikepdf.Dictionary + """ + if kwargs and d is not None: + raise ValueError('Cannot use both a mapping object and keyword args') + if kwargs: + # Add leading slash + # Allows Dictionary(MediaBox=(0,0,1,1), Type=Name('/Page')... + return _qpdf._new_dictionary({('/' + k): v for k, v in kwargs.items()}) + if isinstance(d, Dictionary): + # Already a dictionary + return d.__copy__() + if not d: + d = {} + if d and any(key == '/' or not key.startswith('/') for key in d.keys()): + raise KeyError("Dictionary created from strings must begin with '/'") + return _qpdf._new_dictionary(d) + + +class Stream(Object, metaclass=_ObjectMeta): + """Construct a PDF Stream object.""" + + object_type = ObjectType.stream + + def __new__(cls, owner: Pdf, data: bytes | None = None, d=None, **kwargs) -> Stream: + """ + Create a new stream object. + + Streams stores arbitrary binary data and may or may not be compressed. + It also may or may not be a page or Form XObject's content stream. + + A stream dictionary is like a pikepdf.Dictionary or Python dict, except + it has a binary payload of data attached. The dictionary describes + how the data is compressed or encoded. + + The dictionary may be initialized just like pikepdf.Dictionary is initialized, + using a mapping object or keyword arguments. + + Args: + owner: The Pdf to which this stream shall be attached. + data: The data bytes for the stream. + d: An optional mapping object that will be used to construct the stream's + dictionary. + kwargs: Keyword arguments that will define the stream dictionary. Do not set + /Length here as pikepdf will manage this value. Set /Filter + if the data is already encoded in some format. + + Examples: + Using kwargs: + >>> s1 = pikepdf.Stream( + pdf, + b"uncompressed image data", + BitsPerComponent=8, + ColorSpace=Name.DeviceRGB, + ... + ) + Using dict: + >>> d = pikepdf.Dictionary(...) + >>> s2 = pikepdf.Stream( + pdf, + b"data", + d + ) + + .. versionchanged:: 2.2 + Support creation of ``pikepdf.Stream`` from existing dictionary. + + .. versionchanged:: 3.0 + Deprecated ``obj`` argument was removed; use ``data``. + """ + if data is None: + raise TypeError("Must make Stream from binary data") + + stream_dict = None + if d or kwargs: + stream_dict = Dictionary(d, **kwargs) + + stream = _qpdf._new_stream(owner, data) + if stream_dict: + stream.stream_dict = stream_dict + return stream diff --git a/env/lib/python3.10/site-packages/pikepdf/py.typed b/env/lib/python3.10/site-packages/pikepdf/py.typed new file mode 100644 index 0000000..2f90bdd --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/py.typed @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 +# pikepdf is typed diff --git a/env/lib/python3.10/site-packages/pikepdf/settings.py b/env/lib/python3.10/site-packages/pikepdf/settings.py new file mode 100644 index 0000000..2e0d058 --- /dev/null +++ b/env/lib/python3.10/site-packages/pikepdf/settings.py @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: 2022 James R. Barlow +# SPDX-License-Identifier: MPL-2.0 + +"""pikepdf global settings.""" + +from __future__ import annotations + +from ._qpdf import ( + get_decimal_precision, + set_decimal_precision, + set_flate_compression_level, +) + +__all__ = [ + 'get_decimal_precision', + 'set_decimal_precision', + 'set_flate_compression_level', +] |