#!/usr/bin/python -u import signal import gi from gi.repository import GObject, Gtk, Gdk, GLib, GdkX11 gi.require_version('Gst', '1.0') from gi.repository import Gst from gi.repository import GstVideo VIDEOSINK = 'videoconvert ! xvimagesink {xvimagesinkparam} ' GST_PLAY_PIPELINE = 'filesrc location="{location}" ! decodebin ! videoconvert ! queue ! ' + \ 'videomixer name="mix" sink_1::xpos=1019 sink_1::ypos=571 ! ' + VIDEOSINK + ' name="videosink" ' + \ 'v4l2src device={camera} ! videoconvert ! videoscale ! queue ! video/x-raw,width=258,height=147 ! mix. ' # curl -O https://test-videos.co.uk/vids/bigbuckbunny/mp4/h264/720/Big_Buck_Bunny_720_10s_1MB.mp4 class Demo(object): def __init__(self): self.pipeline = None self.bus = None GObject.threads_init() Gst.init(None) def start(self, video_device): self.win = Gtk.Window.new(Gtk.WindowType.TOPLEVEL) self.win.set_default_size(720, 405) self.win.show_all() GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, self.stop) mock = 'Big_Buck_Bunny_720_10s_1MB.mp4' pipedesc = GST_PLAY_PIPELINE.format(location=mock, xvimagesinkparam='sync=false', camera=video_device) self.pipeline = Gst.parse_launch(pipedesc) self.bus = self.pipeline.get_bus() self.bus.add_signal_watch() self.bus.connect("message", self.on_message) self.bus.enable_sync_message_emission() self.bus.connect("sync-message::element", self.on_sync_message) self.pipeline.set_state(Gst.State.PLAYING) mixer = self.pipeline.get_by_name('mix') mixerpad = mixer.get_static_pad('sink_0') mixerpad.add_probe(Gst.PadProbeType.EVENT_DOWNSTREAM, self.playback_eos_cb, None) Gtk.main() def stop(self): self.pipeline.set_state(Gst.State.NULL) Gtk.main_quit() def playback_eos_cb(self, pad, info, user_data): if info.get_event().type == Gst.EventType.EOS: print('#### EOS EOS EOS EOS ####') status, running_time = self.pipeline.query_position(Gst.Format.TIME) print('[playback_eos_cb] query_position:', status, running_time) decodebin = self.pipeline.get_by_name('decodebin0') status = decodebin.seek_simple(Gst.Format.TIME, Gst.SeekFlags.NONE, 0) print('[playback_eos_cb] set_offset:', running_time / Gst.SECOND) pad.set_offset(running_time / Gst.SECOND) return Gst.PadProbeReturn.PASS def on_message(self, bus, msg): if msg.type == Gst.MessageType.ERROR: print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!') print(msg.parse_error()) print('!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!') def on_sync_message(self, bus, msg): name = msg.get_structure().get_name() if name == 'prepare-window-handle': xid = self.win.get_property('window').get_xid() msg.src.set_window_handle(xid) app = Demo() app.start('/dev/video2')
Run
Reset
Share
Import
Link
Embed
Language▼
English
中文
Python Fiddle
Python Cloud IDE
Follow @python_fiddle
Browser Version Not Supported
Due to Python Fiddle's reliance on advanced JavaScript techniques, older browsers might have problems running it correctly. Please download the latest version of your favourite browser.
Chrome 10+
Firefox 4+
Safari 5+
IE 10+
Let me try anyway!
url:
Go
Python Snippet
Stackoverflow Question