Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean stream upon connect #576

Merged
merged 5 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions caster-back/gencaster/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ async def graph(
yield graph # type: ignore

async for graph_update in GenCasterChannel.receive_graph_updates(
info.context.ws, graph_uuid
info.context["ws"], graph_uuid
):
yield await story_graph_models.Graph.objects.aget(uuid=graph_update.uuid) # type: ignore

Expand All @@ -564,7 +564,7 @@ async def node(
yield node # type: ignore

async for node_update in GenCasterChannel.receive_node_updates(
info.context.ws, node_uuid
info.context["ws"], node_uuid
):
yield await story_graph_models.Node.objects.aget(uuid=node_update.uuid) # type: ignore

Expand All @@ -583,15 +583,15 @@ async def stream_info(
if a given stream is free or used.
Upon connection stop this will be decremented again.
"""
consumer: GraphQLWSConsumerInjector = info.context.ws
consumer: GraphQLWSConsumerInjector = info.context["ws"]

graph = await story_graph_models.Graph.objects.aget(uuid=graph_uuid)

graph = await story_graph_models.Graph.objects.filter(uuid=graph_uuid).afirst()
if not graph:
raise Exception("could not find graph!")
try:
stream = await stream_models.Stream.objects.aget_free_stream(graph)
log.info(f"Attached to stream {stream.uuid}")
except NoStreamAvailableException:
log.error(f"No stream is available for graph {graph.name}")
yield NoStreamAvailable()
return

Expand All @@ -610,16 +610,23 @@ async def cleanup_on_stop(**kwargs: Dict[str, str]):
await cleanup()

with db_logging.LogContext(db_logging.LogKeyEnum.STREAM, stream):
await stream.increment_num_listeners()

engine = Engine(
graph=graph,
stream=stream,
)

await stream.increment_num_listeners()

consumer.disconnect_callback = cleanup
consumer.receive_callback = cleanup_on_stop

# send a first stream info response so the front-end has
# received information that streaming has/can be started,
# see https://github.com/Gencaster/gencaster/issues/483
# otherwise this can result in a dead end if we await
# a stream variable which is set from the frontend
yield StreamInfo(stream=stream, stream_instruction=None) # type: ignore

async for instruction in engine.start(max_steps=int(10e4)):
if type(instruction) == Dialog:
yield instruction
Expand All @@ -641,7 +648,7 @@ async def stream_logs(self, info: Info, stream_uuid: Optional[uuid.UUID] = None,
yield stream_log # type: ignore

async for log_update in GenCasterChannel.receive_stream_log_updates(
info.context.ws,
info.context["ws"],
):
if stream_uuid:
if str(log_update.stream_uuid) != str(stream_uuid):
Expand All @@ -665,7 +672,7 @@ async def get_streams() -> List[Stream]:

yield await get_streams()

async for _ in GenCasterChannel.receive_streams_updates(info.context.ws):
async for _ in GenCasterChannel.receive_streams_updates(info.context["ws"]):
yield await get_streams()


Expand Down
1 change: 1 addition & 0 deletions caster-back/osc_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def get_dispatcher(self) -> Dispatcher:
port = int(os.environ.get("BACKEND_OSC_PORT", 7000))

logging_level = os.environ.get("BACKEND_OSC_LOG_LEVEL", "INFO")
log.setLevel(logging_level)

server = OSCServer()
server.serve_blocking(port=port)
37 changes: 36 additions & 1 deletion caster-back/story_graph/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,36 @@ class Engine:
The engine runs in an async manner so it is possible to do awaits without
blocking the server, which means execution is halted until a specific
condition is met.

:param graph: The graph to execute
:param stream: The stream where the graph should be executed on
:param raise_exceptions: Decides if an exception within e.g. a Python script cell
can bring down the execution or if it ignores it but logs it.
Defaults to False so an invalid Python script cell does not stop the whole graph.
:param run_cleanup_procedure: If ``True`` it executes ``CmdPeriod.run`` on the SuperCollider
server in order to clear all running sounds, patterns and any left running tasks,
creating a clean environment.
The default is ``None`` which will derive the necessary action based
if there are already users on the stream (in which case no reset will be executed).
"""

def __init__(
self, graph: Graph, stream: Stream, raise_exceptions: bool = False
self,
graph: Graph,
stream: Stream,
raise_exceptions: bool = False,
run_cleanup_procedure: Optional[bool] = None,
) -> None:
self.graph: Graph = graph
self.stream = stream
self._current_node: Node
self.blocking_time: int = 60 * 60 * 3
self.raise_exceptions = raise_exceptions
self.run_cleanup_procedure: bool
if run_cleanup_procedure is not None:
self.run_cleanup_procedure = run_cleanup_procedure
else:
self.run_cleanup_procedure = self.stream.num_listeners == 0
log.debug(f"Started engine for graph {self.graph.uuid}")

async def get_stream_variables(self) -> Dict[str, str]:
Expand Down Expand Up @@ -396,6 +416,18 @@ async def get_next_node(self) -> Node:
except AttributeError:
raise GraphDeadEnd()

async def cleanup_sc_procedure(self) -> StreamInstruction:
log.debug("Run cleanup procedure on graph")
# do not wait for the execution because the OSC receiver callback
# may b down because of CmdPeriod and it takes time to recover
# from CmdPeriod
instruction = await sync_to_async(
self.stream.stream_point.send_raw_instruction
)("0.01.wait;CmdPeriod.run;0.01.wait;")
# wait for the CmdPeriod to re-init the OSC receiver callback
await asyncio.sleep(0.2)
return instruction

async def start(
self, max_steps: int = 1000
) -> AsyncGenerator[Union[StreamInstruction, Dialog, GraphDeadEnd], None]:
Expand All @@ -411,6 +443,9 @@ async def start(
"""
self._current_node = await self.graph.aget_entry_node()

if self.run_cleanup_procedure:
await self.cleanup_sc_procedure()

for _ in range(max_steps):
async for instruction in self.execute_node(self._current_node):
yield instruction
Expand Down
48 changes: 36 additions & 12 deletions caster-back/story_graph/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,9 @@ async def test_yield_dialog(self):
"yield Dialog(title='Hello', content=[Text(text='Hello World')], buttons=[Button.ok()])"
)

engine = Engine(self.graph, self.stream, raise_exceptions=True)
engine = Engine(
self.graph, self.stream, raise_exceptions=True, run_cleanup_procedure=False
)
x = engine.start().__aiter__()
dialog: Dialog = await asyncio.wait_for(x.__anext__(), 0.2) # type: ignore
with self.assertRaises(StopAsyncIteration):
Expand All @@ -573,7 +575,9 @@ async def test_execute_markdown_code(self, speak_mock: mock.MagicMock):
None,
cell_type=CellType.MARKDOWN,
)
engine = Engine(self.graph, self.stream, raise_exceptions=True)
engine = Engine(
self.graph, self.stream, raise_exceptions=True, run_cleanup_procedure=False
)
with mock.patch.object(engine, "wait_for_finished_instruction") as patch:
x = engine.start().__aiter__()
with self.assertRaises(StopAsyncIteration):
Expand All @@ -593,7 +597,9 @@ async def text_execute_sc_code(self, sc_instruction_mock: mock.MagicMock):
None,
cell_type=CellType.SUPERCOLLIDER,
)
engine = Engine(self.graph, self.stream, raise_exceptions=True)
engine = Engine(
self.graph, self.stream, raise_exceptions=True, run_cleanup_procedure=False
)
with mock.patch.object(engine, "wait_for_finished_instruction") as patch:
x = engine.start().__aiter__()
with self.assertRaises(StopAsyncIteration):
Expand All @@ -612,7 +618,9 @@ async def test_execute_audio_cell(self, play_audio_file_mock: mock.MagicMock):
cell_type=CellType.AUDIO,
cell_kwargs={"audio_cell": audio_cell},
)
engine = Engine(self.graph, self.stream, raise_exceptions=True)
engine = Engine(
self.graph, self.stream, raise_exceptions=True, run_cleanup_procedure=False
)
with mock.patch.object(engine, "wait_for_finished_instruction") as patch:
x = engine.start().__aiter__()
with self.assertRaises(StopAsyncIteration):
Expand All @@ -630,7 +638,9 @@ async def test_wait_for_finished_instruction_timeout(self):
cell_code="2+2",
cell_type=CellType.SUPERCOLLIDER,
)
engine = Engine(self.graph, self.stream, raise_exceptions=True)
engine = Engine(
self.graph, self.stream, raise_exceptions=True, run_cleanup_procedure=False
)
instruction = StreamInstruction(
stream_point=self.stream.stream_point,
state=StreamInstruction.InstructionState.SENT,
Expand Down Expand Up @@ -659,7 +669,9 @@ async def set_instruction_finished_with_delay(
cell_code="2+2",
cell_type=CellType.SUPERCOLLIDER,
)
engine = Engine(self.graph, self.stream, raise_exceptions=True)
engine = Engine(
self.graph, self.stream, raise_exceptions=True, run_cleanup_procedure=False
)
instruction = StreamInstruction(
stream_point=self.stream.stream_point,
state=StreamInstruction.InstructionState.SENT,
Expand All @@ -684,7 +696,9 @@ async def test_evaluate_python_code(self):
cell_code="2+2",
cell_type=CellType.SUPERCOLLIDER,
)
engine = Engine(self.graph, self.stream, raise_exceptions=True)
engine = Engine(
self.graph, self.stream, raise_exceptions=True, run_cleanup_procedure=False
)

with self.assertRaises(InvalidPythonCode):
await engine._evaluate_python_code("2+")
Expand All @@ -701,7 +715,9 @@ async def test_get_next_node(self):
None,
cell_type=CellType.PYTHON,
)
engine = Engine(self.graph, self.stream, raise_exceptions=True)
engine = Engine(
self.graph, self.stream, raise_exceptions=True, run_cleanup_procedure=False
)
# start node
node_a: Node = await Node.objects.afirst() # type: ignore
node_b = await Node.objects.acreate(
Expand All @@ -724,7 +740,9 @@ async def test_get_next_node_with_vars(self):
None,
cell_type=CellType.PYTHON,
)
engine = Engine(self.graph, self.stream, raise_exceptions=True)
engine = Engine(
self.graph, self.stream, raise_exceptions=True, run_cleanup_procedure=False
)
# start node
node_a: Node = await Node.objects.afirst() # type: ignore
node_b = await Node.objects.acreate(
Expand Down Expand Up @@ -763,7 +781,9 @@ async def test_failed_node_door_code(self):
None,
cell_type=CellType.PYTHON,
)
engine = Engine(self.graph, self.stream, raise_exceptions=True)
engine = Engine(
self.graph, self.stream, raise_exceptions=True, run_cleanup_procedure=False
)
# start node
node_a: Node = await Node.objects.afirst() # type: ignore
node_b = await Node.objects.acreate(
Expand Down Expand Up @@ -802,7 +822,9 @@ async def test_node_door_code_false(self):
None,
cell_type=CellType.PYTHON,
)
engine = Engine(self.graph, self.stream, raise_exceptions=True)
engine = Engine(
self.graph, self.stream, raise_exceptions=True, run_cleanup_procedure=False
)
# start node
node_a: Node = await Node.objects.afirst() # type: ignore
node_b = await Node.objects.acreate(
Expand Down Expand Up @@ -847,7 +869,9 @@ async def test_run_into_dead_end(self):
cell_type=CellType.PYTHON,
)
start_node = await Node.objects.afirst()
engine = Engine(self.graph, self.stream, raise_exceptions=True)
engine = Engine(
self.graph, self.stream, raise_exceptions=True, run_cleanup_procedure=False
)
engine._current_node = start_node # type: ignore

with self.assertRaises(GraphDeadEnd):
Expand Down
2 changes: 1 addition & 1 deletion caster-back/stream/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class Stream(models.Model):
It also allows us to trace past streams.
"""

objects = StreamManager()
objects: StreamManager = StreamManager()

uuid = models.UUIDField(
primary_key=True,
Expand Down
20 changes: 14 additions & 6 deletions caster-sound/GenCaster/classes/GenCaster.sc
Original file line number Diff line number Diff line change
Expand Up @@ -368,17 +368,24 @@ GenCasterServer {
environment[\oscBackendClient] = oscBackendClient;
environment[\this] = this;
this.loadSynthDefs;
beacon = Task({
inf.do({
beacon = SkipJack(
updateFunc: {
this.sendAck(
status: GenCasterStatus.beacon,
uuid: 0,
message: this.serverInfo,
address: "/beacon",
);
5.wait;
});
});
},
dt: 5.0,
name: "gencasterBeacon",
autostart: false,
);
// using SkipJack for the recover of our responder
// is too slow or a too heavy load, therefore we
// add a callback to the CmdPeriod request which
// resurrects the instruction receiver
CmdPeriod.add({this.instructionReceiver.value}.defer(0.01));
}

loadSynthDefs {
Expand Down Expand Up @@ -487,7 +494,8 @@ GenCasterServer {
}

instructionReceiver {
^OSCdef(\instructionReceiver, {|msg, time, addr, recvPort|
"Starting instruction receiver".postln;
^OSCFunc({|msg, time, addr, recvPort|
var uuid = msg[1];
var function = (msg[2] ? "{}").asString;
var manualFinish = msg[3] ? false;
Expand Down
Loading