Skip to content

Nautobot Plugin ChatOps API Package

nautobot_chatops.api

nested_serializers

Nested Serializers for ChatOps Plugin.

NestedAccessGrantSerializer

Bases: WritableNestedSerializer

Nested serializer for AccessGrant objects.

Source code in nautobot_chatops/api/nested_serializers.py
class NestedAccessGrantSerializer(WritableNestedSerializer):
    """Nested serializer for AccessGrant objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_chatops-api:accessgrant-detail")

    class Meta:
        """Meta for Nested AccessGrant Serializer."""

        model = AccessGrant
        fields = ("id", "command", "subcommand", "grant_type", "name", "value", "url")
Meta

Meta for Nested AccessGrant Serializer.

Source code in nautobot_chatops/api/nested_serializers.py
class Meta:
    """Meta for Nested AccessGrant Serializer."""

    model = AccessGrant
    fields = ("id", "command", "subcommand", "grant_type", "name", "value", "url")

NestedCommandTokenSerializer

Bases: WritableNestedSerializer

Nested serializer for CommandToken objects.

Source code in nautobot_chatops/api/nested_serializers.py
class NestedCommandTokenSerializer(WritableNestedSerializer):
    """Nested serializer for CommandToken objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_chatops-api:commandtoken-detail")

    class Meta:
        """Meta for Nested CommandToken Serializer."""

        model = CommandToken
        fields = ("id", "comment", "platform", "token", "url")
Meta

Meta for Nested CommandToken Serializer.

Source code in nautobot_chatops/api/nested_serializers.py
class Meta:
    """Meta for Nested CommandToken Serializer."""

    model = CommandToken
    fields = ("id", "comment", "platform", "token", "url")

serializers

API Serializers for ChatOps Plugin.

AccessGrantSerializer

Bases: ValidatedModelSerializer

API serializer for interacting with AccessGrant objects.

Source code in nautobot_chatops/api/serializers.py
class AccessGrantSerializer(ValidatedModelSerializer):
    """API serializer for interacting with AccessGrant objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_chatops-api:accessgrant-detail")

    class Meta:
        """Meta for AccessGrant Serializer."""

        model = AccessGrant
        fields = ("id", "command", "subcommand", "grant_type", "name", "value", "url")
Meta

Meta for AccessGrant Serializer.

Source code in nautobot_chatops/api/serializers.py
class Meta:
    """Meta for AccessGrant Serializer."""

    model = AccessGrant
    fields = ("id", "command", "subcommand", "grant_type", "name", "value", "url")

CommandTokenSerializer

Bases: ValidatedModelSerializer

API serializer for interacting with CommandToken objects.

Source code in nautobot_chatops/api/serializers.py
class CommandTokenSerializer(ValidatedModelSerializer):
    """API serializer for interacting with CommandToken objects."""

    url = serializers.HyperlinkedIdentityField(view_name="plugins-api:nautobot_chatops-api:commandtoken-detail")

    class Meta:
        """Meta for CommandToken Serializer."""

        model = CommandToken
        fields = ("id", "comment", "platform", "token", "url")
Meta

Meta for CommandToken Serializer.

Source code in nautobot_chatops/api/serializers.py
class Meta:
    """Meta for CommandToken Serializer."""

    model = CommandToken
    fields = ("id", "comment", "platform", "token", "url")

urls

Django urlpatterns declaration for nautobot_chatops plugin.

views

API Views module for the nautobot_chatops Nautobot plugin.

The views implemented in this module act as endpoints for various chat platforms to send requests and notifications to.

generic

API Views for Nautobot Chatops.

AccessGrantViewSet

Bases: ModelViewSet

API viewset for interacting with AccessGrant objects.

Source code in nautobot_chatops/api/views/generic.py
class AccessGrantViewSet(ModelViewSet):  # pylint: disable=too-many-ancestors
    """API viewset for interacting with AccessGrant objects."""

    queryset = AccessGrant.objects.all()
    serializer_class = AccessGrantSerializer
    filterset_class = AccessGrantFilterSet
CommandTokenViewSet

Bases: ModelViewSet

API viewset for interacting with CommandToken objects.

Source code in nautobot_chatops/api/views/generic.py
class CommandTokenViewSet(ModelViewSet):  # pylint: disable=too-many-ancestors
    """API viewset for interacting with CommandToken objects."""

    queryset = CommandToken.objects.all()
    serializer_class = CommandTokenSerializer
    filterset_class = CommandTokenFilterSet
NautobotChatopsRootView

Bases: APIRootView

Nautobot Chatops API root view.

Source code in nautobot_chatops/api/views/generic.py
class NautobotChatopsRootView(APIRootView):
    """Nautobot Chatops API root view."""

    def get_view_name(self):
        """Return name for the Nautobot Chatops API Root."""
        return "Nautobot Chatops"
get_view_name()

Return name for the Nautobot Chatops API Root.

Source code in nautobot_chatops/api/views/generic.py
def get_view_name(self):
    """Return name for the Nautobot Chatops API Root."""
    return "Nautobot Chatops"

lookup

API views for dynamic lookup of platform-specific data.

AccessLookupView

Bases: View

Look up a given access grant value by name.

Source code in nautobot_chatops/api/views/lookup.py
class AccessLookupView(View):
    """Look up a given access grant value by name."""

    http_method_names = ["get"]

    def get(self, request, *args, **kwargs):
        """Handle an inbount GET request for a specific access grant value."""
        for required_param in ("grant_type", "name"):
            if required_param not in request.GET:
                return HttpResponseBadRequest(f"Missing mandatory parameter {required_param}")

        value = None
        # For now we try all available Dispatchers (all supported platforms).
        # In a typical "real" deployment, we would only have one dispatcher_class installed.
        for dispatcher_class in Dispatcher.subclasses():
            try:
                value = dispatcher_class.platform_lookup(request.GET["grant_type"], request.GET["name"])
                if value:
                    break
            except NotImplementedError:
                continue

        if not value:
            return HttpResponseNotFound(f"No {request.GET['grant_type']} {request.GET['name']} found")

        return JsonResponse(data={"value": value})
get(request, args, kwargs)

Handle an inbount GET request for a specific access grant value.

Source code in nautobot_chatops/api/views/lookup.py
def get(self, request, *args, **kwargs):
    """Handle an inbount GET request for a specific access grant value."""
    for required_param in ("grant_type", "name"):
        if required_param not in request.GET:
            return HttpResponseBadRequest(f"Missing mandatory parameter {required_param}")

    value = None
    # For now we try all available Dispatchers (all supported platforms).
    # In a typical "real" deployment, we would only have one dispatcher_class installed.
    for dispatcher_class in Dispatcher.subclasses():
        try:
            value = dispatcher_class.platform_lookup(request.GET["grant_type"], request.GET["name"])
            if value:
                break
        except NotImplementedError:
            continue

    if not value:
        return HttpResponseNotFound(f"No {request.GET['grant_type']} {request.GET['name']} found")

    return JsonResponse(data={"value": value})

mattermost

Views to receive inbound notifications from Mattermost, parse them, and enqueue worker actions.

MattermostInteractionView

Bases: View

Handle notifications resulting from a Mattermost interactive block.

Source code in nautobot_chatops/api/views/mattermost.py
@method_decorator(csrf_exempt, name="dispatch")
class MattermostInteractionView(View):
    """Handle notifications resulting from a Mattermost interactive block."""

    http_method_names = ["post"]

    @staticmethod
    def get_selected_value(cmd):
        """Returns formatted selected value if one exists."""
        return f" '{cmd}'" if cmd else " ''"

    # pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements
    def post(self, request, *args, **kwargs):
        """Handle an inbound HTTP POST request representing a user interaction with a UI element."""
        valid, reason = verify_signature(request)
        if not valid:
            return HttpResponse(status=401, reason=reason)

        # For some reason Integration Messages from Mattermost do not show up in POST.items()
        # in these cases, we have to load the request.body
        try:
            data = json.loads(request.body)
        except ValueError as err:
            logger.info("No request body to decode, setting data to empty dict. Error: %s", err)
            data = {}
        if request.POST.dict():
            data.update(request.POST)

        context = {
            "org_id": data.get("team_id"),
            "org_name": data.get("team_domain"),
            "channel_id": data.get("channel_id"),
            "channel_name": data.get("channel_name"),
            "user_id": data.get("user_id"),
            "user_name": data.get("user_name"),
            "response_url": data.get("response_url"),
            "trigger_id": data.get("trigger_id"),
            "post_id": data.get("post_id"),
            "request_scheme": request.scheme,
            "request_host": request.get_host(),
            "integration_url": request.build_absolute_uri("/api/plugins/chatops/mattermost/interaction/"),
        }

        # Check for channel_name if channel_id is present
        mm_url = settings.PLUGINS_CONFIG["nautobot_chatops"]["mattermost_url"]
        token = settings.PLUGINS_CONFIG["nautobot_chatops"]["mattermost_api_token"]
        if context["channel_name"] is None and context["channel_id"] is not None:
            # Build a Mattermost Client Object
            mm_client = Driver(
                {
                    "url": mm_url,
                    "token": token,
                }
            )

            # Get the channel information from Mattermost API
            channel_info = mm_client.get(f'/channels/{context["channel_id"]}')

            # Assign the Channel name out of the conversations info end point
            context["channel_name"] = channel_info["name"]

        if context["user_name"] is None and context["user_id"] is not None:
            # Build a Mattermost Client Object
            mm_client = Driver(
                {
                    "url": mm_url,
                    "token": token,
                }
            )

            # Get the channel information from Mattermost API
            user_info = mm_client.get(f'/users/{context["user_id"]}')

            # Assign the Channel name out of the conversations info end point
            context["user_name"] = user_info["username"]

        # Block action triggered by a non-modal interactive component
        if data.get("context"):
            action = data.get("context")
            action_id = action.get("action_id", "")
            context["token"] = action.get("token", "")
            if action["type"] == "static_select":
                value = action.get("selected_option", "")
            elif action["type"] == "button":
                value = action.get("value")
            else:
                logger.error(f"Unhandled action type {action['type']} in Mattermost Dispatcher")
                return HttpResponse(status=500)
            selected_value = f"'{value}'"

        elif data.get("submission"):
            # View submission triggered from a modal dialog
            logger.info("Submission triggered from a modal dialog")
            values = data.get("submission")
            context["token"] = data.get("state")
            callback_id = data.get("callback_id")
            logger.debug(json.dumps(data, indent=2))

            # Handling for multiple fields. This will be used when the multi_input_dialog() method of the Mattermost
            # Dispatcher class is utilized.
            if len(values) > 1:
                selected_value = ""
                # sometimes in the case of back-to-back dialogs there will be
                # parameters included in the callback_id.  Below parses those
                # out and adds them to selected_value.
                try:
                    cmds = shlex.split(callback_id)
                except ValueError as err:
                    logger.error("Mattermost: %s", err)
                    return HttpResponse(status=400, reason=f"Error: {err} encountered when processing {callback_id}")
                for i, cmd in enumerate(cmds):
                    if i > 1:
                        selected_value += self.get_selected_value(cmd)
                action_id = f"{cmds[0]} {cmds[1]}"

                sorted_params = sorted(values.keys())
                for blk_id in sorted_params:
                    selected_value += self.get_selected_value(values[blk_id])

                # Remove leading space
                selected_value = selected_value[1:]

            # Original un-modified single-field handling below
            else:
                action_id = sorted(values.keys())[0]
                selected_value = values[action_id]
        else:
            return HttpResponse(status=500, reason="I didn't understand that notification.")

        if settings.PLUGINS_CONFIG["nautobot_chatops"].get("delete_input_on_submission"):
            # Delete the interactive element since it's served its purpose
            # Does not work for Ephemeral Posts.
            if context["post_id"] is not None:
                MattermostDispatcher(context).delete_message(context["post_id"])
        if action_id == "action" and selected_value == "cancel":
            # Nothing more to do
            return HttpResponse()

        logger.info(f"action_id: {action_id}, selected_value: {selected_value}")
        try:
            command, subcommand, params = parse_command_string(f"{action_id} {selected_value}")
        except ValueError as err:
            logger.error("%s", err)
            return HttpResponse(
                status=400, reason=f"Error: {err} encountered on command '{action_id} {selected_value}'"
            )
        # Convert empty parameter strings to NoneType
        for idx, param in enumerate(params):
            if not param:
                params[idx] = None

        logger.info(f"command: {command}, subcommand: {subcommand}, params: {params}")

        registry = get_commands_registry()

        if command not in registry:
            MattermostDispatcher(context).send_markdown(commands_help())
            return HttpResponse()

        MattermostDispatcher(context).send_busy_indicator()

        return check_and_enqueue_command(registry, command, subcommand, params, context, MattermostDispatcher)
get_selected_value(cmd) staticmethod

Returns formatted selected value if one exists.

Source code in nautobot_chatops/api/views/mattermost.py
@staticmethod
def get_selected_value(cmd):
    """Returns formatted selected value if one exists."""
    return f" '{cmd}'" if cmd else " ''"
post(request, args, kwargs)

Handle an inbound HTTP POST request representing a user interaction with a UI element.

Source code in nautobot_chatops/api/views/mattermost.py
def post(self, request, *args, **kwargs):
    """Handle an inbound HTTP POST request representing a user interaction with a UI element."""
    valid, reason = verify_signature(request)
    if not valid:
        return HttpResponse(status=401, reason=reason)

    # For some reason Integration Messages from Mattermost do not show up in POST.items()
    # in these cases, we have to load the request.body
    try:
        data = json.loads(request.body)
    except ValueError as err:
        logger.info("No request body to decode, setting data to empty dict. Error: %s", err)
        data = {}
    if request.POST.dict():
        data.update(request.POST)

    context = {
        "org_id": data.get("team_id"),
        "org_name": data.get("team_domain"),
        "channel_id": data.get("channel_id"),
        "channel_name": data.get("channel_name"),
        "user_id": data.get("user_id"),
        "user_name": data.get("user_name"),
        "response_url": data.get("response_url"),
        "trigger_id": data.get("trigger_id"),
        "post_id": data.get("post_id"),
        "request_scheme": request.scheme,
        "request_host": request.get_host(),
        "integration_url": request.build_absolute_uri("/api/plugins/chatops/mattermost/interaction/"),
    }

    # Check for channel_name if channel_id is present
    mm_url = settings.PLUGINS_CONFIG["nautobot_chatops"]["mattermost_url"]
    token = settings.PLUGINS_CONFIG["nautobot_chatops"]["mattermost_api_token"]
    if context["channel_name"] is None and context["channel_id"] is not None:
        # Build a Mattermost Client Object
        mm_client = Driver(
            {
                "url": mm_url,
                "token": token,
            }
        )

        # Get the channel information from Mattermost API
        channel_info = mm_client.get(f'/channels/{context["channel_id"]}')

        # Assign the Channel name out of the conversations info end point
        context["channel_name"] = channel_info["name"]

    if context["user_name"] is None and context["user_id"] is not None:
        # Build a Mattermost Client Object
        mm_client = Driver(
            {
                "url": mm_url,
                "token": token,
            }
        )

        # Get the channel information from Mattermost API
        user_info = mm_client.get(f'/users/{context["user_id"]}')

        # Assign the Channel name out of the conversations info end point
        context["user_name"] = user_info["username"]

    # Block action triggered by a non-modal interactive component
    if data.get("context"):
        action = data.get("context")
        action_id = action.get("action_id", "")
        context["token"] = action.get("token", "")
        if action["type"] == "static_select":
            value = action.get("selected_option", "")
        elif action["type"] == "button":
            value = action.get("value")
        else:
            logger.error(f"Unhandled action type {action['type']} in Mattermost Dispatcher")
            return HttpResponse(status=500)
        selected_value = f"'{value}'"

    elif data.get("submission"):
        # View submission triggered from a modal dialog
        logger.info("Submission triggered from a modal dialog")
        values = data.get("submission")
        context["token"] = data.get("state")
        callback_id = data.get("callback_id")
        logger.debug(json.dumps(data, indent=2))

        # Handling for multiple fields. This will be used when the multi_input_dialog() method of the Mattermost
        # Dispatcher class is utilized.
        if len(values) > 1:
            selected_value = ""
            # sometimes in the case of back-to-back dialogs there will be
            # parameters included in the callback_id.  Below parses those
            # out and adds them to selected_value.
            try:
                cmds = shlex.split(callback_id)
            except ValueError as err:
                logger.error("Mattermost: %s", err)
                return HttpResponse(status=400, reason=f"Error: {err} encountered when processing {callback_id}")
            for i, cmd in enumerate(cmds):
                if i > 1:
                    selected_value += self.get_selected_value(cmd)
            action_id = f"{cmds[0]} {cmds[1]}"

            sorted_params = sorted(values.keys())
            for blk_id in sorted_params:
                selected_value += self.get_selected_value(values[blk_id])

            # Remove leading space
            selected_value = selected_value[1:]

        # Original un-modified single-field handling below
        else:
            action_id = sorted(values.keys())[0]
            selected_value = values[action_id]
    else:
        return HttpResponse(status=500, reason="I didn't understand that notification.")

    if settings.PLUGINS_CONFIG["nautobot_chatops"].get("delete_input_on_submission"):
        # Delete the interactive element since it's served its purpose
        # Does not work for Ephemeral Posts.
        if context["post_id"] is not None:
            MattermostDispatcher(context).delete_message(context["post_id"])
    if action_id == "action" and selected_value == "cancel":
        # Nothing more to do
        return HttpResponse()

    logger.info(f"action_id: {action_id}, selected_value: {selected_value}")
    try:
        command, subcommand, params = parse_command_string(f"{action_id} {selected_value}")
    except ValueError as err:
        logger.error("%s", err)
        return HttpResponse(
            status=400, reason=f"Error: {err} encountered on command '{action_id} {selected_value}'"
        )
    # Convert empty parameter strings to NoneType
    for idx, param in enumerate(params):
        if not param:
            params[idx] = None

    logger.info(f"command: {command}, subcommand: {subcommand}, params: {params}")

    registry = get_commands_registry()

    if command not in registry:
        MattermostDispatcher(context).send_markdown(commands_help())
        return HttpResponse()

    MattermostDispatcher(context).send_busy_indicator()

    return check_and_enqueue_command(registry, command, subcommand, params, context, MattermostDispatcher)
MattermostSlashCommandView

Bases: View

Handle notifications from a Mattermost /command.

Source code in nautobot_chatops/api/views/mattermost.py
@method_decorator(csrf_exempt, name="dispatch")
class MattermostSlashCommandView(View):
    """Handle notifications from a Mattermost /command."""

    http_method_names = ["post"]

    def post(self, request, *args, **kwargs):
        """Handle an inbound HTTP POST request representing a user-issued /command."""
        valid, reason = verify_signature(request)
        if not valid:
            return HttpResponse(status=401, reason=reason)

        command = request.POST.get("command")
        if not command:
            return HttpResponse("No command specified")
        command = command.replace("/", "")
        params = request.POST.get("text", "")
        context = {
            "request_scheme": request.scheme,
            "request_host": request.get_host(),
            "org_id": request.POST.get("team_id"),
            "org_name": request.POST.get("team_domain"),
            "channel_id": request.POST.get("channel_id"),
            "channel_name": request.POST.get("channel_name"),
            "user_id": request.POST.get("user_id"),
            "user_name": request.POST.get("user_name"),
            "response_url": request.POST.get("response_url"),
            "trigger_id": request.POST.get("trigger_id"),
            "integration_url": request.build_absolute_uri("/api/plugins/chatops/mattermost/interaction/"),
            "token": request.headers.get("Authorization"),
        }

        try:
            command, subcommand, params = parse_command_string(f"{command} {params}")
        except ValueError as err:
            logger.error("%s", err)
            return HttpResponse(status=400, reason=f"'Error: {err}' encountered on '{command} {params}")

        registry = get_commands_registry()

        if command not in registry:
            MattermostDispatcher(context).send_markdown(commands_help(prefix="/"))
            return HttpResponse()

        MattermostDispatcher(context).send_busy_indicator()

        return check_and_enqueue_command(registry, command, subcommand, params, context, MattermostDispatcher)
post(request, args, kwargs)

Handle an inbound HTTP POST request representing a user-issued /command.

Source code in nautobot_chatops/api/views/mattermost.py
def post(self, request, *args, **kwargs):
    """Handle an inbound HTTP POST request representing a user-issued /command."""
    valid, reason = verify_signature(request)
    if not valid:
        return HttpResponse(status=401, reason=reason)

    command = request.POST.get("command")
    if not command:
        return HttpResponse("No command specified")
    command = command.replace("/", "")
    params = request.POST.get("text", "")
    context = {
        "request_scheme": request.scheme,
        "request_host": request.get_host(),
        "org_id": request.POST.get("team_id"),
        "org_name": request.POST.get("team_domain"),
        "channel_id": request.POST.get("channel_id"),
        "channel_name": request.POST.get("channel_name"),
        "user_id": request.POST.get("user_id"),
        "user_name": request.POST.get("user_name"),
        "response_url": request.POST.get("response_url"),
        "trigger_id": request.POST.get("trigger_id"),
        "integration_url": request.build_absolute_uri("/api/plugins/chatops/mattermost/interaction/"),
        "token": request.headers.get("Authorization"),
    }

    try:
        command, subcommand, params = parse_command_string(f"{command} {params}")
    except ValueError as err:
        logger.error("%s", err)
        return HttpResponse(status=400, reason=f"'Error: {err}' encountered on '{command} {params}")

    registry = get_commands_registry()

    if command not in registry:
        MattermostDispatcher(context).send_markdown(commands_help(prefix="/"))
        return HttpResponse()

    MattermostDispatcher(context).send_busy_indicator()

    return check_and_enqueue_command(registry, command, subcommand, params, context, MattermostDispatcher)
verify_signature(request)

Verify that a given request was legitimately signed by Mattermost.

https://developers.mattermost.com/integrate/slash-commands/

Returns:

Name Type Description
valid tuple

(valid, reason)

Source code in nautobot_chatops/api/views/mattermost.py
def verify_signature(request):
    """Verify that a given request was legitimately signed by Mattermost.

    https://developers.mattermost.com/integrate/slash-commands/

    Returns:
      valid (tuple): (valid, reason)
    """
    if request.headers.get("Authorization"):
        expected_signature = request.headers.get("Authorization")
    else:
        # For some reason Integration Messages from Mattermost do not show up in POST.items()
        # in these cases, we have to load the request.body
        try:
            data = json.loads(request.body)
        except ValueError as err:
            logger.info("No request body to decode, setting data to empty dict. Error: %s", err)
            data = {}
        if request.POST.items():
            data.update(request.POST)
        # For Interactive Messages, the token will be passed in the context.
        if data.get("context"):
            action = data.get("context")
            expected_signature = action.get("token")
        # For Interactive Dialogs, the token will be passed in the state.
        elif data.get("state"):
            expected_signature = data.get("state")

        else:
            signature_error_cntr.labels("mattermost", "missing_signature").inc()
            return False, "Missing Command Token in Body or Header"

    if not expected_signature:
        signature_error_cntr.labels("mattermost", "missing_signature").inc()
        return False, "Missing Command Token"

    command_tokens = CommandToken.objects.filter(platform=CommandTokenPlatformChoices.MATTERMOST)

    if not command_tokens.filter(token=expected_signature.split("Token ")[1]):
        signature_error_cntr.labels("mattermost", "incorrect_signature").inc()
        return False, "Incorrect signature"

    return True, "Signature is valid"

ms_teams

Views to receive inbound notifications from Microsoft Teams, parse them, and enqueue worker actions.

MSTeamsMessagesView

Bases: View

Handle notifications from a Microsoft Teams bot.

Source code in nautobot_chatops/api/views/ms_teams.py
@method_decorator(csrf_exempt, name="dispatch")
class MSTeamsMessagesView(View):
    """Handle notifications from a Microsoft Teams bot."""

    http_method_names = ["post"]

    # pylint: disable=too-many-locals,too-many-branches,too-many-statements
    def post(self, request, *args, **kwargs):
        """Process an inbound HTTP POST request."""
        body = json.loads(request.body)

        valid, reason = verify_jwt_token(request.headers, body)
        if not valid:
            return HttpResponse(status=403, reason=reason)

        if body["type"] not in ["message", "invoke"]:
            return HttpResponse(status=200, reason=f"No support for {body['type']} notifications")

        context = {
            "request_scheme": request.scheme,
            "request_host": request.get_host(),
            # We don't get a team_id or a channel_id in direct message conversations
            "channel_id": body["channelData"].get("channel", {}).get("id"),
            "org_id": body["channelData"].get("team", {}).get("id"),
            # Note that the default channel in a team has channel_id == org_id
            "user_id": body["from"]["id"],
            "user_name": body["from"]["name"],
            "user_role": body["from"].get("role"),
            "conversation_id": body["conversation"]["id"],
            "conversation_name": body["conversation"].get("name"),
            "bot_id": body["recipient"]["id"],
            "bot_name": body["recipient"]["name"],
            "bot_role": body["recipient"].get("role"),
            "message_id": body["id"],
            "service_url": body["serviceUrl"],
            "tenant_id": body["channelData"]["tenant"]["id"],
            "is_group": body["conversation"].get("isGroup", False),
        }

        if context["org_id"]:
            # Get the organization name as well
            response = requests.get(
                f"{context['service_url']}/v3/teams/{context['org_id']}",
                headers={"Authorization": f"Bearer {MSTeamsDispatcher.get_token()}"},
            )
            response.raise_for_status()
            context["org_name"] = response.json()["name"]
        else:
            # Direct message - use the user as the "organization" - better than nothing
            context["org_id"] = context["user_id"]
            context["org_name"] = f"direct message with {context['user_name']}"

        if context["channel_id"]:
            # Get the channel name as well
            response = requests.get(
                f"{context['service_url']}/v3/teams/{context['org_id']}/conversations",
                headers={"Authorization": f"Bearer {MSTeamsDispatcher.get_token()}"},
            )
            response.raise_for_status()
            for conversation in response.json()["conversations"]:
                if conversation["id"] == context["channel_id"]:
                    # The "General" channel has a null name
                    context["channel_name"] = conversation["name"] or "General"
                    break
        else:
            # Direct message - use the user as the "channel" - better than nothing
            context["channel_id"] = context["user_id"]
            context["channel_name"] = f"direct message with {context['user_name']}"

        if "text" in body:
            # A command typed directly by the user
            command = body["text"]

            # If we get @ed in a channel, the message will be "<at>NAutobot</at> command subcommand"
            command = re.sub(r"<at>.*</at>", "", command)

            command, subcommand, params = parse_command_string(command)
        elif "value" in body:
            if body["value"].get("type") == "fileUpload":
                # User either granted or denied permission to upload a file
                if body["value"]["action"] == "accept":
                    command = body["value"]["context"]["action_id"]
                    context["uploadInfo"] = body["value"]["uploadInfo"]
                else:
                    command = "cancel"

                command, subcommand, params = parse_command_string(command)
            else:
                # Content that we got from an interactive card
                command, subcommand, params = parse_command_string(body["value"]["action"])
                i = 0
                while True:
                    key = f"param_{i}"
                    if key not in body["value"]:
                        break
                    params.append(body["value"][key])
                    i += 1

            if settings.PLUGINS_CONFIG["nautobot_chatops"].get("delete_input_on_submission"):
                # Delete the card
                MSTeamsDispatcher(context).delete_message(body["replyToId"])
            if command.startswith("cancel"):
                # Nothing more to do
                return HttpResponse(status=200)
        else:
            command = ""
            subcommand = ""
            params = []

        registry = get_commands_registry()

        if command not in registry:
            MSTeamsDispatcher(context).send_markdown(commands_help())
            return HttpResponse(status=200)

        # Send "typing" indicator to the client so they know we received the request
        MSTeamsDispatcher(context).send_busy_indicator()

        return check_and_enqueue_command(registry, command, subcommand, params, context, MSTeamsDispatcher)
post(request, args, kwargs)

Process an inbound HTTP POST request.

Source code in nautobot_chatops/api/views/ms_teams.py
def post(self, request, *args, **kwargs):
    """Process an inbound HTTP POST request."""
    body = json.loads(request.body)

    valid, reason = verify_jwt_token(request.headers, body)
    if not valid:
        return HttpResponse(status=403, reason=reason)

    if body["type"] not in ["message", "invoke"]:
        return HttpResponse(status=200, reason=f"No support for {body['type']} notifications")

    context = {
        "request_scheme": request.scheme,
        "request_host": request.get_host(),
        # We don't get a team_id or a channel_id in direct message conversations
        "channel_id": body["channelData"].get("channel", {}).get("id"),
        "org_id": body["channelData"].get("team", {}).get("id"),
        # Note that the default channel in a team has channel_id == org_id
        "user_id": body["from"]["id"],
        "user_name": body["from"]["name"],
        "user_role": body["from"].get("role"),
        "conversation_id": body["conversation"]["id"],
        "conversation_name": body["conversation"].get("name"),
        "bot_id": body["recipient"]["id"],
        "bot_name": body["recipient"]["name"],
        "bot_role": body["recipient"].get("role"),
        "message_id": body["id"],
        "service_url": body["serviceUrl"],
        "tenant_id": body["channelData"]["tenant"]["id"],
        "is_group": body["conversation"].get("isGroup", False),
    }

    if context["org_id"]:
        # Get the organization name as well
        response = requests.get(
            f"{context['service_url']}/v3/teams/{context['org_id']}",
            headers={"Authorization": f"Bearer {MSTeamsDispatcher.get_token()}"},
        )
        response.raise_for_status()
        context["org_name"] = response.json()["name"]
    else:
        # Direct message - use the user as the "organization" - better than nothing
        context["org_id"] = context["user_id"]
        context["org_name"] = f"direct message with {context['user_name']}"

    if context["channel_id"]:
        # Get the channel name as well
        response = requests.get(
            f"{context['service_url']}/v3/teams/{context['org_id']}/conversations",
            headers={"Authorization": f"Bearer {MSTeamsDispatcher.get_token()}"},
        )
        response.raise_for_status()
        for conversation in response.json()["conversations"]:
            if conversation["id"] == context["channel_id"]:
                # The "General" channel has a null name
                context["channel_name"] = conversation["name"] or "General"
                break
    else:
        # Direct message - use the user as the "channel" - better than nothing
        context["channel_id"] = context["user_id"]
        context["channel_name"] = f"direct message with {context['user_name']}"

    if "text" in body:
        # A command typed directly by the user
        command = body["text"]

        # If we get @ed in a channel, the message will be "<at>NAutobot</at> command subcommand"
        command = re.sub(r"<at>.*</at>", "", command)

        command, subcommand, params = parse_command_string(command)
    elif "value" in body:
        if body["value"].get("type") == "fileUpload":
            # User either granted or denied permission to upload a file
            if body["value"]["action"] == "accept":
                command = body["value"]["context"]["action_id"]
                context["uploadInfo"] = body["value"]["uploadInfo"]
            else:
                command = "cancel"

            command, subcommand, params = parse_command_string(command)
        else:
            # Content that we got from an interactive card
            command, subcommand, params = parse_command_string(body["value"]["action"])
            i = 0
            while True:
                key = f"param_{i}"
                if key not in body["value"]:
                    break
                params.append(body["value"][key])
                i += 1

        if settings.PLUGINS_CONFIG["nautobot_chatops"].get("delete_input_on_submission"):
            # Delete the card
            MSTeamsDispatcher(context).delete_message(body["replyToId"])
        if command.startswith("cancel"):
            # Nothing more to do
            return HttpResponse(status=200)
    else:
        command = ""
        subcommand = ""
        params = []

    registry = get_commands_registry()

    if command not in registry:
        MSTeamsDispatcher(context).send_markdown(commands_help())
        return HttpResponse(status=200)

    # Send "typing" indicator to the client so they know we received the request
    MSTeamsDispatcher(context).send_busy_indicator()

    return check_and_enqueue_command(registry, command, subcommand, params, context, MSTeamsDispatcher)
get_bot_signing_keys(metadata_uri=BOT_CONNECTOR_METADATA_URI)

Get the keys used by the Bot Connector service to sign requests and the associated algorithms.

Source code in nautobot_chatops/api/views/ms_teams.py
def get_bot_signing_keys(metadata_uri=BOT_CONNECTOR_METADATA_URI):
    """Get the keys used by the Bot Connector service to sign requests and the associated algorithms."""
    response = requests.get(metadata_uri)
    id_token_signing_alg_values_supported = response.json()["id_token_signing_alg_values_supported"]
    jwks_uri = response.json()["jwks_uri"]

    response = requests.get(jwks_uri)
    # https://renzo.lucioni.xyz/verifying-jwts-with-jwks-and-pyjwt/
    public_keys = {}
    for jwk in response.json()["keys"]:
        kid = jwk["kid"]
        public_keys[kid] = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(jwk))

    # TODO: we're supposed to be able to cache this for up to 5 days rather than retrieving it every time
    return public_keys, id_token_signing_alg_values_supported
verify_jwt_token(request_headers, request_json)

Verify that an inbound HTTP request is appropriately signed with a valid JWT token.

References
  • https://docs.microsoft.com/en-us/azure/bot-service/rest-api/bot-framework-rest-connector-authentication? view=azure-bot-service-4.0#step-4-verify-the-jwt-token
  • https://github.com/microsoft/BotFramework-Emulator/pull/324

Returns:

Name Type Description
valid tuple

(valid, reason)

Source code in nautobot_chatops/api/views/ms_teams.py
def verify_jwt_token(request_headers, request_json):
    """Verify that an inbound HTTP request is appropriately signed with a valid JWT token.

    References:
      - https://docs.microsoft.com/en-us/azure/bot-service/rest-api/bot-framework-rest-connector-authentication?
        view=azure-bot-service-4.0#step-4-verify-the-jwt-token
      - https://github.com/microsoft/BotFramework-Emulator/pull/324

    Returns:
      valid (tuple): (valid, reason)
    """
    # 1. The token was sent in the HTTP Authorization header with Bearer scheme
    if "authorization" not in request_headers:
        return False, "no Authorization header present"
    auth_type, auth_token = request_headers.get("authorization").split(" ")
    if auth_type != "Bearer":
        return False, "incorrect authorization scheme"

    # Which key does the auth_token say we should use?
    kid = jwt.get_unverified_header(auth_token)["kid"]

    real_connector = True
    public_keys, algorithms = get_bot_signing_keys(BOT_CONNECTOR_METADATA_URI)
    if kid not in public_keys:
        # Maybe it was signed by the emulator instead?
        public_keys, algorithms = get_bot_signing_keys(BOT_EMULATOR_METADATA_URI)
        real_connector = False
        if kid not in public_keys:
            return False, "unknown/unrecognized kid {kid}"

    try:
        # 2. The token is valid JSON that conforms to the JWT standard.
        token_payload = jwt.decode(
            auth_token,
            # 6. The token has a valid cryptographic signature, with a key listed in the OpenID keys document,
            #    using a signing algorithm specified in the Open ID Metadata
            key=public_keys[kid],
            algorithms=algorithms,
            # 3. The token contains an "issuer" claim with value of https://api.botframework.com (for the real thing)
            # or https://login.microsoftonline.com/d6d49420-f39b-4df7-a1dc-d59a935871db/v2.0 (for the emulator)
            issuer=(
                "https://api.botframework.com"
                if real_connector
                else "https://login.microsoftonline.com/d6d49420-f39b-4df7-a1dc-d59a935871db/v2.0"
            ),
            # 4. The token contains an "audience" claim with a value equal to the bot's Microsoft App ID.
            audience=APP_ID,
            # 5. The token is within its validity period plus or minus 5 minutes.
            leeway=(5 * 60),
            options={
                # I think these default to True but better safe than sorry!
                "verify_iss": True,
                "verify_aud": True,
                "verify_nbf": True,
                "verify_exp": True,
                "verify_signature": True,
            },
        )
    except jwt.exceptions.InvalidTokenError as exc:
        return False, str(exc)

    # 7. The token contains a "serviceUrl" claim with value that matches the incoming request
    # In practice I see this is (sometimes?) labeled as "serviceurl"
    service_url = token_payload.get("serviceUrl", token_payload.get("serviceurl"))
    # The bot emulator doesn't seem to provide this claim, so only test if working with a real connector
    if real_connector and (not service_url or service_url != request_json.get("serviceUrl")):
        return False, f"Missing or incorrect serviceUrl claim ({service_url}) in token"

    return True, None

slack

Views to receive inbound notifications from Slack, parse them, and enqueue worker actions.

SlackInteractionView

Bases: View

Handle notifications resulting from a Slack interactive block or modal.

Source code in nautobot_chatops/api/views/slack.py
@method_decorator(csrf_exempt, name="dispatch")
class SlackInteractionView(View):
    """Handle notifications resulting from a Slack interactive block or modal."""

    http_method_names = ["post"]

    # pylint: disable=too-many-locals,too-many-return-statements,too-many-branches,too-many-statements
    def post(self, request, *args, **kwargs):
        """Handle an inbound HTTP POST request representing a user interaction with a UI element."""
        valid, reason = verify_signature(request)
        if not valid:
            return HttpResponse(status=401, reason=reason)

        payload = json.loads(request.POST.get("payload", ""))

        context = {
            "request_scheme": request.scheme,
            "request_host": request.get_host(),
            "org_id": payload.get("team", {}).get("id"),
            "org_name": payload.get("team", {}).get("domain"),
            "channel_id": payload.get("channel", {}).get("id"),
            "channel_name": payload.get("channel", {}).get("name"),
            "user_id": payload.get("user", {}).get("id"),
            "user_name": payload.get("user", {}).get("username"),
            "response_url": payload.get("response_url"),
            "trigger_id": payload.get("trigger_id"),
        }

        # Check for channel_name if channel_id is present
        if context["channel_name"] is None and context["channel_id"] is not None:
            # Build a Slack Client Object
            slack_client = WebClient(token=settings.PLUGINS_CONFIG["nautobot_chatops"]["slack_api_token"])

            # Get the channel information from Slack API
            channel_info = slack_client.conversations_info(channel=context["channel_id"])

            # Assign the Channel name out of the conversations info end point
            context["channel_name"] = channel_info["channel"]["name"]

        if "actions" in payload and payload["actions"]:
            # Block action triggered by a non-modal interactive component
            action = payload["actions"][0]
            action_id = action.get("action_id", "")
            block_id = action.get("block_id", "")
            if action["type"] == "static_select":
                value = action.get("selected_option", {}).get("value", "")
                selected_value = f"'{value}'"
            elif action["type"] == "button":
                value = action.get("value")
                selected_value = f"'{value}'"
            else:
                logger.error(f"Unhandled action type {action['type']}")
                return HttpResponse(status=500)

            if settings.PLUGINS_CONFIG["nautobot_chatops"].get("delete_input_on_submission"):
                # Delete the interactive element since it's served its purpose
                SlackDispatcher(context).delete_message(context["response_url"])
            if action_id == "action" and selected_value == "cancel":
                # Nothing more to do
                return HttpResponse()
        elif "view" in payload and payload["view"]:  # pylint: disable=too-many-nested-blocks
            # View submission triggered from a modal dialog
            logger.info("Submission triggered from a modal dialog")
            logger.info(json.dumps(payload, indent=2))
            values = payload["view"].get("state", {}).get("values", {})

            # Handling for multiple fields. This will be used when the multi_input_dialog() method of the Slack
            # Dispatcher class is utilized.
            if len(values) > 1:
                selected_value = ""
                callback_id = payload["view"].get("callback_id")
                # sometimes in the case of back-to-back dialogs there will be
                # parameters included in the callback_id.  Below parses those
                # out and adds them to selected_value.
                try:
                    cmds = shlex.split(callback_id)
                except ValueError as err:
                    logger.error("%s", err)
                    return HttpResponse(f"Error: {err} encountered when processing {callback_id}")
                for i, cmd in enumerate(cmds):
                    if i == 2:
                        selected_value += f"'{cmd}'"
                    elif i > 2:
                        selected_value += f" '{cmd}'"
                action_id = f"{cmds[0]} {cmds[1]}"

                sorted_params = sorted(values.keys())
                for blk_id in sorted_params:
                    for act_id in values[blk_id].values():
                        if act_id["type"] == "static_select":
                            try:
                                value = act_id["selected_option"]["value"]
                            except (AttributeError, TypeError):
                                # Error is thrown if no option selected and field is optional
                                value = None
                        elif act_id["type"] == "plain_text_input":
                            value = act_id["value"]
                        else:
                            logger.error(f"Unhandled dialog type {act_id['type']}")
                            return HttpResponse(status=500)

                        # If an optional parameter is passed, it is returned as a NoneType.
                        # We instead want to return an empty string, otherwise 'None' is returned as a string.
                        if value:
                            selected_value += f" '{value}'"
                        else:
                            selected_value += " ''"

            # Original un-modified single-field handling below
            else:
                block_id = sorted(values.keys())[0]
                action_id = sorted(values[block_id].keys())[0]
                action = values[block_id][action_id]
                if action["type"] == "plain_text_input":
                    value = action["value"]
                    selected_value = f"'{value}'"
                else:
                    logger.error(f"Unhandled action type {action['type']}")
                    return HttpResponse(status=500)

            # Modal view submissions don't generally contain a channel ID, but we hide one for our convenience:
            if "private_metadata" in payload["view"]:
                private_metadata = json.loads(payload["view"]["private_metadata"])
                if "channel_id" in private_metadata:
                    context["channel_id"] = private_metadata["channel_id"]
        else:
            return HttpResponse("I didn't understand that notification.")

        logger.info(f"action_id: {action_id}, selected_value: {selected_value}")
        try:
            command, subcommand, params = parse_command_string(f"{action_id} {selected_value}")
        except ValueError as err:
            logger.error("%s", err)
            # Tried sending 400 error, but the friendly message never made it to slack.
            return HttpResponse(f"'Error: {err}' encountered on command '{action_id} {selected_value}'.")

        # Convert empty parameter strings to NoneType
        for idx, param in enumerate(params):
            if not param:
                params[idx] = None

        logger.info(f"command: {command}, subcommand: {subcommand}, params: {params}")

        registry = get_commands_registry()

        if command not in registry:
            SlackDispatcher(context).send_markdown(commands_help(prefix=SLASH_PREFIX))
            return HttpResponse()

        # What we'd like to do here is send a "Nautobot is typing..." to the channel,
        # but unfortunately the API we're using doesn't support that (only the legacy/deprecated RTM API does).
        # SlackDispatcher(context).send_busy_indicator()

        return check_and_enqueue_command(registry, command, subcommand, params, context, SlackDispatcher)
post(request, args, kwargs)

Handle an inbound HTTP POST request representing a user interaction with a UI element.

Source code in nautobot_chatops/api/views/slack.py
def post(self, request, *args, **kwargs):
    """Handle an inbound HTTP POST request representing a user interaction with a UI element."""
    valid, reason = verify_signature(request)
    if not valid:
        return HttpResponse(status=401, reason=reason)

    payload = json.loads(request.POST.get("payload", ""))

    context = {
        "request_scheme": request.scheme,
        "request_host": request.get_host(),
        "org_id": payload.get("team", {}).get("id"),
        "org_name": payload.get("team", {}).get("domain"),
        "channel_id": payload.get("channel", {}).get("id"),
        "channel_name": payload.get("channel", {}).get("name"),
        "user_id": payload.get("user", {}).get("id"),
        "user_name": payload.get("user", {}).get("username"),
        "response_url": payload.get("response_url"),
        "trigger_id": payload.get("trigger_id"),
    }

    # Check for channel_name if channel_id is present
    if context["channel_name"] is None and context["channel_id"] is not None:
        # Build a Slack Client Object
        slack_client = WebClient(token=settings.PLUGINS_CONFIG["nautobot_chatops"]["slack_api_token"])

        # Get the channel information from Slack API
        channel_info = slack_client.conversations_info(channel=context["channel_id"])

        # Assign the Channel name out of the conversations info end point
        context["channel_name"] = channel_info["channel"]["name"]

    if "actions" in payload and payload["actions"]:
        # Block action triggered by a non-modal interactive component
        action = payload["actions"][0]
        action_id = action.get("action_id", "")
        block_id = action.get("block_id", "")
        if action["type"] == "static_select":
            value = action.get("selected_option", {}).get("value", "")
            selected_value = f"'{value}'"
        elif action["type"] == "button":
            value = action.get("value")
            selected_value = f"'{value}'"
        else:
            logger.error(f"Unhandled action type {action['type']}")
            return HttpResponse(status=500)

        if settings.PLUGINS_CONFIG["nautobot_chatops"].get("delete_input_on_submission"):
            # Delete the interactive element since it's served its purpose
            SlackDispatcher(context).delete_message(context["response_url"])
        if action_id == "action" and selected_value == "cancel":
            # Nothing more to do
            return HttpResponse()
    elif "view" in payload and payload["view"]:  # pylint: disable=too-many-nested-blocks
        # View submission triggered from a modal dialog
        logger.info("Submission triggered from a modal dialog")
        logger.info(json.dumps(payload, indent=2))
        values = payload["view"].get("state", {}).get("values", {})

        # Handling for multiple fields. This will be used when the multi_input_dialog() method of the Slack
        # Dispatcher class is utilized.
        if len(values) > 1:
            selected_value = ""
            callback_id = payload["view"].get("callback_id")
            # sometimes in the case of back-to-back dialogs there will be
            # parameters included in the callback_id.  Below parses those
            # out and adds them to selected_value.
            try:
                cmds = shlex.split(callback_id)
            except ValueError as err:
                logger.error("%s", err)
                return HttpResponse(f"Error: {err} encountered when processing {callback_id}")
            for i, cmd in enumerate(cmds):
                if i == 2:
                    selected_value += f"'{cmd}'"
                elif i > 2:
                    selected_value += f" '{cmd}'"
            action_id = f"{cmds[0]} {cmds[1]}"

            sorted_params = sorted(values.keys())
            for blk_id in sorted_params:
                for act_id in values[blk_id].values():
                    if act_id["type"] == "static_select":
                        try:
                            value = act_id["selected_option"]["value"]
                        except (AttributeError, TypeError):
                            # Error is thrown if no option selected and field is optional
                            value = None
                    elif act_id["type"] == "plain_text_input":
                        value = act_id["value"]
                    else:
                        logger.error(f"Unhandled dialog type {act_id['type']}")
                        return HttpResponse(status=500)

                    # If an optional parameter is passed, it is returned as a NoneType.
                    # We instead want to return an empty string, otherwise 'None' is returned as a string.
                    if value:
                        selected_value += f" '{value}'"
                    else:
                        selected_value += " ''"

        # Original un-modified single-field handling below
        else:
            block_id = sorted(values.keys())[0]
            action_id = sorted(values[block_id].keys())[0]
            action = values[block_id][action_id]
            if action["type"] == "plain_text_input":
                value = action["value"]
                selected_value = f"'{value}'"
            else:
                logger.error(f"Unhandled action type {action['type']}")
                return HttpResponse(status=500)

        # Modal view submissions don't generally contain a channel ID, but we hide one for our convenience:
        if "private_metadata" in payload["view"]:
            private_metadata = json.loads(payload["view"]["private_metadata"])
            if "channel_id" in private_metadata:
                context["channel_id"] = private_metadata["channel_id"]
    else:
        return HttpResponse("I didn't understand that notification.")

    logger.info(f"action_id: {action_id}, selected_value: {selected_value}")
    try:
        command, subcommand, params = parse_command_string(f"{action_id} {selected_value}")
    except ValueError as err:
        logger.error("%s", err)
        # Tried sending 400 error, but the friendly message never made it to slack.
        return HttpResponse(f"'Error: {err}' encountered on command '{action_id} {selected_value}'.")

    # Convert empty parameter strings to NoneType
    for idx, param in enumerate(params):
        if not param:
            params[idx] = None

    logger.info(f"command: {command}, subcommand: {subcommand}, params: {params}")

    registry = get_commands_registry()

    if command not in registry:
        SlackDispatcher(context).send_markdown(commands_help(prefix=SLASH_PREFIX))
        return HttpResponse()

    # What we'd like to do here is send a "Nautobot is typing..." to the channel,
    # but unfortunately the API we're using doesn't support that (only the legacy/deprecated RTM API does).
    # SlackDispatcher(context).send_busy_indicator()

    return check_and_enqueue_command(registry, command, subcommand, params, context, SlackDispatcher)
SlackSlashCommandView

Bases: View

Handle notifications from a Slack /command.

Source code in nautobot_chatops/api/views/slack.py
@method_decorator(csrf_exempt, name="dispatch")
class SlackSlashCommandView(View):
    """Handle notifications from a Slack /command."""

    http_method_names = ["post"]

    def post(self, request, *args, **kwargs):
        """Handle an inbound HTTP POST request representing a user-issued /command."""
        valid, reason = verify_signature(request)
        if not valid:
            return HttpResponse(status=401, reason=reason)

        command = request.POST.get("command")
        if not command:
            return HttpResponse("No command specified")
        command = command.replace(SLASH_PREFIX, "")
        params = request.POST.get("text", "")
        context = {
            "request_scheme": request.scheme,
            "request_host": request.get_host(),
            "org_id": request.POST.get("team_id"),
            "org_name": request.POST.get("team_domain"),
            "channel_id": request.POST.get("channel_id"),
            "channel_name": request.POST.get("channel_name"),
            "user_id": request.POST.get("user_id"),
            "user_name": request.POST.get("user_name"),
            "response_url": request.POST.get("response_url"),
            "trigger_id": request.POST.get("trigger_id"),
        }
        try:
            command, subcommand, params = parse_command_string(f"{command} {params}")
        except ValueError as err:
            logger.error("%s", err)
            # Tried sending 400 error, but the friendly message never made it to slack.
            return HttpResponse(f"'Error: {err}' encountered on command '{command} {params}'.")

        registry = get_commands_registry()

        if command not in registry:
            SlackDispatcher(context).send_markdown(commands_help(prefix=SLASH_PREFIX))
            return HttpResponse()

        # What we'd like to do here is send a "Nautobot is typing..." to the channel,
        # but unfortunately the API we're using doesn't support that (only the legacy/deprecated RTM API does).
        # SlackDispatcher(context).send_busy_indicator()

        return check_and_enqueue_command(registry, command, subcommand, params, context, SlackDispatcher)
post(request, args, kwargs)

Handle an inbound HTTP POST request representing a user-issued /command.

Source code in nautobot_chatops/api/views/slack.py
def post(self, request, *args, **kwargs):
    """Handle an inbound HTTP POST request representing a user-issued /command."""
    valid, reason = verify_signature(request)
    if not valid:
        return HttpResponse(status=401, reason=reason)

    command = request.POST.get("command")
    if not command:
        return HttpResponse("No command specified")
    command = command.replace(SLASH_PREFIX, "")
    params = request.POST.get("text", "")
    context = {
        "request_scheme": request.scheme,
        "request_host": request.get_host(),
        "org_id": request.POST.get("team_id"),
        "org_name": request.POST.get("team_domain"),
        "channel_id": request.POST.get("channel_id"),
        "channel_name": request.POST.get("channel_name"),
        "user_id": request.POST.get("user_id"),
        "user_name": request.POST.get("user_name"),
        "response_url": request.POST.get("response_url"),
        "trigger_id": request.POST.get("trigger_id"),
    }
    try:
        command, subcommand, params = parse_command_string(f"{command} {params}")
    except ValueError as err:
        logger.error("%s", err)
        # Tried sending 400 error, but the friendly message never made it to slack.
        return HttpResponse(f"'Error: {err}' encountered on command '{command} {params}'.")

    registry = get_commands_registry()

    if command not in registry:
        SlackDispatcher(context).send_markdown(commands_help(prefix=SLASH_PREFIX))
        return HttpResponse()

    # What we'd like to do here is send a "Nautobot is typing..." to the channel,
    # but unfortunately the API we're using doesn't support that (only the legacy/deprecated RTM API does).
    # SlackDispatcher(context).send_busy_indicator()

    return check_and_enqueue_command(registry, command, subcommand, params, context, SlackDispatcher)
generate_signature(request)

Calculate the expected signature of a given request.

Source code in nautobot_chatops/api/views/slack.py
def generate_signature(request):
    """Calculate the expected signature of a given request."""
    version = "v0"
    # The existence of this header should already have been checked
    timestamp = request.headers.get("X-Slack-Request-Timestamp")
    body = request.body.decode("utf-8")
    base_string = f"{version}:{timestamp}:{body}".encode("utf-8")
    signing_secret = settings.PLUGINS_CONFIG["nautobot_chatops"].get("slack_signing_secret").encode("utf-8")

    computed_signature = hmac.new(signing_secret, base_string, digestmod=hashlib.sha256).hexdigest()
    return f"{version}={computed_signature}"
verify_signature(request)

Verify that a given request was legitimately signed by Slack.

https://api.slack.com/authentication/verifying-requests-from-slack

Returns:

Name Type Description
valid tuple

(valid, reason)

Source code in nautobot_chatops/api/views/slack.py
def verify_signature(request):
    """Verify that a given request was legitimately signed by Slack.

    https://api.slack.com/authentication/verifying-requests-from-slack

    Returns:
      valid (tuple): (valid, reason)
    """
    expected_signature = request.headers.get("X-Slack-Signature")
    if not expected_signature:
        signature_error_cntr.labels("slack", "missing_signature").inc()
        return False, "Missing X-Slack-Signature header"
    timestamp = request.headers.get("X-Slack-Request-Timestamp")
    if not timestamp:
        signature_error_cntr.labels("slack", "missing_timestamp").inc()
        return False, "Missing X-Slack-Request-Timestamp header"

    computed_signature = generate_signature(request)

    if computed_signature != expected_signature:
        signature_error_cntr.labels("slack", "incorrect_signature").inc()
        return False, "Incorrect signature"

    return True, "Signature is valid"

webex

Views to receive inbound notifications from WebEx, parse them, and enqueue worker actions.

WebExView

Bases: View

Handle all supported inbound notifications from WebEx.

Source code in nautobot_chatops/api/views/webex.py
@method_decorator(csrf_exempt, name="dispatch")
class WebExView(View):
    """Handle all supported inbound notifications from WebEx."""

    http_method_names = ["post"]

    # pylint: disable=too-many-locals,too-many-return-statements,too-many-branches
    def post(self, request, *args, **kwargs):
        """Process an inbound HTTP POST request."""
        if not API:
            return HttpResponse(reason="Incomplete or incorrect bot setup")

        valid, reason = verify_signature(request)
        if not valid:
            return HttpResponse(status=401, reason=reason)

        body = json.loads(request.body)

        if body.get("resource") not in ["messages", "attachmentActions"] or body.get("event") != "created":
            return HttpResponse(reason="No support for {body.get('resource')} {body.get('event')} notifications.")

        data = body.get("data", {})
        if data.get("personId") == BOT_ID:
            logger.info("Ignoring message that we are the sender of.")
            return HttpResponse(200)

        context = {
            "request_scheme": request.scheme,
            "request_host": request.get_host(),
            "org_id": body.get("orgId"),
            "channel_id": data.get("roomId"),
            "user_id": data.get("personId"),
            # In a 'attachmentActions' notification, the relevant message ID is 'messageId'.
            # In a 'messages' notification, the relevant message ID is 'id'.
            "message_id": data.get("messageId") or data.get("id"),
        }

        # In WebEx, the webhook doesn't contain the user/channel/org names. We have to call back for them.
        # For whatever reason, API.organizations.get() is only permitted by admin users, which the bot is not.
        # context["org_name"] = API.organizations.get(context["org_id"]).displayName
        context["channel_name"] = API.rooms.get(context["channel_id"]).title
        context["user_name"] = API.people.get(context["user_id"]).displayName

        if body.get("resource") == "messages":
            # In WebEx, the webhook notification doesn't contain the message text. We have to call back for it.
            message = API.messages.get(context["message_id"])
            command = message.text.strip()
            # Check for a mention of the bot in the HTML (i.e., if this is not a direct message), and remove it if so.
            if message.html:
                bot_mention = re.search("<spark-mention.*?>(.+?)</spark-mention>", message.html)
                if bot_mention:
                    command = re.sub(bot_mention.group(1), "", command).strip()
            command, subcommand, params = parse_command_string(command)
        elif body.get("resource") == "attachmentActions":
            # In WebEx, the webhook notification doesn't contain the action details. We have to call back for it.
            action = API.attachment_actions.get(body.get("data", {}).get("id"))
            if settings.PLUGINS_CONFIG["nautobot_chatops"].get("delete_input_on_submission"):
                # Delete the card that this action was triggered from
                WebExDispatcher(context).delete_message(context["message_id"])
            if action.inputs.get("action") == "cancel":
                return HttpResponse(status=200)
            command, subcommand, params = parse_command_string(action.inputs.get("action"))
            i = 0
            while True:
                key = f"param_{i}"
                if key not in action.inputs:
                    break
                params.append(action.inputs[key])
                i += 1

        registry = get_commands_registry()

        if command not in registry:
            WebExDispatcher(context).send_markdown(commands_help())
            return HttpResponse(status=200)

        return check_and_enqueue_command(registry, command, subcommand, params, context, WebExDispatcher)
post(request, args, kwargs)

Process an inbound HTTP POST request.

Source code in nautobot_chatops/api/views/webex.py
def post(self, request, *args, **kwargs):
    """Process an inbound HTTP POST request."""
    if not API:
        return HttpResponse(reason="Incomplete or incorrect bot setup")

    valid, reason = verify_signature(request)
    if not valid:
        return HttpResponse(status=401, reason=reason)

    body = json.loads(request.body)

    if body.get("resource") not in ["messages", "attachmentActions"] or body.get("event") != "created":
        return HttpResponse(reason="No support for {body.get('resource')} {body.get('event')} notifications.")

    data = body.get("data", {})
    if data.get("personId") == BOT_ID:
        logger.info("Ignoring message that we are the sender of.")
        return HttpResponse(200)

    context = {
        "request_scheme": request.scheme,
        "request_host": request.get_host(),
        "org_id": body.get("orgId"),
        "channel_id": data.get("roomId"),
        "user_id": data.get("personId"),
        # In a 'attachmentActions' notification, the relevant message ID is 'messageId'.
        # In a 'messages' notification, the relevant message ID is 'id'.
        "message_id": data.get("messageId") or data.get("id"),
    }

    # In WebEx, the webhook doesn't contain the user/channel/org names. We have to call back for them.
    # For whatever reason, API.organizations.get() is only permitted by admin users, which the bot is not.
    # context["org_name"] = API.organizations.get(context["org_id"]).displayName
    context["channel_name"] = API.rooms.get(context["channel_id"]).title
    context["user_name"] = API.people.get(context["user_id"]).displayName

    if body.get("resource") == "messages":
        # In WebEx, the webhook notification doesn't contain the message text. We have to call back for it.
        message = API.messages.get(context["message_id"])
        command = message.text.strip()
        # Check for a mention of the bot in the HTML (i.e., if this is not a direct message), and remove it if so.
        if message.html:
            bot_mention = re.search("<spark-mention.*?>(.+?)</spark-mention>", message.html)
            if bot_mention:
                command = re.sub(bot_mention.group(1), "", command).strip()
        command, subcommand, params = parse_command_string(command)
    elif body.get("resource") == "attachmentActions":
        # In WebEx, the webhook notification doesn't contain the action details. We have to call back for it.
        action = API.attachment_actions.get(body.get("data", {}).get("id"))
        if settings.PLUGINS_CONFIG["nautobot_chatops"].get("delete_input_on_submission"):
            # Delete the card that this action was triggered from
            WebExDispatcher(context).delete_message(context["message_id"])
        if action.inputs.get("action") == "cancel":
            return HttpResponse(status=200)
        command, subcommand, params = parse_command_string(action.inputs.get("action"))
        i = 0
        while True:
            key = f"param_{i}"
            if key not in action.inputs:
                break
            params.append(action.inputs[key])
            i += 1

    registry = get_commands_registry()

    if command not in registry:
        WebExDispatcher(context).send_markdown(commands_help())
        return HttpResponse(status=200)

    return check_and_enqueue_command(registry, command, subcommand, params, context, WebExDispatcher)
generate_signature(request)

Calculate the expected signature of a given request.

Source code in nautobot_chatops/api/views/webex.py
def generate_signature(request):
    """Calculate the expected signature of a given request."""
    # v1.4.0 Deprecation warning
    if settings.PLUGINS_CONFIG["nautobot_chatops"].get("webex_teams_signing_secret") and not settings.PLUGINS_CONFIG[
        "nautobot_chatops"
    ].get("webex_signing_secret"):
        signing_secret = settings.PLUGINS_CONFIG["nautobot_chatops"].get("webex_teams_signing_secret").encode("utf-8")
        logger.warning(
            "The 'webex_teams_signing_secret' setting is deprecated, please use 'webex_signing_secret' instead"
        )
    else:
        try:
            signing_secret = settings.PLUGINS_CONFIG["nautobot_chatops"]["webex_signing_secret"].encode("utf-8")
        except KeyError as error:
            error_msg = "The 'webex_signing_secret' setting must be configured"
            logger.error(error_msg)
            raise KeyError(error_msg) from error

    return hmac.new(signing_secret, request.body, digestmod=hashlib.sha1).hexdigest()
verify_signature(request)

Verify that a given request was legitimately signed by WebEx.

https://developer.webex.com/docs/api/guides/webhooks#handling-requests-from-webex-teams

Returns:

Name Type Description
valid tuple

(valid, reason)

Source code in nautobot_chatops/api/views/webex.py
def verify_signature(request):
    """Verify that a given request was legitimately signed by WebEx.

    https://developer.webex.com/docs/api/guides/webhooks#handling-requests-from-webex-teams

    Returns:
      valid (tuple): (valid, reason)
    """
    expected_signature = request.headers.get("X-Spark-Signature")
    if not expected_signature:
        return False, "Missing X-Spark-Signature header"

    computed_signature = generate_signature(request)

    if expected_signature != computed_signature:
        return False, "Incorrect signature"

    return True, "Valid signature"