mcdreforged.command.builder.nodes.basic 源代码

import collections
import dataclasses
from abc import ABC, abstractmethod
from typing import List, Callable, Iterable, Set, Dict, Type, Any, Union, Optional, TypedDict, TypeVar, NoReturn

from typing_extensions import Self, override, NotRequired

from mcdreforged.command.builder import command_builder_utils as utils
from mcdreforged.command.builder.callback import CallbackError, ScheduledCallback, DirectCallbackInvoker
from mcdreforged.command.builder.common import ParseResult, CommandContext, CommandSuggestions, CommandSuggestion, CommandExecutions, CommandExecution
from mcdreforged.command.builder.exception import LiteralNotMatch, UnknownCommand, UnknownArgument, CommandSyntaxError, \
	UnknownRootArgument, RequirementNotMet, IllegalNodeOperation, \
	CommandError
from mcdreforged.command.command_source import CommandSource
from mcdreforged.utils import tree_printer, class_utils, collection_utils
from mcdreforged.utils.types.message import MessageText

_T = TypeVar('_T')
__SOURCE_CONTEXT_CALLBACK = Union[
	Callable[[], Any],
	Callable[[CommandSource], Any],
	Callable[[CommandSource, CommandContext], Any]
]
__SOURCE_CONTEXT_CALLBACK_BOOL = Union[
	Callable[[], bool],
	Callable[[CommandSource], bool],
	Callable[[CommandSource, CommandContext], bool]
]
__SOURCE_CONTEXT_CALLBACK_MSG = Union[
	Callable[[], MessageText],
	Callable[[CommandSource], MessageText],
	Callable[[CommandSource, CommandContext], MessageText]
]
__SOURCE_CONTEXT_CALLBACK_STR_ITERABLE = Union[
	Callable[[], Iterable[str]],
	Callable[[CommandSource], Iterable[str]],
	Callable[[CommandSource, CommandContext], Iterable[str]]
]
__SOURCE_ERROR_CONTEXT_CALLBACK = Union[
	Callable[[], Any],
	Callable[[CommandSource], Any],
	Callable[[CommandSource, CommandError], Any],
	Callable[[CommandSource, CommandError, CommandContext], Any]
]

RUNS_CALLBACK = __SOURCE_CONTEXT_CALLBACK
ERROR_HANDLER_CALLBACK = __SOURCE_ERROR_CONTEXT_CALLBACK
FAIL_MSG_CALLBACK = __SOURCE_CONTEXT_CALLBACK_MSG
SUGGESTS_CALLBACK = __SOURCE_CONTEXT_CALLBACK_STR_ITERABLE
REQUIRES_CALLBACK = __SOURCE_CONTEXT_CALLBACK_BOOL
PRECONDITION_CALLBACK = __SOURCE_CONTEXT_CALLBACK_BOOL


@dataclasses.dataclass(frozen=True)
class _ErrorHandler:
	callback: ERROR_HANDLER_CALLBACK
	handled: bool


@dataclasses.dataclass(frozen=True)
class _Requirement:
	requirement: REQUIRES_CALLBACK
	failure_message_getter: Optional[FAIL_MSG_CALLBACK]


_ERROR_HANDLER_TYPE = Dict[Type[CommandError], _ErrorHandler]


[文档] class AbstractNode(ABC): """ :class:`AbstractNode` is base class of all command nodes. It's also an abstract class. It provides several methods for building up the command tree """ def __init__(self): self._children_literal: Dict[str, List[Literal]] = collections.defaultdict(list) # mapping from literal text to related Literal nodes self._children: List[AbstractNode] = [] self._callback: Optional[RUNS_CALLBACK] = None self._error_handlers: _ERROR_HANDLER_TYPE = {} self._child_error_handlers: _ERROR_HANDLER_TYPE = {} self._preconditions: List[PRECONDITION_CALLBACK] = [] self._requirements: List[_Requirement] = [] self._redirect_node: Optional[AbstractNode] = None self._suggestion_getter: SUGGESTS_CALLBACK = lambda: [] # -------------- # Interfaces # --------------
[文档] def then(self, node: 'AbstractNode') -> Self: """ Attach a child node to its children list, and then return itself It's used for building the command tree structure :param node: A node instance to be added to current node's children list Example:: Literal('!!email'). \\ then(Literal('list')). \\ then(Literal('remove'). \\ then(Integer('email_id')) ). \\ then(Literal('send'). \\ then(Text('player'). \\ then(GreedyText('message')) ) ) """ if self._redirect_node is not None: raise IllegalNodeOperation('Redirected node is not allowed to add child nodes') class_utils.check_type(node, AbstractNode) if isinstance(node, Literal): for literal in node.literals: self._children_literal[literal].append(node) else: self._children.append(node) return self
[文档] def runs(self, func: RUNS_CALLBACK) -> Self: """ Set the callback function of this node. When the command parsing finished at this node, the callback function will be executed The callback function is allowed to accept 0 to 2 arguments (a :class:`~mcdreforged.command.command_source.CommandSource` as command source and a :class:`dict` (:class:`~mcdreforged.command.builder.common.CommandContext`) as context). For example, the following 4 functions are available callbacks:: def callback1(): pass def callback2(source: CommandSource): pass def callback3(source: CommandSource, context: dict): pass callback4 = lambda src: src.reply('pong') node1.runs(callback1) node2.runs(callback2) node3.runs(callback3) node4.runs(callback4) Both of them can be used as the argument of the ``runs`` method This dynamic callback argument adaptation is used in all callback invoking of the command nodes :param func: A callable that accepts up to 2 arguments. Argument list: :class:`~mcdreforged.command.command_source.CommandSource`, :class:`dict` (:class:`~mcdreforged.command.builder.common.CommandContext`) """ class_utils.check_type(func, Callable) # type: ignore # see also: python/mypy#14928 self._callback = func return self
[文档] def requires(self, requirement: REQUIRES_CALLBACK, failure_message_getter: Optional[FAIL_MSG_CALLBACK] = None) -> Self: """ Set the requirement tester callback of the node. When entering this node, MCDR will invoke the requirement tester to see if the current command source and context meet the specified condition If the tester callback return True, nothing will happen, MCDR will continue parsing the rest of the command If the tester callback return False, a ``RequirementNotMet`` exception will be risen. At this time if the *failure_message_getter* parameter is available, MCDR will invoke *failure_message_getter* to get the message string of the ``RequirementNotMet`` exception, otherwise a default message will be used .. versionadded:: v2.7.0 Multiple :meth:`requires` call results in an "and" combination of all given requirements, i.e. the current command context is satisfied iif. all given requirements are satisfied :param requirement: A callable that accepts up to 2 arguments and returns a bool. Argument list: :class:`~mcdreforged.command.command_source.CommandSource`, :class:`dict` (:class:`~mcdreforged.command.builder.common.CommandContext`) :param failure_message_getter: An optional callable that accepts up to 2 arguments and returns a str or a :class:`~mcdreforged.minecraft.rtext.text.RTextBase`. Argument list: :class:`~mcdreforged.command.command_source.CommandSource`, :class:`dict` (:class:`~mcdreforged.command.builder.common.CommandContext`) Example usages:: node1.requires(lambda src: src.has_permission(3)) # Permission check, error if the permission is not enough node2.requires(lambda src, ctx: ctx['page_count'] <= get_max_page()) # Dynamic range check node3.requires(lambda src, ctx: is_legal(ctx['target']), lambda src, ctx: 'target {} is illegal'.format(ctx['target'])) # Customized failure message """ class_utils.check_type(requirement, Callable) # type: ignore # see also: python/mypy#14928 class_utils.check_type(failure_message_getter, (Callable, None)) # type: ignore # see also: python/mypy#14928 self._requirements.append(_Requirement(requirement, failure_message_getter)) return self
[文档] def precondition(self, precondition: PRECONDITION_CALLBACK) -> Self: """ Set the precondition callback for this node. Before attempting to enter this node, MCDR will invoke the precondition callback to see if the current command source and context meet the specified condition If the precondition callback returns True, the node will remain accessible, and MCDR will continue entering the node If the precondition callback returns False, the node will be filtered out and treated as if it does not exist .. versionadded:: v2.14.0 :param precondition: A callable that accepts up to 2 arguments and returns a bool. Argument list: :class:`~mcdreforged.command.command_source.CommandSource`, :class:`dict` (:class:`~mcdreforged.command.builder.common.CommandContext`) Example usages:: node1.precondition(lambda src: src.has_permission(3)) # Permission check, but hide if the permission is not enough node2.precondition(lambda src, ctx: 'foo' in ctx) # Avoid re-assigning the "foo" argument """ class_utils.check_type(precondition, Callable) # type: ignore # see also: python/mypy#14928 self._preconditions.append(precondition) return self
[文档] def redirects(self, redirect_node: 'AbstractNode') -> Self: """ Redirect all further child nodes command parsing to another given node Redirected stuffs: * Children node traversal * Command callback, if current node doesn't have a callback Unredirected stuffs: * Command parsing, i.e. parsing the literal / argument value of the node from command * Requirement testing * Precondition testing * Suggestion fetching Example use cases: * You want a short command and full-path command that will all execute the same commands * You want to repeatedly re-enter a command node's children when parsing commands Pay attention to the difference between :meth:`redirects` and :meth:`then`. :meth:`redirects` is to redirect the child nodes, and :meth:`then` is to add a child node :param redirect_node: A node instance which current node is redirecting to """ if self.has_children(): raise IllegalNodeOperation('Node with children nodes is not allowed to be redirected') class_utils.check_type(redirect_node, AbstractNode) self._redirect_node = redirect_node return self
[文档] def suggests(self, suggestion: SUGGESTS_CALLBACK) -> Self: """ Set the provider for command suggestions of this node :class:`Literal` node does not support this method Examples:: Literal('!!whereis'). \\ then( Text('player_name'). suggests(lambda: ['Steve', 'Alex']). runs(lambda src, ctx: find_player(src, ctx['player_name'])) ) When the user input ``!!whereis`` in the console and a space character, MCDR will show the suggestions ``"Steve"`` and ``"Alex"`` :param suggestion: A callable function which accepts up to 2 parameters and return an iterable of str indicating the current command suggestions. Argument list: :class:`~mcdreforged.command.command_source.CommandSource`, :class:`dict` (:class:`~mcdreforged.command.builder.common.CommandContext`) """ class_utils.check_type(suggestion, Callable) # type: ignore # see also: python/mypy#14928 self._suggestion_getter = suggestion return self
[文档] def on_error(self, error_type: Type[CommandError], handler: ERROR_HANDLER_CALLBACK, *, handled: bool = False) -> Self: """ When a command error occurs, the given will invoke the given handler to handle with the error :param error_type: A class that is subclass of :class:`CommandError` :param handler: A callable that accepts up to 3 arguments. Argument list: :class:`~mcdreforged.command.builder.exception.CommandError`, :class:`~mcdreforged.command.command_source.CommandSource`, :class:`dict` (:class:`~mcdreforged.command.builder.common.CommandContext`) :keyword handled: If handled is set to True, :meth:`CommandError.set_handled<mcdreforged.command.builder.exception.CommandError.set_handled>` is called automatically when invoking the handler callback """ if not issubclass(error_type, CommandError): raise TypeError('error_type parameter should be a class inherited from CommandError, but class {} found'.format(error_type)) class_utils.check_type(error_type, type) class_utils.check_type(handler, Callable) # type: ignore # see also: python/mypy#14928 self._error_handlers[error_type] = _ErrorHandler(handler, handled) return self
[文档] def on_child_error(self, error_type: Type[CommandError], handler: ERROR_HANDLER_CALLBACK, *, handled: bool = False) -> Self: """ Similar to :meth:`on_error`, but it gets triggered only when the node receives a command error from one of the node's direct or indirect child """ if not issubclass(error_type, CommandError): raise TypeError('error_type parameter should be a class inherited from CommandError, but class {} found'.format(error_type)) class_utils.check_type(error_type, type) class_utils.check_type(handler, Callable) # type: ignore # see also: python/mypy#14928 self._child_error_handlers[error_type] = _ErrorHandler(handler, handled) return self
[文档] def print_tree(self, line_writer: tree_printer.LineWriter = print): """ Print the command tree in a read-able format. Mostly used in debugging :param line_writer: A printer function that accepts a str .. versionadded:: v2.6.0 """ def children_getter(node: AbstractNode) -> List[AbstractNode]: return node.get_children() tree_printer.print_tree(self, children_getter, str, line_writer)
# ------------------- # Interfaces ends # ------------------- @abstractmethod def _get_usage(self) -> str: raise NotImplementedError() def has_children(self): return len(self._children) + len(self._children_literal) > 0 def get_children(self) -> List['AbstractNode']: children: List[AbstractNode] = [] for literal_list in self._children_literal.values(): children.extend(literal_list) children.extend(self._children) return collection_utils.unique_list(children) def _on_visited(self, context: CommandContext, parsed_result: ParseResult): """ Invoked when this node is visited, right after the node successfully parses a command segment A node can use the context dict to store its provided value here """ pass @abstractmethod def parse(self, text: str) -> ParseResult: """ Try to parse the text and get an argument * ``ParseResult.value``: The value to store in the context dict * ``ParseResult.remaining``: The remaining unparsed text :param text: the text to be parsed. It's supposed to not be started with DIVIDER character :meta private: """ raise NotImplementedError() @staticmethod def __smart_callback(callback: Callable, args: tuple, callback_error_factory: CallbackError.Builder): # make sure all passed CommandContext are copies adjusted_args = tuple( arg.copy() if isinstance(arg, CommandContext) else arg for arg in args ) return ScheduledCallback(callback, adjusted_args, callback_error_factory).invoke(DirectCallbackInvoker()) def __handle_error(self, error: CommandError, context: CommandContext, error_handlers: _ERROR_HANDLER_TYPE): for error_type, handler in error_handlers.items(): if isinstance(error, error_type): self.__smart_callback(handler.callback, (context.source, error, context), CallbackError.builder(context, 'error handling')) if handler.handled: error.set_handled() def __raise_error(self, error: CommandError, context: CommandContext): self.__handle_error(error, context, self._error_handlers) raise error def __check_requirements(self, context: CommandContext) -> Optional[_Requirement]: """ :return: None: requirement check passed; otherwise, the unsatisfied requirement """ for req in self._requirements: ok = self.__smart_callback(req.requirement, (context.source, context), CallbackError.builder(context, 'requirements check')) if not ok: return req return None def __check_preconditions(self, context: CommandContext) -> bool: for precondition in self._preconditions: ok = self.__smart_callback(precondition, (context.source, context), CallbackError.builder(context, 'preconditions check')) if not ok: return False return True def _get_suggestions(self, context: CommandContext) -> Iterable[str]: return self.__smart_callback(self._suggestion_getter, (context.source, context), CallbackError.builder(context, 'suggestions getting')) def _execute_command(self, context: CommandContext) -> CommandExecutions: command = context.command executions = CommandExecutions() try: parse_result = self.parse(context.command_remaining) except CommandSyntaxError as error: error.set_parsed_command(context.command_read) error.set_failed_command(context.command_read + context.command_remaining[:error.char_read]) self.__raise_error(error, context) else: next_remaining = utils.remove_divider_prefix(context.command_remaining[parse_result.char_read:]) total_read = len(command) - len(next_remaining) with context.visit_node(self, parse_result, total_read): req = self.__check_requirements(context) if req is not None: # requirement check failed if req.failure_message_getter is not None: failure_message = self.__smart_callback(req.failure_message_getter, (context.source, context), CallbackError.builder(context, 'failure message getting')) else: failure_message = None self.__raise_error(RequirementNotMet(context.command_read, context.command_read, failure_message), context) # Parsing finished if len(next_remaining) == 0: callback = self._callback if callback is None and self._redirect_node is not None: callback = self._redirect_node._callback if callback is not None: context2 = context.copy() executions.append(CommandExecution( context=context2, scheduled_callback=ScheduledCallback(callback, (context2.source, context2), CallbackError.builder(context2, 'command callback')), )) else: self.__raise_error(UnknownCommand(context.command_read, context.command_read), context) # Un-parsed command string remains else: # Redirecting node = self if self._redirect_node is None else self._redirect_node argument_unknown = False # No child at all if not node.has_children(): argument_unknown = True else: # Pass the remaining command string to the children next_literal = utils.get_element(next_remaining) try: # Check literal children first literal_error = None child_literal: AbstractNode # satisfy pycharm's static checker on the __check_preconditions() call for child_literal in node._children_literal.get(next_literal, []): if not child_literal.__check_preconditions(context): continue try: with context.enter_child(child_literal): executions.extend(child_literal._execute_command(context)) break except CommandError as e: # it's ok for a direct literal node to fail # other literal might still have a chance to consume this command literal_error = e else: # All literal children fails if literal_error is not None: raise literal_error for child in node._children: if not child.__check_preconditions(context): continue try: with context.enter_child(child): executions.extend(child._execute_command(context)) except UnknownArgument: continue break else: # No argument child argument_unknown = True except CommandError as error: self.__handle_error(error, context, self._child_error_handlers) raise error from None if argument_unknown: self.__raise_error(UnknownArgument(context.command_read, command), context) return executions def _generate_suggestions(self, context: CommandContext) -> CommandSuggestions: """ Return a list of tuple (suggested command, suggested argument) """ def self_suggestions(): return CommandSuggestions([CommandSuggestion(command_read_at_the_beginning, s) for s in self._get_suggestions(context)]) suggestions = CommandSuggestions() # [!!aa bb cc] dd # read suggested command_read_at_the_beginning = context.command_read if len(context.command_remaining) == 0: return self_suggestions() try: result = self.parse(context.command_remaining) except CommandSyntaxError: return self_suggestions() else: success_read = len(context.command) - len(context.command_remaining) + result.char_read next_remaining = utils.remove_divider_prefix(context.command_remaining[result.char_read:]) total_read = len(context.command) - len(next_remaining) with context.visit_node(self, result, total_read): if self.__check_requirements(context) is not None: return CommandSuggestions() # Parsing finished if len(next_remaining) == 0: # total_read == success_read means DIVIDER does not exist at the end of the input string # in that case, ends at this current node if success_read == total_read: return self_suggestions() node = self if self._redirect_node is None else self._redirect_node # Check literal children first children_literal = node._children_literal.get(utils.get_element(next_remaining), []) child_literal: AbstractNode # satisfy pycharm's static checker on the __check_preconditions() call for child_literal in children_literal: if not child_literal.__check_preconditions(context): continue with context.enter_child(child_literal): suggestions.extend(child_literal._generate_suggestions(context)) if len(children_literal) == 0: for literal_list in node._children_literal.values(): for child_literal in literal_list: if not child_literal.__check_preconditions(context): continue with context.enter_child(child_literal): suggestions.extend(child_literal._generate_suggestions(context)) usages = [] for child in node._children: if not child.__check_preconditions(context): continue with context.enter_child(child): suggestions.extend(child._generate_suggestions(context)) if len(next_remaining) == 0: usages.append(child._get_usage()) if len(next_remaining) == 0: suggestions.complete_hint = '|'.join(usages) return suggestions
class EntryNode(AbstractNode, ABC): def _entry_execute(self, source: CommandSource, command: str) -> CommandExecutions: """ Parse and execute this command :param source: the source that executes this command :param command: the command string to execute :raise CommandError: if parsing fails :meta private: """ try: context = CommandContext(source, command) with context.enter_child(self): return self._execute_command(context) except LiteralNotMatch as error: # the root literal node fails to parse the first element raise UnknownRootArgument(error.get_parsed_command(), error.get_failed_command()) from error def _entry_generate_suggestions(self, source: CommandSource, command: str) -> CommandSuggestions: """ Get a list of command suggestion of given command Return an empty list if parsing fails :param source: the source that executes this command :param command: the command string to execute :meta private: """ context = CommandContext(source, command) with context.enter_child(self): return self._generate_suggestions(context)
[文档] class Literal(EntryNode): """ Literal node is a special node. It doesn't output any value. It's more like a command branch carrier Literal node can accept a str as its literal in its constructor. A literal node accepts the parsing command only when the next element of the parsing command exactly matches the literal of the node Literal node is the only node that can start a command execution """ def __init__(self, literal: Union[str, Iterable[str]]): super().__init__() if isinstance(literal, str): literals = {literal} elif isinstance(literal, Iterable): literals = set(literal) else: raise TypeError('Only str or Iterable[str] is accepted') for literal in literals: if not isinstance(literal, str): raise TypeError('Literal node only accepts str but {} found'.format(type(literal))) if utils.DIVIDER in literal: raise TypeError('DIVIDER character {!r} cannot be inside a literal'.format(utils.DIVIDER)) self.literals: Set[str] = literals self._suggestion_getter = lambda: self.literals @override def _get_usage(self) -> str: return '|'.join(sorted(self.literals)) @override def suggests(self, suggestion: SUGGESTS_CALLBACK) -> NoReturn: raise IllegalNodeOperation('Literal node does not support suggests') @override def parse(self, text): arg = utils.get_element(text) if arg in self.literals: return ParseResult(None, len(arg)) else: raise LiteralNotMatch('Invalid Argument', len(arg)) def __str__(self): return 'Literal {}'.format(repr(tuple(self.literals)[0]) if len(self.literals) == 1 else set(self.literals)) def __repr__(self): return class_utils.represent(self, {'literals': self.literals})
[文档] class ArgumentNode(AbstractNode, ABC): """ Argument node is an abstract base class for all nodes which store parsed values It has a str field ``name`` which is used as the key used in storing parsed value in context """ class _InitKwargs(TypedDict): accumulate: NotRequired[bool] metavar: NotRequired[str]
[文档] def __init__( self, name: str, *, accumulate: Optional[bool] = None, metavar: Optional[str] = None, ): """ :param name: The name of the argument node. Should be unique within the command tree path :keyword accumulate: If set to True, then the parsed value will be stored in a list in the command context. Re-visiting of this node appends new value to the end of the list :keyword metavar: Optional str. It behaves like the *metavar* kwarg of :external:meth:`argparse.ArgumentParser.add_argument`, which overrides the name displayed of the command suggestion usage placeholder. If not provided, the *name* argument will be used as the default placeholder .. versionadded:: v2.13.0 The *accumulate* parameter .. versionadded:: v2.14.0 The *metavar* parameter """ super().__init__() self.__name = name self.__accumulate: bool = bool(accumulate) self.__metavar = metavar
@override def _on_visited(self, context: CommandContext, parsed_result: ParseResult): if self.__accumulate: if self.__name not in context: context[self.__name] = [] context[self.__name].append(parsed_result.value) else: context[self.__name] = parsed_result.value def get_name(self) -> str: return self.__name @override def _get_usage(self) -> str: return '<{}>'.format(self.__metavar or self.__name) def __str__(self): return '{} <{}>'.format(self.__class__.__name__, self.get_name()) def __repr__(self): fields: dict = {'name': self.__name} if self.__accumulate: fields['accumulate'] = True if self.__metavar: fields['metavar'] = True return class_utils.represent(self, fields)