Skip to content

Nautobot Secrets Providers Package

nautobot_secrets_providers.providers

Nautobot Secrets Providers.

AWSSecretsManagerSecretsProvider

Bases: SecretsProvider

A secrets provider for AWS Secrets Manager.

Source code in nautobot_secrets_providers/providers/aws.py
class AWSSecretsManagerSecretsProvider(SecretsProvider):
    """A secrets provider for AWS Secrets Manager."""

    slug = "aws-secrets-manager"
    name = "AWS Secrets Manager"
    is_available = boto3 is not None

    # TBD: Remove after pylint-nautobot bump
    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for AWS Secrets Manager."""

        name = forms.CharField(
            required=True,
            help_text="The name of the AWS Secrets Manager secret",
        )
        region = forms.CharField(
            required=True,
            help_text="The region name of the AWS Secrets Manager secret",
        )
        key = forms.CharField(
            required=True,
            help_text="The key of the AWS Secrets Manager secret",
        )

    @classmethod
    def get_value_for_secret(cls, secret, obj=None, **kwargs):
        """Return the secret value by name and region."""
        # Extract the parameters from the Secret.
        parameters = secret.rendered_parameters(obj=obj)

        secret_name = parameters.get("name")
        secret_key = parameters.get("key")
        region_name = parameters.get("region")

        # Create a Secrets Manager client.
        session = boto3.session.Session()
        client = session.client(service_name="secretsmanager", region_name=region_name)

        # This is based on sample code to only handle the specific exceptions for the 'GetSecretValue' API.
        # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
        # We rethrow the exception by default.
        try:
            get_secret_value_response = client.get_secret_value(SecretId=secret_name)
        except ClientError as err:
            if err.response["Error"]["Code"] == "DecryptionFailureException":  # pylint: disable=no-else-raise
                # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
                # Deal with the exception here, and/or rethrow at your discretion.
                raise exceptions.SecretProviderError(secret, cls, str(err))
            elif err.response["Error"]["Code"] == "InternalServiceErrorException":
                # An error occurred on the server side.
                # Deal with the exception here, and/or rethrow at your discretion.
                raise exceptions.SecretProviderError(secret, cls, str(err))
            elif err.response["Error"]["Code"] == "InvalidParameterException":
                # You provided an invalid value for a parameter.
                # Deal with the exception here, and/or rethrow at your discretion.
                raise exceptions.SecretParametersError(secret, cls, str(err))
            elif err.response["Error"]["Code"] == "InvalidRequestException":
                # You provided a parameter value that is not valid for the current state of the resource.
                # Deal with the exception here, and/or rethrow at your discretion.
                raise exceptions.SecretProviderError(secret, cls, str(err))
            elif err.response["Error"]["Code"] == "ResourceNotFoundException":
                # We can't find the resource that you asked for.
                # Deal with the exception here, and/or rethrow at your discretion.
                raise exceptions.SecretValueNotFoundError(secret, cls, str(err))
            else:
                # We got an error that isn't defined above
                raise exceptions.SecretProviderError(secret, cls, str(err))
        else:
            # Decrypts secret using the associated KMS CMK.
            # Depending on whether the secret is a string or binary, one of these fields will be populated.
            if "SecretString" in get_secret_value_response:
                secret_value = get_secret_value_response["SecretString"]
            else:
                # TODO(jathan): Do we care about this? Let's figure out what to do about a binary value?
                secret_value = base64.b64decode(get_secret_value_response["SecretBinary"])  # noqa

        # If we get this far it should be valid JSON.
        data = json.loads(secret_value)

        # Retrieve the value using the key or complain loudly.
        try:
            return data[secret_key]
        except KeyError as err:
            msg = f"The secret value could not be retrieved using key {err}"
            raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err

ParametersForm

Bases: BootstrapMixin, Form

Required parameters for AWS Secrets Manager.

Source code in nautobot_secrets_providers/providers/aws.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for AWS Secrets Manager."""

    name = forms.CharField(
        required=True,
        help_text="The name of the AWS Secrets Manager secret",
    )
    region = forms.CharField(
        required=True,
        help_text="The region name of the AWS Secrets Manager secret",
    )
    key = forms.CharField(
        required=True,
        help_text="The key of the AWS Secrets Manager secret",
    )

get_value_for_secret(secret, obj=None, **kwargs) classmethod

Return the secret value by name and region.

Source code in nautobot_secrets_providers/providers/aws.py
@classmethod
def get_value_for_secret(cls, secret, obj=None, **kwargs):
    """Return the secret value by name and region."""
    # Extract the parameters from the Secret.
    parameters = secret.rendered_parameters(obj=obj)

    secret_name = parameters.get("name")
    secret_key = parameters.get("key")
    region_name = parameters.get("region")

    # Create a Secrets Manager client.
    session = boto3.session.Session()
    client = session.client(service_name="secretsmanager", region_name=region_name)

    # This is based on sample code to only handle the specific exceptions for the 'GetSecretValue' API.
    # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
    # We rethrow the exception by default.
    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    except ClientError as err:
        if err.response["Error"]["Code"] == "DecryptionFailureException":  # pylint: disable=no-else-raise
            # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise exceptions.SecretProviderError(secret, cls, str(err))
        elif err.response["Error"]["Code"] == "InternalServiceErrorException":
            # An error occurred on the server side.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise exceptions.SecretProviderError(secret, cls, str(err))
        elif err.response["Error"]["Code"] == "InvalidParameterException":
            # You provided an invalid value for a parameter.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise exceptions.SecretParametersError(secret, cls, str(err))
        elif err.response["Error"]["Code"] == "InvalidRequestException":
            # You provided a parameter value that is not valid for the current state of the resource.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise exceptions.SecretProviderError(secret, cls, str(err))
        elif err.response["Error"]["Code"] == "ResourceNotFoundException":
            # We can't find the resource that you asked for.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise exceptions.SecretValueNotFoundError(secret, cls, str(err))
        else:
            # We got an error that isn't defined above
            raise exceptions.SecretProviderError(secret, cls, str(err))
    else:
        # Decrypts secret using the associated KMS CMK.
        # Depending on whether the secret is a string or binary, one of these fields will be populated.
        if "SecretString" in get_secret_value_response:
            secret_value = get_secret_value_response["SecretString"]
        else:
            # TODO(jathan): Do we care about this? Let's figure out what to do about a binary value?
            secret_value = base64.b64decode(get_secret_value_response["SecretBinary"])  # noqa

    # If we get this far it should be valid JSON.
    data = json.loads(secret_value)

    # Retrieve the value using the key or complain loudly.
    try:
        return data[secret_key]
    except KeyError as err:
        msg = f"The secret value could not be retrieved using key {err}"
        raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err

AWSSystemsManagerParameterStore

Bases: SecretsProvider

A secrets provider for AWS Systems Manager Parameter Store.

Documentation: https://docs.aws.amazon.com/systems-manager/latest/userguide/systems-manager-parameter-store.html

Source code in nautobot_secrets_providers/providers/aws.py
class AWSSystemsManagerParameterStore(SecretsProvider):
    """
    A secrets provider for AWS Systems Manager Parameter Store.

    Documentation: https://docs.aws.amazon.com/systems-manager/latest/userguide/systems-manager-parameter-store.html
    """

    slug = "aws-sm-parameter-store"
    name = "AWS Systems Manager Parameter Store"
    is_available = boto3 is not None

    # TBD: Remove after pylint-nautobot bump
    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for AWS Parameter Store."""

        name = forms.CharField(
            required=True,
            help_text="The name of the AWS Parameter Store secret",
        )
        region = forms.CharField(
            required=True,
            help_text="The region name of the AWS Parameter Store secret",
        )
        key = forms.CharField(
            required=True,
            help_text="The key name to retrieve from AWS Parameter Store",
        )

    @classmethod
    def get_value_for_secret(cls, secret, obj=None, **kwargs):
        """Return the parameter value by name and region."""
        # Extract the parameters from the Nautobot secret.
        parameters = secret.rendered_parameters(obj=obj)

        # Create a SSM client.
        session = boto3.session.Session()
        client = session.client(service_name="ssm", region_name=parameters.get("region"))
        try:
            get_secret_value_response = client.get_parameter(Name=parameters.get("name"), WithDecryption=True)
        except ClientError as err:
            if err.response["Error"]["Code"] == "ParameterNotFound":
                raise exceptions.SecretParametersError(secret, cls, str(err))

            if err.response["Error"]["Code"] == "ParameterVersionNotFound":
                raise exceptions.SecretValueNotFoundError(secret, cls, str(err))

            raise exceptions.SecretProviderError(secret, cls, str(err))

        try:
            # Fetch the Value field from the parameter which must be a json field.
            data = json.loads(get_secret_value_response["Parameter"]["Value"])
        except ValueError as err:
            msg = "InvalidJson"
            raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err

        try:
            # Return the value of the secret key configured in the nautobot secret.
            return data[parameters.get("key")]
        except KeyError as err:
            msg = f"InvalidKeyName {err}"
            raise exceptions.SecretParametersError(secret, cls, msg) from err

ParametersForm

Bases: BootstrapMixin, Form

Required parameters for AWS Parameter Store.

Source code in nautobot_secrets_providers/providers/aws.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for AWS Parameter Store."""

    name = forms.CharField(
        required=True,
        help_text="The name of the AWS Parameter Store secret",
    )
    region = forms.CharField(
        required=True,
        help_text="The region name of the AWS Parameter Store secret",
    )
    key = forms.CharField(
        required=True,
        help_text="The key name to retrieve from AWS Parameter Store",
    )

get_value_for_secret(secret, obj=None, **kwargs) classmethod

Return the parameter value by name and region.

Source code in nautobot_secrets_providers/providers/aws.py
@classmethod
def get_value_for_secret(cls, secret, obj=None, **kwargs):
    """Return the parameter value by name and region."""
    # Extract the parameters from the Nautobot secret.
    parameters = secret.rendered_parameters(obj=obj)

    # Create a SSM client.
    session = boto3.session.Session()
    client = session.client(service_name="ssm", region_name=parameters.get("region"))
    try:
        get_secret_value_response = client.get_parameter(Name=parameters.get("name"), WithDecryption=True)
    except ClientError as err:
        if err.response["Error"]["Code"] == "ParameterNotFound":
            raise exceptions.SecretParametersError(secret, cls, str(err))

        if err.response["Error"]["Code"] == "ParameterVersionNotFound":
            raise exceptions.SecretValueNotFoundError(secret, cls, str(err))

        raise exceptions.SecretProviderError(secret, cls, str(err))

    try:
        # Fetch the Value field from the parameter which must be a json field.
        data = json.loads(get_secret_value_response["Parameter"]["Value"])
    except ValueError as err:
        msg = "InvalidJson"
        raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err

    try:
        # Return the value of the secret key configured in the nautobot secret.
        return data[parameters.get("key")]
    except KeyError as err:
        msg = f"InvalidKeyName {err}"
        raise exceptions.SecretParametersError(secret, cls, msg) from err

AzureKeyVaultSecretsProvider

Bases: SecretsProvider

A secrets provider for Azure Key Vault.

Source code in nautobot_secrets_providers/providers/azure.py
class AzureKeyVaultSecretsProvider(SecretsProvider):
    """A secrets provider for Azure Key Vault."""

    slug = "azure-key-vault"
    name = "Azure Key Vault"
    is_available = azure_available

    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for Azure Key Vault."""

        vault_url = forms.CharField(
            required=True,
            help_text="The URL of the Azure Key Vault",
        )
        secret_name = forms.CharField(
            required=True,
            help_text="The name of the secret in the Azure Key Vault",
        )

    @classmethod
    def get_value_for_secret(cls, secret, obj=None, **kwargs):
        """Return the secret value by name from Azure Key Vault."""
        # Extract the parameters from the Secret.
        parameters = secret.rendered_parameters(obj=obj)
        vault_url = parameters.get("vault_url")
        secret_name = parameters.get("secret_name")

        # Authenticate with Azure Key Vault using default credentials.
        # This assumes that environment variables for Azure authentication are set.
        credential = DefaultAzureCredential()
        client = SecretClient(vault_url=vault_url, credential=credential)

        try:
            # Retrieve the secret from Azure Key Vault.
            response = client.get_secret(secret_name)
        except Exception as err:
            # Handle exceptions from the Azure SDK.
            raise exceptions.SecretProviderError(secret, cls, str(err))

        # The value is in the 'value' attribute of the response.
        secret_value = response.value

        # Return the secret value.
        return secret_value

ParametersForm

Bases: BootstrapMixin, Form

Required parameters for Azure Key Vault.

Source code in nautobot_secrets_providers/providers/azure.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for Azure Key Vault."""

    vault_url = forms.CharField(
        required=True,
        help_text="The URL of the Azure Key Vault",
    )
    secret_name = forms.CharField(
        required=True,
        help_text="The name of the secret in the Azure Key Vault",
    )

get_value_for_secret(secret, obj=None, **kwargs) classmethod

Return the secret value by name from Azure Key Vault.

Source code in nautobot_secrets_providers/providers/azure.py
@classmethod
def get_value_for_secret(cls, secret, obj=None, **kwargs):
    """Return the secret value by name from Azure Key Vault."""
    # Extract the parameters from the Secret.
    parameters = secret.rendered_parameters(obj=obj)
    vault_url = parameters.get("vault_url")
    secret_name = parameters.get("secret_name")

    # Authenticate with Azure Key Vault using default credentials.
    # This assumes that environment variables for Azure authentication are set.
    credential = DefaultAzureCredential()
    client = SecretClient(vault_url=vault_url, credential=credential)

    try:
        # Retrieve the secret from Azure Key Vault.
        response = client.get_secret(secret_name)
    except Exception as err:
        # Handle exceptions from the Azure SDK.
        raise exceptions.SecretProviderError(secret, cls, str(err))

    # The value is in the 'value' attribute of the response.
    secret_value = response.value

    # Return the secret value.
    return secret_value

DelineaSecretServerSecretsProviderId

Bases: DelineaSecretServerSecretsProviderBase

A secrets provider for Delinea Secret Server.

Source code in nautobot_secrets_providers/providers/delinea.py
class DelineaSecretServerSecretsProviderId(DelineaSecretServerSecretsProviderBase):
    """A secrets provider for Delinea Secret Server."""

    slug = "delinea-tss-id"  # type: ignore
    name = "Delinea Secret Server by ID"  # type: ignore

    # TBD: Remove after pylint-nautobot bump
    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for Delinea Secret Server."""

        secret_id = forms.IntegerField(
            label="Secret ID",
            required=True,
            min_value=1,
            help_text="The secret-id used to select the entry in Delinea Secret Server.",
        )
        secret_selected_value = forms.ChoiceField(
            label="Return value",
            required=True,
            choices=DelineaSecretChoices,
            help_text="Select which value to return.",
        )

ParametersForm

Bases: BootstrapMixin, Form

Required parameters for Delinea Secret Server.

Source code in nautobot_secrets_providers/providers/delinea.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for Delinea Secret Server."""

    secret_id = forms.IntegerField(
        label="Secret ID",
        required=True,
        min_value=1,
        help_text="The secret-id used to select the entry in Delinea Secret Server.",
    )
    secret_selected_value = forms.ChoiceField(
        label="Return value",
        required=True,
        choices=DelineaSecretChoices,
        help_text="Select which value to return.",
    )

DelineaSecretServerSecretsProviderPath

Bases: DelineaSecretServerSecretsProviderBase

A secrets provider for Delinea Secret Server.

Source code in nautobot_secrets_providers/providers/delinea.py
class DelineaSecretServerSecretsProviderPath(DelineaSecretServerSecretsProviderBase):
    """A secrets provider for Delinea Secret Server."""

    slug = "delinea-tss-path"  # type: ignore
    name = "Delinea Secret Server by Path"  # type: ignore

    # TBD: Remove after pylint-nautobot bump
    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for Delinea Secret Server."""

        secret_path = forms.CharField(
            required=True,
            max_length=300,
            min_length=3,
            help_text=r"Enter the secret's path (e.g. \FolderPath\Secret Name).",
        )
        secret_selected_value = forms.ChoiceField(
            label="Return value",
            required=True,
            choices=DelineaSecretChoices,
            help_text="Select which value to return.",
        )

ParametersForm

Bases: BootstrapMixin, Form

Required parameters for Delinea Secret Server.

Source code in nautobot_secrets_providers/providers/delinea.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for Delinea Secret Server."""

    secret_path = forms.CharField(
        required=True,
        max_length=300,
        min_length=3,
        help_text=r"Enter the secret's path (e.g. \FolderPath\Secret Name).",
    )
    secret_selected_value = forms.ChoiceField(
        label="Return value",
        required=True,
        choices=DelineaSecretChoices,
        help_text="Select which value to return.",
    )

HashiCorpVaultSecretsProvider

Bases: SecretsProvider

A secrets provider for HashiCorp Vault.

Source code in nautobot_secrets_providers/providers/hashicorp.py
class HashiCorpVaultSecretsProvider(SecretsProvider):
    """A secrets provider for HashiCorp Vault."""

    slug = "hashicorp-vault"
    name = "HashiCorp Vault"
    is_available = hvac is not None

    # TBD: Remove after pylint-nautobot bump
    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for HashiCorp Vault."""

        path = forms.CharField(
            required=True,
            help_text="The path to the HashiCorp Vault secret",
        )
        key = forms.CharField(
            required=True,
            help_text="The key of the HashiCorp Vault secret",
        )
        vault = forms.ChoiceField(
            required=False,  # This should be required, but would be a breaking change
            choices=vault_choices,
            help_text="HashiCorp Vault to retrieve the secret from.",
        )
        mount_point = forms.CharField(
            required=False,
            help_text="Override Vault Setting: The path where the secret engine was mounted on.",
            label="Mount Point (override)",
        )
        kv_version = forms.ChoiceField(
            required=False,
            choices=add_blank_choice(HashicorpKVVersionChoices),
            help_text="Override Vault Setting: The version of the kv engine (either v1 or v2).",
            label="KV Version (override)",
        )

    @staticmethod
    def retrieve_vault_settings(name=None):
        """Retrieve the configuration from settings that matches the provided vault name.

        Args:
            name (str, optional): Vault name to retrieve from settings. Defaults to None.

        Returns:
            vault_settings (dict): Hashicorp Vault Settings
        """
        vault_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"].get("hashicorp_vault", {})
        if name and "vaults" in vault_settings:
            return vault_settings["vaults"][name]
        return vault_settings

    @classmethod
    def validate_vault_settings(cls, secret=None, vault_name=None):
        """Validate the vault settings."""
        try:
            vault_settings = cls.retrieve_vault_settings(vault_name)
        except KeyError as err:
            raise exceptions.SecretProviderError(
                secret, cls, f"HashiCorp Vault {vault_name} is not configured!"
            ) from err
        if not vault_settings:
            raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault {vault_name} is not configured!")

        auth_method = vault_settings.get("auth_method", "token")
        kv_version = vault_settings.get("kv_version", HashicorpKVVersionChoices.KV_VERSION_2)

        if "url" not in vault_settings:
            raise exceptions.SecretProviderError(secret, cls, "HashiCorp Vault configuration is missing a url")

        if auth_method not in AUTH_METHOD_CHOICES:
            raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault Auth Method {auth_method} is invalid!")

        if kv_version not in HashicorpKVVersionChoices.as_dict():
            raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault KV version {kv_version} is invalid!")

        if auth_method == "aws" and not boto3:
            raise exceptions.SecretProviderError(
                secret, cls, "HashiCorp Vault AWS Authentication Method requires the boto3 library!"
            )
        if auth_method == "token" and "token" not in vault_settings:
            raise exceptions.SecretProviderError(
                secret, cls, "HashiCorp Vault configuration is missing a token for token authentication!"
            )
        if auth_method == "kubernetes" and "role_name" not in vault_settings:
            raise exceptions.SecretProviderError(
                secret, cls, "HashiCorp Vault configuration is missing a role name for kubernetes authentication!"
            )
        if auth_method == "approle" and ("role_id" not in vault_settings or "secret_id" not in vault_settings):
            raise exceptions.SecretProviderError(
                secret, cls, "HashiCorp Vault configuration is missing a role_id and/or secret_id!"
            )

        return vault_settings

    @classmethod
    def get_client(cls, secret=None, vault_name=None):  # pylint: disable-msg=too-many-locals
        """Authenticate and return a hashicorp client."""
        vault_settings = cls.validate_vault_settings(secret, vault_name)
        auth_method = vault_settings.get("auth_method", "token")

        k8s_token_path = vault_settings.get("k8s_token_path", K8S_TOKEN_DEFAULT_PATH)
        login_kwargs = vault_settings.get("login_kwargs", {})

        # According to the docs (https://hvac.readthedocs.io/en/stable/source/hvac_v1.html?highlight=verify#hvac.v1.Client.__init__)
        # the client verify parameter is either a boolean or a path to a ca certificate file to verify.  This is non-intuitive
        # so we use a parameter to specify the path to the ca_cert, if not provided we use the default of None
        ca_cert = vault_settings.get("ca_cert", None)

        namespace = vault_settings.get("namespace", None)

        # Get the client and attempt to retrieve the secret.
        try:
            if auth_method == "token":
                client = hvac.Client(
                    url=vault_settings["url"],
                    token=vault_settings["token"],
                    verify=ca_cert,
                    namespace=namespace,
                )
            else:
                client = hvac.Client(url=vault_settings["url"], verify=ca_cert, namespace=namespace)
                if auth_method == "approle":
                    client.auth.approle.login(
                        role_id=vault_settings["role_id"],
                        secret_id=vault_settings["secret_id"],
                        **login_kwargs,
                    )
                elif auth_method == "kubernetes":
                    with open(k8s_token_path, "r", encoding="utf-8") as token_file:
                        jwt = token_file.read()
                    client.auth.kubernetes.login(role=vault_settings["role_name"], jwt=jwt, **login_kwargs)
                elif auth_method == "aws":
                    session = boto3.Session()
                    aws_creds = session.get_credentials()
                    aws_region = session.region_name or "us-east-1"
                    client.auth.aws.iam_login(
                        access_key=aws_creds.access_key,
                        secret_key=aws_creds.secret_key,
                        session_token=aws_creds.token,
                        region=aws_region,
                        role=vault_settings.get("role_name", None),
                        **login_kwargs,
                    )
        except hvac.exceptions.InvalidRequest as err:
            raise exceptions.SecretProviderError(
                secret, cls, f"HashiCorp Vault Login failed (auth_method: {auth_method}). Error: {err}"
            ) from err
        except hvac.exceptions.Forbidden as err:
            raise exceptions.SecretProviderError(
                secret, cls, f"HashiCorp Vault Access Denied (auth_method: {auth_method}). Error: {err}"
            ) from err

        return client

    @classmethod
    def get_value_for_secret(cls, secret, obj=None, **kwargs):
        """Return the value stored under the secret’s key in the secret’s path."""
        # Try to get parameters and error out early.
        parameters = secret.rendered_parameters(obj=obj)
        try:
            vault_name = parameters.get("vault", "default")
            vault_settings = cls.retrieve_vault_settings(vault_name)
        except KeyError:
            vault_settings = {}
        # Get the mount_point and kv_version from the Vault configuration. These default to the
        # default Vault that HashiCorp provides.
        secret_mount_point = vault_settings.get("default_mount_point", "secret")
        secret_kv_version = vault_settings.get("kv_version", HashicorpKVVersionChoices.KV_VERSION_2)

        try:
            secret_path = parameters["path"]
            secret_key = parameters["key"]
            # If the user does choose to override the Vault settings at their own risk, we will use
            # the settings they provide. These are here to support multiple vaults (vault engines) when
            # that was not allowed by the settings. Ideally these should be deprecated and removed in
            # the future.
            secret_mount_point = parameters.get("mount_point", secret_mount_point) or secret_mount_point
            secret_kv_version = parameters.get("kv_version", secret_kv_version) or secret_kv_version
        except KeyError as err:
            msg = f"The secret parameter could not be retrieved for field {err}"
            raise exceptions.SecretParametersError(secret, cls, msg) from err

        client = cls.get_client(secret, vault_name)

        try:
            if secret_kv_version == HashicorpKVVersionChoices.KV_VERSION_1:
                response = client.secrets.kv.v1.read_secret(path=secret_path, mount_point=secret_mount_point)
            else:
                response = client.secrets.kv.v2.read_secret(path=secret_path, mount_point=secret_mount_point)
        except hvac.exceptions.InvalidPath as err:
            raise exceptions.SecretValueNotFoundError(secret, cls, str(err)) from err

        # Retrieve the value using the key or complain loudly.
        try:
            if secret_kv_version == HashicorpKVVersionChoices.KV_VERSION_1:
                return response["data"][secret_key]
            return response["data"]["data"][secret_key]
        except KeyError as err:
            msg = f"The secret value could not be retrieved using key {err}"
            raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err

ParametersForm

Bases: BootstrapMixin, Form

Required parameters for HashiCorp Vault.

Source code in nautobot_secrets_providers/providers/hashicorp.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for HashiCorp Vault."""

    path = forms.CharField(
        required=True,
        help_text="The path to the HashiCorp Vault secret",
    )
    key = forms.CharField(
        required=True,
        help_text="The key of the HashiCorp Vault secret",
    )
    vault = forms.ChoiceField(
        required=False,  # This should be required, but would be a breaking change
        choices=vault_choices,
        help_text="HashiCorp Vault to retrieve the secret from.",
    )
    mount_point = forms.CharField(
        required=False,
        help_text="Override Vault Setting: The path where the secret engine was mounted on.",
        label="Mount Point (override)",
    )
    kv_version = forms.ChoiceField(
        required=False,
        choices=add_blank_choice(HashicorpKVVersionChoices),
        help_text="Override Vault Setting: The version of the kv engine (either v1 or v2).",
        label="KV Version (override)",
    )

get_client(secret=None, vault_name=None) classmethod

Authenticate and return a hashicorp client.

Source code in nautobot_secrets_providers/providers/hashicorp.py
@classmethod
def get_client(cls, secret=None, vault_name=None):  # pylint: disable-msg=too-many-locals
    """Authenticate and return a hashicorp client."""
    vault_settings = cls.validate_vault_settings(secret, vault_name)
    auth_method = vault_settings.get("auth_method", "token")

    k8s_token_path = vault_settings.get("k8s_token_path", K8S_TOKEN_DEFAULT_PATH)
    login_kwargs = vault_settings.get("login_kwargs", {})

    # According to the docs (https://hvac.readthedocs.io/en/stable/source/hvac_v1.html?highlight=verify#hvac.v1.Client.__init__)
    # the client verify parameter is either a boolean or a path to a ca certificate file to verify.  This is non-intuitive
    # so we use a parameter to specify the path to the ca_cert, if not provided we use the default of None
    ca_cert = vault_settings.get("ca_cert", None)

    namespace = vault_settings.get("namespace", None)

    # Get the client and attempt to retrieve the secret.
    try:
        if auth_method == "token":
            client = hvac.Client(
                url=vault_settings["url"],
                token=vault_settings["token"],
                verify=ca_cert,
                namespace=namespace,
            )
        else:
            client = hvac.Client(url=vault_settings["url"], verify=ca_cert, namespace=namespace)
            if auth_method == "approle":
                client.auth.approle.login(
                    role_id=vault_settings["role_id"],
                    secret_id=vault_settings["secret_id"],
                    **login_kwargs,
                )
            elif auth_method == "kubernetes":
                with open(k8s_token_path, "r", encoding="utf-8") as token_file:
                    jwt = token_file.read()
                client.auth.kubernetes.login(role=vault_settings["role_name"], jwt=jwt, **login_kwargs)
            elif auth_method == "aws":
                session = boto3.Session()
                aws_creds = session.get_credentials()
                aws_region = session.region_name or "us-east-1"
                client.auth.aws.iam_login(
                    access_key=aws_creds.access_key,
                    secret_key=aws_creds.secret_key,
                    session_token=aws_creds.token,
                    region=aws_region,
                    role=vault_settings.get("role_name", None),
                    **login_kwargs,
                )
    except hvac.exceptions.InvalidRequest as err:
        raise exceptions.SecretProviderError(
            secret, cls, f"HashiCorp Vault Login failed (auth_method: {auth_method}). Error: {err}"
        ) from err
    except hvac.exceptions.Forbidden as err:
        raise exceptions.SecretProviderError(
            secret, cls, f"HashiCorp Vault Access Denied (auth_method: {auth_method}). Error: {err}"
        ) from err

    return client

get_value_for_secret(secret, obj=None, **kwargs) classmethod

Return the value stored under the secret’s key in the secret’s path.

Source code in nautobot_secrets_providers/providers/hashicorp.py
@classmethod
def get_value_for_secret(cls, secret, obj=None, **kwargs):
    """Return the value stored under the secret’s key in the secret’s path."""
    # Try to get parameters and error out early.
    parameters = secret.rendered_parameters(obj=obj)
    try:
        vault_name = parameters.get("vault", "default")
        vault_settings = cls.retrieve_vault_settings(vault_name)
    except KeyError:
        vault_settings = {}
    # Get the mount_point and kv_version from the Vault configuration. These default to the
    # default Vault that HashiCorp provides.
    secret_mount_point = vault_settings.get("default_mount_point", "secret")
    secret_kv_version = vault_settings.get("kv_version", HashicorpKVVersionChoices.KV_VERSION_2)

    try:
        secret_path = parameters["path"]
        secret_key = parameters["key"]
        # If the user does choose to override the Vault settings at their own risk, we will use
        # the settings they provide. These are here to support multiple vaults (vault engines) when
        # that was not allowed by the settings. Ideally these should be deprecated and removed in
        # the future.
        secret_mount_point = parameters.get("mount_point", secret_mount_point) or secret_mount_point
        secret_kv_version = parameters.get("kv_version", secret_kv_version) or secret_kv_version
    except KeyError as err:
        msg = f"The secret parameter could not be retrieved for field {err}"
        raise exceptions.SecretParametersError(secret, cls, msg) from err

    client = cls.get_client(secret, vault_name)

    try:
        if secret_kv_version == HashicorpKVVersionChoices.KV_VERSION_1:
            response = client.secrets.kv.v1.read_secret(path=secret_path, mount_point=secret_mount_point)
        else:
            response = client.secrets.kv.v2.read_secret(path=secret_path, mount_point=secret_mount_point)
    except hvac.exceptions.InvalidPath as err:
        raise exceptions.SecretValueNotFoundError(secret, cls, str(err)) from err

    # Retrieve the value using the key or complain loudly.
    try:
        if secret_kv_version == HashicorpKVVersionChoices.KV_VERSION_1:
            return response["data"][secret_key]
        return response["data"]["data"][secret_key]
    except KeyError as err:
        msg = f"The secret value could not be retrieved using key {err}"
        raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err

retrieve_vault_settings(name=None) staticmethod

Retrieve the configuration from settings that matches the provided vault name.

Parameters:

Name Type Description Default
name str

Vault name to retrieve from settings. Defaults to None.

None

Returns:

Name Type Description
vault_settings dict

Hashicorp Vault Settings

Source code in nautobot_secrets_providers/providers/hashicorp.py
@staticmethod
def retrieve_vault_settings(name=None):
    """Retrieve the configuration from settings that matches the provided vault name.

    Args:
        name (str, optional): Vault name to retrieve from settings. Defaults to None.

    Returns:
        vault_settings (dict): Hashicorp Vault Settings
    """
    vault_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"].get("hashicorp_vault", {})
    if name and "vaults" in vault_settings:
        return vault_settings["vaults"][name]
    return vault_settings

validate_vault_settings(secret=None, vault_name=None) classmethod

Validate the vault settings.

Source code in nautobot_secrets_providers/providers/hashicorp.py
@classmethod
def validate_vault_settings(cls, secret=None, vault_name=None):
    """Validate the vault settings."""
    try:
        vault_settings = cls.retrieve_vault_settings(vault_name)
    except KeyError as err:
        raise exceptions.SecretProviderError(
            secret, cls, f"HashiCorp Vault {vault_name} is not configured!"
        ) from err
    if not vault_settings:
        raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault {vault_name} is not configured!")

    auth_method = vault_settings.get("auth_method", "token")
    kv_version = vault_settings.get("kv_version", HashicorpKVVersionChoices.KV_VERSION_2)

    if "url" not in vault_settings:
        raise exceptions.SecretProviderError(secret, cls, "HashiCorp Vault configuration is missing a url")

    if auth_method not in AUTH_METHOD_CHOICES:
        raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault Auth Method {auth_method} is invalid!")

    if kv_version not in HashicorpKVVersionChoices.as_dict():
        raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault KV version {kv_version} is invalid!")

    if auth_method == "aws" and not boto3:
        raise exceptions.SecretProviderError(
            secret, cls, "HashiCorp Vault AWS Authentication Method requires the boto3 library!"
        )
    if auth_method == "token" and "token" not in vault_settings:
        raise exceptions.SecretProviderError(
            secret, cls, "HashiCorp Vault configuration is missing a token for token authentication!"
        )
    if auth_method == "kubernetes" and "role_name" not in vault_settings:
        raise exceptions.SecretProviderError(
            secret, cls, "HashiCorp Vault configuration is missing a role name for kubernetes authentication!"
        )
    if auth_method == "approle" and ("role_id" not in vault_settings or "secret_id" not in vault_settings):
        raise exceptions.SecretProviderError(
            secret, cls, "HashiCorp Vault configuration is missing a role_id and/or secret_id!"
        )

    return vault_settings

OnePasswordSecretsProvider

Bases: SecretsProvider

A secrets provider for 1Password.

Source code in nautobot_secrets_providers/providers/one_password.py
class OnePasswordSecretsProvider(SecretsProvider):
    """A secrets provider for 1Password."""

    slug = "one-password"
    name = "1Password Vault"
    is_available = Client is not None

    # TBD: Remove after pylint-nautobot bump
    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for HashiCorp Vault."""

        vault = forms.ChoiceField(
            required=True,
            choices=vault_choices,
            help_text="1Password Vault to retrieve the secret from.",
        )
        item = forms.CharField(
            required=True,
            help_text="The item in 1Password.",
        )
        section = forms.CharField(
            required=False,
            help_text="The section where the field is a part of.",
        )
        field = forms.CharField(
            required=True,
            help_text="The field where the secret is located. Defaults to 'password'.",
            initial="password",
        )

    @classmethod
    def get_token(cls, secret, vault):
        """Get the token for a vault."""
        plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
        if "token" in plugin_settings["one_password"]["vaults"][vault]:
            return plugin_settings["one_password"]["vaults"][vault]["token"]
        try:
            return plugin_settings["one_password"]["token"]
        except KeyError as exc:
            raise exceptions.SecretProviderError(secret, cls, "1Password token is not configured!") from exc

    @classmethod
    def get_value_for_secret(cls, secret, obj=None, **kwargs):  # pylint: disable=too-many-locals
        """Get the value for a secret from 1Password."""
        # This is only required for 1Password therefore not defined in
        # `required_settings` for the app config.
        plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
        if "one_password" not in plugin_settings:
            raise exceptions.SecretProviderError(secret, cls, "1Password is not configured!")

        parameters = secret.rendered_parameters(obj=obj)
        vault = parameters["vault"]

        return get_secret_from_vault(
            vault=vault,
            item=parameters["item"],
            field=parameters["field"],
            token=cls.get_token(secret, vault=vault),
            section=parameters.get("section", None),
        )

ParametersForm

Bases: BootstrapMixin, Form

Required parameters for HashiCorp Vault.

Source code in nautobot_secrets_providers/providers/one_password.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for HashiCorp Vault."""

    vault = forms.ChoiceField(
        required=True,
        choices=vault_choices,
        help_text="1Password Vault to retrieve the secret from.",
    )
    item = forms.CharField(
        required=True,
        help_text="The item in 1Password.",
    )
    section = forms.CharField(
        required=False,
        help_text="The section where the field is a part of.",
    )
    field = forms.CharField(
        required=True,
        help_text="The field where the secret is located. Defaults to 'password'.",
        initial="password",
    )

get_token(secret, vault) classmethod

Get the token for a vault.

Source code in nautobot_secrets_providers/providers/one_password.py
@classmethod
def get_token(cls, secret, vault):
    """Get the token for a vault."""
    plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
    if "token" in plugin_settings["one_password"]["vaults"][vault]:
        return plugin_settings["one_password"]["vaults"][vault]["token"]
    try:
        return plugin_settings["one_password"]["token"]
    except KeyError as exc:
        raise exceptions.SecretProviderError(secret, cls, "1Password token is not configured!") from exc

get_value_for_secret(secret, obj=None, **kwargs) classmethod

Get the value for a secret from 1Password.

Source code in nautobot_secrets_providers/providers/one_password.py
@classmethod
def get_value_for_secret(cls, secret, obj=None, **kwargs):  # pylint: disable=too-many-locals
    """Get the value for a secret from 1Password."""
    # This is only required for 1Password therefore not defined in
    # `required_settings` for the app config.
    plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
    if "one_password" not in plugin_settings:
        raise exceptions.SecretProviderError(secret, cls, "1Password is not configured!")

    parameters = secret.rendered_parameters(obj=obj)
    vault = parameters["vault"]

    return get_secret_from_vault(
        vault=vault,
        item=parameters["item"],
        field=parameters["field"],
        token=cls.get_token(secret, vault=vault),
        section=parameters.get("section", None),
    )

aws

Secrets Provider for AWS Secrets Manager and Parameter Store.

AWSSecretsManagerSecretsProvider

Bases: SecretsProvider

A secrets provider for AWS Secrets Manager.

Source code in nautobot_secrets_providers/providers/aws.py
class AWSSecretsManagerSecretsProvider(SecretsProvider):
    """A secrets provider for AWS Secrets Manager."""

    slug = "aws-secrets-manager"
    name = "AWS Secrets Manager"
    is_available = boto3 is not None

    # TBD: Remove after pylint-nautobot bump
    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for AWS Secrets Manager."""

        name = forms.CharField(
            required=True,
            help_text="The name of the AWS Secrets Manager secret",
        )
        region = forms.CharField(
            required=True,
            help_text="The region name of the AWS Secrets Manager secret",
        )
        key = forms.CharField(
            required=True,
            help_text="The key of the AWS Secrets Manager secret",
        )

    @classmethod
    def get_value_for_secret(cls, secret, obj=None, **kwargs):
        """Return the secret value by name and region."""
        # Extract the parameters from the Secret.
        parameters = secret.rendered_parameters(obj=obj)

        secret_name = parameters.get("name")
        secret_key = parameters.get("key")
        region_name = parameters.get("region")

        # Create a Secrets Manager client.
        session = boto3.session.Session()
        client = session.client(service_name="secretsmanager", region_name=region_name)

        # This is based on sample code to only handle the specific exceptions for the 'GetSecretValue' API.
        # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
        # We rethrow the exception by default.
        try:
            get_secret_value_response = client.get_secret_value(SecretId=secret_name)
        except ClientError as err:
            if err.response["Error"]["Code"] == "DecryptionFailureException":  # pylint: disable=no-else-raise
                # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
                # Deal with the exception here, and/or rethrow at your discretion.
                raise exceptions.SecretProviderError(secret, cls, str(err))
            elif err.response["Error"]["Code"] == "InternalServiceErrorException":
                # An error occurred on the server side.
                # Deal with the exception here, and/or rethrow at your discretion.
                raise exceptions.SecretProviderError(secret, cls, str(err))
            elif err.response["Error"]["Code"] == "InvalidParameterException":
                # You provided an invalid value for a parameter.
                # Deal with the exception here, and/or rethrow at your discretion.
                raise exceptions.SecretParametersError(secret, cls, str(err))
            elif err.response["Error"]["Code"] == "InvalidRequestException":
                # You provided a parameter value that is not valid for the current state of the resource.
                # Deal with the exception here, and/or rethrow at your discretion.
                raise exceptions.SecretProviderError(secret, cls, str(err))
            elif err.response["Error"]["Code"] == "ResourceNotFoundException":
                # We can't find the resource that you asked for.
                # Deal with the exception here, and/or rethrow at your discretion.
                raise exceptions.SecretValueNotFoundError(secret, cls, str(err))
            else:
                # We got an error that isn't defined above
                raise exceptions.SecretProviderError(secret, cls, str(err))
        else:
            # Decrypts secret using the associated KMS CMK.
            # Depending on whether the secret is a string or binary, one of these fields will be populated.
            if "SecretString" in get_secret_value_response:
                secret_value = get_secret_value_response["SecretString"]
            else:
                # TODO(jathan): Do we care about this? Let's figure out what to do about a binary value?
                secret_value = base64.b64decode(get_secret_value_response["SecretBinary"])  # noqa

        # If we get this far it should be valid JSON.
        data = json.loads(secret_value)

        # Retrieve the value using the key or complain loudly.
        try:
            return data[secret_key]
        except KeyError as err:
            msg = f"The secret value could not be retrieved using key {err}"
            raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err
ParametersForm

Bases: BootstrapMixin, Form

Required parameters for AWS Secrets Manager.

Source code in nautobot_secrets_providers/providers/aws.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for AWS Secrets Manager."""

    name = forms.CharField(
        required=True,
        help_text="The name of the AWS Secrets Manager secret",
    )
    region = forms.CharField(
        required=True,
        help_text="The region name of the AWS Secrets Manager secret",
    )
    key = forms.CharField(
        required=True,
        help_text="The key of the AWS Secrets Manager secret",
    )
get_value_for_secret(secret, obj=None, **kwargs) classmethod

Return the secret value by name and region.

Source code in nautobot_secrets_providers/providers/aws.py
@classmethod
def get_value_for_secret(cls, secret, obj=None, **kwargs):
    """Return the secret value by name and region."""
    # Extract the parameters from the Secret.
    parameters = secret.rendered_parameters(obj=obj)

    secret_name = parameters.get("name")
    secret_key = parameters.get("key")
    region_name = parameters.get("region")

    # Create a Secrets Manager client.
    session = boto3.session.Session()
    client = session.client(service_name="secretsmanager", region_name=region_name)

    # This is based on sample code to only handle the specific exceptions for the 'GetSecretValue' API.
    # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
    # We rethrow the exception by default.
    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    except ClientError as err:
        if err.response["Error"]["Code"] == "DecryptionFailureException":  # pylint: disable=no-else-raise
            # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise exceptions.SecretProviderError(secret, cls, str(err))
        elif err.response["Error"]["Code"] == "InternalServiceErrorException":
            # An error occurred on the server side.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise exceptions.SecretProviderError(secret, cls, str(err))
        elif err.response["Error"]["Code"] == "InvalidParameterException":
            # You provided an invalid value for a parameter.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise exceptions.SecretParametersError(secret, cls, str(err))
        elif err.response["Error"]["Code"] == "InvalidRequestException":
            # You provided a parameter value that is not valid for the current state of the resource.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise exceptions.SecretProviderError(secret, cls, str(err))
        elif err.response["Error"]["Code"] == "ResourceNotFoundException":
            # We can't find the resource that you asked for.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise exceptions.SecretValueNotFoundError(secret, cls, str(err))
        else:
            # We got an error that isn't defined above
            raise exceptions.SecretProviderError(secret, cls, str(err))
    else:
        # Decrypts secret using the associated KMS CMK.
        # Depending on whether the secret is a string or binary, one of these fields will be populated.
        if "SecretString" in get_secret_value_response:
            secret_value = get_secret_value_response["SecretString"]
        else:
            # TODO(jathan): Do we care about this? Let's figure out what to do about a binary value?
            secret_value = base64.b64decode(get_secret_value_response["SecretBinary"])  # noqa

    # If we get this far it should be valid JSON.
    data = json.loads(secret_value)

    # Retrieve the value using the key or complain loudly.
    try:
        return data[secret_key]
    except KeyError as err:
        msg = f"The secret value could not be retrieved using key {err}"
        raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err

AWSSystemsManagerParameterStore

Bases: SecretsProvider

A secrets provider for AWS Systems Manager Parameter Store.

Documentation: https://docs.aws.amazon.com/systems-manager/latest/userguide/systems-manager-parameter-store.html

Source code in nautobot_secrets_providers/providers/aws.py
class AWSSystemsManagerParameterStore(SecretsProvider):
    """
    A secrets provider for AWS Systems Manager Parameter Store.

    Documentation: https://docs.aws.amazon.com/systems-manager/latest/userguide/systems-manager-parameter-store.html
    """

    slug = "aws-sm-parameter-store"
    name = "AWS Systems Manager Parameter Store"
    is_available = boto3 is not None

    # TBD: Remove after pylint-nautobot bump
    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for AWS Parameter Store."""

        name = forms.CharField(
            required=True,
            help_text="The name of the AWS Parameter Store secret",
        )
        region = forms.CharField(
            required=True,
            help_text="The region name of the AWS Parameter Store secret",
        )
        key = forms.CharField(
            required=True,
            help_text="The key name to retrieve from AWS Parameter Store",
        )

    @classmethod
    def get_value_for_secret(cls, secret, obj=None, **kwargs):
        """Return the parameter value by name and region."""
        # Extract the parameters from the Nautobot secret.
        parameters = secret.rendered_parameters(obj=obj)

        # Create a SSM client.
        session = boto3.session.Session()
        client = session.client(service_name="ssm", region_name=parameters.get("region"))
        try:
            get_secret_value_response = client.get_parameter(Name=parameters.get("name"), WithDecryption=True)
        except ClientError as err:
            if err.response["Error"]["Code"] == "ParameterNotFound":
                raise exceptions.SecretParametersError(secret, cls, str(err))

            if err.response["Error"]["Code"] == "ParameterVersionNotFound":
                raise exceptions.SecretValueNotFoundError(secret, cls, str(err))

            raise exceptions.SecretProviderError(secret, cls, str(err))

        try:
            # Fetch the Value field from the parameter which must be a json field.
            data = json.loads(get_secret_value_response["Parameter"]["Value"])
        except ValueError as err:
            msg = "InvalidJson"
            raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err

        try:
            # Return the value of the secret key configured in the nautobot secret.
            return data[parameters.get("key")]
        except KeyError as err:
            msg = f"InvalidKeyName {err}"
            raise exceptions.SecretParametersError(secret, cls, msg) from err
ParametersForm

Bases: BootstrapMixin, Form

Required parameters for AWS Parameter Store.

Source code in nautobot_secrets_providers/providers/aws.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for AWS Parameter Store."""

    name = forms.CharField(
        required=True,
        help_text="The name of the AWS Parameter Store secret",
    )
    region = forms.CharField(
        required=True,
        help_text="The region name of the AWS Parameter Store secret",
    )
    key = forms.CharField(
        required=True,
        help_text="The key name to retrieve from AWS Parameter Store",
    )
get_value_for_secret(secret, obj=None, **kwargs) classmethod

Return the parameter value by name and region.

Source code in nautobot_secrets_providers/providers/aws.py
@classmethod
def get_value_for_secret(cls, secret, obj=None, **kwargs):
    """Return the parameter value by name and region."""
    # Extract the parameters from the Nautobot secret.
    parameters = secret.rendered_parameters(obj=obj)

    # Create a SSM client.
    session = boto3.session.Session()
    client = session.client(service_name="ssm", region_name=parameters.get("region"))
    try:
        get_secret_value_response = client.get_parameter(Name=parameters.get("name"), WithDecryption=True)
    except ClientError as err:
        if err.response["Error"]["Code"] == "ParameterNotFound":
            raise exceptions.SecretParametersError(secret, cls, str(err))

        if err.response["Error"]["Code"] == "ParameterVersionNotFound":
            raise exceptions.SecretValueNotFoundError(secret, cls, str(err))

        raise exceptions.SecretProviderError(secret, cls, str(err))

    try:
        # Fetch the Value field from the parameter which must be a json field.
        data = json.loads(get_secret_value_response["Parameter"]["Value"])
    except ValueError as err:
        msg = "InvalidJson"
        raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err

    try:
        # Return the value of the secret key configured in the nautobot secret.
        return data[parameters.get("key")]
    except KeyError as err:
        msg = f"InvalidKeyName {err}"
        raise exceptions.SecretParametersError(secret, cls, msg) from err

azure

Secrets Provider for Azure Key Vault.

AzureKeyVaultSecretsProvider

Bases: SecretsProvider

A secrets provider for Azure Key Vault.

Source code in nautobot_secrets_providers/providers/azure.py
class AzureKeyVaultSecretsProvider(SecretsProvider):
    """A secrets provider for Azure Key Vault."""

    slug = "azure-key-vault"
    name = "Azure Key Vault"
    is_available = azure_available

    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for Azure Key Vault."""

        vault_url = forms.CharField(
            required=True,
            help_text="The URL of the Azure Key Vault",
        )
        secret_name = forms.CharField(
            required=True,
            help_text="The name of the secret in the Azure Key Vault",
        )

    @classmethod
    def get_value_for_secret(cls, secret, obj=None, **kwargs):
        """Return the secret value by name from Azure Key Vault."""
        # Extract the parameters from the Secret.
        parameters = secret.rendered_parameters(obj=obj)
        vault_url = parameters.get("vault_url")
        secret_name = parameters.get("secret_name")

        # Authenticate with Azure Key Vault using default credentials.
        # This assumes that environment variables for Azure authentication are set.
        credential = DefaultAzureCredential()
        client = SecretClient(vault_url=vault_url, credential=credential)

        try:
            # Retrieve the secret from Azure Key Vault.
            response = client.get_secret(secret_name)
        except Exception as err:
            # Handle exceptions from the Azure SDK.
            raise exceptions.SecretProviderError(secret, cls, str(err))

        # The value is in the 'value' attribute of the response.
        secret_value = response.value

        # Return the secret value.
        return secret_value
ParametersForm

Bases: BootstrapMixin, Form

Required parameters for Azure Key Vault.

Source code in nautobot_secrets_providers/providers/azure.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for Azure Key Vault."""

    vault_url = forms.CharField(
        required=True,
        help_text="The URL of the Azure Key Vault",
    )
    secret_name = forms.CharField(
        required=True,
        help_text="The name of the secret in the Azure Key Vault",
    )
get_value_for_secret(secret, obj=None, **kwargs) classmethod

Return the secret value by name from Azure Key Vault.

Source code in nautobot_secrets_providers/providers/azure.py
@classmethod
def get_value_for_secret(cls, secret, obj=None, **kwargs):
    """Return the secret value by name from Azure Key Vault."""
    # Extract the parameters from the Secret.
    parameters = secret.rendered_parameters(obj=obj)
    vault_url = parameters.get("vault_url")
    secret_name = parameters.get("secret_name")

    # Authenticate with Azure Key Vault using default credentials.
    # This assumes that environment variables for Azure authentication are set.
    credential = DefaultAzureCredential()
    client = SecretClient(vault_url=vault_url, credential=credential)

    try:
        # Retrieve the secret from Azure Key Vault.
        response = client.get_secret(secret_name)
    except Exception as err:
        # Handle exceptions from the Azure SDK.
        raise exceptions.SecretProviderError(secret, cls, str(err))

    # The value is in the 'value' attribute of the response.
    secret_value = response.value

    # Return the secret value.
    return secret_value

choices

Choices for Secrets Providers App.

DelineaSecretChoices

Bases: ChoiceSet

Choices for Delinea Secret Server Result.

Source code in nautobot_secrets_providers/providers/choices.py
class DelineaSecretChoices(ChoiceSet):
    """Choices for Delinea Secret Server Result."""

    SECRET_TOKEN = "token"  # noqa: S105
    SECRET_PASSWORD = "password"  # noqa: S105
    SECRET_USERNAME = "username"  # noqa: S105
    SECRET_URL = "url"  # noqa: S105
    SECRET_NOTES = "notes"  # noqa: S105

    CHOICES = (
        (SECRET_TOKEN, "Token"),
        (SECRET_PASSWORD, "Password"),
        (SECRET_USERNAME, "Username"),
        (SECRET_URL, "URL"),
        (SECRET_NOTES, "Notes"),
    )

HashicorpKVVersionChoices

Bases: ChoiceSet

Choices for Hashicorp KV Version.

Source code in nautobot_secrets_providers/providers/choices.py
class HashicorpKVVersionChoices(ChoiceSet):
    """Choices for Hashicorp KV Version."""

    KV_VERSION_1 = "v1"
    KV_VERSION_2 = "v2"

    CHOICES = (
        (KV_VERSION_1, "V1"),
        (KV_VERSION_2, "V2"),
    )

delinea

Secrets Provider for Delinea Secret Server.

DelineaSecretServerSecretsProviderBase

Bases: SecretsProvider

A secrets provider for Delinea Secret Server.

Source code in nautobot_secrets_providers/providers/delinea.py
class DelineaSecretServerSecretsProviderBase(SecretsProvider):
    """A secrets provider for Delinea Secret Server."""

    is_available = delinea_installed

    @classmethod
    def get_value_for_secret(cls, secret, obj=None, **kwargs):  # pylint: disable=too-many-locals
        """Return the value stored under the secret's key in the secret's path."""
        # This is only required for Delinea Secret Server therefore not defined in
        # `required_settings` for the app config.
        plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
        if "delinea" not in plugin_settings:
            raise exceptions.SecretProviderError(secret, cls, "Delinea Secret Server is not configured!")

        # Try to get parameters and error out early.
        parameters = secret.rendered_parameters(obj=obj)
        try:
            if "secret_id" in parameters:
                secret_id = parameters["secret_id"]
            else:
                secret_id = None
            if "secret_path" in parameters:
                secret_path = parameters["secret_path"]
            else:
                secret_path = None
            secret_selected_value = parameters["secret_selected_value"]
        except KeyError as err:
            msg = f"The secret parameter could not be retrieved for field {err}"
            raise exceptions.SecretParametersError(secret, cls, msg) from err

        if secret_id is None and secret_path is None:
            msg = "The secret parameter could not be retrieved for field!"
            raise exceptions.SecretParametersError(secret, cls, msg)

        delinea_settings = plugin_settings["delinea"]
        if delinea_settings["base_url"] is None:
            raise exceptions.SecretProviderError(secret, cls, "Delinea Secret Server is not configured!")

        return cls.query_delinea_secret_server(
            secret=secret,
            base_url=delinea_settings["base_url"],
            ca_bundle_path=delinea_settings["ca_bundle_path"],
            cloud_based=delinea_settings["cloud_based"],
            domain=delinea_settings["domain"],
            password=delinea_settings["password"],
            secret_id=secret_id,
            secret_path=secret_path,
            secret_selected_value=secret_selected_value,
            tenant=delinea_settings["tenant"],
            token=delinea_settings["token"],
            username=delinea_settings["username"],
            caller_class=cls,
        )

    @staticmethod
    def query_delinea_secret_server(  # pylint: disable=too-many-boolean-expressions,too-many-locals,too-many-branches,too-many-arguments
        secret,
        base_url,
        ca_bundle_path=None,
        cloud_based=None,
        domain=None,
        password=None,
        secret_id=None,
        secret_path=None,
        secret_selected_value=None,
        tenant=None,
        token=None,
        username=None,
        caller_class=None,
    ):
        """Query Delinea Secret Server."""
        # Ensure required parameters are set
        if any(
            [token is None and not all([username, password]), cloud_based and not all([tenant, username, password])]
        ):
            raise exceptions.SecretProviderError(
                secret,
                caller_class,
                """Delinea Secret Server is not configured!
                See section 'Delinea Secret Server' in `README.md'.
                """,
            )

        must_restore_env = False
        original_env = os.getenv("REQUESTS_CA_BUNDLE", "")
        try:
            if ca_bundle_path is not None:
                # Ensure cerificates file exists if ca_bundle_path is defined
                if not Path(ca_bundle_path).exists():
                    raise exceptions.SecretProviderError(
                        secret,
                        caller_class,
                        (
                            "Delinea Secret Server is not configured properly! "
                            "Trusted certificates file not found: "
                            "Environment variable 'REQUESTS_CA_BUNDLE': "
                            f"{ca_bundle_path}."
                        ),
                    )
                if original_env != ca_bundle_path:
                    os.environ["REQUESTS_CA_BUNDLE"] = ca_bundle_path
                    must_restore_env = True
            # Setup Delinea authorizer
            # Username | Password | Token | Domain | Authorizer
            #   def    |   def    |   *   |   -    | PasswordGrantAuthorizer
            #   def    |   def    |   *   |  def   | DomainPasswordGrantAuthorizer
            #    *     |    *     |  def  |   -    | AccessTokenAuthorizer
            if all([username, password]):
                if domain is not None:
                    delinea_authorizer = DomainPasswordGrantAuthorizer(
                        base_url=base_url,
                        domain=domain,
                        username=username,
                        password=password,
                    )
                else:
                    delinea_authorizer = PasswordGrantAuthorizer(
                        base_url=base_url,
                        username=username,
                        password=password,
                    )
            else:
                delinea_authorizer = AccessTokenAuthorizer(token)

            # Get the client.
            if cloud_based:
                delinea = SecretServerCloud(tenant=tenant, authorizer=delinea_authorizer)
            else:
                delinea = SecretServer(base_url=base_url, authorizer=delinea_authorizer)

            # Attempt to retrieve the secret.
            try:
                if secret_id is not None:
                    secret = ServerSecret(**delinea.get_secret(secret_id))
                else:
                    secret = ServerSecret(**delinea.get_secret_by_path(secret_path))
            except SecretServerError as err:
                raise exceptions.SecretValueNotFoundError(secret, caller_class, str(err)) from err

            # Attempt to return the selected value.
            try:
                return secret.fields[secret_selected_value].value
            except KeyError as err:
                msg = f"The secret value could not be retrieved using key {err}"
                raise exceptions.SecretValueNotFoundError(secret, caller_class, msg) from err
        finally:
            if must_restore_env:
                os.environ["REQUESTS_CA_BUNDLE"] = original_env
get_value_for_secret(secret, obj=None, **kwargs) classmethod

Return the value stored under the secret's key in the secret's path.

Source code in nautobot_secrets_providers/providers/delinea.py
@classmethod
def get_value_for_secret(cls, secret, obj=None, **kwargs):  # pylint: disable=too-many-locals
    """Return the value stored under the secret's key in the secret's path."""
    # This is only required for Delinea Secret Server therefore not defined in
    # `required_settings` for the app config.
    plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
    if "delinea" not in plugin_settings:
        raise exceptions.SecretProviderError(secret, cls, "Delinea Secret Server is not configured!")

    # Try to get parameters and error out early.
    parameters = secret.rendered_parameters(obj=obj)
    try:
        if "secret_id" in parameters:
            secret_id = parameters["secret_id"]
        else:
            secret_id = None
        if "secret_path" in parameters:
            secret_path = parameters["secret_path"]
        else:
            secret_path = None
        secret_selected_value = parameters["secret_selected_value"]
    except KeyError as err:
        msg = f"The secret parameter could not be retrieved for field {err}"
        raise exceptions.SecretParametersError(secret, cls, msg) from err

    if secret_id is None and secret_path is None:
        msg = "The secret parameter could not be retrieved for field!"
        raise exceptions.SecretParametersError(secret, cls, msg)

    delinea_settings = plugin_settings["delinea"]
    if delinea_settings["base_url"] is None:
        raise exceptions.SecretProviderError(secret, cls, "Delinea Secret Server is not configured!")

    return cls.query_delinea_secret_server(
        secret=secret,
        base_url=delinea_settings["base_url"],
        ca_bundle_path=delinea_settings["ca_bundle_path"],
        cloud_based=delinea_settings["cloud_based"],
        domain=delinea_settings["domain"],
        password=delinea_settings["password"],
        secret_id=secret_id,
        secret_path=secret_path,
        secret_selected_value=secret_selected_value,
        tenant=delinea_settings["tenant"],
        token=delinea_settings["token"],
        username=delinea_settings["username"],
        caller_class=cls,
    )
query_delinea_secret_server(secret, base_url, ca_bundle_path=None, cloud_based=None, domain=None, password=None, secret_id=None, secret_path=None, secret_selected_value=None, tenant=None, token=None, username=None, caller_class=None) staticmethod

Query Delinea Secret Server.

Source code in nautobot_secrets_providers/providers/delinea.py
@staticmethod
def query_delinea_secret_server(  # pylint: disable=too-many-boolean-expressions,too-many-locals,too-many-branches,too-many-arguments
    secret,
    base_url,
    ca_bundle_path=None,
    cloud_based=None,
    domain=None,
    password=None,
    secret_id=None,
    secret_path=None,
    secret_selected_value=None,
    tenant=None,
    token=None,
    username=None,
    caller_class=None,
):
    """Query Delinea Secret Server."""
    # Ensure required parameters are set
    if any(
        [token is None and not all([username, password]), cloud_based and not all([tenant, username, password])]
    ):
        raise exceptions.SecretProviderError(
            secret,
            caller_class,
            """Delinea Secret Server is not configured!
            See section 'Delinea Secret Server' in `README.md'.
            """,
        )

    must_restore_env = False
    original_env = os.getenv("REQUESTS_CA_BUNDLE", "")
    try:
        if ca_bundle_path is not None:
            # Ensure cerificates file exists if ca_bundle_path is defined
            if not Path(ca_bundle_path).exists():
                raise exceptions.SecretProviderError(
                    secret,
                    caller_class,
                    (
                        "Delinea Secret Server is not configured properly! "
                        "Trusted certificates file not found: "
                        "Environment variable 'REQUESTS_CA_BUNDLE': "
                        f"{ca_bundle_path}."
                    ),
                )
            if original_env != ca_bundle_path:
                os.environ["REQUESTS_CA_BUNDLE"] = ca_bundle_path
                must_restore_env = True
        # Setup Delinea authorizer
        # Username | Password | Token | Domain | Authorizer
        #   def    |   def    |   *   |   -    | PasswordGrantAuthorizer
        #   def    |   def    |   *   |  def   | DomainPasswordGrantAuthorizer
        #    *     |    *     |  def  |   -    | AccessTokenAuthorizer
        if all([username, password]):
            if domain is not None:
                delinea_authorizer = DomainPasswordGrantAuthorizer(
                    base_url=base_url,
                    domain=domain,
                    username=username,
                    password=password,
                )
            else:
                delinea_authorizer = PasswordGrantAuthorizer(
                    base_url=base_url,
                    username=username,
                    password=password,
                )
        else:
            delinea_authorizer = AccessTokenAuthorizer(token)

        # Get the client.
        if cloud_based:
            delinea = SecretServerCloud(tenant=tenant, authorizer=delinea_authorizer)
        else:
            delinea = SecretServer(base_url=base_url, authorizer=delinea_authorizer)

        # Attempt to retrieve the secret.
        try:
            if secret_id is not None:
                secret = ServerSecret(**delinea.get_secret(secret_id))
            else:
                secret = ServerSecret(**delinea.get_secret_by_path(secret_path))
        except SecretServerError as err:
            raise exceptions.SecretValueNotFoundError(secret, caller_class, str(err)) from err

        # Attempt to return the selected value.
        try:
            return secret.fields[secret_selected_value].value
        except KeyError as err:
            msg = f"The secret value could not be retrieved using key {err}"
            raise exceptions.SecretValueNotFoundError(secret, caller_class, msg) from err
    finally:
        if must_restore_env:
            os.environ["REQUESTS_CA_BUNDLE"] = original_env

DelineaSecretServerSecretsProviderId

Bases: DelineaSecretServerSecretsProviderBase

A secrets provider for Delinea Secret Server.

Source code in nautobot_secrets_providers/providers/delinea.py
class DelineaSecretServerSecretsProviderId(DelineaSecretServerSecretsProviderBase):
    """A secrets provider for Delinea Secret Server."""

    slug = "delinea-tss-id"  # type: ignore
    name = "Delinea Secret Server by ID"  # type: ignore

    # TBD: Remove after pylint-nautobot bump
    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for Delinea Secret Server."""

        secret_id = forms.IntegerField(
            label="Secret ID",
            required=True,
            min_value=1,
            help_text="The secret-id used to select the entry in Delinea Secret Server.",
        )
        secret_selected_value = forms.ChoiceField(
            label="Return value",
            required=True,
            choices=DelineaSecretChoices,
            help_text="Select which value to return.",
        )
ParametersForm

Bases: BootstrapMixin, Form

Required parameters for Delinea Secret Server.

Source code in nautobot_secrets_providers/providers/delinea.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for Delinea Secret Server."""

    secret_id = forms.IntegerField(
        label="Secret ID",
        required=True,
        min_value=1,
        help_text="The secret-id used to select the entry in Delinea Secret Server.",
    )
    secret_selected_value = forms.ChoiceField(
        label="Return value",
        required=True,
        choices=DelineaSecretChoices,
        help_text="Select which value to return.",
    )

DelineaSecretServerSecretsProviderPath

Bases: DelineaSecretServerSecretsProviderBase

A secrets provider for Delinea Secret Server.

Source code in nautobot_secrets_providers/providers/delinea.py
class DelineaSecretServerSecretsProviderPath(DelineaSecretServerSecretsProviderBase):
    """A secrets provider for Delinea Secret Server."""

    slug = "delinea-tss-path"  # type: ignore
    name = "Delinea Secret Server by Path"  # type: ignore

    # TBD: Remove after pylint-nautobot bump
    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for Delinea Secret Server."""

        secret_path = forms.CharField(
            required=True,
            max_length=300,
            min_length=3,
            help_text=r"Enter the secret's path (e.g. \FolderPath\Secret Name).",
        )
        secret_selected_value = forms.ChoiceField(
            label="Return value",
            required=True,
            choices=DelineaSecretChoices,
            help_text="Select which value to return.",
        )
ParametersForm

Bases: BootstrapMixin, Form

Required parameters for Delinea Secret Server.

Source code in nautobot_secrets_providers/providers/delinea.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for Delinea Secret Server."""

    secret_path = forms.CharField(
        required=True,
        max_length=300,
        min_length=3,
        help_text=r"Enter the secret's path (e.g. \FolderPath\Secret Name).",
    )
    secret_selected_value = forms.ChoiceField(
        label="Return value",
        required=True,
        choices=DelineaSecretChoices,
        help_text="Select which value to return.",
    )

hashicorp

Secrets Provider for HashiCorp Vault.

HashiCorpVaultSecretsProvider

Bases: SecretsProvider

A secrets provider for HashiCorp Vault.

Source code in nautobot_secrets_providers/providers/hashicorp.py
class HashiCorpVaultSecretsProvider(SecretsProvider):
    """A secrets provider for HashiCorp Vault."""

    slug = "hashicorp-vault"
    name = "HashiCorp Vault"
    is_available = hvac is not None

    # TBD: Remove after pylint-nautobot bump
    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for HashiCorp Vault."""

        path = forms.CharField(
            required=True,
            help_text="The path to the HashiCorp Vault secret",
        )
        key = forms.CharField(
            required=True,
            help_text="The key of the HashiCorp Vault secret",
        )
        vault = forms.ChoiceField(
            required=False,  # This should be required, but would be a breaking change
            choices=vault_choices,
            help_text="HashiCorp Vault to retrieve the secret from.",
        )
        mount_point = forms.CharField(
            required=False,
            help_text="Override Vault Setting: The path where the secret engine was mounted on.",
            label="Mount Point (override)",
        )
        kv_version = forms.ChoiceField(
            required=False,
            choices=add_blank_choice(HashicorpKVVersionChoices),
            help_text="Override Vault Setting: The version of the kv engine (either v1 or v2).",
            label="KV Version (override)",
        )

    @staticmethod
    def retrieve_vault_settings(name=None):
        """Retrieve the configuration from settings that matches the provided vault name.

        Args:
            name (str, optional): Vault name to retrieve from settings. Defaults to None.

        Returns:
            vault_settings (dict): Hashicorp Vault Settings
        """
        vault_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"].get("hashicorp_vault", {})
        if name and "vaults" in vault_settings:
            return vault_settings["vaults"][name]
        return vault_settings

    @classmethod
    def validate_vault_settings(cls, secret=None, vault_name=None):
        """Validate the vault settings."""
        try:
            vault_settings = cls.retrieve_vault_settings(vault_name)
        except KeyError as err:
            raise exceptions.SecretProviderError(
                secret, cls, f"HashiCorp Vault {vault_name} is not configured!"
            ) from err
        if not vault_settings:
            raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault {vault_name} is not configured!")

        auth_method = vault_settings.get("auth_method", "token")
        kv_version = vault_settings.get("kv_version", HashicorpKVVersionChoices.KV_VERSION_2)

        if "url" not in vault_settings:
            raise exceptions.SecretProviderError(secret, cls, "HashiCorp Vault configuration is missing a url")

        if auth_method not in AUTH_METHOD_CHOICES:
            raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault Auth Method {auth_method} is invalid!")

        if kv_version not in HashicorpKVVersionChoices.as_dict():
            raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault KV version {kv_version} is invalid!")

        if auth_method == "aws" and not boto3:
            raise exceptions.SecretProviderError(
                secret, cls, "HashiCorp Vault AWS Authentication Method requires the boto3 library!"
            )
        if auth_method == "token" and "token" not in vault_settings:
            raise exceptions.SecretProviderError(
                secret, cls, "HashiCorp Vault configuration is missing a token for token authentication!"
            )
        if auth_method == "kubernetes" and "role_name" not in vault_settings:
            raise exceptions.SecretProviderError(
                secret, cls, "HashiCorp Vault configuration is missing a role name for kubernetes authentication!"
            )
        if auth_method == "approle" and ("role_id" not in vault_settings or "secret_id" not in vault_settings):
            raise exceptions.SecretProviderError(
                secret, cls, "HashiCorp Vault configuration is missing a role_id and/or secret_id!"
            )

        return vault_settings

    @classmethod
    def get_client(cls, secret=None, vault_name=None):  # pylint: disable-msg=too-many-locals
        """Authenticate and return a hashicorp client."""
        vault_settings = cls.validate_vault_settings(secret, vault_name)
        auth_method = vault_settings.get("auth_method", "token")

        k8s_token_path = vault_settings.get("k8s_token_path", K8S_TOKEN_DEFAULT_PATH)
        login_kwargs = vault_settings.get("login_kwargs", {})

        # According to the docs (https://hvac.readthedocs.io/en/stable/source/hvac_v1.html?highlight=verify#hvac.v1.Client.__init__)
        # the client verify parameter is either a boolean or a path to a ca certificate file to verify.  This is non-intuitive
        # so we use a parameter to specify the path to the ca_cert, if not provided we use the default of None
        ca_cert = vault_settings.get("ca_cert", None)

        namespace = vault_settings.get("namespace", None)

        # Get the client and attempt to retrieve the secret.
        try:
            if auth_method == "token":
                client = hvac.Client(
                    url=vault_settings["url"],
                    token=vault_settings["token"],
                    verify=ca_cert,
                    namespace=namespace,
                )
            else:
                client = hvac.Client(url=vault_settings["url"], verify=ca_cert, namespace=namespace)
                if auth_method == "approle":
                    client.auth.approle.login(
                        role_id=vault_settings["role_id"],
                        secret_id=vault_settings["secret_id"],
                        **login_kwargs,
                    )
                elif auth_method == "kubernetes":
                    with open(k8s_token_path, "r", encoding="utf-8") as token_file:
                        jwt = token_file.read()
                    client.auth.kubernetes.login(role=vault_settings["role_name"], jwt=jwt, **login_kwargs)
                elif auth_method == "aws":
                    session = boto3.Session()
                    aws_creds = session.get_credentials()
                    aws_region = session.region_name or "us-east-1"
                    client.auth.aws.iam_login(
                        access_key=aws_creds.access_key,
                        secret_key=aws_creds.secret_key,
                        session_token=aws_creds.token,
                        region=aws_region,
                        role=vault_settings.get("role_name", None),
                        **login_kwargs,
                    )
        except hvac.exceptions.InvalidRequest as err:
            raise exceptions.SecretProviderError(
                secret, cls, f"HashiCorp Vault Login failed (auth_method: {auth_method}). Error: {err}"
            ) from err
        except hvac.exceptions.Forbidden as err:
            raise exceptions.SecretProviderError(
                secret, cls, f"HashiCorp Vault Access Denied (auth_method: {auth_method}). Error: {err}"
            ) from err

        return client

    @classmethod
    def get_value_for_secret(cls, secret, obj=None, **kwargs):
        """Return the value stored under the secret’s key in the secret’s path."""
        # Try to get parameters and error out early.
        parameters = secret.rendered_parameters(obj=obj)
        try:
            vault_name = parameters.get("vault", "default")
            vault_settings = cls.retrieve_vault_settings(vault_name)
        except KeyError:
            vault_settings = {}
        # Get the mount_point and kv_version from the Vault configuration. These default to the
        # default Vault that HashiCorp provides.
        secret_mount_point = vault_settings.get("default_mount_point", "secret")
        secret_kv_version = vault_settings.get("kv_version", HashicorpKVVersionChoices.KV_VERSION_2)

        try:
            secret_path = parameters["path"]
            secret_key = parameters["key"]
            # If the user does choose to override the Vault settings at their own risk, we will use
            # the settings they provide. These are here to support multiple vaults (vault engines) when
            # that was not allowed by the settings. Ideally these should be deprecated and removed in
            # the future.
            secret_mount_point = parameters.get("mount_point", secret_mount_point) or secret_mount_point
            secret_kv_version = parameters.get("kv_version", secret_kv_version) or secret_kv_version
        except KeyError as err:
            msg = f"The secret parameter could not be retrieved for field {err}"
            raise exceptions.SecretParametersError(secret, cls, msg) from err

        client = cls.get_client(secret, vault_name)

        try:
            if secret_kv_version == HashicorpKVVersionChoices.KV_VERSION_1:
                response = client.secrets.kv.v1.read_secret(path=secret_path, mount_point=secret_mount_point)
            else:
                response = client.secrets.kv.v2.read_secret(path=secret_path, mount_point=secret_mount_point)
        except hvac.exceptions.InvalidPath as err:
            raise exceptions.SecretValueNotFoundError(secret, cls, str(err)) from err

        # Retrieve the value using the key or complain loudly.
        try:
            if secret_kv_version == HashicorpKVVersionChoices.KV_VERSION_1:
                return response["data"][secret_key]
            return response["data"]["data"][secret_key]
        except KeyError as err:
            msg = f"The secret value could not be retrieved using key {err}"
            raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err
ParametersForm

Bases: BootstrapMixin, Form

Required parameters for HashiCorp Vault.

Source code in nautobot_secrets_providers/providers/hashicorp.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for HashiCorp Vault."""

    path = forms.CharField(
        required=True,
        help_text="The path to the HashiCorp Vault secret",
    )
    key = forms.CharField(
        required=True,
        help_text="The key of the HashiCorp Vault secret",
    )
    vault = forms.ChoiceField(
        required=False,  # This should be required, but would be a breaking change
        choices=vault_choices,
        help_text="HashiCorp Vault to retrieve the secret from.",
    )
    mount_point = forms.CharField(
        required=False,
        help_text="Override Vault Setting: The path where the secret engine was mounted on.",
        label="Mount Point (override)",
    )
    kv_version = forms.ChoiceField(
        required=False,
        choices=add_blank_choice(HashicorpKVVersionChoices),
        help_text="Override Vault Setting: The version of the kv engine (either v1 or v2).",
        label="KV Version (override)",
    )
get_client(secret=None, vault_name=None) classmethod

Authenticate and return a hashicorp client.

Source code in nautobot_secrets_providers/providers/hashicorp.py
@classmethod
def get_client(cls, secret=None, vault_name=None):  # pylint: disable-msg=too-many-locals
    """Authenticate and return a hashicorp client."""
    vault_settings = cls.validate_vault_settings(secret, vault_name)
    auth_method = vault_settings.get("auth_method", "token")

    k8s_token_path = vault_settings.get("k8s_token_path", K8S_TOKEN_DEFAULT_PATH)
    login_kwargs = vault_settings.get("login_kwargs", {})

    # According to the docs (https://hvac.readthedocs.io/en/stable/source/hvac_v1.html?highlight=verify#hvac.v1.Client.__init__)
    # the client verify parameter is either a boolean or a path to a ca certificate file to verify.  This is non-intuitive
    # so we use a parameter to specify the path to the ca_cert, if not provided we use the default of None
    ca_cert = vault_settings.get("ca_cert", None)

    namespace = vault_settings.get("namespace", None)

    # Get the client and attempt to retrieve the secret.
    try:
        if auth_method == "token":
            client = hvac.Client(
                url=vault_settings["url"],
                token=vault_settings["token"],
                verify=ca_cert,
                namespace=namespace,
            )
        else:
            client = hvac.Client(url=vault_settings["url"], verify=ca_cert, namespace=namespace)
            if auth_method == "approle":
                client.auth.approle.login(
                    role_id=vault_settings["role_id"],
                    secret_id=vault_settings["secret_id"],
                    **login_kwargs,
                )
            elif auth_method == "kubernetes":
                with open(k8s_token_path, "r", encoding="utf-8") as token_file:
                    jwt = token_file.read()
                client.auth.kubernetes.login(role=vault_settings["role_name"], jwt=jwt, **login_kwargs)
            elif auth_method == "aws":
                session = boto3.Session()
                aws_creds = session.get_credentials()
                aws_region = session.region_name or "us-east-1"
                client.auth.aws.iam_login(
                    access_key=aws_creds.access_key,
                    secret_key=aws_creds.secret_key,
                    session_token=aws_creds.token,
                    region=aws_region,
                    role=vault_settings.get("role_name", None),
                    **login_kwargs,
                )
    except hvac.exceptions.InvalidRequest as err:
        raise exceptions.SecretProviderError(
            secret, cls, f"HashiCorp Vault Login failed (auth_method: {auth_method}). Error: {err}"
        ) from err
    except hvac.exceptions.Forbidden as err:
        raise exceptions.SecretProviderError(
            secret, cls, f"HashiCorp Vault Access Denied (auth_method: {auth_method}). Error: {err}"
        ) from err

    return client
get_value_for_secret(secret, obj=None, **kwargs) classmethod

Return the value stored under the secret’s key in the secret’s path.

Source code in nautobot_secrets_providers/providers/hashicorp.py
@classmethod
def get_value_for_secret(cls, secret, obj=None, **kwargs):
    """Return the value stored under the secret’s key in the secret’s path."""
    # Try to get parameters and error out early.
    parameters = secret.rendered_parameters(obj=obj)
    try:
        vault_name = parameters.get("vault", "default")
        vault_settings = cls.retrieve_vault_settings(vault_name)
    except KeyError:
        vault_settings = {}
    # Get the mount_point and kv_version from the Vault configuration. These default to the
    # default Vault that HashiCorp provides.
    secret_mount_point = vault_settings.get("default_mount_point", "secret")
    secret_kv_version = vault_settings.get("kv_version", HashicorpKVVersionChoices.KV_VERSION_2)

    try:
        secret_path = parameters["path"]
        secret_key = parameters["key"]
        # If the user does choose to override the Vault settings at their own risk, we will use
        # the settings they provide. These are here to support multiple vaults (vault engines) when
        # that was not allowed by the settings. Ideally these should be deprecated and removed in
        # the future.
        secret_mount_point = parameters.get("mount_point", secret_mount_point) or secret_mount_point
        secret_kv_version = parameters.get("kv_version", secret_kv_version) or secret_kv_version
    except KeyError as err:
        msg = f"The secret parameter could not be retrieved for field {err}"
        raise exceptions.SecretParametersError(secret, cls, msg) from err

    client = cls.get_client(secret, vault_name)

    try:
        if secret_kv_version == HashicorpKVVersionChoices.KV_VERSION_1:
            response = client.secrets.kv.v1.read_secret(path=secret_path, mount_point=secret_mount_point)
        else:
            response = client.secrets.kv.v2.read_secret(path=secret_path, mount_point=secret_mount_point)
    except hvac.exceptions.InvalidPath as err:
        raise exceptions.SecretValueNotFoundError(secret, cls, str(err)) from err

    # Retrieve the value using the key or complain loudly.
    try:
        if secret_kv_version == HashicorpKVVersionChoices.KV_VERSION_1:
            return response["data"][secret_key]
        return response["data"]["data"][secret_key]
    except KeyError as err:
        msg = f"The secret value could not be retrieved using key {err}"
        raise exceptions.SecretValueNotFoundError(secret, cls, msg) from err
retrieve_vault_settings(name=None) staticmethod

Retrieve the configuration from settings that matches the provided vault name.

Parameters:

Name Type Description Default
name str

Vault name to retrieve from settings. Defaults to None.

None

Returns:

Name Type Description
vault_settings dict

Hashicorp Vault Settings

Source code in nautobot_secrets_providers/providers/hashicorp.py
@staticmethod
def retrieve_vault_settings(name=None):
    """Retrieve the configuration from settings that matches the provided vault name.

    Args:
        name (str, optional): Vault name to retrieve from settings. Defaults to None.

    Returns:
        vault_settings (dict): Hashicorp Vault Settings
    """
    vault_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"].get("hashicorp_vault", {})
    if name and "vaults" in vault_settings:
        return vault_settings["vaults"][name]
    return vault_settings
validate_vault_settings(secret=None, vault_name=None) classmethod

Validate the vault settings.

Source code in nautobot_secrets_providers/providers/hashicorp.py
@classmethod
def validate_vault_settings(cls, secret=None, vault_name=None):
    """Validate the vault settings."""
    try:
        vault_settings = cls.retrieve_vault_settings(vault_name)
    except KeyError as err:
        raise exceptions.SecretProviderError(
            secret, cls, f"HashiCorp Vault {vault_name} is not configured!"
        ) from err
    if not vault_settings:
        raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault {vault_name} is not configured!")

    auth_method = vault_settings.get("auth_method", "token")
    kv_version = vault_settings.get("kv_version", HashicorpKVVersionChoices.KV_VERSION_2)

    if "url" not in vault_settings:
        raise exceptions.SecretProviderError(secret, cls, "HashiCorp Vault configuration is missing a url")

    if auth_method not in AUTH_METHOD_CHOICES:
        raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault Auth Method {auth_method} is invalid!")

    if kv_version not in HashicorpKVVersionChoices.as_dict():
        raise exceptions.SecretProviderError(secret, cls, f"HashiCorp Vault KV version {kv_version} is invalid!")

    if auth_method == "aws" and not boto3:
        raise exceptions.SecretProviderError(
            secret, cls, "HashiCorp Vault AWS Authentication Method requires the boto3 library!"
        )
    if auth_method == "token" and "token" not in vault_settings:
        raise exceptions.SecretProviderError(
            secret, cls, "HashiCorp Vault configuration is missing a token for token authentication!"
        )
    if auth_method == "kubernetes" and "role_name" not in vault_settings:
        raise exceptions.SecretProviderError(
            secret, cls, "HashiCorp Vault configuration is missing a role name for kubernetes authentication!"
        )
    if auth_method == "approle" and ("role_id" not in vault_settings or "secret_id" not in vault_settings):
        raise exceptions.SecretProviderError(
            secret, cls, "HashiCorp Vault configuration is missing a role_id and/or secret_id!"
        )

    return vault_settings

vault_choices()

Generate Choices for vault form field.

If vaults is a key in the vault config, then we build a form option for each key in vaults. Otherwise we fall "Default" to make this a non-breaking change.

Source code in nautobot_secrets_providers/providers/hashicorp.py
def vault_choices():
    """Generate Choices for vault form field.

    If `vaults` is a key in the vault config,
    then we build a form option for each key in vaults.
    Otherwise we fall "Default" to make this a non-breaking change.
    """
    plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
    if "vaults" in plugin_settings["hashicorp_vault"]:
        return [(key, key.replace("_", " ").title()) for key in plugin_settings["hashicorp_vault"]["vaults"].keys()]
    return [("default", "Default")]

one_password

1Password Secrets Provider for Nautobot.

OnePasswordSecretsProvider

Bases: SecretsProvider

A secrets provider for 1Password.

Source code in nautobot_secrets_providers/providers/one_password.py
class OnePasswordSecretsProvider(SecretsProvider):
    """A secrets provider for 1Password."""

    slug = "one-password"
    name = "1Password Vault"
    is_available = Client is not None

    # TBD: Remove after pylint-nautobot bump
    # pylint: disable-next=nb-incorrect-base-class
    class ParametersForm(BootstrapMixin, forms.Form):
        """Required parameters for HashiCorp Vault."""

        vault = forms.ChoiceField(
            required=True,
            choices=vault_choices,
            help_text="1Password Vault to retrieve the secret from.",
        )
        item = forms.CharField(
            required=True,
            help_text="The item in 1Password.",
        )
        section = forms.CharField(
            required=False,
            help_text="The section where the field is a part of.",
        )
        field = forms.CharField(
            required=True,
            help_text="The field where the secret is located. Defaults to 'password'.",
            initial="password",
        )

    @classmethod
    def get_token(cls, secret, vault):
        """Get the token for a vault."""
        plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
        if "token" in plugin_settings["one_password"]["vaults"][vault]:
            return plugin_settings["one_password"]["vaults"][vault]["token"]
        try:
            return plugin_settings["one_password"]["token"]
        except KeyError as exc:
            raise exceptions.SecretProviderError(secret, cls, "1Password token is not configured!") from exc

    @classmethod
    def get_value_for_secret(cls, secret, obj=None, **kwargs):  # pylint: disable=too-many-locals
        """Get the value for a secret from 1Password."""
        # This is only required for 1Password therefore not defined in
        # `required_settings` for the app config.
        plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
        if "one_password" not in plugin_settings:
            raise exceptions.SecretProviderError(secret, cls, "1Password is not configured!")

        parameters = secret.rendered_parameters(obj=obj)
        vault = parameters["vault"]

        return get_secret_from_vault(
            vault=vault,
            item=parameters["item"],
            field=parameters["field"],
            token=cls.get_token(secret, vault=vault),
            section=parameters.get("section", None),
        )
ParametersForm

Bases: BootstrapMixin, Form

Required parameters for HashiCorp Vault.

Source code in nautobot_secrets_providers/providers/one_password.py
class ParametersForm(BootstrapMixin, forms.Form):
    """Required parameters for HashiCorp Vault."""

    vault = forms.ChoiceField(
        required=True,
        choices=vault_choices,
        help_text="1Password Vault to retrieve the secret from.",
    )
    item = forms.CharField(
        required=True,
        help_text="The item in 1Password.",
    )
    section = forms.CharField(
        required=False,
        help_text="The section where the field is a part of.",
    )
    field = forms.CharField(
        required=True,
        help_text="The field where the secret is located. Defaults to 'password'.",
        initial="password",
    )
get_token(secret, vault) classmethod

Get the token for a vault.

Source code in nautobot_secrets_providers/providers/one_password.py
@classmethod
def get_token(cls, secret, vault):
    """Get the token for a vault."""
    plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
    if "token" in plugin_settings["one_password"]["vaults"][vault]:
        return plugin_settings["one_password"]["vaults"][vault]["token"]
    try:
        return plugin_settings["one_password"]["token"]
    except KeyError as exc:
        raise exceptions.SecretProviderError(secret, cls, "1Password token is not configured!") from exc
get_value_for_secret(secret, obj=None, **kwargs) classmethod

Get the value for a secret from 1Password.

Source code in nautobot_secrets_providers/providers/one_password.py
@classmethod
def get_value_for_secret(cls, secret, obj=None, **kwargs):  # pylint: disable=too-many-locals
    """Get the value for a secret from 1Password."""
    # This is only required for 1Password therefore not defined in
    # `required_settings` for the app config.
    plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
    if "one_password" not in plugin_settings:
        raise exceptions.SecretProviderError(secret, cls, "1Password is not configured!")

    parameters = secret.rendered_parameters(obj=obj)
    vault = parameters["vault"]

    return get_secret_from_vault(
        vault=vault,
        item=parameters["item"],
        field=parameters["field"],
        token=cls.get_token(secret, vault=vault),
        section=parameters.get("section", None),
    )

get_secret_from_vault(vault, item, field, token, section=None) async

Get a secret from a 1Password vault.

Parameters:

Name Type Description Default
vault str

1Password Vault where the secret is located.

required
item str

1Password Item where the secret is located.

required
field str

1Password secret field name.

required
token str

1Password Service Account token.

required
section str

1Password Item Section for the secret. Defaults to None.

None

Returns:

Type Description
str

Value from the secret.

Source code in nautobot_secrets_providers/providers/one_password.py
@async_to_sync
async def get_secret_from_vault(vault, item, field, token, section=None):
    """Get a secret from a 1Password vault.

    Args:
        vault (str): 1Password Vault where the secret is located.
        item (str): 1Password Item where the secret is located.
        field (str): 1Password secret field name.
        token (str): 1Password Service Account token.
        section (str, optional): 1Password Item Section for the secret. Defaults to None.

    Returns:
        (str): Value from the secret.
    """
    client = await Client.authenticate(
        auth=token, integration_name="nautobot-secrets-providers", integration_version=__version__
    )
    reference = f"op://{vault}/{item}/{f'{section}/' if section else ''}{field}"
    return await client.secrets.resolve(reference)

vault_choices()

Generate Choices for vault form field.

Build a form option for each key in vaults.

Source code in nautobot_secrets_providers/providers/one_password.py
def vault_choices():
    """Generate Choices for vault form field.

    Build a form option for each key in vaults.
    """
    plugin_settings = settings.PLUGINS_CONFIG["nautobot_secrets_providers"]
    return [(key, key) for key in plugin_settings["one_password"]["vaults"].keys()]