1import asyncio
2
3import pytest
4
5from webdriver.error import TimeoutException
6
7from tests.support.sync import AsyncPoll
8from . import assert_browsing_context
9
10pytestmark = pytest.mark.asyncio
11
12CONTEXT_CREATED_EVENT = "browsingContext.contextCreated"
13
14
15async def test_not_unsubscribed(bidi_session, current_session):
16    await bidi_session.session.subscribe(events=[CONTEXT_CREATED_EVENT])
17    await bidi_session.session.unsubscribe(events=[CONTEXT_CREATED_EVENT])
18
19    # Track all received browsingContext.contextCreated events in the events array
20    events = []
21
22    async def on_event(method, data):
23        events.append(data)
24
25    remove_listener = bidi_session.add_event_listener(CONTEXT_CREATED_EVENT, on_event)
26
27    current_session.new_window(type_hint="tab")
28
29    wait = AsyncPoll(current_session, timeout=.5)
30    with pytest.raises(TimeoutException):
31        await wait.until(lambda _: len(events) > 0)
32
33    remove_listener()
34
35
36@pytest.mark.parametrize("type_hint", ["tab", "window"])
37async def test_new_context(bidi_session, current_session, wait_for_event, type_hint):
38    # Unsubscribe in case a previous tests subscribed to the event
39    await bidi_session.session.unsubscribe(events=[CONTEXT_CREATED_EVENT])
40
41    await bidi_session.session.subscribe(events=[CONTEXT_CREATED_EVENT])
42
43    on_entry = wait_for_event(CONTEXT_CREATED_EVENT)
44    top_level_context_id = current_session.new_window(type_hint=type_hint)
45    context_info = await on_entry
46
47    assert_browsing_context(
48        context_info,
49        children=None,
50        context=top_level_context_id,
51        url="about:blank",
52        parent=None,
53    )
54
55
56async def test_evaluate_window_open_without_url(
57    bidi_session, current_session, wait_for_event
58):
59    # Unsubscribe in case a previous tests subscribed to the event
60    await bidi_session.session.unsubscribe(events=[CONTEXT_CREATED_EVENT])
61
62    await bidi_session.session.subscribe(events=[CONTEXT_CREATED_EVENT])
63
64    on_entry = wait_for_event(CONTEXT_CREATED_EVENT)
65    current_session.execute_script("""window.open();""")
66    context_info = await on_entry
67
68    assert_browsing_context(
69        context_info,
70        children=None,
71        context=None,
72        url="about:blank",
73        parent=None,
74    )
75
76    await bidi_session.session.unsubscribe(events=[CONTEXT_CREATED_EVENT])
77
78
79async def test_evaluate_window_open_with_url(
80    bidi_session, current_session, wait_for_event, inline
81):
82    # Unsubscribe in case a previous tests subscribed to the event
83    await bidi_session.session.unsubscribe(events=[CONTEXT_CREATED_EVENT])
84
85    url = inline("<div>foo</div>")
86
87    await bidi_session.session.subscribe(events=[CONTEXT_CREATED_EVENT])
88
89    on_entry = wait_for_event(CONTEXT_CREATED_EVENT)
90    current_session.execute_script(
91        """
92        const url = arguments[0];
93        window.open(url);
94    """,
95        args=[url],
96    )
97    context_info = await on_entry
98
99    assert_browsing_context(
100        context_info,
101        children=None,
102        context=None,
103        url="about:blank",
104        parent=None,
105    )
106
107
108async def test_navigate_creates_iframes(bidi_session, current_session, wait_for_event, inline):
109    # Unsubscribe in case a previous tests subscribed to the event
110    await bidi_session.session.unsubscribe(events=[CONTEXT_CREATED_EVENT])
111
112    top_level_context_id = current_session.window_handle
113
114    url_iframe1 = inline("<div>foo</div>")
115    url_iframe2 = inline("<div>bar</div>")
116    url_page = inline(
117        f"<iframe src='{url_iframe1}'></iframe><iframe src='{url_iframe2}'></iframe>"
118    )
119
120    events = []
121
122    async def on_event(method, data):
123        events.append(data)
124
125    remove_listener = bidi_session.add_event_listener(CONTEXT_CREATED_EVENT, on_event)
126    await bidi_session.session.subscribe(events=[CONTEXT_CREATED_EVENT])
127
128    current_session.url = url_page
129
130    wait = AsyncPoll(
131        current_session,
132        message="Didn't receive context created events for frames")
133    await wait.until(lambda _: len(events) >= 2)
134    assert len(events) == 2
135
136    assert_browsing_context(
137        events[0],
138        children=None,
139        context=None,
140        url=url_iframe1,
141        parent=top_level_context_id,
142    )
143
144    assert_browsing_context(
145        events[1],
146        children=None,
147        context=None,
148        url=url_iframe2,
149        parent=top_level_context_id,
150    )
151
152    remove_listener()
153    await bidi_session.session.unsubscribe(events=[CONTEXT_CREATED_EVENT])
154
155
156async def test_navigate_creates_nested_iframes(
157    bidi_session, current_session, wait_for_event, inline
158):
159    # Unsubscribe in case a previous tests subscribed to the event
160    await bidi_session.session.unsubscribe(events=[CONTEXT_CREATED_EVENT])
161
162    top_level_context_id = current_session.window_handle
163
164    url_nested_iframe = inline("<div>foo</div>")
165    url_iframe = inline(f"<iframe src='{url_nested_iframe}'></iframe")
166    url_page = inline(f"<iframe src='{url_iframe}'></iframe>")
167
168    events = []
169
170    async def on_event(method, data):
171        events.append(data)
172
173    remove_listener = bidi_session.add_event_listener(CONTEXT_CREATED_EVENT, on_event)
174    await bidi_session.session.subscribe(events=[CONTEXT_CREATED_EVENT])
175
176    current_session.url = url_page
177
178    wait = AsyncPoll(
179        current_session,
180        message="Didn't receive context created events for frames")
181    await wait.until(lambda _: len(events) >= 2)
182    assert len(events) == 2
183
184    assert_browsing_context(
185        events[0],
186        children=None,
187        context=None,
188        url=url_iframe,
189        parent=top_level_context_id,
190    )
191
192    assert_browsing_context(
193        events[1],
194        children=None,
195        context=None,
196        url=url_nested_iframe,
197        parent=events[0]["context"],
198    )
199
200    remove_listener()
201    await bidi_session.session.unsubscribe(events=[CONTEXT_CREATED_EVENT])
202