| Viewing file:  commit.py (28.42 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# commit.py# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
 #
 # This module is part of GitPython and is released under
 # the BSD License: http://www.opensource.org/licenses/bsd-license.php
 import datetime
 import re
 from subprocess import Popen, PIPE
 from gitdb import IStream
 from git.util import hex_to_bin, Actor, Stats, finalize_process
 from git.diff import Diffable
 from git.cmd import Git
 
 from .tree import Tree
 from . import base
 from .util import (
 Serializable,
 TraversableIterableObj,
 parse_date,
 altz_to_utctz_str,
 parse_actor_and_date,
 from_timestamp,
 )
 
 from time import time, daylight, altzone, timezone, localtime
 import os
 from io import BytesIO
 import logging
 from collections import defaultdict
 
 
 # typing ------------------------------------------------------------------
 
 from typing import (
 Any,
 IO,
 Iterator,
 List,
 Sequence,
 Tuple,
 Union,
 TYPE_CHECKING,
 cast,
 Dict,
 )
 
 from git.types import PathLike, Literal
 
 if TYPE_CHECKING:
 from git.repo import Repo
 from git.refs import SymbolicReference
 
 # ------------------------------------------------------------------------
 
 log = logging.getLogger("git.objects.commit")
 log.addHandler(logging.NullHandler())
 
 __all__ = ("Commit",)
 
 
 class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
 
 """Wraps a git Commit object.
 
 This class will act lazily on some of its attributes and will query the
 value on demand only if it involves calling the git binary."""
 
 # ENVIRONMENT VARIABLES
 # read when creating new commits
 env_author_date = "GIT_AUTHOR_DATE"
 env_committer_date = "GIT_COMMITTER_DATE"
 
 # CONFIGURATION KEYS
 conf_encoding = "i18n.commitencoding"
 
 # INVARIANTS
 default_encoding = "UTF-8"
 
 # object configuration
 type: Literal["commit"] = "commit"
 __slots__ = (
 "tree",
 "author",
 "authored_date",
 "author_tz_offset",
 "committer",
 "committed_date",
 "committer_tz_offset",
 "message",
 "parents",
 "encoding",
 "gpgsig",
 )
 _id_attribute_ = "hexsha"
 
 def __init__(
 self,
 repo: "Repo",
 binsha: bytes,
 tree: Union[Tree, None] = None,
 author: Union[Actor, None] = None,
 authored_date: Union[int, None] = None,
 author_tz_offset: Union[None, float] = None,
 committer: Union[Actor, None] = None,
 committed_date: Union[int, None] = None,
 committer_tz_offset: Union[None, float] = None,
 message: Union[str, bytes, None] = None,
 parents: Union[Sequence["Commit"], None] = None,
 encoding: Union[str, None] = None,
 gpgsig: Union[str, None] = None,
 ) -> None:
 """Instantiate a new Commit. All keyword arguments taking None as default will
 be implicitly set on first query.
 
 :param binsha: 20 byte sha1
 :param parents: tuple( Commit, ... )
 is a tuple of commit ids or actual Commits
 :param tree: Tree object
 :param author: Actor
 is the author Actor object
 :param authored_date: int_seconds_since_epoch
 is the authored DateTime - use time.gmtime() to convert it into a
 different format
 :param author_tz_offset: int_seconds_west_of_utc
 is the timezone that the authored_date is in
 :param committer: Actor
 is the committer string
 :param committed_date: int_seconds_since_epoch
 is the committed DateTime - use time.gmtime() to convert it into a
 different format
 :param committer_tz_offset: int_seconds_west_of_utc
 is the timezone that the committed_date is in
 :param message: string
 is the commit message
 :param encoding: string
 encoding of the message, defaults to UTF-8
 :param parents:
 List or tuple of Commit objects which are our parent(s) in the commit
 dependency graph
 :return: git.Commit
 
 :note:
 Timezone information is in the same format and in the same sign
 as what time.altzone returns. The sign is inverted compared to git's
 UTC timezone."""
 super(Commit, self).__init__(repo, binsha)
 self.binsha = binsha
 if tree is not None:
 assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree)
 if tree is not None:
 self.tree = tree
 if author is not None:
 self.author = author
 if authored_date is not None:
 self.authored_date = authored_date
 if author_tz_offset is not None:
 self.author_tz_offset = author_tz_offset
 if committer is not None:
 self.committer = committer
 if committed_date is not None:
 self.committed_date = committed_date
 if committer_tz_offset is not None:
 self.committer_tz_offset = committer_tz_offset
 if message is not None:
 self.message = message
 if parents is not None:
 self.parents = parents
 if encoding is not None:
 self.encoding = encoding
 if gpgsig is not None:
 self.gpgsig = gpgsig
 
 @classmethod
 def _get_intermediate_items(cls, commit: "Commit") -> Tuple["Commit", ...]:
 return tuple(commit.parents)
 
 @classmethod
 def _calculate_sha_(cls, repo: "Repo", commit: "Commit") -> bytes:
 """Calculate the sha of a commit.
 
 :param repo: Repo object the commit should be part of
 :param commit: Commit object for which to generate the sha
 """
 
 stream = BytesIO()
 commit._serialize(stream)
 streamlen = stream.tell()
 stream.seek(0)
 
 istream = repo.odb.store(IStream(cls.type, streamlen, stream))
 return istream.binsha
 
 def replace(self, **kwargs: Any) -> "Commit":
 """Create new commit object from existing commit object.
 
 Any values provided as keyword arguments will replace the
 corresponding attribute in the new object.
 """
 
 attrs = {k: getattr(self, k) for k in self.__slots__}
 
 for attrname in kwargs:
 if attrname not in self.__slots__:
 raise ValueError("invalid attribute name")
 
 attrs.update(kwargs)
 new_commit = self.__class__(self.repo, self.NULL_BIN_SHA, **attrs)
 new_commit.binsha = self._calculate_sha_(self.repo, new_commit)
 
 return new_commit
 
 def _set_cache_(self, attr: str) -> None:
 if attr in Commit.__slots__:
 # read the data in a chunk, its faster - then provide a file wrapper
 _binsha, _typename, self.size, stream = self.repo.odb.stream(self.binsha)
 self._deserialize(BytesIO(stream.read()))
 else:
 super(Commit, self)._set_cache_(attr)
 # END handle attrs
 
 @property
 def authored_datetime(self) -> datetime.datetime:
 return from_timestamp(self.authored_date, self.author_tz_offset)
 
 @property
 def committed_datetime(self) -> datetime.datetime:
 return from_timestamp(self.committed_date, self.committer_tz_offset)
 
 @property
 def summary(self) -> Union[str, bytes]:
 """:return: First line of the commit message"""
 if isinstance(self.message, str):
 return self.message.split("\n", 1)[0]
 else:
 return self.message.split(b"\n", 1)[0]
 
 def count(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> int:
 """Count the number of commits reachable from this commit
 
 :param paths:
 is an optional path or a list of paths restricting the return value
 to commits actually containing the paths
 
 :param kwargs:
 Additional options to be passed to git-rev-list. They must not alter
 the output style of the command, or parsing will yield incorrect results
 :return: int defining the number of reachable commits"""
 # yes, it makes a difference whether empty paths are given or not in our case
 # as the empty paths version will ignore merge commits for some reason.
 if paths:
 return len(self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines())
 return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines())
 
 @property
 def name_rev(self) -> str:
 """
 :return:
 String describing the commits hex sha based on the closest Reference.
 Mostly useful for UI purposes"""
 return self.repo.git.name_rev(self)
 
 @classmethod
 def iter_items(
 cls,
 repo: "Repo",
 rev: Union[str, "Commit", "SymbolicReference"],  # type: ignore
 paths: Union[PathLike, Sequence[PathLike]] = "",
 **kwargs: Any,
 ) -> Iterator["Commit"]:
 """Find all commits matching the given criteria.
 
 :param repo: is the Repo
 :param rev: revision specifier, see git-rev-parse for viable options
 :param paths:
 is an optional path or list of paths, if set only Commits that include the path
 or paths will be considered
 :param kwargs:
 optional keyword arguments to git rev-list where
 ``max_count`` is the maximum number of commits to fetch
 ``skip`` is the number of commits to skip
 ``since`` all commits since i.e. '1970-01-01'
 :return: iterator yielding Commit items"""
 if "pretty" in kwargs:
 raise ValueError("--pretty cannot be used as parsing expects single sha's only")
 # END handle pretty
 
 # use -- in any case, to prevent possibility of ambiguous arguments
 # see https://github.com/gitpython-developers/GitPython/issues/264
 
 args_list: List[PathLike] = ["--"]
 
 if paths:
 paths_tup: Tuple[PathLike, ...]
 if isinstance(paths, (str, os.PathLike)):
 paths_tup = (paths,)
 else:
 paths_tup = tuple(paths)
 
 args_list.extend(paths_tup)
 # END if paths
 
 proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs)
 return cls._iter_from_process_or_stream(repo, proc)
 
 def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> Iterator["Commit"]:
 """Iterate _all_ parents of this commit.
 
 :param paths:
 Optional path or list of paths limiting the Commits to those that
 contain at least one of the paths
 :param kwargs: All arguments allowed by git-rev-list
 :return: Iterator yielding Commit objects which are parents of self"""
 # skip ourselves
 skip = kwargs.get("skip", 1)
 if skip == 0:  # skip ourselves
 skip = 1
 kwargs["skip"] = skip
 
 return self.iter_items(self.repo, self, paths, **kwargs)
 
 @property
 def stats(self) -> Stats:
 """Create a git stat from changes between this commit and its first parent
 or from all changes done if this is the very first commit.
 
 :return: git.Stats"""
 if not self.parents:
 text = self.repo.git.diff_tree(self.hexsha, "--", numstat=True, no_renames=True, root=True)
 text2 = ""
 for line in text.splitlines()[1:]:
 (insertions, deletions, filename) = line.split("\t")
 text2 += "%s\t%s\t%s\n" % (insertions, deletions, filename)
 text = text2
 else:
 text = self.repo.git.diff(self.parents[0].hexsha, self.hexsha, "--", numstat=True, no_renames=True)
 return Stats._list_from_string(self.repo, text)
 
 @property
 def trailers(self) -> Dict[str, str]:
 """Get the trailers of the message as a dictionary
 
 :note: This property is deprecated, please use either ``Commit.trailers_list`` or ``Commit.trailers_dict``.
 
 :return:
 Dictionary containing whitespace stripped trailer information.
 Only contains the latest instance of each trailer key.
 """
 return {
 k: v[0] for k, v in self.trailers_dict.items()
 }
 
 @property
 def trailers_list(self) -> List[Tuple[str, str]]:
 """Get the trailers of the message as a list
 
 Git messages can contain trailer information that are similar to RFC 822
 e-mail headers (see: https://git-scm.com/docs/git-interpret-trailers).
 
 This functions calls ``git interpret-trailers --parse`` onto the message
 to extract the trailer information, returns the raw trailer data as a list.
 
 Valid message with trailer::
 
 Subject line
 
 some body information
 
 another information
 
 key1: value1.1
 key1: value1.2
 key2 :    value 2 with inner spaces
 
 
 Returned list will look like this::
 
 [
 ("key1", "value1.1"),
 ("key1", "value1.2"),
 ("key2", "value 2 with inner spaces"),
 ]
 
 
 :return:
 List containing key-value tuples of whitespace stripped trailer information.
 """
 cmd = ["git", "interpret-trailers", "--parse"]
 proc: Git.AutoInterrupt = self.repo.git.execute(cmd, as_process=True, istream=PIPE)  # type: ignore
 trailer: str = proc.communicate(str(self.message).encode())[0].decode("utf8")
 trailer = trailer.strip()
 
 if not trailer:
 return []
 
 trailer_list = []
 for t in trailer.split("\n"):
 key, val = t.split(":", 1)
 trailer_list.append((key.strip(), val.strip()))
 
 return trailer_list
 
 @property
 def trailers_dict(self) -> Dict[str, List[str]]:
 """Get the trailers of the message as a dictionary
 
 Git messages can contain trailer information that are similar to RFC 822
 e-mail headers (see: https://git-scm.com/docs/git-interpret-trailers).
 
 This functions calls ``git interpret-trailers --parse`` onto the message
 to extract the trailer information. The key value pairs are stripped of
 leading and trailing whitespaces before they get saved into a dictionary.
 
 Valid message with trailer::
 
 Subject line
 
 some body information
 
 another information
 
 key1: value1.1
 key1: value1.2
 key2 :    value 2 with inner spaces
 
 
 Returned dictionary will look like this::
 
 {
 "key1": ["value1.1", "value1.2"],
 "key2": ["value 2 with inner spaces"],
 }
 
 
 :return:
 Dictionary containing whitespace stripped trailer information.
 Mapping trailer keys to a list of their corresponding values.
 """
 d = defaultdict(list)
 for key, val in self.trailers_list:
 d[key].append(val)
 return dict(d)
 
 @classmethod
 def _iter_from_process_or_stream(cls, repo: "Repo", proc_or_stream: Union[Popen, IO]) -> Iterator["Commit"]:
 """Parse out commit information into a list of Commit objects
 We expect one-line per commit, and parse the actual commit information directly
 from our lighting fast object database
 
 :param proc: git-rev-list process instance - one sha per line
 :return: iterator returning Commit objects"""
 
 # def is_proc(inp) -> TypeGuard[Popen]:
 #     return hasattr(proc_or_stream, 'wait') and not hasattr(proc_or_stream, 'readline')
 
 # def is_stream(inp) -> TypeGuard[IO]:
 #     return hasattr(proc_or_stream, 'readline')
 
 if hasattr(proc_or_stream, "wait"):
 proc_or_stream = cast(Popen, proc_or_stream)
 if proc_or_stream.stdout is not None:
 stream = proc_or_stream.stdout
 elif hasattr(proc_or_stream, "readline"):
 proc_or_stream = cast(IO, proc_or_stream)
 stream = proc_or_stream
 
 readline = stream.readline
 while True:
 line = readline()
 if not line:
 break
 hexsha = line.strip()
 if len(hexsha) > 40:
 # split additional information, as returned by bisect for instance
 hexsha, _ = line.split(None, 1)
 # END handle extra info
 
 assert len(hexsha) == 40, "Invalid line: %s" % hexsha
 yield cls(repo, hex_to_bin(hexsha))
 # END for each line in stream
 # TODO: Review this - it seems process handling got a bit out of control
 # due to many developers trying to fix the open file handles issue
 if hasattr(proc_or_stream, "wait"):
 proc_or_stream = cast(Popen, proc_or_stream)
 finalize_process(proc_or_stream)
 
 @classmethod
 def create_from_tree(
 cls,
 repo: "Repo",
 tree: Union[Tree, str],
 message: str,
 parent_commits: Union[None, List["Commit"]] = None,
 head: bool = False,
 author: Union[None, Actor] = None,
 committer: Union[None, Actor] = None,
 author_date: Union[None, str, datetime.datetime] = None,
 commit_date: Union[None, str, datetime.datetime] = None,
 ) -> "Commit":
 """Commit the given tree, creating a commit object.
 
 :param repo: Repo object the commit should be part of
 :param tree: Tree object or hex or bin sha
 the tree of the new commit
 :param message: Commit message. It may be an empty string if no message is provided.
 It will be converted to a string , in any case.
 :param parent_commits:
 Optional Commit objects to use as parents for the new commit.
 If empty list, the commit will have no parents at all and become
 a root commit.
 If None , the current head commit will be the parent of the
 new commit object
 :param head:
 If True, the HEAD will be advanced to the new commit automatically.
 Else the HEAD will remain pointing on the previous commit. This could
 lead to undesired results when diffing files.
 :param author: The name of the author, optional. If unset, the repository
 configuration is used to obtain this value.
 :param committer: The name of the committer, optional. If unset, the
 repository configuration is used to obtain this value.
 :param author_date: The timestamp for the author field
 :param commit_date: The timestamp for the committer field
 
 :return: Commit object representing the new commit
 
 :note:
 Additional information about the committer and Author are taken from the
 environment or from the git configuration, see git-commit-tree for
 more information"""
 if parent_commits is None:
 try:
 parent_commits = [repo.head.commit]
 except ValueError:
 # empty repositories have no head commit
 parent_commits = []
 # END handle parent commits
 else:
 for p in parent_commits:
 if not isinstance(p, cls):
 raise ValueError(f"Parent commit '{p!r}' must be of type {cls}")
 # end check parent commit types
 # END if parent commits are unset
 
 # retrieve all additional information, create a commit object, and
 # serialize it
 # Generally:
 # * Environment variables override configuration values
 # * Sensible defaults are set according to the git documentation
 
 # COMMITTER AND AUTHOR INFO
 cr = repo.config_reader()
 env = os.environ
 
 committer = committer or Actor.committer(cr)
 author = author or Actor.author(cr)
 
 # PARSE THE DATES
 unix_time = int(time())
 is_dst = daylight and localtime().tm_isdst > 0
 offset = altzone if is_dst else timezone
 
 author_date_str = env.get(cls.env_author_date, "")
 if author_date:
 author_time, author_offset = parse_date(author_date)
 elif author_date_str:
 author_time, author_offset = parse_date(author_date_str)
 else:
 author_time, author_offset = unix_time, offset
 # END set author time
 
 committer_date_str = env.get(cls.env_committer_date, "")
 if commit_date:
 committer_time, committer_offset = parse_date(commit_date)
 elif committer_date_str:
 committer_time, committer_offset = parse_date(committer_date_str)
 else:
 committer_time, committer_offset = unix_time, offset
 # END set committer time
 
 # assume utf8 encoding
 enc_section, enc_option = cls.conf_encoding.split(".")
 conf_encoding = cr.get_value(enc_section, enc_option, cls.default_encoding)
 if not isinstance(conf_encoding, str):
 raise TypeError("conf_encoding could not be coerced to str")
 
 # if the tree is no object, make sure we create one - otherwise
 # the created commit object is invalid
 if isinstance(tree, str):
 tree = repo.tree(tree)
 # END tree conversion
 
 # CREATE NEW COMMIT
 new_commit = cls(
 repo,
 cls.NULL_BIN_SHA,
 tree,
 author,
 author_time,
 author_offset,
 committer,
 committer_time,
 committer_offset,
 message,
 parent_commits,
 conf_encoding,
 )
 
 new_commit.binsha = cls._calculate_sha_(repo, new_commit)
 
 if head:
 # need late import here, importing git at the very beginning throws
 # as well ...
 import git.refs
 
 try:
 repo.head.set_commit(new_commit, logmsg=message)
 except ValueError:
 # head is not yet set to the ref our HEAD points to
 # Happens on first commit
 master = git.refs.Head.create(
 repo,
 repo.head.ref,
 new_commit,
 logmsg="commit (initial): %s" % message,
 )
 repo.head.set_reference(master, logmsg="commit: Switching to %s" % master)
 # END handle empty repositories
 # END advance head handling
 
 return new_commit
 
 # { Serializable Implementation
 
 def _serialize(self, stream: BytesIO) -> "Commit":
 write = stream.write
 write(("tree %s\n" % self.tree).encode("ascii"))
 for p in self.parents:
 write(("parent %s\n" % p).encode("ascii"))
 
 a = self.author
 aname = a.name
 c = self.committer
 fmt = "%s %s <%s> %s %s\n"
 write(
 (
 fmt
 % (
 "author",
 aname,
 a.email,
 self.authored_date,
 altz_to_utctz_str(self.author_tz_offset),
 )
 ).encode(self.encoding)
 )
 
 # encode committer
 aname = c.name
 write(
 (
 fmt
 % (
 "committer",
 aname,
 c.email,
 self.committed_date,
 altz_to_utctz_str(self.committer_tz_offset),
 )
 ).encode(self.encoding)
 )
 
 if self.encoding != self.default_encoding:
 write(("encoding %s\n" % self.encoding).encode("ascii"))
 
 try:
 if self.__getattribute__("gpgsig"):
 write(b"gpgsig")
 for sigline in self.gpgsig.rstrip("\n").split("\n"):
 write((" " + sigline + "\n").encode("ascii"))
 except AttributeError:
 pass
 
 write(b"\n")
 
 # write plain bytes, be sure its encoded according to our encoding
 if isinstance(self.message, str):
 write(self.message.encode(self.encoding))
 else:
 write(self.message)
 # END handle encoding
 return self
 
 def _deserialize(self, stream: BytesIO) -> "Commit":
 """
 :param from_rev_list: if true, the stream format is coming from the rev-list command
 Otherwise it is assumed to be a plain data stream from our object
 """
 readline = stream.readline
 self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, "")
 
 self.parents = []
 next_line = None
 while True:
 parent_line = readline()
 if not parent_line.startswith(b"parent"):
 next_line = parent_line
 break
 # END abort reading parents
 self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii"))))
 # END for each parent line
 self.parents = tuple(self.parents)
 
 # we don't know actual author encoding before we have parsed it, so keep the lines around
 author_line = next_line
 committer_line = readline()
 
 # we might run into one or more mergetag blocks, skip those for now
 next_line = readline()
 while next_line.startswith(b"mergetag "):
 next_line = readline()
 while next_line.startswith(b" "):
 next_line = readline()
 # end skip mergetags
 
 # now we can have the encoding line, or an empty line followed by the optional
 # message.
 self.encoding = self.default_encoding
 self.gpgsig = ""
 
 # read headers
 enc = next_line
 buf = enc.strip()
 while buf:
 if buf[0:10] == b"encoding ":
 self.encoding = buf[buf.find(b" ") + 1 :].decode(self.encoding, "ignore")
 elif buf[0:7] == b"gpgsig ":
 sig = buf[buf.find(b" ") + 1 :] + b"\n"
 is_next_header = False
 while True:
 sigbuf = readline()
 if not sigbuf:
 break
 if sigbuf[0:1] != b" ":
 buf = sigbuf.strip()
 is_next_header = True
 break
 sig += sigbuf[1:]
 # end read all signature
 self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, "ignore")
 if is_next_header:
 continue
 buf = readline().strip()
 # decode the authors name
 
 try:
 (
 self.author,
 self.authored_date,
 self.author_tz_offset,
 ) = parse_actor_and_date(author_line.decode(self.encoding, "replace"))
 except UnicodeDecodeError:
 log.error(
 "Failed to decode author line '%s' using encoding %s",
 author_line,
 self.encoding,
 exc_info=True,
 )
 
 try:
 (
 self.committer,
 self.committed_date,
 self.committer_tz_offset,
 ) = parse_actor_and_date(committer_line.decode(self.encoding, "replace"))
 except UnicodeDecodeError:
 log.error(
 "Failed to decode committer line '%s' using encoding %s",
 committer_line,
 self.encoding,
 exc_info=True,
 )
 # END handle author's encoding
 
 # a stream from our data simply gives us the plain message
 # The end of our message stream is marked with a newline that we strip
 self.message = stream.read()
 try:
 self.message = self.message.decode(self.encoding, "replace")
 except UnicodeDecodeError:
 log.error(
 "Failed to decode message '%s' using encoding %s",
 self.message,
 self.encoding,
 exc_info=True,
 )
 # END exception handling
 
 return self
 
 # } END serializable implementation
 
 @property
 def co_authors(self) -> List[Actor]:
 """
 Search the commit message for any co-authors of this commit.
 Details on co-authors: https://github.blog/2018-01-29-commit-together-with-co-authors/
 
 :return: List of co-authors for this commit (as Actor objects).
 """
 co_authors = []
 
 if self.message:
 results = re.findall(
 r"^Co-authored-by: (.*) <(.*?)>$",
 self.message,
 re.MULTILINE,
 )
 for author in results:
 co_authors.append(Actor(*author))
 
 return co_authors
 
 |