Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add server-side action cable tests #5108

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
bundler-cache: true
env:
BUNDLE_GEMFILE: ./spec/dummy/Gemfile
- run: bundle exec rails test:system
- run: bundle exec rails test:all
working-directory: ./spec/dummy
# Some coverage goals of these tests:
# - Test once without Rails at all
Expand Down
3 changes: 2 additions & 1 deletion lib/graphql/subscriptions/action_cable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ def write_subscription(query, events)
#
def setup_stream(channel, initial_event)
topic = initial_event.topic
channel.stream_from(stream_event_name(initial_event), coder: @action_cable_coder) do |message|
event_stream = stream_event_name(initial_event)
channel.stream_from(event_stream, coder: @action_cable_coder) do |message|
events_by_fingerprint = @events[topic]
object = nil
events_by_fingerprint.each do |_fingerprint, events|
Expand Down
159 changes: 159 additions & 0 deletions spec/dummy/test/channels/graphql_channel_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# frozen_string_literal: true
require "test_helper"

class GraphqlChannelTest < ActionCable::Channel::TestCase
module RealChannelStub
def confirmed?
subscription_confirmation_sent?
end

def real_streams
streams
end
end

def assert_has_real_stream(stream_name)
assert subscription.real_streams.key?(stream_name), "Expected Stream #{stream_name.inspect} to be present in #{subscription.real_streams.keys}"
end

def setup
@prev_server = ActionCable.server
@server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async)
@server.config.allowed_request_origins = [ 'http://rubyonrails.com' ]

ActionCable.instance_variable_set(:@server, @server)
end

def teardown
ActionCable.instance_variable_set(:@server, @prev_server)
end

def wait_for_async
wait_for_executor Concurrent.global_io_executor
end

def run_in_eventmachine
yield
wait_for_async
end

def wait_for_executor(executor)
# do not wait forever, wait 2s
timeout = 2
until executor.completed_task_count == executor.scheduled_task_count
sleep 0.1
timeout -= 0.1
raise "Executor could not complete all tasks in 2 seconds" unless timeout > 0
end
end

class Connection < ActionCable::Connection::Base
attr_reader :websocket

def send_async(method, *args)
send method, *args
end

public :handle_close
end


module InterceptTransmit
def transmit(msg)
intercepted_messages << JSON.parse(msg)
super
end

def intercepted_messages
@intercepted_messages ||= []
end
end

test "it subscribes and unsubscribes" do
run_in_eventmachine do
env = Rack::MockRequest.env_for "/test", "HTTP_HOST" => "localhost", "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket", "HTTP_ORIGIN" => "http://rubyonrails.com"

@connection = Connection.new(ActionCable.server, env).tap do |connection|
connection.process
assert_predicate connection.websocket, :possible?

wait_for_async
assert_predicate connection.websocket, :alive?
connection.websocket.singleton_class.prepend(InterceptTransmit)
end

@connection.subscriptions.add({"identifier" => "{\"channel\": \"GraphqlChannel\"}"})

@subscription = @connection.subscriptions.instance_variable_get(:@subscriptions).values.first
@subscription.singleton_class.prepend(RealChannelStub)
assert subscription.confirmed?

subscription.execute({
"query" => "subscription { payload(id: \"abc\") { value } }"
})
wait_for_async

sub_id = subscription.instance_variable_get(:@subscription_ids).first
subscription_stream = "graphql-subscription:#{sub_id}"
assert_has_real_stream subscription_stream
topic_stream = "graphql-event::payload:id:abc"
assert_has_real_stream topic_stream

subscription.make_trigger({ "field" => "payload", "arguments" => { "id" => "abc"}, "value" => 19 })

wait_for_async

@connection.handle_close
wait_for_async

expected_data = [
{"identifier"=>"{\"channel\": \"GraphqlChannel\"}", "type"=>"confirm_subscription"},
{"identifier"=>"{\"channel\": \"GraphqlChannel\"}", "message"=>{"result"=>{"data"=>{}}, "more"=>true}},
{"identifier"=>"{\"channel\": \"GraphqlChannel\"}", "message"=>{"result"=>{"data"=>{"payload"=>{"value"=>19}}}, "more"=>true}},
{"identifier"=>"{\"channel\": \"GraphqlChannel\"}", "message"=>{"more"=>false}},
]
assert_equal expected_data, @connection.websocket.intercepted_messages
end
end

class TestServer
include ActionCable::Server::Connections
include ActionCable::Server::Broadcasting

attr_reader :logger, :config, :mutex

class FakeConfiguration < ActionCable::Server::Configuration
attr_accessor :subscription_adapter, :log_tags, :filter_parameters

def initialize(subscription_adapter:)
@log_tags = []
@filter_parameters = []
@subscription_adapter = subscription_adapter
end

def pubsub_adapter
@subscription_adapter
end
end

def initialize(subscription_adapter: SuccessAdapter)
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
@config = FakeConfiguration.new(subscription_adapter: subscription_adapter)
@mutex = Monitor.new
end

def pubsub
@pubsub ||= @config.subscription_adapter.new(self)
end

def event_loop
@event_loop ||= ActionCable::Connection::StreamEventLoop.new.tap do |loop|
loop.instance_variable_set(:@executor, Concurrent.global_io_executor)
end
end

def worker_pool
@worker_pool ||= ActionCable::Server::Worker.new(max_size: 5)
end
end
end