Skip to content

mode.utils.graphs

DependencyGraph

Bases: DependencyGraphT

A directed acyclic graph of objects and their dependencies.

Supports a robust topological sort to detect the order in which they must be handled.

Takes an optional iterator of (obj, dependencies) tuples to build the graph from.

Warning

Does not support cycle detection.

Source code in mode/utils/graphs/graph.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class DependencyGraph(DependencyGraphT):
    """A directed acyclic graph of objects and their dependencies.

    Supports a robust topological sort
    to detect the order in which they must be handled.

    Takes an optional iterator of ``(obj, dependencies)``
    tuples to build the graph from.

    Warning:
        Does not support cycle detection.
    """

    adjacent: MutableMapping

    def __init__(
        self,
        it: Optional[Iterable] = None,
        formatter: GraphFormatterT[_T] = None,
    ) -> None:
        self.formatter = formatter or GraphFormatter()
        self.adjacent = {}
        if it is not None:
            self.update(it)

    def add_arc(self, obj: _T) -> None:
        """Add an object to the graph."""
        self.adjacent.setdefault(obj, [])

    def add_edge(self, A: _T, B: _T) -> None:
        """Add an edge from object ``A`` to object ``B``.

        I.e. ``A`` depends on ``B``.
        """
        self[A].append(B)

    def connect(self, graph: DependencyGraphT[_T]) -> None:
        """Add nodes from another graph."""
        self.adjacent.update(graph.adjacent)

    def topsort(self) -> Sequence:
        """Sort the graph topologically.

        Returns:
            List: of objects in the order in which they must be handled.
        """
        graph = DependencyGraph()
        components = self._tarjan72()

        NC = {
            node: component for component in components for node in component
        }
        for component in components:
            graph.add_arc(component)
        for node in self:
            node_c = NC[node]
            for successor in self[node]:
                successor_c = NC[successor]
                if node_c != successor_c:
                    graph.add_edge(node_c, successor_c)
        return [t[0] for t in graph._khan62()]

    def valency_of(self, obj: _T) -> int:
        """Return the valency (degree) of a vertex in the graph."""
        try:
            sizes = [len(self[obj])]
        except KeyError:
            return 0
        for node in self[obj]:
            sizes.append(self.valency_of(node))
        return sum(sizes)

    def update(self, it: Iterable) -> None:
        """Update graph with data from a list of ``(obj, deps)`` tuples."""
        tups = list(it)
        for obj, _ in tups:
            self.add_arc(obj)
        for obj, deps in tups:
            for dep in deps:
                self.add_edge(obj, dep)

    def edges(self) -> Iterable:
        """Return generator that yields for all edges in the graph."""
        return (obj for obj, adj in self.items() if adj)

    def _khan62(self) -> Sequence:
        """Perform Khan's simple topological sort algorithm from '62.

        See https://en.wikipedia.org/wiki/Topological_sorting
        """
        count: Counter[Any] = Counter()
        result = []

        for node in self:
            for successor in self[node]:
                count[successor] += 1
        ready = [node for node in self if not count[node]]

        while ready:
            node = ready.pop()
            result.append(node)

            for successor in self[node]:
                count[successor] -= 1
                if count[successor] == 0:
                    ready.append(successor)
        result.reverse()
        return result

    def _tarjan72(self) -> Sequence:
        """Perform Tarjan's algorithm to find strongly connected components.

        See Also:
            :wikipedia:`Tarjan%27s_strongly_connected_components_algorithm`
        """
        result: List = []
        stack: List = []
        low: List = []

        def visit(node: Any) -> None:
            if node in low:
                return
            num = len(low)
            low[node] = num
            stack_pos = len(stack)
            stack.append(node)

            for successor in self[node]:
                visit(successor)
                low[node] = min(low[node], low[successor])

            if num == low[node]:
                component = tuple(stack[stack_pos:])
                stack[stack_pos:] = []
                result.append(component)
                for item in component:
                    low[item] = len(self)

        for node in self:
            visit(node)

        return result

    def to_dot(self, fh: IO, *, formatter: GraphFormatterT[_T] = None) -> None:
        """Convert the graph to DOT format.

        Arguments:
            fh (IO): A file, or a file-like object to write the graph to.
            formatter (celery.utils.graph.GraphFormatter): Custom graph
                formatter to use.
        """
        seen: Set = set()
        draw = formatter or self.formatter
        write = partial(print, file=fh)

        def if_not_seen(fun: Callable[[Any], str], obj: Any) -> None:
            label = draw.label(obj)
            if label not in seen:
                write(fun(obj))
                seen.add(label)

        write(draw.head())
        for obj, adjacent in self.items():
            if not adjacent:
                if_not_seen(draw.terminal_node, obj)
            for req in adjacent:
                if_not_seen(draw.node, obj)
                write(draw.edge(obj, req))
        write(draw.tail())

    def __iter__(self) -> Iterator:
        return iter(self.adjacent)

    def __getitem__(self, node: _T) -> Any:
        return self.adjacent[node]

    def __len__(self) -> int:
        return len(self.adjacent)

    def __contains__(self, obj: _T) -> bool:
        return obj in self.adjacent

    def items(self) -> ItemsView:
        return cast(ItemsView, self.adjacent.items())

    def __repr__(self) -> str:
        return "\n".join(self._repr_node(N) for N in self)

    def _repr_node(
        self, obj: _T, level: int = 1, fmt: str = "{0}({1})"
    ) -> str:
        output = [fmt.format(obj, self.valency_of(obj))]
        if obj in self:
            for other in self[obj]:
                d = fmt.format(other, self.valency_of(other))
                output.append("     " * level + d)
                output.extend(
                    self._repr_node(other, level + 1).split("\n")[1:]
                )
        return "\n".join(output)

add_arc(obj)

Add an object to the graph.

Source code in mode/utils/graphs/graph.py
54
55
56
def add_arc(self, obj: _T) -> None:
    """Add an object to the graph."""
    self.adjacent.setdefault(obj, [])

add_edge(A, B)

Add an edge from object A to object B.

I.e. A depends on B.

Source code in mode/utils/graphs/graph.py
58
59
60
61
62
63
def add_edge(self, A: _T, B: _T) -> None:
    """Add an edge from object ``A`` to object ``B``.

    I.e. ``A`` depends on ``B``.
    """
    self[A].append(B)

connect(graph)

Add nodes from another graph.

Source code in mode/utils/graphs/graph.py
65
66
67
def connect(self, graph: DependencyGraphT[_T]) -> None:
    """Add nodes from another graph."""
    self.adjacent.update(graph.adjacent)

edges()

Return generator that yields for all edges in the graph.

Source code in mode/utils/graphs/graph.py
110
111
112
def edges(self) -> Iterable:
    """Return generator that yields for all edges in the graph."""
    return (obj for obj, adj in self.items() if adj)

to_dot(fh, *, formatter=None)

Convert the graph to DOT format.

Parameters:

Name Type Description Default
fh IO

A file, or a file-like object to write the graph to.

required
formatter GraphFormatter

Custom graph formatter to use.

None
Source code in mode/utils/graphs/graph.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def to_dot(self, fh: IO, *, formatter: GraphFormatterT[_T] = None) -> None:
    """Convert the graph to DOT format.

    Arguments:
        fh (IO): A file, or a file-like object to write the graph to.
        formatter (celery.utils.graph.GraphFormatter): Custom graph
            formatter to use.
    """
    seen: Set = set()
    draw = formatter or self.formatter
    write = partial(print, file=fh)

    def if_not_seen(fun: Callable[[Any], str], obj: Any) -> None:
        label = draw.label(obj)
        if label not in seen:
            write(fun(obj))
            seen.add(label)

    write(draw.head())
    for obj, adjacent in self.items():
        if not adjacent:
            if_not_seen(draw.terminal_node, obj)
        for req in adjacent:
            if_not_seen(draw.node, obj)
            write(draw.edge(obj, req))
    write(draw.tail())

topsort()

Sort the graph topologically.

Returns:

Name Type Description
List Sequence

of objects in the order in which they must be handled.

Source code in mode/utils/graphs/graph.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def topsort(self) -> Sequence:
    """Sort the graph topologically.

    Returns:
        List: of objects in the order in which they must be handled.
    """
    graph = DependencyGraph()
    components = self._tarjan72()

    NC = {
        node: component for component in components for node in component
    }
    for component in components:
        graph.add_arc(component)
    for node in self:
        node_c = NC[node]
        for successor in self[node]:
            successor_c = NC[successor]
            if node_c != successor_c:
                graph.add_edge(node_c, successor_c)
    return [t[0] for t in graph._khan62()]

update(it)

Update graph with data from a list of (obj, deps) tuples.

Source code in mode/utils/graphs/graph.py
101
102
103
104
105
106
107
108
def update(self, it: Iterable) -> None:
    """Update graph with data from a list of ``(obj, deps)`` tuples."""
    tups = list(it)
    for obj, _ in tups:
        self.add_arc(obj)
    for obj, deps in tups:
        for dep in deps:
            self.add_edge(obj, dep)

valency_of(obj)

Return the valency (degree) of a vertex in the graph.

Source code in mode/utils/graphs/graph.py
91
92
93
94
95
96
97
98
99
def valency_of(self, obj: _T) -> int:
    """Return the valency (degree) of a vertex in the graph."""
    try:
        sizes = [len(self[obj])]
    except KeyError:
        return 0
    for node in self[obj]:
        sizes.append(self.valency_of(node))
    return sum(sizes)

GraphFormatter

Bases: GraphFormatterT

Format dependency graphs.

Source code in mode/utils/graphs/formatter.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class GraphFormatter(GraphFormatterT):
    """Format dependency graphs."""

    _attr = DOT.ATTR.strip()
    _node = DOT.NODE.strip()
    _edge = DOT.EDGE.strip()
    _head = DOT.HEAD.strip()
    _tail = DOT.TAIL.strip()
    _attrsep = DOT.ATTRSEP
    _dirs: ClassVar[Dict] = dict(DOT.DIRS)

    scheme: Mapping[str, Any] = {
        "shape": "box",
        "arrowhead": "vee",
        "style": "filled",
        "fontname": "HelveticaNeue",
    }
    edge_scheme: Mapping[str, Any] = {
        "color": "darkseagreen4",
        "arrowcolor": "black",
        "arrowsize": 0.7,
    }
    node_scheme: Mapping[str, Any] = {
        "fillcolor": "palegreen3",
        "color": "palegreen4",
    }
    term_scheme: Mapping[str, Any] = {
        "fillcolor": "palegreen1",
        "color": "palegreen2",
    }
    graph_scheme: Mapping[str, Any] = {"bgcolor": "mintcream"}

    def __init__(
        self,
        root: Any = None,
        type: Optional[str] = None,
        id: Optional[str] = None,
        indent: int = 0,
        inw: str = " " * 4,
        **scheme: Any,
    ) -> None:
        self.id = id or "dependencies"
        self.root = root
        self.type = type or "digraph"
        self.direction = self._dirs[self.type]
        self.IN = inw * (indent or 0)
        self.INp = self.IN + inw
        self.scheme = dict(self.scheme, **scheme)
        self.graph_scheme = dict(self.graph_scheme, root=self.label(self.root))

    def attr(self, name: str, value: Any) -> str:
        return self.FMT(self._attr, name=name, value=f'"{value}"')

    def attrs(
        self, d: Optional[Mapping] = None, scheme: Optional[Mapping] = None
    ) -> str:
        scheme = {**self.scheme, **scheme} if scheme else self.scheme
        d = {**scheme, **d} if d else scheme
        return self._attrsep.join(str(self.attr(k, v)) for k, v in d.items())

    def head(self, **attrs: Any) -> str:
        return self.FMT(
            self._head,
            id=self.id,
            type=self.type,
            attrs=self.attrs(attrs, self.graph_scheme),
        )

    def tail(self) -> str:
        return self.FMT(self._tail)

    def label(self, obj: _T) -> str:
        return label(obj)

    def node(self, obj: _T, **attrs: Any) -> str:
        return self.draw_node(obj, self.node_scheme, attrs)

    def terminal_node(self, obj: _T, **attrs: Any) -> str:
        return self.draw_node(obj, self.term_scheme, attrs)

    def edge(self, a: _T, b: _T, **attrs: Any) -> str:
        return self.draw_edge(a, b, **attrs)

    def _enc(self, s: str) -> str:
        return s.encode("utf-8", "ignore").decode()

    def FMT(self, fmt: str, *args: Any, **kwargs: Any) -> str:
        return self._enc(
            fmt.format(*args, **dict(kwargs, IN=self.IN, INp=self.INp))
        )

    def draw_edge(
        self,
        a: _T,
        b: _T,
        scheme: Optional[Mapping] = None,
        attrs: Optional[Mapping] = None,
    ) -> str:
        return self.FMT(
            self._edge,
            self.label(a),
            self.label(b),
            dir=self.direction,
            attrs=self.attrs(attrs, self.edge_scheme),
        )

    def draw_node(
        self,
        obj: _T,
        scheme: Optional[Mapping] = None,
        attrs: Optional[Mapping] = None,
    ) -> str:
        return self.FMT(
            self._node, self.label(obj), attrs=self.attrs(attrs, scheme)
        )