# a simple PDF parser # copyright 2012 Brecht Machiels import re, struct, time, hashlib, time from binascii import hexlify, unhexlify from collections import OrderedDict from datetime import datetime from io import BytesIO, SEEK_CUR, SEEK_END PDF_VERSION = '1.4' class Object(object): def __init__(self, indirect=False): self.indirect = indirect def bytes(self, document): if self.indirect: reference = document._by_object_id[id(self)] out = reference.bytes(document) else: out = self._bytes(document) return out def delete(self, document): try: reference = document._by_object_id[self] reference.delete() except KeyError: pass def short_repr(self): return repr(self) def register_indirect(self, document): if self.indirect: document.register(self) class Reference(object): def __init__(self, document, identifier, generation): self.document = document self.identifier = identifier self.generation = generation def bytes(self, document): return '{} {} R'.format(self.identifier, self.generation).encode('utf_8') @property def target(self): return self.document[self.identifier][0] def delete(self, document=None): if document == self.document: del self.document[self.identifier] def __repr__(self): return '{}<{} {}>'.format(self.target.__class__.__name__, self.identifier, self.generation) class Boolean(Object): def __init__(self, value, indirect=False): super().__init__(indirect) self.value = value def __repr__(self): return '{}({})'.format(self.__class__.__name__, self.value) def _bytes(self, document): return b'true' if self.value else b'false' class Integer(Object, int): def __new__(cls, value, base=10, indirect=False): #print('Integer.__new__(', cls, value, base, indirect, ')') try: obj = int.__new__(cls, value, base) except TypeError: obj = int.__new__(cls, value) return obj def __init__(self, value, base=10, indirect=False): Object.__init__(self, indirect) def __repr__(self): return '{}({})'.format(self.__class__.__name__, int.__repr__(self)) def _bytes(self, document): return int.__str__(self).encode('utf_8') class Real(Object, float): def __new__(cls, value, indirect=False): return float.__new__(cls, value) def __init__(self, value, indirect=False): Object.__init__(self, indirect) def __repr__(self): return '{}({})'.format(self.__class__.__name__, float.__repr__(self)) def _bytes(self, document): return float.__repr__(self).encode('utf_8') class String(Object): def __init__(self, string, indirect=False): super().__init__(indirect) self.string = string def __repr__(self): return "{}('{}')".format(self.__class__.__name__, self.string) def _bytes(self, document): escaped = self.string.replace('\n', r'\n') escaped = escaped.replace('\r', r'\r') escaped = escaped.replace('\t', r'\t') escaped = escaped.replace('\b', r'\b') escaped = escaped.replace('\f', r'\f') for char in '\\()': escaped = escaped.replace(char, '\\{}'.format(char)) out = '({})'.format(escaped) return out.encode('utf_8') class HexString(Object): def __init__(self, byte_string, indirect=False): super().__init__(indirect) self.byte_string = byte_string def __repr__(self): return "{}('{}')".format(self.__class__.__name__, hexlify(self.byte_string).decode()) def _bytes(self, document): return b'<' + hexlify(self.byte_string) + b'>' class Date(String): def __init__(self, timestamp, indirect=False): local_time = datetime.fromtimestamp(timestamp) utc_time = datetime.utcfromtimestamp(timestamp) utc_offset = local_time - utc_time utc_offset_minutes, utc_offset_seconds = divmod(utc_offset.seconds, 60) utc_offset_hours, utc_offset_minutes = divmod(utc_offset_minutes, 60) string = local_time.strftime('D:%Y%m%d%H%M%S') string += "{:+03d}'{:02d}'".format(utc_offset_hours, utc_offset_minutes) super().__init__(string, indirect) class Name(Object, str): # TODO: names should be unique (per document), so check def __new__(cls, value, indirect=False): return str.__new__(cls, value) def __init__(self, name, indirect=False): Object.__init__(self, indirect) def __repr__(self): return '{}({})'.format(self.__class__.__name__, str.__repr__(self)) def _bytes(self, document): # TODO: # escaping return '/{}'.format(self).encode('utf_8') class Array(Object, list): # TODO: not all methods of list are overridden, so funny # behavior is to be expected def __init__(self, items=[], indirect=False): Object.__init__(self, indirect) list.__init__(self, items) def __repr__(self): contents = ', '.join([item.short_repr() for item in self]) return '{}({})'.format(self.__class__.__name__, contents) def _bytes(self, document): return b'[' + b' '.join([elem.bytes(document) for elem in self]) + b']' def short_repr(self): return '<{} {}>'.format(self.__class__.__name__, id(self)) def register_indirect(self, document): register_children = True if self.indirect: register_children = id(self) not in document._by_object_id document.register(self) if register_children: for item in self: item.register_indirect(document) class Dictionary(Object, OrderedDict): def __init__(self, indirect=False): Object.__init__(self, indirect) OrderedDict.__init__(self) def __repr__(self): contents = ', '.join(['{}: {}'.format(key, value.short_repr()) for key, value in self.items()]) return '{}({})'.format(self.__class__.__name__, contents) def _bytes(self, document): return b'<< ' + b' '.join([Name(key).bytes(document) + b' ' + value.bytes(document) for key, value in self.items()]) + b' >>' def short_repr(self): return '<{} {}>'.format(self.__class__.__name__, id(self)) def register_indirect(self, document): register_children = True if self.indirect: register_children = id(self) not in document._by_object_id document.register(self) if register_children: for item in self.values(): item.register_indirect(document) class Stream(Dictionary): def __init__(self): # (Streams are always indirectly referenced) super().__init__(indirect=True) self.data = BytesIO() def _bytes(self, document): if 'Length' in self: self['Length'].delete(document) self['Length'] = Integer(self.size) out = super()._bytes(document) out += b'\nstream\n' out += self.data.getvalue() out += b'\nendstream' return out def read(self, *args, **kwargs): return self.data.read(*args, **kwargs) def write(self, *args, **kwargs): return self.data.write(*args, **kwargs) def tell(self, *args, **kwargs): return self.data.tell(*args, **kwargs) def seek(self, *args, **kwargs): return self.data.seek(*args, **kwargs) def getvalue(self): return self.data.getvalue() @property def size(self): restore_pos = self.tell() self.seek(0, SEEK_END) size = self.tell() self.seek(restore_pos) return size class XObjectForm(Stream): def __init__(self, bounding_box): super().__init__() self['Type'] = Name('XObject') self['Subtype'] = Name('Form') self['BBox'] = bounding_box class Null(Object): def __init__(self, indirect=False): super().__init__(indirect) def __repr__(self): return self.__class__.__name__ def _bytes(self, document): return b'null' class Document(dict): def __init__(self): self.catalog = Catalog() self.info = Dictionary(indirect=True) self.timestamp = time.time() self.info['CreationDate'] = Date(self.timestamp) self.id = None self._by_object_id = {} def register(self, obj): if id(obj) not in self._by_object_id: identifier, generation = self.max_identifier + 1, 0 reference = Reference(self, identifier, generation) self._by_object_id[id(obj)] = reference self[identifier] = obj @property def max_identifier(self): try: identifier = max(self.keys()) except ValueError: identifier = 0 return identifier def _write_xref_table(self, file, addresses): def out(string): file.write(string + b'\n') out(b'xref') out('0 {}'.format(self.max_identifier + 1).encode('utf_8')) out(b'0000000000 65535 f ') last_free = 0 for identifier in range(1, self.max_identifier + 1): try: address = addresses[identifier] out('{:010d} {:05d} n '.format(address, 0).encode('utf_8')) except KeyError: out(b'0000000000 65535 f ') last_free = identifier def write(self, file_or_filename): def out(string): file.write(string + b'\n') try: file = open(file_or_filename, 'wb') close_file = True except TypeError: file = file_or_filename close_file = False self.catalog.register_indirect(self) self.info.register_indirect(self) if 'Producer' in self.info: self.info['Producer'].delete(self) if 'ModDate' in self.info: self.info['ModDate'].delete(self) self.info['Producer'] = String('pyte PDF backend') self.info['ModDate'] = Date(self.timestamp) out('%PDF-{}'.format(PDF_VERSION).encode('utf_8')) file.write(b'%\xDC\xE1\xD8\xB7\n') addresses = {} for identifier in range(1, self.max_identifier + 1): try: obj = self[identifier] addresses[identifier] = file.tell() out('{} 0 obj'.format(identifier).encode('utf_8')) out(obj._bytes(self)) out(b'endobj') except KeyError: pass xref_table_address = file.tell() self._write_xref_table(file, addresses) out(b'trailer') trailer = Dictionary() trailer['Size'] = Integer(self.max_identifier + 1) trailer['Root'] = self.catalog trailer['Info'] = self.info md5sum = hashlib.md5() md5sum.update(str(self.timestamp).encode()) md5sum.update(str(file.tell()).encode()) for value in self.info.values(): md5sum.update(value._bytes(self)) new_id = HexString(md5sum.digest()) if self.id: self.id[1] = new_id else: self.id = Array([new_id, new_id]) trailer['ID'] = self.id out(trailer.bytes(self)) out(b'startxref') out(str(xref_table_address).encode('utf_8')) out(b'%%EOF') if close_file: file.close() class Catalog(Dictionary): def __init__(self): super().__init__(indirect=True) self['Type'] = Name('Catalog') self['Pages'] = Pages() class Pages(Dictionary): def __init__(self): super().__init__(indirect=True) self['Type'] = Name('Pages') self['Count'] = Integer(0) self['Kids'] = Array() def new_page(self, width, height): page = Page(self, width, height) self['Kids'].append(page) self['Count'] = Integer(self['Count'] + 1) return page class Page(Dictionary): def __init__(self, parent, width, height): super().__init__(indirect=True) self['Type'] = Name('Page') self['Parent'] = parent self['Resources'] = Dictionary() self['MediaBox'] = Array([Integer(0), Integer(0), Real(width), Real(height)]) def to_xobject_form(self): content_stream = self['Contents'] xobject = XObjectForm(self['MediaBox']) if 'Filter' in content_stream: xobject['Filter'] = content_stream['Filter'] if 'Resources' in self: xobject['Resources'] = self['Resources'] xobject.write(content_stream.getvalue()) return xobject class Font(Dictionary): def __init__(self, indirect): super().__init__(indirect) self['Type'] = Name('Font') class PDFReader(Document): DICT_BEGIN = b'<<' DICT_END = b'>>' STRING_BEGIN = b'(' STRING_END = b')' ARRAY_BEGIN = b'[' ARRAY_END = b']' HEXSTRING_BEGIN = b'<' HEXSTRING_END = b'>' NAME_BEGIN = b'/' COMMENT_BEGIN = b'%' def __init__(self, file_or_filename): try: self.file = open(file_or_filename, 'rb') except TypeError: self.file = file_or_filename self.timestamp = time.time() xref_offset = self.find_xref_offset() self._xref = self.parse_xref_tables(xref_offset) self._by_object_id = {} trailer = self.parse_trailer() if 'Info' in trailer: self.info = trailer['Info'] else: self.info = Dictionary(self) self.id = trailer['ID'] if 'ID' in trailer else None self._max_identifier_in_file = int(trailer['Size']) - 1 self.catalog = trailer['Root'] self.file.close() @property def max_identifier(self): return max(super().max_identifier, self._max_identifier_in_file) def __getitem__(self, identifier): try: obj = super().__getitem__(identifier) except KeyError: address = self._xref[identifier] obj = self.parse_indirect_object(address) self[identifier] = obj return obj def __delitem__(self, identifier): del self._xref[identifier] super().__delitem__(identifier) def jump_to_next_line(self): while True: char = self.file.read(1) if char == b'\n': break elif char == b'\r': next_char = self.file.read(1) if next_char != b'\n': self.file.seek(-1, SEEK_CUR) break whitespace = b'\0\t\n\f\r ' delimiters = b'()<>[]{}/%' def eat_whitespace(self): while True: char = self.file.read(1) if char not in self.whitespace: self.file.seek(-1, SEEK_CUR) break def next_token(self): token = self.file.read(1) if token in (self.HEXSTRING_BEGIN, self.HEXSTRING_END): # check for dict begin/end char = self.file.read(1) if char == token: token += char else: self.file.seek(-1, SEEK_CUR) elif token in self.delimiters + self.whitespace: pass else: while True: char = self.file.read(1) if char in self.delimiters + self.whitespace: self.file.seek(-1, SEEK_CUR) break token += char return token def next_item(self, identifier=None): indirect = identifier is not None self.eat_whitespace() restore_pos = self.file.tell() token = self.next_token() if token == self.STRING_BEGIN: item = self.read_string(identifier) elif token == self.HEXSTRING_BEGIN: item = self.read_hex_string(identifier) elif token == self.ARRAY_BEGIN: item = self.read_array(identifier) elif token == self.NAME_BEGIN: item = self.read_name(identifier) elif token == self.DICT_BEGIN: item = self.read_dictionary_or_stream(identifier) elif token == b'true': item = Boolean(True, indirect=indirect) elif token == b'false': item = Boolean(False, indirect=indirect) elif token == b'null': item = Null(indirect=indirect) else: # number or indirect reference self.file.seek(restore_pos) item = self.read_number(identifier) restore_pos = self.file.tell() if isinstance(item, Integer): try: generation = self.read_number() self.eat_whitespace() r = self.next_token() if isinstance(generation, Integer) and r == b'R': item = self[int(item)] else: raise ValueError except ValueError: self.file.seek(restore_pos) return item def peek(self, length=50): restore_pos = self.file.tell() print(self.file.read(length)) self.file.seek(restore_pos) def read_array(self, identifier=None): array = Array(indirect=identifier is not None) if identifier: self[identifier] = array while True: self.eat_whitespace() token = self.file.read(1) if token == self.ARRAY_END: break self.file.seek(-1, SEEK_CUR) item = self.next_item() array.append(item) return array re_name_escape = re.compile(r'#\d\d') def read_name(self, identifier=None): indirect = identifier is not None name = '' while True: char = self.file.read(1) if char in self.delimiters + self.whitespace: self.file.seek(-1, SEEK_CUR) break name += char.decode('utf_8') for group in set(self.re_name_escape.findall(name)): number = int(group[1:], 16) name.replace(group, chr(number)) return Name(name, indirect) def read_dictionary_or_stream(self, identifier=None): dictionary = Dictionary(indirect=identifier is not None) if identifier: self[identifier] = dictionary while True: self.eat_whitespace() token = self.next_token() if token == self.DICT_END: break key, value = self.read_name(), self.next_item() dictionary[key] = value self.eat_whitespace() dict_pos = self.file.tell() if self.next_token() == b'stream': self.eat_whitespace() length = int(dictionary['Length']) stream = Stream() stream.update(dictionary) stream.write(self.file.read(length)) self.eat_whitespace() assert self.next_token() == b'endstream' dictionary = stream else: self.file.seek(dict_pos) # try to map to specific Dictionary sub-class def recursive_subclasses(cls): for subcls in cls.__subclasses__(): yield subcls for subsubcls in recursive_subclasses(subcls): yield subsubcls subclasses = {subcls.__name__: subcls for subcls in recursive_subclasses(Dictionary)} if 'Type' in dictionary: cls_name = dictionary['Type'] if 'Subtype' in dictionary: cls_name += dictionary['Subtype'] if cls_name in subclasses: dictionary.__class__ = subclasses[cls_name] return dictionary newline_chars = b'\n\r' escape_chars = b'nrtbf()\\' def read_string(self, identifier=None): indirect = identifier is not None string = b'' escape = False parenthesis_level = 0 while True: char = self.file.read(1) if escape: if char in self.escape_chars: string += char elif char == b'\n': pass elif char == b'\r' and self.file.read(1) != '\n': self.file.seek(-1, SEEK_CUR) elif char.isdigit(): for i in range(2): extra = self.file.read(1) if extra.isdigit(): char += extra else: self.file.seek(-1, SEEK_CUR) break string += struct.pack('B', int(char, 8)) else: string += b'\\' + char escape = False elif char == b'\\': escape = True elif char == b'(': parenthesis_level += 1 elif char == b')' and parenthesis_level > 0: parenthesis_level -= 1 elif char == self.STRING_END: break else: string += char return String(string.decode('utf_8'), indirect) def read_hex_string(self, identifier=None): indirect = identifier is not None hex_string = b'' while True: self.eat_whitespace() char = self.file.read(1) if char == self.HEXSTRING_END: break hex_string += char if len(hex_string) % 2 > 0: hex_string += b'0' return HexString(unhexlify(hex_string), indirect) def read_number(self, identifier=None): indirect = identifier is not None self.eat_whitespace() number_string = b'' while True: char = self.file.read(1) if char not in b'+-.0123456789': self.file.seek(-1, SEEK_CUR) break number_string += char try: number = Integer(number_string, indirect=indirect) except ValueError: number = Real(number_string, indirect=indirect) if number_string == b'0' and int(number) > 0: #print(id(number)) print(number_string, number, repr(number), int(number), int(str(number)), number.bytes(None), id(number)) ## print('=>', int(number), ## int(number), ## int(number), ## int(number), ## int(number), ## int(number), ## int(number), ## int(number), ## int(number), ## int(number), ## int(number), ## int(number)) ## print('->', int(Integer(number_string, indirect=indirect)), ## int(Integer(number_string, indirect=indirect)), ## int(Integer(number_string, indirect=indirect)), ## int(Integer(number_string, indirect=indirect)), ## int(Integer(number_string, indirect=indirect)), ## int(Integer(number_string, indirect=indirect)), ## int(Integer(number_string, indirect=indirect)), ## int(Integer(number_string, indirect=indirect)), ## int(Integer(number_string, indirect=indirect)), ## int(Integer(number_string, indirect=indirect)), ## int(Integer(number_string, indirect=indirect)), ## int(Integer(number_string, indirect=indirect))) ## print('+>', int(Integer(b'0', indirect=True)), ## int(Integer(b'0')), ## int(Integer(b'0')), ## int(Integer(b'0')), ## int(Integer(b'0')), ## int(Integer(b'0')), ## int(Integer(b'0')), ## int(Integer(b'0')), ## int(Integer(b'0')), ## int(Integer(b'0')), ## int(Integer(b'0')), ## int(Integer(b'0'))) return number def parse_trailer(self): assert self.next_token() == b'trailer' self.jump_to_next_line() trailer_dict = self.next_item() return trailer_dict ##/Size: (Required; must not be an indirect reference) The total number of entries in the file's ##cross-reference table, as defined by the combination of the original section and all ##update sections. Equivalently, this value is 1 greater than the highest object number ##used in the file. ##Note: Any object in a cross-reference section whose number is greater than this value is ##ignored and considered missing. def parse_indirect_object(self, address): # save file state restore_pos = self.file.tell() self.file.seek(address) identifier = int(self.read_number()) generation = int(self.read_number()) self.eat_whitespace() assert self.next_token() == b'obj' self.eat_whitespace() obj = self.next_item(identifier=identifier) reference = Reference(self, identifier, generation) self._by_object_id[id(obj)] = reference self.eat_whitespace() assert self.next_token() == b'endobj' self.file.seek(restore_pos) return obj def parse_xref_tables(self, offset): xref = {} self.file.seek(offset) assert self.next_token() == b'xref' self.jump_to_next_line() while True: try: identifier, entries = self.read_number(), self.read_number() self.jump_to_next_line() for i in range(entries): line = self.file.read(20) if line[17] == ord(b'n'): address, generation = int(line[:10]), int(line[11:16]) xref[identifier] = address identifier += 1 except ValueError: break return xref def find_xref_offset(self): self.file.seek(0, SEEK_END) offset = self.file.tell() - len('%%EOF') while True: self.file.seek(offset) value = self.file.read(len('startxref')) if value == b'startxref': self.jump_to_next_line() xref_offset = self.read_number() self.jump_to_next_line() if self.file.read(5) != b'%%EOF': raise ValueError('Invalid PDF file: missing %%EOF') break offset -= 1 return int(xref_offset) if __name__ == '__main__': pdf = PDFReader('vieth2008.pdf')