-
Notifications
You must be signed in to change notification settings - Fork 31
/
vfx_render_farm_conversion.py
231 lines (186 loc) · 7.88 KB
/
vfx_render_farm_conversion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""Demonstrating how to convert a flowpipe graph to a render farm job.
This guide expects that your render farm can handle dependencies between tasks.
"""
import json
import logging
import os
from tempfile import gettempdir
from flowpipe import Graph, INode, Node
# -----------------------------------------------------------------------------
#
# Necessary utilities
#
# -----------------------------------------------------------------------------
class JsonDatabase:
"""The Database stores the JSON-serialized nodes.
The storage can also be handled via a database, this is just the easiest
way for demonstrational purposes. In production, a file based storage also
has advantages for debugging and allows for easy hacking by just altering
the JSON files directly.
"""
PATH = os.path.join(gettempdir(), "json-database", "{identifier}.json")
@staticmethod
def set(node):
"""Store the node under it's identifier."""
serialized_json = JsonDatabase.PATH.format(identifier=node.identifier)
if not os.path.exists(os.path.dirname(serialized_json)):
os.makedirs(os.path.dirname(serialized_json))
with open(serialized_json, "w") as f:
json.dump(node.serialize(), f, indent=2)
return serialized_json
@staticmethod
def get(identifier):
"""Retrieve the node behind the given identifier."""
serialized_json = JsonDatabase.PATH.format(identifier=identifier)
with open(serialized_json, "r") as f:
data = json.load(f)
return INode.deserialize(data)
# Command templates to execute a flowpipe node in the terminal.
# Uses different python interpreters and commands based on the host application
# The template just needs the path to the serialized json file and optionally
# a range of frames passed to the node for the implicit batch conversion.
COMMANDS = {
"python": (
"python -c '"
"from my_farm import conversion;"
'conversion.evaluate_on_farm("{serialized_json}", {frames})\''
),
"maya": (
"mayapy -c '"
"import maya.standalone;"
'maya.standalone.initialize(name="python");'
"from my_farm import conversion;"
'conversion.evaluate_on_farm("{serialized_json}", {frames})\''
),
}
def convert_graph_to_job(graph):
"""Convert the graph to a dict representing a typical render farm job."""
job = {"name": graph.name, "tasks": []}
# Turn every node into a farm task
tasks = {}
for node in graph.nodes:
serialized_json = JsonDatabase.set(node)
tasks[node.name] = []
# IMPLICIT BATCHING:
# Create individual tasks for each batch if the batch size is defined
# Feed the calculated frame range to each batch
if node.metadata.get("batch_size") is not None:
batch_size = node.metadata["batch_size"]
frames = node.inputs["frames"].value
i = 0
while i < len(frames) - 1:
end = i + batch_size
if end > len(frames) - 1:
end = len(frames)
f = frames[i:end]
task = {"name": "{0}-{1}".format(node.name, i / batch_size)}
command = COMMANDS.get(
node.metadata.get("interpreter", "python"), None
)
task["command"] = command.format(
serialized_json=serialized_json, frames=f
)
job["tasks"].append(task)
tasks[node.name].append(task)
i += batch_size
else:
task = {"name": node.name}
command = COMMANDS.get(
node.metadata.get("interpreter", "python"), None
)
task["command"] = command.format(
serialized_json=serialized_json, frames=None
)
job["tasks"].append(task)
tasks[node.name].append(task)
# The dependencies between the tasks based on the connections of the Nodes
for node_name in tasks:
for task in tasks[node_name]:
node = graph[node_name]
task["dependencies"] = []
for upstream in [n.name for n in node.upstream_nodes]:
task["dependencies"] += [t["name"] for t in tasks[upstream]]
return job
def evaluate_on_farm(serialized_json, frames=None):
"""Evaluate the node behind the given json file.
1. Deserialize the node
2. Collect any input values from any upstream dependencies
For implicit batching, the given frames are assigned to the node,
overriding whatever might be stored in the json file, becuase all
batches share the same json file.
3. Evaluate the node
4. Serialize the node back into its original file
For implicit farm conversion, the serialization only happens once,
for the 'last' batch, knowing that the last batch in numbers might
not be the 'last' batch actually executed.
"""
# Debug logs might be useful on the farm
logging.baseConfig.setLevel(logging.DEBUG)
# Deserialize the node from the serialized json
with open(serialized_json, "r") as f:
data = json.load(f)
node = INode.deserialize(data)
# Retrieve the upstream output data
for name, input_plug in data["inputs"].items():
for identifier, output_plug in input_plug["connections"].items():
upstream_node = JsonDatabase.get(identifier)
node.inputs[name].value = upstream_node.outputs[output_plug].value
# Specifically assign the batch frames here if applicable
if frames is not None:
all_frames = node.inputs["frames"]
node.inputs["frames"] = frames
# Actually evalute the node
node.evaluate()
# Store the result back into the same file ONLY once
# ALL batch processes access the same json file so the result is only stored
# for the last batch, knowing that the last batch in numbers might not be
# the last batch actually executed
if frames is not None and frames[-1] != all_frames[-1]:
return
with open(serialized_json, "w") as f:
json.dump(node.serialize(), f, indent=2)
# -----------------------------------------------------------------------------
#
# Examples
#
# -----------------------------------------------------------------------------
@Node(outputs=["renderings"], metadata={"interpreter": "maya"})
def MayaRender(frames, scene_file):
"""Render the given frames from the given scene.."""
return {"renderings": "/renderings/file.%04d.exr"}
@Node(outputs=["status"])
def UpdateDatabase(id_, images):
"""Update the database entries of the given asset with the given data."""
return {"status": True}
def implicit_batching(frames, batch_size):
"""Batches are created during the farm conversion."""
graph = Graph(name="Rendering")
render = MayaRender(
graph=graph,
frames=list(range(frames)),
scene_file="/scene/for/rendering.ma",
metadata={"batch_size": batch_size},
)
update = UpdateDatabase(graph=graph, id_=123456)
render.outputs["renderings"].connect(update.inputs["images"])
print(graph)
print(json.dumps(convert_graph_to_job(graph), indent=2))
def explicit_batching(frames, batch_size):
"""Batches are already part of the graph."""
graph = Graph(name="Rendering")
update_database = UpdateDatabase(graph=graph, id_=123456)
for i in range(0, frames, batch_size):
maya_render = MayaRender(
name="MayaRender{0}-{1}".format(i, i + batch_size),
graph=graph,
frames=list(range(i, i + batch_size)),
scene_file="/scene/for/rendering.ma",
)
maya_render.outputs["renderings"].connect(
update_database.inputs["images"][str(i)]
)
print(graph)
print(json.dumps(convert_graph_to_job(graph), indent=2))
if __name__ == "__main__":
implicit_batching(30, 10)
explicit_batching(30, 10)