Skip to content

Code reference

This page aims to provide an API overview of the different functions and classes you may need when implementing an SSoT job.

nautobot_ssot.contrib.CustomFieldAnnotation dataclass

Map a model field to an arbitrary custom field name.

For usage with typing.Annotated.

This exists to map model fields to their corresponding custom fields. This serves to explicitly differentiate normal fields from custom fields.

Note that for backwards compatibility purposes it is also possible to use CustomFieldAnnotation.name instead of CustomFieldAnnotation.key.

Example

Given a boolean custom field with label "Is Global" and key "is_global" on the Provider model:

class ProviderModel(NautobotModel):
    _model: Provider
    _identifiers = ("name",)
    _attributes = ("is_global",)

    name: str
    is_global: Annotated[bool, CustomFieldAnnotation(key="is_global")

This then maps the model field 'is_global' to the custom field with the key 'is_global'.

Source code in nautobot_ssot/contrib/types.py
@dataclass
class CustomFieldAnnotation:
    """Map a model field to an arbitrary custom field name.

    For usage with `typing.Annotated`.

    This exists to map model fields to their corresponding custom fields. This serves to explicitly differentiate
    normal fields from custom fields.

    Note that for backwards compatibility purposes it is also possible to use `CustomFieldAnnotation.name` instead of
    `CustomFieldAnnotation.key`.

    Example:
        Given a boolean custom field with label "Is Global" and key "is_global" on the Provider model:

        ```python
        class ProviderModel(NautobotModel):
            _model: Provider
            _identifiers = ("name",)
            _attributes = ("is_global",)

            name: str
            is_global: Annotated[bool, CustomFieldAnnotation(key="is_global")
        ```

        This then maps the model field 'is_global' to the custom field with the key 'is_global'.
    """

    # TODO: Delete on 3.0, keep around for backwards compatibility for now
    name: Optional[str] = None

    key: Optional[str] = None

    def __post_init__(self):
        """Compatibility layer with using 'name' instead of 'key'.

        If `self.key` isn't set, fall back to the old behaviour.
        """
        if not self.key:
            if self.name:
                self.key = self.name
            else:
                raise ValueError("The 'key' field on CustomFieldAnnotation needs to be set.")

__post_init__()

Compatibility layer with using 'name' instead of 'key'.

If self.key isn't set, fall back to the old behaviour.

Source code in nautobot_ssot/contrib/types.py
def __post_init__(self):
    """Compatibility layer with using 'name' instead of 'key'.

    If `self.key` isn't set, fall back to the old behaviour.
    """
    if not self.key:
        if self.name:
            self.key = self.name
        else:
            raise ValueError("The 'key' field on CustomFieldAnnotation needs to be set.")

nautobot_ssot.contrib.NautobotModel

Bases: DiffSyncModel

Base model for any diffsync models interfacing with Nautobot through the ORM.

This provides the create, update and delete operations in a generic fashion, meaning you don't have to implement them yourself.

In order to accomplish this, the _model field has to be set on subclasses to map them to the corresponding ORM model class.

Source code in nautobot_ssot/contrib/model.py
class NautobotModel(DiffSyncModel):
    """
    Base model for any diffsync models interfacing with Nautobot through the ORM.

    This provides the `create`, `update` and `delete` operations in a generic fashion, meaning you don't have to
    implement them yourself.

    In order to accomplish this, the `_model` field has to be set on subclasses to map them to the corresponding ORM
    model class.
    """

    _model: Model

    pk: Optional[UUID]

    @classmethod
    def _get_queryset(cls):
        """Get the queryset used to load the models data from Nautobot."""
        available_fields = {field.name for field in cls._model._meta.get_fields()}
        parameter_names = [
            parameter for parameter in list(cls._identifiers) + list(cls._attributes) if parameter in available_fields
        ]
        # Here we identify any foreign keys (i.e. fields with '__' in them) so that we can load them directly in the
        # first query if this function hasn't been overridden.
        prefetch_related_parameters = [parameter.split("__")[0] for parameter in parameter_names if "__" in parameter]
        qs = cls.get_queryset()
        return qs.prefetch_related(*prefetch_related_parameters)

    @classmethod
    def get_queryset(cls):
        """Get the queryset used to load the models data from Nautobot."""
        return cls._model.objects.all()

    @classmethod
    def _check_field(cls, name):
        """Check whether the given field name is defined on the diffsync (pydantic) model."""
        if name not in cls.__fields__:
            raise ObjectCrudException(f"Field {name} is not defined on the model.")

    def get_from_db(self):
        """Get the ORM object for this diffsync object from the database using the primary key."""
        try:
            return self.diffsync.get_from_orm_cache({"pk": self.pk}, self._model)
        except self._model.DoesNotExist as error:
            raise ObjectCrudException(f"No such {self._model._meta.verbose_name} instance with PK {self.pk}") from error

    def update(self, attrs):
        """Update the ORM object corresponding to this diffsync object."""
        try:
            obj = self.get_from_db()
            self._update_obj_with_parameters(obj, attrs, self.diffsync)
        except ObjectCrudException as error:
            raise ObjectNotUpdated(error) from error
        return super().update(attrs)

    def delete(self):
        """Delete the ORM object corresponding to this diffsync object."""
        try:
            obj = self.get_from_db()
        except ObjectCrudException as error:
            raise ObjectNotDeleted(error) from error
        try:
            obj.delete()
        except ProtectedError as error:
            raise ObjectNotDeleted(f"Couldn't delete {obj} as it is referenced by another object") from error
        return super().delete()

    @classmethod
    def create(cls, diffsync, ids, attrs):
        """Create the ORM object corresponding to this diffsync object."""
        # Only diffsync cares about the distinction between ids and attrs, we do not.
        # Therefore, we merge the two into parameters.
        parameters = ids.copy()
        parameters.update(attrs)

        # This is in fact callable, because it is a model
        obj = cls._model()  # pylint: disable=not-callable

        try:
            cls._update_obj_with_parameters(obj, parameters, diffsync)
        except ObjectCrudException as error:
            raise ObjectNotCreated(error) from error

        return super().create(diffsync, ids, attrs)

    @classmethod
    def _handle_single_field(
        cls, field, obj, value, relationship_fields, diffsync
    ):  # pylint: disable=too-many-arguments,too-many-locals, too-many-branches
        """Set a single field on a Django object to a given value, or, for relationship fields, prepare setting.

        :param field: The name of the field to set.
        :param obj: The Django ORM object to set the field on.
        :param value: The value to set the field to.
        :param relationship_fields: Helper dictionary containing information on relationship fields.
            This is mutated over the course of this function.
        :param diffsync: The related diffsync adapter used for looking up things in the cache.
        """
        # Use type hints at runtime to determine which fields are custom fields
        type_hints = get_type_hints(cls, include_extras=True)

        cls._check_field(field)

        # Handle custom fields. See CustomFieldAnnotation docstring for more details.
        custom_relationship_annotation = None
        metadata_for_this_field = getattr(type_hints[field], "__metadata__", [])
        for metadata in metadata_for_this_field:
            if isinstance(metadata, CustomFieldAnnotation):
                obj.cf[metadata.key] = value
                return
            if isinstance(metadata, CustomRelationshipAnnotation):
                custom_relationship_annotation = metadata
                break

        # Prepare handling of foreign keys and custom relationship foreign keys.
        # Example: If field is `tenant__group__name`, then
        # `foreign_keys["tenant"]["group__name"] = value` or
        # `custom_relationship_foreign_keys["tenant"]["group__name"] = value`
        # Also, the model class will be added to the dictionary for normal foreign keys, so we can later use it
        # for querying:
        # `foreign_keys["tenant"]["_model_class"] = nautobot.tenancy.models.Tenant
        # For custom relationship foreign keys, we add the annotation instead:
        # `custom_relationship_foreign_keys["tenant"]["_annotation"] = CustomRelationshipAnnotation(...)
        if "__" in field:
            related_model, lookup = field.split("__", maxsplit=1)
            # Custom relationship foreign keys
            if custom_relationship_annotation:
                relationship_fields["custom_relationship_foreign_keys"][related_model][lookup] = value
                relationship_fields["custom_relationship_foreign_keys"][related_model][
                    "_annotation"
                ] = custom_relationship_annotation
            # Normal foreign keys
            else:
                django_field = cls._model._meta.get_field(related_model)
                relationship_fields["foreign_keys"][related_model][lookup] = value
                # Add a special key to the dictionary to point to the related model's class
                relationship_fields["foreign_keys"][related_model]["_model_class"] = django_field.related_model
            return

        # Prepare handling of custom relationship many-to-many fields.
        if custom_relationship_annotation:
            relationship = diffsync.get_from_orm_cache({"label": custom_relationship_annotation.name}, Relationship)
            if custom_relationship_annotation.side == RelationshipSideEnum.DESTINATION:
                related_object_content_type = relationship.source_type
            else:
                related_object_content_type = relationship.destination_type
            related_model_class = related_object_content_type.model_class()
            if (
                relationship.type == RelationshipTypeChoices.TYPE_ONE_TO_MANY
                and custom_relationship_annotation.side == RelationshipSideEnum.DESTINATION
            ):
                relationship_fields["custom_relationship_foreign_keys"][field] = {
                    **value,
                    "_annotation": custom_relationship_annotation,
                }
            else:
                relationship_fields["custom_relationship_many_to_many_fields"][field] = {
                    "objects": [diffsync.get_from_orm_cache(parameters, related_model_class) for parameters in value],
                    "annotation": custom_relationship_annotation,
                }

            return

        django_field = cls._model._meta.get_field(field)

        # Prepare handling of many-to-many fields. If we are dealing with a many-to-many field,
        # we get all the related objects here to later set them once the object has been saved.
        if django_field.many_to_many or django_field.one_to_many:
            try:
                relationship_fields["many_to_many_fields"][field] = [
                    diffsync.get_from_orm_cache(parameters, django_field.related_model) for parameters in value
                ]
            except django_field.related_model.DoesNotExist as error:
                raise ObjectCrudException(
                    f"Unable to populate many to many relationship '{django_field.name}' with parameters {value}, at least one related object not found."
                ) from error
            except MultipleObjectsReturned as error:
                raise ObjectCrudException(
                    f"Unable to populate many to many relationship '{django_field.name}' with parameters {value}, at least one related object found twice."
                ) from error
            return

        # As the default case, just set the attribute directly
        setattr(obj, field, value)

    @classmethod
    def _update_obj_with_parameters(cls, obj, parameters, diffsync):
        """Update a given Nautobot ORM object with the given parameters."""
        relationship_fields = {
            # Example: {"group": {"name": "Group Name", "_model_class": TenantGroup}}
            "foreign_keys": defaultdict(dict),
            # Example: {"tags": [Tag-1, Tag-2]}
            "many_to_many_fields": defaultdict(list),
            # Example: TODO
            "custom_relationship_foreign_keys": defaultdict(dict),
            # Example: TODO
            "custom_relationship_many_to_many_fields": defaultdict(dict),
        }
        for field, value in parameters.items():
            cls._handle_single_field(field, obj, value, relationship_fields, diffsync)

        # Set foreign keys
        cls._lookup_and_set_foreign_keys(relationship_fields["foreign_keys"], obj, diffsync=diffsync)

        # Save the object to the database
        try:
            obj.validated_save()
        except ValidationError as error:
            raise ObjectCrudException(f"Validated save failed for Django object. Parameters: {parameters}") from error

        # Handle relationship association creation. This needs to be after object creation, because relationship
        # association objects rely on both sides already existing.
        cls._lookup_and_set_custom_relationship_foreign_keys(
            relationship_fields["custom_relationship_foreign_keys"], obj, diffsync
        )
        cls._set_custom_relationship_to_many_fields(
            relationship_fields["custom_relationship_many_to_many_fields"], obj, diffsync
        )

        # Set many-to-many fields after saving.
        cls._set_many_to_many_fields(relationship_fields["many_to_many_fields"], obj)

    @classmethod
    def _set_custom_relationship_to_many_fields(cls, custom_relationship_many_to_many_fields, obj, diffsync):
        for _, dictionary in custom_relationship_many_to_many_fields.items():
            annotation = dictionary.pop("annotation")
            objects = dictionary.pop("objects")
            # TODO: Deduplicate this code
            relationship = diffsync.get_from_orm_cache({"label": annotation.name}, Relationship)
            parameters = {
                "relationship": relationship,
                "source_type": relationship.source_type,
                "destination_type": relationship.destination_type,
            }
            associations = []
            if annotation.side == RelationshipSideEnum.SOURCE:
                parameters["source_id"] = obj.id
                for object_to_relate in objects:
                    association_parameters = parameters.copy()
                    association_parameters["destination_id"] = object_to_relate.id
                    try:
                        association = diffsync.get_from_orm_cache(association_parameters, RelationshipAssociation)
                    except RelationshipAssociation.DoesNotExist:
                        association = RelationshipAssociation(**parameters, destination_id=object_to_relate.id)
                        association.validated_save()
                    associations.append(association)
            else:
                parameters["destination_id"] = obj.id
                for object_to_relate in objects:
                    association_parameters = parameters.copy()
                    association_parameters["source_id"] = object_to_relate.id
                    try:
                        association = diffsync.get_from_orm_cache(association_parameters, RelationshipAssociation)
                    except RelationshipAssociation.DoesNotExist:
                        association = RelationshipAssociation(**parameters, source_id=object_to_relate.id)
                        association.validated_save()
                    associations.append(association)
            # Now we need to clean up any associations that we're not `get_or_create`'d in order to achieve
            # declarativeness.
            # TODO: This may benefit from an ORM cache with `filter` capabilities, but I guess the gain in most cases
            # would be fairly minor.
            for existing_association in RelationshipAssociation.objects.filter(**parameters):
                if existing_association not in associations:
                    existing_association.delete()

    @classmethod
    def _set_many_to_many_fields(cls, many_to_many_fields, obj):
        """
        Given a dictionary, set many-to-many relationships on an object.

        This will always use `set`, thereby replacing any elements that are already part of the relationship.

        Example dictionary:
        {
            "relationship_field_name": [<Object 1>, <Object 2>],
            ...
        }
        """
        for field_name, related_objects in many_to_many_fields.items():
            many_to_many_field = getattr(obj, field_name)
            many_to_many_field.set(related_objects)

    @classmethod
    def _lookup_and_set_custom_relationship_foreign_keys(cls, custom_relationship_foreign_keys, obj, diffsync):
        for _, related_model_dict in custom_relationship_foreign_keys.items():
            annotation = related_model_dict.pop("_annotation")
            # TODO: Deduplicate this code
            try:
                relationship = diffsync.get_from_orm_cache({"label": annotation.name}, Relationship)
            except Relationship.DoesNotExist as error:
                raise ObjectCrudException(f"No such relationship with label '{annotation.name}'") from error
            parameters = {
                "relationship": relationship,
                "source_type": relationship.source_type,
                "destination_type": relationship.destination_type,
            }
            if annotation.side == RelationshipSideEnum.SOURCE:
                parameters["source_id"] = obj.id
                related_model_class = relationship.destination_type.model_class()
                try:
                    destination_object = diffsync.get_from_orm_cache(related_model_dict, related_model_class)
                except related_model_class.DoesNotExist as error:
                    raise ObjectCrudException(
                        f"Couldn't resolve custom relationship {relationship.name}, no such {related_model_class._meta.verbose_name} object with parameters {related_model_dict}."
                    ) from error
                except related_model_class.MultipleObjectsReturned as error:
                    raise ObjectCrudException(
                        f"Couldn't resolve custom relationship {relationship.name}, multiple {related_model_class._meta.verbose_name} objects with parameters {related_model_dict}."
                    ) from error
                RelationshipAssociation.objects.update_or_create(
                    **parameters,
                    defaults={"destination_id": destination_object.id},
                )
            else:
                parameters["destination_id"] = obj.id
                source_object = diffsync.get_from_orm_cache(related_model_dict, relationship.source_type.model_class())
                RelationshipAssociation.objects.update_or_create(**parameters, defaults={"source_id": source_object.id})

    @classmethod
    def _lookup_and_set_foreign_keys(cls, foreign_keys, obj, diffsync):
        """
        Given a list of foreign keys as dictionaries, look up and set foreign keys on an object.

        Dictionary should be in the form of:
        [
          {"field_1": "value_1", "field_2": "value_2"},
          ...
        ]
        where each item in the list corresponds to the parameters needed to uniquely identify a foreign key object.
        """
        for field_name, related_model_dict in foreign_keys.items():
            related_model = related_model_dict.pop("_model_class")
            # Generic foreign keys will not have this dictionary field. As such, we need to retrieve the appropriate
            # model class through other means.
            if not related_model:
                try:
                    app_label = related_model_dict.pop("app_label")
                    model = related_model_dict.pop("model")
                except KeyError as error:
                    raise ValueError(
                        f"Missing annotation for '{field_name}__app_label' or '{field_name}__model - this is required"
                        f"for generic foreign keys."
                    ) from error
                try:
                    related_model_content_type = diffsync.get_from_orm_cache(
                        {"app_label": app_label, "model": model}, ContentType
                    )
                    related_model = related_model_content_type.model_class()
                except ContentType.DoesNotExist as error:
                    raise ObjectCrudException(f"Unknown content type '{app_label}.{model}'.") from error
            # Set the foreign key to 'None' when none of the fields are set to anything
            if not any(related_model_dict.values()):
                setattr(obj, field_name, None)
                continue
            try:
                related_object = diffsync.get_from_orm_cache(related_model_dict, related_model)
            except related_model.DoesNotExist as error:
                raise ObjectCrudException(
                    f"Couldn't find '{related_model._meta.verbose_name}' instance behind '{field_name}' with: {related_model_dict}."
                ) from error
            except MultipleObjectsReturned as error:
                raise ObjectCrudException(
                    f"Found multiple instances for {field_name} wit: {related_model_dict}"
                ) from error
            setattr(obj, field_name, related_object)

create(diffsync, ids, attrs) classmethod

Create the ORM object corresponding to this diffsync object.

Source code in nautobot_ssot/contrib/model.py
@classmethod
def create(cls, diffsync, ids, attrs):
    """Create the ORM object corresponding to this diffsync object."""
    # Only diffsync cares about the distinction between ids and attrs, we do not.
    # Therefore, we merge the two into parameters.
    parameters = ids.copy()
    parameters.update(attrs)

    # This is in fact callable, because it is a model
    obj = cls._model()  # pylint: disable=not-callable

    try:
        cls._update_obj_with_parameters(obj, parameters, diffsync)
    except ObjectCrudException as error:
        raise ObjectNotCreated(error) from error

    return super().create(diffsync, ids, attrs)

delete()

Delete the ORM object corresponding to this diffsync object.

Source code in nautobot_ssot/contrib/model.py
def delete(self):
    """Delete the ORM object corresponding to this diffsync object."""
    try:
        obj = self.get_from_db()
    except ObjectCrudException as error:
        raise ObjectNotDeleted(error) from error
    try:
        obj.delete()
    except ProtectedError as error:
        raise ObjectNotDeleted(f"Couldn't delete {obj} as it is referenced by another object") from error
    return super().delete()

get_from_db()

Get the ORM object for this diffsync object from the database using the primary key.

Source code in nautobot_ssot/contrib/model.py
def get_from_db(self):
    """Get the ORM object for this diffsync object from the database using the primary key."""
    try:
        return self.diffsync.get_from_orm_cache({"pk": self.pk}, self._model)
    except self._model.DoesNotExist as error:
        raise ObjectCrudException(f"No such {self._model._meta.verbose_name} instance with PK {self.pk}") from error

get_queryset() classmethod

Get the queryset used to load the models data from Nautobot.

Source code in nautobot_ssot/contrib/model.py
@classmethod
def get_queryset(cls):
    """Get the queryset used to load the models data from Nautobot."""
    return cls._model.objects.all()

update(attrs)

Update the ORM object corresponding to this diffsync object.

Source code in nautobot_ssot/contrib/model.py
def update(self, attrs):
    """Update the ORM object corresponding to this diffsync object."""
    try:
        obj = self.get_from_db()
        self._update_obj_with_parameters(obj, attrs, self.diffsync)
    except ObjectCrudException as error:
        raise ObjectNotUpdated(error) from error
    return super().update(attrs)

nautobot_ssot.contrib.NautobotAdapter

Bases: DiffSync

Adapter for loading data from Nautobot through the ORM.

This adapter is able to infer how to load data from Nautobot based on how the models attached to it are defined.

Source code in nautobot_ssot/contrib/adapter.py
class NautobotAdapter(DiffSync):
    """
    Adapter for loading data from Nautobot through the ORM.

    This adapter is able to infer how to load data from Nautobot based on how the models attached to it are defined.
    """

    # This dictionary acts as an ORM cache.
    _cache: DefaultDict[str, Dict[ParameterSet, Model]]
    _cache_hits: DefaultDict[str, int] = defaultdict(int)

    def __init__(self, *args, job, sync=None, **kwargs):
        """Instantiate this class, but do not load data immediately from the local system."""
        super().__init__(*args, **kwargs)
        self.job = job
        self.sync = sync
        self.invalidate_cache()

    def invalidate_cache(self, zero_out_hits=True):
        """Invalidates all the objects in the ORM cache."""
        self._cache = defaultdict(dict)
        if zero_out_hits:
            self._cache_hits = defaultdict(int)

    def get_from_orm_cache(self, parameters: Dict, model_class: Type[Model]):
        """Retrieve an object from the ORM or the cache."""
        parameter_set = frozenset(parameters.items())
        content_type = ContentType.objects.get_for_model(model_class)
        model_cache_key = f"{content_type.app_label}.{content_type.model}"
        if cached_object := self._cache[model_cache_key].get(parameter_set):
            self._cache_hits[model_cache_key] += 1
            return cached_object
        # As we are using `get` here, this will error if there is not exactly one object that corresponds to the
        # parameter set. We intentionally pass these errors through.
        self._cache[model_cache_key][parameter_set] = model_class.objects.get(**dict(parameter_set))
        return self._cache[model_cache_key][parameter_set]

    @staticmethod
    def _get_parameter_names(diffsync_model):
        """Ignore the differences between identifiers and attributes, because at this point they don't matter to us."""
        return list(diffsync_model._identifiers) + list(diffsync_model._attributes)  # pylint: disable=protected-access

    def _load_objects(self, diffsync_model):
        """Given a diffsync model class, load a list of models from the database and return them."""
        parameter_names = self._get_parameter_names(diffsync_model)
        for database_object in diffsync_model._get_queryset():
            self._load_single_object(database_object, diffsync_model, parameter_names)

    def _handle_single_parameter(self, parameters, parameter_name, database_object, diffsync_model):
        type_hints = get_type_hints(diffsync_model, include_extras=True)
        # Handle custom fields and custom relationships. See CustomFieldAnnotation and CustomRelationshipAnnotation
        # docstrings for more details.
        is_custom_field = False
        custom_relationship_annotation = None
        metadata_for_this_field = getattr(type_hints[parameter_name], "__metadata__", [])
        for metadata in metadata_for_this_field:
            if isinstance(metadata, CustomFieldAnnotation):
                if metadata.name in database_object.cf:
                    parameters[parameter_name] = database_object.cf[metadata.key]
                is_custom_field = True
                break
            if isinstance(metadata, CustomRelationshipAnnotation):
                custom_relationship_annotation = metadata
                break
        if is_custom_field:
            return

        # Handling of foreign keys where the local side is the many and the remote side the one.
        # Note: This includes the side of a generic foreign key that has the foreign key, i.e.
        # the 'many' side.
        if "__" in parameter_name:
            if custom_relationship_annotation:
                parameters[parameter_name] = self._handle_custom_relationship_foreign_key(
                    database_object, parameter_name, custom_relationship_annotation
                )
            else:
                parameters[parameter_name] = self._handle_foreign_key(database_object, parameter_name)
            return

        # Handling of one- and many-to custom relationship fields:
        if custom_relationship_annotation:
            parameters[parameter_name] = self._handle_custom_relationship_to_many_relationship(
                database_object, diffsync_model, parameter_name, custom_relationship_annotation
            )
            return

        database_field = diffsync_model._model._meta.get_field(parameter_name)

        # Handling of one- and many-to-many non-custom relationship fields.
        # Note: This includes the side of a generic foreign key that constitutes the foreign key,
        # i.e. the 'one' side.
        if database_field.many_to_many or database_field.one_to_many:
            parameters[parameter_name] = self._handle_to_many_relationship(
                database_object, diffsync_model, parameter_name
            )
            return

        # Handling of normal fields - as this is the default case, set the attribute directly.
        if hasattr(self, f"load_param_{parameter_name}"):
            parameters[parameter_name] = getattr(self, f"load_param_{parameter_name}")(parameter_name, database_object)
        else:
            parameters[parameter_name] = getattr(database_object, parameter_name)

    def _load_single_object(self, database_object, diffsync_model, parameter_names):
        """Load a single diffsync object from a single database object."""
        parameters = {}
        for parameter_name in parameter_names:
            self._handle_single_parameter(parameters, parameter_name, database_object, diffsync_model)
        parameters["pk"] = database_object.pk
        try:
            diffsync_model = diffsync_model(**parameters)
        except pydantic.ValidationError as error:
            raise ValueError(f"Parameters: {parameters}") from error
        self.add(diffsync_model)

        self._handle_children(database_object, diffsync_model)
        return diffsync_model

    def _handle_children(self, database_object, diffsync_model):
        """Recurse through all the children for this model."""
        for children_parameter, children_field in diffsync_model._children.items():
            children = getattr(database_object, children_field).all()
            diffsync_model_child = self._get_diffsync_class(model_name=children_parameter)
            for child in children:
                parameter_names = self._get_parameter_names(diffsync_model_child)
                child_diffsync_object = self._load_single_object(child, diffsync_model_child, parameter_names)
                diffsync_model.add_child(child_diffsync_object)

    def load(self):
        """Generic implementation of the load function."""
        if not hasattr(self, "top_level") or not self.top_level:
            raise ValueError("'top_level' needs to be set on the class.")

        for model_name in self.top_level:
            diffsync_model = self._get_diffsync_class(model_name)

            # This function directly mutates the diffsync store, i.e. it will create and load the objects
            # for this specific model class as well as its children without returning anything.
            self._load_objects(diffsync_model)

    def _get_diffsync_class(self, model_name):
        """Given a model name, return the diffsync class."""
        try:
            diffsync_model = getattr(self, model_name)
        except AttributeError as error:
            raise AttributeError(
                f"Please define {model_name} to be the diffsync model on this adapter as a class level attribute."
            ) from error
        return diffsync_model

    def _handle_custom_relationship_to_many_relationship(
        self, database_object, diffsync_model, parameter_name, annotation
    ):
        # Introspect type annotations to deduce which fields are of interest
        # for this many-to-many relationship.
        diffsync_field_type = get_type_hints(diffsync_model)[parameter_name]
        inner_type = get_args(diffsync_field_type)[0]
        related_objects_list = []
        # TODO: Allow for filtering, i.e. not taking into account all the objects behind the relationship.
        relationship = self.get_from_orm_cache({"label": annotation.name}, Relationship)
        relationship_association_parameters = self._construct_relationship_association_parameters(
            annotation, database_object
        )
        relationship_associations = RelationshipAssociation.objects.filter(**relationship_association_parameters)

        field_name = ""
        field_name += "source" if annotation.side == RelationshipSideEnum.DESTINATION else "destination"
        field_name += "_"
        field_name += (
            relationship.source_type.app_label.lower()
            if annotation.side == RelationshipSideEnum.DESTINATION
            else relationship.destination_type.app_label.lower()
        )
        field_name += "_"
        field_name += (
            relationship.source_type.model.lower()
            if annotation.side == RelationshipSideEnum.DESTINATION
            else relationship.destination_type.model.lower()
        )

        for association in relationship_associations:
            related_object = getattr(
                association, "source" if annotation.side == RelationshipSideEnum.DESTINATION else "destination"
            )
            dictionary_representation = self._handle_typed_dict(inner_type, related_object)
            # Only use those where there is a single field defined, all 'None's will not help us.
            if any(dictionary_representation.values()):
                related_objects_list.append(dictionary_representation)

        # For one-to-many, we need to return an object, not a list of objects
        if (
            relationship.type == RelationshipTypeChoices.TYPE_ONE_TO_MANY
            and annotation.side == RelationshipSideEnum.DESTINATION
        ):
            if not related_objects_list:
                return None

            if len(related_objects_list) == 1:
                return related_objects_list[0]

            raise ObjectCrudException(
                f"More than one related objects for a {RelationshipTypeChoices.TYPE_ONE_TO_MANY} relationship: {related_objects_list}"
            )

        return related_objects_list

    @classmethod
    def _handle_typed_dict(cls, inner_type, related_object):
        """Handle a typed dict for many to many relationships.

        Args:
            inner_type: The typed dict.
            related_object: The related object
        Returns: The dictionary representation of `related_object` as described by `inner_type`.
        """
        dictionary_representation = {}
        for field_name in get_type_hints(inner_type):
            if "__" in field_name:
                dictionary_representation[field_name] = cls._handle_foreign_key(related_object, field_name)
                continue
            dictionary_representation[field_name] = getattr(related_object, field_name)
        return dictionary_representation

    def _construct_relationship_association_parameters(self, annotation, database_object):
        relationship = self.get_from_orm_cache({"label": annotation.name}, Relationship)
        relationship_association_parameters = {
            "relationship": relationship,
            "source_type": relationship.source_type,
            "destination_type": relationship.destination_type,
        }
        if annotation.side == RelationshipSideEnum.SOURCE:
            relationship_association_parameters["source_id"] = database_object.id
        else:
            relationship_association_parameters["destination_id"] = database_object.id
        return relationship_association_parameters

    @staticmethod
    def _handle_to_many_relationship(database_object, diffsync_model, parameter_name):
        """Handle a single one- or many-to-many relationship field.

        one- or many-to-many relationships are type annotated as a list of typed dictionaries. The typed
        dictionary type expresses, which attributes we are interested in for diffsync.

        :param database_object: The Django ORM database object
        :param diffsync_model: The diffsync model class (not specific object) for this ORM object
        :param parameter_name: The field name of the specific relationship to handle
        :return: A list of dictionaries which represent the related objects.

        :example:

        Example parameters:
        - a `nautobot.dcim.models.Interface` instance with two IP addresses assigned
          through the `ip_addresses` many-to-many relationship as `database_object`
        - an InterfaceModel class like the following `NautobotInterface` as `diffsync_model`

        ```python
        class IPAddressDict(TypedDict):
            host: str
            prefix_length: int


        class NautobotInterface(NautobotModel):
            _model = Interface
            _modelname = "interface"
            _identifiers = (
                "name",
                "device__name",
            )
            _attributes = ("ip_addresses",)

            name: str
            device__name: str
            ip_addresses: List[IPAddressDict] = []
        ```

        - a field name like `ip_addresses` as the `parameter_name`

        Example return list within the above input example:

        ```python
        [
          {"host": "192.0.2.0/25", "prefix_length": 25},
          {"host": "192.0.2.128/25", "prefix_length": 25},
        ]
        ```
        """
        # Introspect type annotations to deduce which fields are of interest
        # for this many-to-many relationship.
        inner_type = get_args(get_type_hints(diffsync_model)[parameter_name])[0]
        related_objects_list = []
        # TODO: Allow for filtering, i.e. not taking into account all the objects behind the relationship.
        for related_object in getattr(database_object, parameter_name).all():
            dictionary_representation = NautobotAdapter._handle_typed_dict(inner_type, related_object)
            # Only use those where there is a single field defined, all 'None's will not help us.
            if any(dictionary_representation.values()):
                related_objects_list.append(dictionary_representation)
        return related_objects_list

    def _handle_custom_relationship_foreign_key(
        self, database_object, parameter_name: str, annotation: CustomRelationshipAnnotation
    ):
        """Handle a single custom relationship foreign key field."""
        relationship_association_parameters = self._construct_relationship_association_parameters(
            annotation, database_object
        )

        relationship_association = RelationshipAssociation.objects.filter(**relationship_association_parameters)
        amount_of_relationship_associations = relationship_association.count()
        if amount_of_relationship_associations == 0:
            return None
        if amount_of_relationship_associations == 1:
            association = relationship_association.first()
            related_object = getattr(
                association, "source" if annotation.side == RelationshipSideEnum.DESTINATION else "destination"
            )
            # Discard the first part as there is no actual field on the model corresponding to that part.
            _, *lookups = parameter_name.split("__")
            for lookup in lookups[:-1]:
                related_object = getattr(related_object, lookup)
            return getattr(related_object, lookups[-1])
        raise ValueError("Foreign key custom relationship matched two associations - this shouldn't happen.")

    @staticmethod
    def _handle_foreign_key(database_object, parameter_name):
        """Handle a single foreign key field.

        Given the object from the database as well as the name of parameter in the form of
        f'{foreign_key_field_name}__{remote_field_name}'
        return the field at 'remote_field_name' on the object behind the foreign key at 'foreign_key_field_name'.

        Furthermore, 'remote_field_name' may be a series of '__' delimited lookups.

        :param database_object: The Django ORM database object
        :param parameter_name: The field name of the specific relationship to handle
        :return: If present, the object behind the (generic) foreign key, else None
        """
        related_model, *lookups = parameter_name.split("__")
        related_object = getattr(database_object, related_model)
        # If the foreign key does not point to anything, return None
        if not related_object:
            return None
        for lookup in lookups[:-1]:
            related_object = getattr(related_object, lookup)
            # If the foreign key does not point to anything, return None
            if not related_object:
                return None
        # Return the result of the last lookup directly.
        try:
            return getattr(related_object, lookups[-1])
        # If the lookup doesn't point anywhere, check whether it is using the convention for generic foreign keys.
        except AttributeError:
            if lookups[-1] in ["app_label", "model"]:
                return getattr(ContentType.objects.get_for_model(related_object), lookups[-1])
        return None

__init__(*args, job, sync=None, **kwargs)

Instantiate this class, but do not load data immediately from the local system.

Source code in nautobot_ssot/contrib/adapter.py
def __init__(self, *args, job, sync=None, **kwargs):
    """Instantiate this class, but do not load data immediately from the local system."""
    super().__init__(*args, **kwargs)
    self.job = job
    self.sync = sync
    self.invalidate_cache()

get_from_orm_cache(parameters, model_class)

Retrieve an object from the ORM or the cache.

Source code in nautobot_ssot/contrib/adapter.py
def get_from_orm_cache(self, parameters: Dict, model_class: Type[Model]):
    """Retrieve an object from the ORM or the cache."""
    parameter_set = frozenset(parameters.items())
    content_type = ContentType.objects.get_for_model(model_class)
    model_cache_key = f"{content_type.app_label}.{content_type.model}"
    if cached_object := self._cache[model_cache_key].get(parameter_set):
        self._cache_hits[model_cache_key] += 1
        return cached_object
    # As we are using `get` here, this will error if there is not exactly one object that corresponds to the
    # parameter set. We intentionally pass these errors through.
    self._cache[model_cache_key][parameter_set] = model_class.objects.get(**dict(parameter_set))
    return self._cache[model_cache_key][parameter_set]

invalidate_cache(zero_out_hits=True)

Invalidates all the objects in the ORM cache.

Source code in nautobot_ssot/contrib/adapter.py
def invalidate_cache(self, zero_out_hits=True):
    """Invalidates all the objects in the ORM cache."""
    self._cache = defaultdict(dict)
    if zero_out_hits:
        self._cache_hits = defaultdict(int)

load()

Generic implementation of the load function.

Source code in nautobot_ssot/contrib/adapter.py
def load(self):
    """Generic implementation of the load function."""
    if not hasattr(self, "top_level") or not self.top_level:
        raise ValueError("'top_level' needs to be set on the class.")

    for model_name in self.top_level:
        diffsync_model = self._get_diffsync_class(model_name)

        # This function directly mutates the diffsync store, i.e. it will create and load the objects
        # for this specific model class as well as its children without returning anything.
        self._load_objects(diffsync_model)

nautobot_ssot.jobs.base.DataSyncBaseJob

Bases: Job

Common base class for data synchronization jobs.

Works mostly as per the BaseJob API, with the following changes:

  • Concrete subclasses are responsible for implementing self.sync_data() (or related hooks), not self.run().
  • Subclasses may optionally define any Meta field supported by Jobs, as well as the following:
  • dryrun_default - defaults to True if unspecified
  • data_source and data_target as labels (by default, will use the name and/or "Nautobot" as appropriate)
  • data_source_icon and data_target_icon
Source code in nautobot_ssot/jobs/base.py
class DataSyncBaseJob(Job):  # pylint: disable=too-many-instance-attributes
    """Common base class for data synchronization jobs.

    Works mostly as per the BaseJob API, with the following changes:

    - Concrete subclasses are responsible for implementing `self.sync_data()` (or related hooks), **not** `self.run()`.
    - Subclasses may optionally define any Meta field supported by Jobs, as well as the following:
      - `dryrun_default` - defaults to True if unspecified
      - `data_source` and `data_target` as labels (by default, will use the `name` and/or "Nautobot" as appropriate)
      - `data_source_icon` and `data_target_icon`
    """

    dryrun = DryRunVar(description="Perform a dry-run, making no actual changes to Nautobot data.", default=True)
    memory_profiling = BooleanVar(description="Perform a memory profiling analysis.", default=False)

    def load_source_adapter(self):
        """Method to instantiate and load the SOURCE adapter into `self.source_adapter`.

        Relevant available instance attributes include:

        - self.job_result (as per Job API)
        """
        raise NotImplementedError

    def load_target_adapter(self):
        """Method to instantiate and load the TARGET adapter into `self.target_adapter`.

        Relevant available instance attributes include:

        - self.job_result (as per Job API)
        """
        raise NotImplementedError

    def calculate_diff(self):
        """Method to calculate the difference from SOURCE to TARGET adapter and store in `self.diff`.

        This is a generic implementation that you could overwrite completely in your custom logic.
        """
        if self.source_adapter is not None and self.target_adapter is not None:
            self.diff = self.source_adapter.diff_to(self.target_adapter, flags=self.diffsync_flags)
            self.sync.diff = {}
            self.sync.summary = self.diff.summary()
            self.sync.save()
            try:
                self.sync.diff = self.diff.dict()
                self.sync.save()
            except OperationalError:
                self.logger.warning("Unable to save JSON diff to the database; likely the diff is too large.")
                self.sync.refresh_from_db()
            self.logger.info(self.diff.summary())
        else:
            self.logger.warning("Not both adapters were properly initialized prior to diff calculation.")

    def execute_sync(self):
        """Method to synchronize the difference from `self.diff`, from SOURCE to TARGET adapter.

        This is a generic implementation that you could overwrite completely in your custom logic.
        """
        if self.source_adapter is not None and self.target_adapter is not None:
            self.source_adapter.sync_to(self.target_adapter, flags=self.diffsync_flags)
        else:
            self.logger.warning("Not both adapters were properly initialized prior to synchronization.")

    def sync_data(self, memory_profiling):
        """Method to load data from adapters, calculate diffs and sync (if not dry-run).

        It is composed by 4 methods:
        - self.load_source_adapter: instantiates the source adapter (self.source_adapter) and loads its data
        - self.load_target_adapter: instantiates the target adapter (self.target_adapter) and loads its data
        - self.calculate_diff: generates the diff from source to target adapter and stores it in self.diff
        - self.execute_sync: if not dry-run, uses the self.diff to synchronize from source to target

        This is a generic implementation that you could overwrite completely in you custom logic.
        Available instance attributes include:

        - self.sync       (Sync instance tracking this job execution)
        - self.job_result (as per Job API)
        """

        def record_memory_trace(step: str):
            """Helper function to record memory usage and reset tracemalloc stats."""
            memory_final, memory_peak = tracemalloc.get_traced_memory()
            setattr(self.sync, f"{step}_memory_final", memory_final)
            setattr(self.sync, f"{step}_memory_peak", memory_peak)
            self.sync.save()
            self.logger.info("Traced memory for %s (Final, Peak): %s bytes, %s bytes", step, memory_final, memory_peak)
            tracemalloc.clear_traces()

        if not self.sync:
            return

        if memory_profiling:
            tracemalloc.start()

        start_time = datetime.now()

        self.logger.info("Loading current data from source adapter...")
        self.load_source_adapter()
        load_source_adapter_time = datetime.now()
        self.sync.source_load_time = load_source_adapter_time - start_time
        self.sync.save()
        self.logger.info("Source Load Time from %s: %s", self.source_adapter, self.sync.source_load_time)
        if memory_profiling:
            record_memory_trace("source_load")

        self.logger.info("Loading current data from target adapter...")
        self.load_target_adapter()
        load_target_adapter_time = datetime.now()
        self.sync.target_load_time = load_target_adapter_time - load_source_adapter_time
        self.sync.save()
        self.logger.info("Target Load Time from %s: %s", self.target_adapter, self.sync.target_load_time)
        if memory_profiling:
            record_memory_trace("target_load")

        self.logger.info("Calculating diffs...")
        self.calculate_diff()
        calculate_diff_time = datetime.now()
        self.sync.diff_time = calculate_diff_time - load_target_adapter_time
        self.sync.save()
        self.logger.info("Diff Calculation Time: %s", self.sync.diff_time)
        if memory_profiling:
            record_memory_trace("diff")

        if self.dryrun:
            self.logger.info("As `dryrun` is set, skipping the actual data sync.")
        else:
            self.logger.info("Syncing from %s to %s...", self.source_adapter, self.target_adapter)
            self.execute_sync()
            execute_sync_time = datetime.now()
            self.sync.sync_time = execute_sync_time - calculate_diff_time
            self.sync.save()
            self.logger.info("Sync complete")
            self.logger.info("Sync Time: %s", self.sync.sync_time)
            if memory_profiling:
                record_memory_trace("sync")

    def lookup_object(self, model_name, unique_id) -> Optional[BaseModel]:  # pylint: disable=unused-argument
        """Look up the Nautobot record, if any, identified by the args.

        Optional helper method used to build more detailed/accurate SyncLogEntry records from DiffSync logs.

        Args:
            model_name (str): DiffSyncModel class name or similar class/model label.
            unique_id (str): DiffSyncModel unique_id or similar unique identifier.

        Returns:
            Optional[BaseModel]: Nautobot model instance, or None
        """
        return None

    @classmethod
    def data_mappings(cls) -> Iterable[DataMapping]:
        """List the data mappings involved in this sync job."""
        return []

    @classmethod
    def config_information(cls):
        """Return a dict of user-facing configuration information {property: value}.

        Note that this will be rendered 'as-is' in the UI, so as a general practice this
        should NOT include sensitive information such as passwords!
        """
        return {}

    def sync_log(  # pylint: disable=too-many-arguments
        self,
        action,
        status,
        message="",
        diff=None,
        synced_object=None,
        object_repr="",
    ):
        """Log a action message as a SyncLogEntry."""
        if synced_object and not object_repr:
            object_repr = repr(synced_object)

        SyncLogEntry.objects.create(
            sync=self.sync,
            action=action,
            status=status,
            message=message,
            diff=diff,
            synced_object=synced_object,
            object_repr=object_repr,
        )

    def _structlog_to_sync_log_entry(self, _logger, _log_method, event_dict):
        """Capture certain structlog messages from DiffSync into the Nautobot database."""
        if all(key in event_dict for key in ("src", "dst", "action", "model", "unique_id", "diffs", "status")):
            # The DiffSync log gives us a model name (string) and unique_id (string).
            # Try to look up the actual Nautobot object that this describes.
            synced_object = self.lookup_object(  # pylint: disable=assignment-from-none
                event_dict["model"], event_dict["unique_id"]
            )
            object_repr = repr(synced_object) if synced_object else f"{event_dict['model']} {event_dict['unique_id']}"
            self.sync_log(
                action=event_dict["action"] or SyncLogEntryActionChoices.ACTION_NO_CHANGE,
                diff=event_dict["diffs"] if event_dict["action"] else None,
                status=event_dict["status"],
                message=event_dict["event"],
                synced_object=synced_object,
                object_repr=object_repr,
            )

        return event_dict

    @classmethod
    def _get_vars(cls):
        """Extend Job._get_vars to include `dryrun` variable.

        Workaround for https://github.com/netbox-community/netbox/issues/5529
        """
        got_vars = super()._get_vars()
        if hasattr(cls, "dryrun"):
            got_vars["dryrun"] = cls.dryrun

        if hasattr(cls, "memory_profiling"):
            got_vars["memory_profiling"] = cls.memory_profiling
        return got_vars

    def __init__(self):
        """Initialize a Job."""
        super().__init__()
        self.sync = None
        self.diff = None
        self.source_adapter = None
        self.target_adapter = None
        # Default diffsync flags. You can overwrite them at any time.
        self.diffsync_flags = DiffSyncFlags.CONTINUE_ON_FAILURE | DiffSyncFlags.LOG_UNCHANGED_RECORDS

    @classmethod
    def as_form(cls, data=None, files=None, initial=None, approval_view=False):
        """Render this instance as a Django form for user inputs, including a "Dry run" field."""
        form = super().as_form(data=data, files=files, initial=initial, approval_view=approval_view)
        # Set the "dryrun" widget's initial value based on our Meta attribute, if any
        form.fields["dryrun"].initial = getattr(cls.Meta, "dryrun_default", True)
        return form

    @classproperty
    def data_source(cls):
        """The system or data source providing input data for this sync."""
        return getattr(cls.Meta, "data_source", cls.name)

    @classproperty
    def data_target(cls):
        """The system or data source being modified by this sync."""
        return getattr(cls.Meta, "data_target", cls.name)

    @classproperty
    def data_source_icon(cls):
        """Icon corresponding to the data_source."""
        return getattr(cls.Meta, "data_source_icon", None)

    @classproperty
    def data_target_icon(cls):
        """Icon corresponding to the data_target."""
        return getattr(cls.Meta, "data_target_icon", None)

    def run(self, dryrun, memory_profiling, *args, **kwargs):  # pylint:disable=arguments-differ
        """Job entry point from Nautobot - do not override!"""
        self.sync = Sync.objects.create(
            source=self.data_source,
            target=self.data_target,
            dry_run=dryrun,
            job_result=self.job_result,
            start_time=timezone.now(),
            diff={},
        )

        # Add _structlog_to_sync_log_entry as a processor for structlog calls from DiffSync
        structlog.configure(
            processors=[self._structlog_to_sync_log_entry, structlog.stdlib.render_to_log_kwargs],
            context_class=dict,
            logger_factory=structlog.stdlib.LoggerFactory(),
            wrapper_class=structlog.stdlib.BoundLogger,
            cache_logger_on_first_use=True,
        )
        self.sync_data(memory_profiling)

__init__()

Initialize a Job.

Source code in nautobot_ssot/jobs/base.py
def __init__(self):
    """Initialize a Job."""
    super().__init__()
    self.sync = None
    self.diff = None
    self.source_adapter = None
    self.target_adapter = None
    # Default diffsync flags. You can overwrite them at any time.
    self.diffsync_flags = DiffSyncFlags.CONTINUE_ON_FAILURE | DiffSyncFlags.LOG_UNCHANGED_RECORDS

as_form(data=None, files=None, initial=None, approval_view=False) classmethod

Render this instance as a Django form for user inputs, including a "Dry run" field.

Source code in nautobot_ssot/jobs/base.py
@classmethod
def as_form(cls, data=None, files=None, initial=None, approval_view=False):
    """Render this instance as a Django form for user inputs, including a "Dry run" field."""
    form = super().as_form(data=data, files=files, initial=initial, approval_view=approval_view)
    # Set the "dryrun" widget's initial value based on our Meta attribute, if any
    form.fields["dryrun"].initial = getattr(cls.Meta, "dryrun_default", True)
    return form

calculate_diff()

Method to calculate the difference from SOURCE to TARGET adapter and store in self.diff.

This is a generic implementation that you could overwrite completely in your custom logic.

Source code in nautobot_ssot/jobs/base.py
def calculate_diff(self):
    """Method to calculate the difference from SOURCE to TARGET adapter and store in `self.diff`.

    This is a generic implementation that you could overwrite completely in your custom logic.
    """
    if self.source_adapter is not None and self.target_adapter is not None:
        self.diff = self.source_adapter.diff_to(self.target_adapter, flags=self.diffsync_flags)
        self.sync.diff = {}
        self.sync.summary = self.diff.summary()
        self.sync.save()
        try:
            self.sync.diff = self.diff.dict()
            self.sync.save()
        except OperationalError:
            self.logger.warning("Unable to save JSON diff to the database; likely the diff is too large.")
            self.sync.refresh_from_db()
        self.logger.info(self.diff.summary())
    else:
        self.logger.warning("Not both adapters were properly initialized prior to diff calculation.")

config_information() classmethod

Return a dict of user-facing configuration information {property: value}.

Note that this will be rendered 'as-is' in the UI, so as a general practice this should NOT include sensitive information such as passwords!

Source code in nautobot_ssot/jobs/base.py
@classmethod
def config_information(cls):
    """Return a dict of user-facing configuration information {property: value}.

    Note that this will be rendered 'as-is' in the UI, so as a general practice this
    should NOT include sensitive information such as passwords!
    """
    return {}

data_mappings() classmethod

List the data mappings involved in this sync job.

Source code in nautobot_ssot/jobs/base.py
@classmethod
def data_mappings(cls) -> Iterable[DataMapping]:
    """List the data mappings involved in this sync job."""
    return []

data_source()

The system or data source providing input data for this sync.

Source code in nautobot_ssot/jobs/base.py
@classproperty
def data_source(cls):
    """The system or data source providing input data for this sync."""
    return getattr(cls.Meta, "data_source", cls.name)

data_source_icon()

Icon corresponding to the data_source.

Source code in nautobot_ssot/jobs/base.py
@classproperty
def data_source_icon(cls):
    """Icon corresponding to the data_source."""
    return getattr(cls.Meta, "data_source_icon", None)

data_target()

The system or data source being modified by this sync.

Source code in nautobot_ssot/jobs/base.py
@classproperty
def data_target(cls):
    """The system or data source being modified by this sync."""
    return getattr(cls.Meta, "data_target", cls.name)

data_target_icon()

Icon corresponding to the data_target.

Source code in nautobot_ssot/jobs/base.py
@classproperty
def data_target_icon(cls):
    """Icon corresponding to the data_target."""
    return getattr(cls.Meta, "data_target_icon", None)

execute_sync()

Method to synchronize the difference from self.diff, from SOURCE to TARGET adapter.

This is a generic implementation that you could overwrite completely in your custom logic.

Source code in nautobot_ssot/jobs/base.py
def execute_sync(self):
    """Method to synchronize the difference from `self.diff`, from SOURCE to TARGET adapter.

    This is a generic implementation that you could overwrite completely in your custom logic.
    """
    if self.source_adapter is not None and self.target_adapter is not None:
        self.source_adapter.sync_to(self.target_adapter, flags=self.diffsync_flags)
    else:
        self.logger.warning("Not both adapters were properly initialized prior to synchronization.")

load_source_adapter()

Method to instantiate and load the SOURCE adapter into self.source_adapter.

Relevant available instance attributes include:

  • self.job_result (as per Job API)
Source code in nautobot_ssot/jobs/base.py
def load_source_adapter(self):
    """Method to instantiate and load the SOURCE adapter into `self.source_adapter`.

    Relevant available instance attributes include:

    - self.job_result (as per Job API)
    """
    raise NotImplementedError

load_target_adapter()

Method to instantiate and load the TARGET adapter into self.target_adapter.

Relevant available instance attributes include:

  • self.job_result (as per Job API)
Source code in nautobot_ssot/jobs/base.py
def load_target_adapter(self):
    """Method to instantiate and load the TARGET adapter into `self.target_adapter`.

    Relevant available instance attributes include:

    - self.job_result (as per Job API)
    """
    raise NotImplementedError

lookup_object(model_name, unique_id)

Look up the Nautobot record, if any, identified by the args.

Optional helper method used to build more detailed/accurate SyncLogEntry records from DiffSync logs.

Parameters:

Name Type Description Default
model_name str

DiffSyncModel class name or similar class/model label.

required
unique_id str

DiffSyncModel unique_id or similar unique identifier.

required

Returns:

Type Description
Optional[BaseModel]

Optional[BaseModel]: Nautobot model instance, or None

Source code in nautobot_ssot/jobs/base.py
def lookup_object(self, model_name, unique_id) -> Optional[BaseModel]:  # pylint: disable=unused-argument
    """Look up the Nautobot record, if any, identified by the args.

    Optional helper method used to build more detailed/accurate SyncLogEntry records from DiffSync logs.

    Args:
        model_name (str): DiffSyncModel class name or similar class/model label.
        unique_id (str): DiffSyncModel unique_id or similar unique identifier.

    Returns:
        Optional[BaseModel]: Nautobot model instance, or None
    """
    return None

run(dryrun, memory_profiling, *args, **kwargs)

Job entry point from Nautobot - do not override!

Source code in nautobot_ssot/jobs/base.py
def run(self, dryrun, memory_profiling, *args, **kwargs):  # pylint:disable=arguments-differ
    """Job entry point from Nautobot - do not override!"""
    self.sync = Sync.objects.create(
        source=self.data_source,
        target=self.data_target,
        dry_run=dryrun,
        job_result=self.job_result,
        start_time=timezone.now(),
        diff={},
    )

    # Add _structlog_to_sync_log_entry as a processor for structlog calls from DiffSync
    structlog.configure(
        processors=[self._structlog_to_sync_log_entry, structlog.stdlib.render_to_log_kwargs],
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )
    self.sync_data(memory_profiling)

sync_data(memory_profiling)

Method to load data from adapters, calculate diffs and sync (if not dry-run).

It is composed by 4 methods: - self.load_source_adapter: instantiates the source adapter (self.source_adapter) and loads its data - self.load_target_adapter: instantiates the target adapter (self.target_adapter) and loads its data - self.calculate_diff: generates the diff from source to target adapter and stores it in self.diff - self.execute_sync: if not dry-run, uses the self.diff to synchronize from source to target

This is a generic implementation that you could overwrite completely in you custom logic. Available instance attributes include:

  • self.sync (Sync instance tracking this job execution)
  • self.job_result (as per Job API)
Source code in nautobot_ssot/jobs/base.py
def sync_data(self, memory_profiling):
    """Method to load data from adapters, calculate diffs and sync (if not dry-run).

    It is composed by 4 methods:
    - self.load_source_adapter: instantiates the source adapter (self.source_adapter) and loads its data
    - self.load_target_adapter: instantiates the target adapter (self.target_adapter) and loads its data
    - self.calculate_diff: generates the diff from source to target adapter and stores it in self.diff
    - self.execute_sync: if not dry-run, uses the self.diff to synchronize from source to target

    This is a generic implementation that you could overwrite completely in you custom logic.
    Available instance attributes include:

    - self.sync       (Sync instance tracking this job execution)
    - self.job_result (as per Job API)
    """

    def record_memory_trace(step: str):
        """Helper function to record memory usage and reset tracemalloc stats."""
        memory_final, memory_peak = tracemalloc.get_traced_memory()
        setattr(self.sync, f"{step}_memory_final", memory_final)
        setattr(self.sync, f"{step}_memory_peak", memory_peak)
        self.sync.save()
        self.logger.info("Traced memory for %s (Final, Peak): %s bytes, %s bytes", step, memory_final, memory_peak)
        tracemalloc.clear_traces()

    if not self.sync:
        return

    if memory_profiling:
        tracemalloc.start()

    start_time = datetime.now()

    self.logger.info("Loading current data from source adapter...")
    self.load_source_adapter()
    load_source_adapter_time = datetime.now()
    self.sync.source_load_time = load_source_adapter_time - start_time
    self.sync.save()
    self.logger.info("Source Load Time from %s: %s", self.source_adapter, self.sync.source_load_time)
    if memory_profiling:
        record_memory_trace("source_load")

    self.logger.info("Loading current data from target adapter...")
    self.load_target_adapter()
    load_target_adapter_time = datetime.now()
    self.sync.target_load_time = load_target_adapter_time - load_source_adapter_time
    self.sync.save()
    self.logger.info("Target Load Time from %s: %s", self.target_adapter, self.sync.target_load_time)
    if memory_profiling:
        record_memory_trace("target_load")

    self.logger.info("Calculating diffs...")
    self.calculate_diff()
    calculate_diff_time = datetime.now()
    self.sync.diff_time = calculate_diff_time - load_target_adapter_time
    self.sync.save()
    self.logger.info("Diff Calculation Time: %s", self.sync.diff_time)
    if memory_profiling:
        record_memory_trace("diff")

    if self.dryrun:
        self.logger.info("As `dryrun` is set, skipping the actual data sync.")
    else:
        self.logger.info("Syncing from %s to %s...", self.source_adapter, self.target_adapter)
        self.execute_sync()
        execute_sync_time = datetime.now()
        self.sync.sync_time = execute_sync_time - calculate_diff_time
        self.sync.save()
        self.logger.info("Sync complete")
        self.logger.info("Sync Time: %s", self.sync.sync_time)
        if memory_profiling:
            record_memory_trace("sync")

sync_log(action, status, message='', diff=None, synced_object=None, object_repr='')

Log a action message as a SyncLogEntry.

Source code in nautobot_ssot/jobs/base.py
def sync_log(  # pylint: disable=too-many-arguments
    self,
    action,
    status,
    message="",
    diff=None,
    synced_object=None,
    object_repr="",
):
    """Log a action message as a SyncLogEntry."""
    if synced_object and not object_repr:
        object_repr = repr(synced_object)

    SyncLogEntry.objects.create(
        sync=self.sync,
        action=action,
        status=status,
        message=message,
        diff=diff,
        synced_object=synced_object,
        object_repr=object_repr,
    )

nautobot_ssot.jobs.DataSource

Bases: DataSyncBaseJob

Base class for Jobs that sync data from another data source to Nautobot.

Source code in nautobot_ssot/jobs/base.py
class DataSource(DataSyncBaseJob):
    """Base class for Jobs that sync data **from** another data source **to** Nautobot."""

    @classproperty
    def data_target(cls):
        """For a DataSource this is always Nautobot."""
        return "Nautobot"

    @classproperty
    def data_target_icon(cls):
        """For a DataSource this is always the Nautobot logo."""
        return static("img/nautobot_logo.png")

data_target()

For a DataSource this is always Nautobot.

Source code in nautobot_ssot/jobs/base.py
@classproperty
def data_target(cls):
    """For a DataSource this is always Nautobot."""
    return "Nautobot"

data_target_icon()

For a DataSource this is always the Nautobot logo.

Source code in nautobot_ssot/jobs/base.py
@classproperty
def data_target_icon(cls):
    """For a DataSource this is always the Nautobot logo."""
    return static("img/nautobot_logo.png")

nautobot_ssot.jobs.DataTarget

Bases: DataSyncBaseJob

Base class for Jobs that sync data to another data target from Nautobot.

Source code in nautobot_ssot/jobs/base.py
class DataTarget(DataSyncBaseJob):
    """Base class for Jobs that sync data **to** another data target **from** Nautobot."""

    @classproperty
    def data_source(cls):
        """For a DataTarget this is always Nautobot."""
        return "Nautobot"

    @classproperty
    def data_source_icon(cls):
        """For a DataTarget this is always the Nautobot logo."""
        return static("img/nautobot_logo.png")

data_source()

For a DataTarget this is always Nautobot.

Source code in nautobot_ssot/jobs/base.py
@classproperty
def data_source(cls):
    """For a DataTarget this is always Nautobot."""
    return "Nautobot"

data_source_icon()

For a DataTarget this is always the Nautobot logo.

Source code in nautobot_ssot/jobs/base.py
@classproperty
def data_source_icon(cls):
    """For a DataTarget this is always the Nautobot logo."""
    return static("img/nautobot_logo.png")