-
Notifications
You must be signed in to change notification settings - Fork 20
/
gstreamer.py
118 lines (104 loc) · 4.32 KB
/
gstreamer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from functools import partial
import svgwrite
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import GLib, GObject, Gst, GstBase
from PIL import Image
GObject.threads_init()
Gst.init(None)
def on_bus_message(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write('Warning: %s: %s\n' % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write('Error: %s: %s\n' % (err, debug))
loop.quit()
return True
def on_new_sample(sink, overlay, screen_size, appsink_size, user_function):
sample = sink.emit('pull-sample')
buf = sample.get_buffer()
result, mapinfo = buf.map(Gst.MapFlags.READ)
if result:
img = Image.frombytes('RGB', (appsink_size[0], appsink_size[1]), mapinfo.data, 'raw')
svg_canvas = svgwrite.Drawing('', size=(screen_size[0], screen_size[1]))
user_function(img, svg_canvas)
overlay.set_property('data', svg_canvas.tostring())
buf.unmap(mapinfo)
return Gst.FlowReturn.OK
def detectCoralDevBoard():
try:
if 'MX8MQ' in open('/sys/firmware/devicetree/base/model').read():
print('Detected EdgeTPU dev board.')
return True
except: pass
return False
def run_pipeline(user_function,
src_size=(640,480),
appsink_size=(320, 180), videosrc='/dev/video0'):
PIPELINE = 'v4l2src device={videosrc} ! {src_caps} ! {leaky_q} ! tee name=t'
if detectCoralDevBoard():
SRC_CAPS = 'video/x-raw,format=YUY2,width={width},height={height},framerate=30/1'
PIPELINE += """
t. ! {leaky_q} ! glupload ! glfilterbin filter=glcolorscale
! {dl_caps} ! videoconvert ! {sink_caps} ! {sink_element}
t. ! {leaky_q} ! glupload ! glfilterbin filter=glcolorscale
! rsvgoverlay name=overlay ! waylandsink
"""
else:
SRC_CAPS = 'video/x-raw,width={width},height={height},framerate=30/1'
PIPELINE += """
t. ! {leaky_q} ! videoconvert ! videoscale ! {sink_caps} ! {sink_element}
t. ! {leaky_q} ! videoconvert
! rsvgoverlay name=overlay ! videoconvert ! ximagesink
"""
SINK_ELEMENT = 'appsink name=appsink sync=false emit-signals=true max-buffers=1 drop=true'
DL_CAPS = 'video/x-raw,format=RGBA,width={width},height={height}'
SINK_CAPS = 'video/x-raw,format=RGB,width={width},height={height}'
LEAKY_Q = 'queue max-size-buffers=1 leaky=downstream'
src_caps = SRC_CAPS.format(width=src_size[0], height=src_size[1])
dl_caps = DL_CAPS.format(width=appsink_size[0], height=appsink_size[1])
sink_caps = SINK_CAPS.format(width=appsink_size[0], height=appsink_size[1])
pipeline = PIPELINE.format(videosrc=videosrc, leaky_q=LEAKY_Q,
src_caps=src_caps, dl_caps=dl_caps, sink_caps=sink_caps,
sink_element=SINK_ELEMENT)
print('Gstreamer pipeline: ', pipeline)
pipeline = Gst.parse_launch(pipeline)
overlay = pipeline.get_by_name('overlay')
appsink = pipeline.get_by_name('appsink')
appsink.connect('new-sample', partial(on_new_sample,
overlay=overlay, screen_size = src_size,
appsink_size=appsink_size, user_function=user_function))
loop = GObject.MainLoop()
# Set up a pipeline bus watch to catch errors.
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message', on_bus_message, loop)
# Run pipeline.
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# Clean up.
pipeline.set_state(Gst.State.NULL)
while GLib.MainContext.default().iteration(False):
pass