Detecting insertion/removal of USB input devices on Windows 10

2024/10/1 23:32:33

I already have some working Python code to detect the insertion of some USB device types (from here).

import wmiraw_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_USBHub\'"
c = wmi.WMI()
watcher = c.watch_for(raw_wql=raw_wql)
while 1:usb = watcher()print(usb)

Unfortunately this script does not detect the insertion of all types of USB devices. This means that the insertion of USB flash drives is detected, but USB input devices are not. The removal of USB devices is not detected at all.

Is there a way to extend the existing script accordingly?

EDIT: Better WQL query and Python code

I've improved the WQL query and Python code based on information I got in MSDN.

The following script is intended to output a message when a USB keyboard is plugged in or unplugged.

Issue: No message appears when a USB keyboard is plugged in, but both messages ("Keyboard connected" and "Keyboard disconnected") appear when the USB keyboard is unplugged. What's wrong with this code?

import wmidevice_connected_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_Keyboard\'"
device_disconnected_wql = "SELECT * FROM __InstanceDeletionEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_Keyboard\'"c = wmi.WMI()
connected_watcher = c.watch_for(raw_wql=device_connected_wql)
disconnected_watcher = c.watch_for(raw_wql=device_disconnected_wql)while 1:connected = connected_watcher()disconnected = disconnected_watcher()if connected:print("Keyboard connected")if disconnected:print("Keyboard disconnected")
Answer

There are multiple ways to detect what is happening with device changes

Detecting USB Insertion / Removal Events in Windows using C++

One suggested way is to use WM_DEVICECHANGE message. https://stackoverflow.com/a/4078996/2830850

One such example an be found at below

#Modified from: http://wiki.wxpython.org/HookingTheWndProc
##########################################################################
##
##  This is a modification of the original WndProcHookMixin by Kevin Moore,
##  modified to use ctypes only instead of pywin32, so it can be used
##  with no additional dependencies in Python 2.5
##
##########################################################################import sys
import ctypes
#import GUID
from ctypes import c_long, c_int, wintypesimport wxGWL_WNDPROC = -4
WM_DESTROY  = 2
DBT_DEVTYP_DEVICEINTERFACE = 0x00000005  # device interface class
DBT_DEVICEREMOVECOMPLETE = 0x8004  # device is gone
DBT_DEVICEARRIVAL = 0x8000  # system detected a new device
WM_DEVICECHANGE = 0x0219class GUID(ctypes.Structure):_pack_ = 1_fields_ = [("Data1", ctypes.c_ulong),("Data2", ctypes.c_ushort),("Data3", ctypes.c_ushort),("Data4", ctypes.c_ubyte * 8)]## It's probably not neccesary to make this distinction, but it never hurts to be safe
if 'unicode' in wx.PlatformInfo:SetWindowLong = ctypes.windll.user32.SetWindowLongWCallWindowProc = ctypes.windll.user32.CallWindowProcW
else:SetWindowLong = ctypes.windll.user32.SetWindowLongACallWindowProc = ctypes.windll.user32.CallWindowProcA## Create a type that will be used to cast a python callable to a c callback function
## first arg is return type, the rest are the arguments
#WndProcType = ctypes.WINFUNCTYPE(c_int, wintypes.HWND, wintypes.UINT, wintypes.WPARAM, wintypes.LPARAM)
WndProcType = ctypes.WINFUNCTYPE(ctypes.c_long, ctypes.c_int, ctypes.c_uint, ctypes.c_int, ctypes.c_int)if 'unicode' in wx.PlatformInfo:RegisterDeviceNotification = ctypes.windll.user32.RegisterDeviceNotificationW
else:RegisterDeviceNotification = ctypes.windll.user32.RegisterDeviceNotificationA
RegisterDeviceNotification.restype = wintypes.HANDLE
RegisterDeviceNotification.argtypes = [wintypes.HANDLE, wintypes.c_void_p, wintypes.DWORD]UnregisterDeviceNotification = ctypes.windll.user32.UnregisterDeviceNotification
UnregisterDeviceNotification.restype = wintypes.BOOL
UnregisterDeviceNotification.argtypes = [wintypes.HANDLE]class DEV_BROADCAST_DEVICEINTERFACE(ctypes.Structure):_fields_ = [("dbcc_size", ctypes.c_ulong),("dbcc_devicetype", ctypes.c_ulong),("dbcc_reserved", ctypes.c_ulong),("dbcc_classguid", GUID),("dbcc_name", ctypes.c_wchar * 256)]class DEV_BROADCAST_HDR(ctypes.Structure):_fields_ = [("dbch_size", wintypes.DWORD),("dbch_devicetype", wintypes.DWORD),("dbch_reserved", wintypes.DWORD)]GUID_DEVCLASS_PORTS = GUID(0x4D36E978, 0xE325, 0x11CE,(ctypes.c_ubyte*8)(0xBF, 0xC1, 0x08, 0x00, 0x2B, 0xE1, 0x03, 0x18))
GUID_DEVINTERFACE_USB_DEVICE = GUID(0xA5DCBF10L, 0x6530,0x11D2,(ctypes.c_ubyte*8)(0x90, 0x1F, 0x00,0xC0, 0x4F, 0xB9, 0x51, 0xED))class WndProcHookMixin:"""This class can be mixed in with any wxWindows window class in order to hook it's WndProc function.You supply a set of message handler functions with the function addMsgHandler. When the window receives thatmessage, the specified handler function is invoked. If the handler explicitly returns False then the standardWindowProc will not be invoked with the message. You can really screw things up this way, so be careful.This is not the correct way to deal with standard windows messages in wxPython (i.e. button click, paint, etc)use the standard wxWindows method of binding events for that. This is really for capturing custom windows messagesor windows messages that are outside of the wxWindows world."""def __init__(self):self.__msgDict = {}## We need to maintain a reference to the WndProcType wrapper## because ctypes doesn'tself.__localWndProcWrapped = Noneself.rtnHandles = []def hookWndProc(self):self.__localWndProcWrapped = WndProcType(self.localWndProc)self.__oldWndProc = SetWindowLong(self.GetHandle(), GWL_WNDPROC, self.__localWndProcWrapped)def unhookWndProc(self):SetWindowLong(self.GetHandle(), GWL_WNDPROC, self.__oldWndProc)## Allow the ctypes wrapper to be garbage collectedself.__localWndProcWrapped = Nonedef addMsgHandler(self,messageNumber,handler):self.__msgDict[messageNumber] = handlerdef localWndProc(self, hWnd, msg, wParam, lParam):# call the handler if one exists# performance note: "in" is the fastest way to check for a key# when the key is unlikely to be found# (which is the case here, since most messages will not have handlers).# This is called via a ctypes shim for every single windows message# so dispatch speed is importantif msg in self.__msgDict:# if the handler returns false, we terminate the message here# Note that we don't pass the hwnd or the message along# Handlers should be really, really careful about returning false hereif self.__msgDict[msg](wParam,lParam) == False:return# Restore the old WndProc on Destroy.if msg == WM_DESTROY: self.unhookWndProc()return CallWindowProc(self.__oldWndProc, hWnd, msg, wParam, lParam)def registerDeviceNotification(self, guid, devicetype=DBT_DEVTYP_DEVICEINTERFACE):devIF = DEV_BROADCAST_DEVICEINTERFACE()devIF.dbcc_size = ctypes.sizeof(DEV_BROADCAST_DEVICEINTERFACE)devIF.dbcc_devicetype = DBT_DEVTYP_DEVICEINTERFACEif guid:devIF.dbcc_classguid = GUID.GUID(guid)return RegisterDeviceNotification(self.GetHandle(), ctypes.byref(devIF), 0)def unregisterDeviceNotification(self, handle):if UnregisterDeviceNotification(handle) == 0:raise Exception("Unable to unregister device notification messages")# a simple example
if __name__ == "__main__":class MyFrame(wx.Frame,WndProcHookMixin):def __init__(self,parent):WndProcHookMixin.__init__(self)wx.Frame.__init__(self,parent,-1,"Insert and Remove USE Device and Watch STDOUT",size=(640,480))self.Bind(wx.EVT_CLOSE, self.onClose)#Change the following guid to the GUID of the device you want notifications for#self.devNotifyHandle = self.registerDeviceNotification(guid="{3c5e1462-5695-4e18-876b-f3f3d08aaf18}")dbh = DEV_BROADCAST_DEVICEINTERFACE()dbh.dbcc_size = ctypes.sizeof(DEV_BROADCAST_DEVICEINTERFACE)dbh.dbcc_devicetype = DBT_DEVTYP_DEVICEINTERFACEdbh.dbcc_classguid = GUID_DEVINTERFACE_USB_DEVICEself.devNotifyHandle = RegisterDeviceNotification(self.GetHandle(), ctypes.byref(dbh), 0)self.addMsgHandler(WM_DEVICECHANGE, self.onDeviceChange)self.hookWndProc()def onDeviceChange(self,wParam,lParam):print "WM_DEVICECHANGE [WPARAM:%i][LPARAM:%i]"%(wParam,lParam)if wParam == DBT_DEVICEARRIVAL:print "Device Arrival"elif wParam == DBT_DEVICEREMOVECOMPLETE:print "Device Remvoed"if lParam:dbh = DEV_BROADCAST_HDR.from_address(lParam)if dbh.dbch_devicetype == DBT_DEVTYP_DEVICEINTERFACE:dbd = DEV_BROADCAST_DEVICEINTERFACE.from_address(lParam)#Verify that the USB VID and PID match our assigned VID and PIDif 'Vid_10c4&Pid_8382' in dbd.dbcc_name:print "Was Our USB Device"return Truedef onClose(self, event):self.unregisterDeviceNotification(self.devNotifyHandle)event.Skip()app = wx.App(False)frame = MyFrame(None)frame.Show()app.MainLoop()

Above is taken from https://github.com/weiwei22844/UsefullPython/blob/fa8603b92cb0b3f6ce00c876f24138211f47e906/HookUsbMsg.py

Now coming back to your code

import wmidevice_connected_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_Keyboard\'"
device_disconnected_wql = "SELECT * FROM __InstanceDeletionEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_Keyboard\'"c = wmi.WMI()
connected_watcher = c.watch_for(raw_wql=device_connected_wql)
disconnected_watcher = c.watch_for(raw_wql=device_disconnected_wql)while 1:connected = connected_watcher()disconnected = disconnected_watcher()if connected:print("Keyboard connected")if disconnected:print("Keyboard disconnected")

The way you call is connected_watcher and disconnected_watcher in series, both are blocking call. So when you call it first connected becomes true and then the disconnected_watcher gets called which blocked until a device is disconnected. That is why when you disconnected, you see both the messages together.

The way you can fix it is by making sure you time out these queries

while 1:try:connected = connected_watcher(timeout_ms=10)except wmi.x_wmi_timed_out:passelse:if connected:print("Keyboard connected")try:disconnected = disconnected_watcher(timeout_ms=10)except wmi.x_wmi_timed_out:passelse:if disconnected:print("Keyboard disconnected")

Another approach is to use threads. But to make your code thread compatible you need to make it like below

class VolumeRemovalWatcher:def __init__(self, callback=None):self.stop_wanted=Falseself.callback=callbackdef stop(self):self.stop_wanted = Truedef watch_for_events(self):if not threading.current_thread() is threading.main_thread():pythoncom.CoInitialize()try:w = WMI()watcher = w.Win32_VolumeChangeEvent.watch_for(EventType=3)while not self.stop_wanted:try:event = watcher(timeout_ms=1000)except x_wmi_timed_out:passelse:print(event.DriveName)if self.callback is not None:self.callback(event.DriveName)except Exception as e:print(e)return Nonefinally:if not threading.current_thread() is threading.main_thread():pythoncom.CoUninitialize()

Credits for above code to https://github.com/utytlanyjoe/pyWmiHandler/blob/89b934301990a4a955ec13db21caaf81d9a94f63/wmi_wrapper.py

https://en.xdnf.cn/q/70916.html

Related Q&A

FastAPI as a Windows service

I am trying to run FastAPI as a windows service.Couldnt find any documentation or any article to run Uvicorn as a Windows service. I tried using NSSM as well but my windows service stops.

Why not use runserver for production at Django?

Everywhere i see that uWSGI and Gunicorn are recommended for production mode from everyone. However, there is a lot more suffering to operate with it, the python manage.py runserver is more faster, sim…

How to create decorator for lazy initialization of a property

I want to create a decorator that works like a property, only it calls the decorated function only once, and on subsequent calls always return the result of the first call. An example:def SomeClass(obj…

How to convert a 24-bit wav file to 16 or 32 bit files in python3

I am trying to make spectrograms of a bunch of .wav files so I can further analyze them(in python 3.6), however, I keep getting this nasty errorValueError: Unsupported bit depth: the wav file has 24-bi…

Get consistent Key error: \n [duplicate]

This question already has answers here:How do I escape curly-brace ({}) characters in a string while using .format (or an f-string)?(23 answers)Closed 8 years ago.When trying to run a script containin…

Cannot take the length of Shape with unknown rank

I have a neural network, from a tf.data data generator and a tf.keras model, as follows (a simplified version-because it would be too long):dataset = ...A tf.data.Dataset object that with the next_x me…

Pre-fill new functions in Eclipse and Pydev with docstring and Not Implemented exception

I am editing my Python source code with Eclipse and Pydev.I want to document all of my functions and raise a "Not Implemented" exception whenever a function have not yet been implemented. For…

How to serialize hierarchical relationship in Django REST

I have a Django model that is hierarchical using django-mptt, which looks like:class UOMCategory(MPTTModel, BaseModel):"""This represents categories of different unit of measurements.&qu…

Django: Loading another template on click of a button

Ive been working on a django project for a few weeks now, just playing around so that I can get the hang of it. I am a little bit confused. I have a template now called "home.html". I was wo…

Given two python lists of same length. How to return the best matches of similar values?

Given are two python lists with strings in them (names of persons):list_1 = [J. Payne, George Bush, Billy Idol, M Stuart, Luc van den Bergen] list_2 = [John Payne, George W. Bush, Billy Idol, M. Stuart…