# ||||||||||||||||||||||||||||| IMPORT ALL MODULES ||||||||||||||||||||||||||||||||||||||| # ****************************************************************************************** import pyqtgraph as pg from pyqtgraph.Qt import QtCore, QtWidgets import pyqtgraph.console from pyqtgraph.dockarea import * from PyQt5.QtWidgets import * import sys # connectin settings dictionary my_connection = {'source':'empty', 'ip_number': '000.000.0.000'} from PyQt5 import * from PyQt5 import QtWidgets import numpy as np from scipy.signal import hilbert, chirp # ||||||||||||||||||||||||||||| INITIALIZE THE APP |||||||||||||||||||||||||||||||||||||| # ****************************************************************************************** app = QtWidgets.QApplication([]) app.setStyle('Fusion') # set the application style ... win = QtWidgets.QMainWindow() program_area = DockArea() win.setCentralWidget(program_area) win.resize(1200,850) win.setWindowTitle('RT Python- Monitor V.02 ') iconPath = 'icon.png' # ||||||||||||||||||||||||||||| GUI ||||||||||||||||||||||||||||||||||||| # ****************************************************************************************** # # Widget docks initialization # Here at this level "docks " are deifined in size and with titles # ============================================================================= dck_Connection = Dock("Connection settings", size = (150,20)) dck_Monitoring = Dock("Monitoring ", size = (800,800), closable = False) # dck_MONITOR_FREQ = Dock("Monitoring", size = (800,800), closable =False) dck_MonitoringSetup = Dock("Monitoring Setup", size = (100, 100)) dck_Values = Dock("Value", size =(150,20)) dck_InfoStream = Dock("Stream Info", size = (100,100)) # prevent moving docks around dck_Connection.hideTitleBar() dck_InfoStream.hideTitleBar() # ============================================================================= # # program_area construct # here docks are integrated in the program - area # ============================================================================= program_area.addDock(dck_Connection,'left') program_area.addDock(dck_Monitoring, 'right') program_area.addDock(dck_MonitoringSetup, 'bottom', dck_Connection) program_area.addDock(dck_Values, 'right') program_area.addDock(dck_InfoStream,'bottom', dck_Monitoring) # # # # #-------------------------------- INFOMRATIVE CONSOLE DOCK---------------------- console = QPlainTextEdit() console.setReadOnly(True) console.setStyleSheet("background-color: white") layout_console = pg.LayoutWidget() dck_InfoStream.addWidget(layout_console) layout_console.addWidget(console) infoString ='' cSentences = 0 # running strings in the window below... def setInfoString(msg): global infoString, cSentences infoString += msg + '\n' cSentences += 1 console.setPlainText(infoString) if(cSentences > 4): console.setPlainText('') infoString = msg + '\n' console.setPlainText(infoString) cSentences= 0 setInfoString("Wellcome to the monitor V 0.2") #setInfoString(iso_qc.confirm()) #setInfoString(DataScreen.confirm()) # # # #-------------------------------- CONNECTION SETTINGS DOCK-- ------------------ #label intro_label = QLabel("CONNECTION SETTINGS\n") # Checkboxes chk_HW = QtWidgets.QRadioButton() chk_HW.setText("RT Hardware") chk_HW.setChecked(True) # Radio button demo data chk_Demo = QtWidgets.QRadioButton() chk_Demo.setText("Demo Data") # label ip connection lbl_conn = QLabel("\n IP ") # Check button hardware chk_IPS = QtWidgets.QCheckBox() chk_IPS.setText("File- Source") chk_IPS.setChecked(True) # label label_ip = QtWidgets.QLabel("Enter the IP number") # label entry ctrl label_ctrl = QtWidgets.QLabel("...\n") # edit box def edt_enterIP_slot(): t= edt_enterIP.text() # read the text k = iso_qc.ip_entry_control(t) # re-format the text to get 000.000.0.000 edt_enterIP.setText(k) # form the ip number chk_IPS.setChecked(False) label_ctrl.setText('HW @:'+ k) pass edt_enterIP = QtWidgets.QLineEdit() edt_enterIP.textChanged.connect(edt_enterIP_slot) def chk_IPS_slot(): if(chk_IPS.isChecked() == True): setInfoString('File Source activated') # if there is some entry left behind in the ip window if(edt_enterIP.text() != ''): edt_enterIP.setText('') chk_IPS.setChecked(True) else: setInfoString('File Source skipped... ') chk_IPS.stateChanged.connect(chk_IPS_slot) # command clear def cmd_clear_slot(): edt_enterIP.setText('') chk_IPS.setChecked(True) setInfoString('Data cleared ... ') pass cmd_clear = QPushButton('CLEAR') cmd_clear.clicked.connect(cmd_clear_slot) # command connect def cmd_apply_slot(): if(chk_Demo.isChecked()==True): edt_enterIP.setVisible(False) label_ip.setVisible(False) chk_IPS.setVisible(False) label_ctrl.setVisible(False) label_ip.setVisible(False) lbl_conn.setVisible(False) chk_HW.setText("Connect to HW ?") setInfoString('USE DEMO DATA locally generated ?') elif(chk_HW.isChecked()==True): edt_enterIP.setVisible(True) label_ip.setVisible(True) chk_IPS.setVisible(True) label_ctrl.setVisible(True) label_ip.setVisible(True) lbl_conn.setVisible(True) chk_HW.setText("RT Hardware") if(chk_HW.isChecked()==True and chk_IPS.isChecked() == True): #use backdoor file setInfoString('Backdoor FILE used for IP configuration... ') my_connection['source'] ='ip_config.txt' setInfoString('File used as backdoor: ' + str(my_connection['source'])) elif(chk_HW.isChecked()== True and chk_IPS.isChecked() == False and len(edt_enterIP.text())!= 0): #user enteres the ip number ip_number = edt_enterIP.text() setInfoString('Establish connection @: ' + str(ip_number) + ' ?') my_connection['source'] ='HW-Station' my_connection['ip_number'] = ip_number setInfoString('HW Station direct use.. ') elif(chk_HW.isChecked()== True and chk_IPS.isChecked() == False): # ERROR ENTRY user enteres nothing .. if(len(edt_enterIP.text())==0): setInfoString('... No IP Entry .. check the connection settings') cmd_apply = QPushButton('Submit') cmd_apply.clicked.connect(cmd_apply_slot) # def cmd_connect_slot(): # if demo data is chosen if(chk_Demo.isChecked() == True): setInfoString('::Demo data not connected yet ... version not closed ') elif(my_connection['ip_number'] != '000.000.0.000'): setInfoString('Connection starts... with host@ :' + my_connection['ip_number']) # else connect pass cmd_connect = QPushButton('Connect') cmd_connect.clicked.connect(cmd_connect_slot) # ============================================================================= # # Layout description # # # PUTTING wiggets to docks # ============================================================================= layout_conn = pg.LayoutWidget() # Control widget dck_Connection.addWidget(layout_conn) layout_conn.addWidget(intro_label, 0,0,1,1) layout_conn.addWidget(chk_Demo, 1,0,1,1) layout_conn.addWidget(chk_HW, 2,0,1,1) layout_conn.addWidget(lbl_conn,3,0,1,1) layout_conn.addWidget(chk_IPS,4,0,1,1) layout_conn.addWidget(label_ip, 5,0,1,1) layout_conn.addWidget(edt_enterIP, 6,0,1,1) layout_conn.addWidget(label_ctrl, 7,0,1,1) layout_conn.addWidget(cmd_clear, 8,0,1,1) layout_conn.addWidget(cmd_apply, 8,1,1,1) layout_conn.addWidget(cmd_connect, 9,0,1,2) msgBox = QtWidgets.QMessageBox() msgBox.setWindowTitle("Connection starts...") msgBox.setText("Even a broken clock gets to be right twice per day .. ") msgBox.setDetailedText(" Don't listen to the message box !...") msgBox.setStandardButtons(QtWidgets.QMessageBox.Ok | QtWidgets.QMessageBox.Cancel)
# dck_Monitoring .. K = 5 chName = 'Channel' CH_NameList = [] for i in range(0,K): new_name = chName + ':' + str(i+1) CH_NameList.append(new_name) d = {} count = 0 for i in CH_NameList: d[count] = i count += 1 DashBoard = pg.GraphicsLayoutWidget(show=True, title='Nyquist') ch_objects = [] Curves = [] pltItems =[] for i in range(0, len(d)): p = DashBoard.addPlot(title = d[i]) DashBoard.nextRow() p.showGrid(x =True, y =True) ch_objects.append(p) r = np.random.randint(0, 255) g = np.random.randint(0, 255) b = np.random.randint(0, 255) curve = p.plot(pen = (r,g,b), linewidth = 0.5) pltItems.append(p) Curves.append(curve) # same makes the for loop above # p1 = DashBoard.addPlot(title="ch1") # curve1 = p1.plot(pen = (0,214,0), linewidth = 0.5) # DashBoard.nextRow() # p2 = DashBoard.addPlot(title="ch2") # curve2 = p2.plot(pen = (191,0,0), linewidth = 0.5) dck_Monitoring.addWidget(DashBoard) fs = 10e3 t = np.linspace(0,1,int(fs)) #------------------------------------------------------------------------------------------- # pre generated fields signals= [ [] for _ in range(len(CH_NameList)) ] buffer_counter = 0 # ||||||||||||||||||||||||||||| REAL TIME SIGNAL PROCESSING||||||||||||||||||||||||||||||| # ****************************************************************************************** def real_time_processing(): global buffer_counter, ENDPOINT buffer_counter +=1 # for many channels for i in range(0,len(CH_NameList)): # ------------------------------------------------- every time signal with some variation values= np.random.normal(size= 10000) val = 0.1*np.random.normal() values = (1.0 + 0.5 * np.sin(2.0*np.pi*12.0*(i+1)*t)) values += val*np.random.normal(size = len(values)) # RNDSCL = 0.3*np.random.normal() # w = chirp(t, f0=6, f1=100, t1=10, method='quadratic') # values = w + (0 + 0.5 * np.sin(2.0*np.pi*12.0*(i+1)*t))*RNDSCL #----------------------------------------------- Buffer the values m = len(CH_NameList) # 0 : (1, 10000) --> to signal signals[0] # 1 : (2, 10000) --> to singal signals[1] # 2 : (3, 10000) --> to signal signals[2] signals[i%m].append(values) channel = signals[i%m] ch = np.asarray(channel) plotvalues = np.concatenate(ch) L = len(ch) print('signals is a list of lists:',type(signals),':', len(signals)) print('Channel :', i%m, ' << IS FROM signals['+ str(i%m) +']') print('Channel:',type(channel),':', len(channel)) print('Channel2Numpy:',type(ch),':', L) print('Last buffer input has size of', ch.shape ) print('Length of conacetated fields:', len(plotvalues), '| curve length is:', plotvalues.shape) print('-'*80) Curves[i].setData(plotvalues) if(sys.getsizeof(plotvalues)/1024./1024.0 > 2): print('#'*80) print('Values plotted:', len(plotvalues) , '@ MB:', str(sys.getsizeof(plotvalues)/1024.0/1024.0)) #timer.stop() channel.pop(0) #channel.clear() endpoint = 100000 if(len(plotvalues) > endpoint ): print('='*100) endpoint *= 2 pltItems[i].setXRange(0, endpoint) #if(buffer_counter == 4): # timer.stop() # TIMER FUNCTIONS timer = QtCore.QTimer() timer.timeout.connect(real_time_processing) timer.start(150) win.show() ## Start Qt event loop unless running in interactive mode or using pyside. if __name__ == '__main__': import sys if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'): QtWidgets.QApplication.instance().exec_()