import sqlite3 from sqlite3 import Error import os import subprocess import platform import serial.tools.list_ports import random import datetime import time import pages import openplc import monitoring as monitor import sys import ctypes import socket import mimetypes import flask import flask_login app = flask.Flask(__name__) app.secret_key = str(os.urandom(16)) login_manager = flask_login.LoginManager() login_manager.init_app(app) openplc_runtime = openplc.runtime() class User(flask_login.UserMixin): pass # Allowed MIME types and magic numbers (file signatures) IMAGE_MAGIC_NUMBERS = { b'\xFF\xD8\xFF': 'image/jpeg', # JPEG b'\x89PNG\r\n\x1A\n': 'image/png', # PNG b'GIF87a': 'image/gif', # GIF87a b'GIF89a': 'image/gif', # GIF89a } # Function to check MIME type and file signature def is_allowed_file(file): # First, check the MIME type based on the file extension mime_type, _ = mimetypes.guess_type(file.filename) if mime_type not in IMAGE_MAGIC_NUMBERS.values(): return False try: # Read the first 8 bytes of the file to determine its magic number file.seek(0) # Ensure we're at the start of the file file_header = file.read(8) file.seek(0) # Reset file pointer after reading # Check if the file header matches a known image format for magic, expected_mime in IMAGE_MAGIC_NUMBERS.items(): if file_header.startswith(magic): return True return False except Exception: return False def configure_runtime(): global openplc_runtime database = "openplc.db" conn = create_connection(database) if (conn != None): try: print("Openning database") cur = conn.cursor() cur.execute("SELECT * FROM Settings") rows = cur.fetchall() cur.close() conn.close() for row in rows: if (row[0] == "Modbus_port"): if (row[1] != "disabled"): print("Enabling Modbus on port " + str(int(row[1]))) openplc_runtime.start_modbus(int(row[1])) else: print("Disabling Modbus") openplc_runtime.stop_modbus() elif (row[0] == "Dnp3_port"): if (row[1] != "disabled"): print("Enabling DNP3 on port " + str(int(row[1]))) openplc_runtime.start_dnp3(int(row[1])) else: print("Disabling DNP3") openplc_runtime.stop_dnp3() elif (row[0] == "Enip_port"): if (row[1] != "disabled"): print("Enabling EtherNet/IP on port " + str(int(row[1]))) openplc_runtime.start_enip(int(row[1])) else: print("Disabling EtherNet/IP") openplc_runtime.stop_enip() elif (row[0] == "snap7"): if (row[1] != "false"): print("Enabling S7 Protocol") openplc_runtime.start_snap7() else: print("Disabling S7 Protocol") openplc_runtime.stop_snap7() elif (row[0] == "Pstorage_polling"): if (row[1] != "disabled"): print("Enabling Persistent Storage with polling rate of " + str(int(row[1])) + " seconds") openplc_runtime.start_pstorage(int(row[1])) else: print("Disabling Persistent Storage") openplc_runtime.stop_pstorage() delete_persistent_file() except Error as e: print("error connecting to the database" + str(e)) else: print("Error opening DB") def delete_persistent_file(): if (os.path.isfile("persistent.file")): os.remove("persistent.file") print("persistent.file removed!") def generate_mbconfig(): database = "openplc.db" conn = create_connection(database) if (conn != None): try: cur = conn.cursor() cur.execute("SELECT COUNT(*) FROM Slave_dev") row = cur.fetchone() num_devices = int(row[0]) mbconfig = 'Num_Devices = "' + str(num_devices) + '"' cur.close() cur=conn.cursor() cur.execute("SELECT * FROM Settings") rows = cur.fetchall() cur.close() for row in rows: if (row[0] == "Slave_polling"): slave_polling = str(row[1]) elif (row[0] == "Slave_timeout"): slave_timeout = str(row[1]) mbconfig += '\nPolling_Period = "' + slave_polling + '"' mbconfig += '\nTimeout = "' + slave_timeout + '"' cur = conn.cursor() cur.execute("SELECT * FROM Slave_dev") rows = cur.fetchall() cur.close() conn.close() device_counter = 0 for row in rows: mbconfig += """ # ------------ # DEVICE """ mbconfig += str(device_counter) mbconfig += """ # ------------ """ mbconfig += 'device' + str(device_counter) + '.name = "' + str(row[1]) + '"\n' mbconfig += 'device' + str(device_counter) + '.slave_id = "' + str(row[3]) + '"\n' if (str(row[2]) == 'ESP32' or str(row[2]) == 'ESP8266' or str(row[2]) == 'TCP'): mbconfig += 'device' + str(device_counter) + '.protocol = "TCP"\n' mbconfig += 'device' + str(device_counter) + '.address = "' + str(row[9]) + '"\n' else: mbconfig += 'device' + str(device_counter) + '.protocol = "RTU"\n' if (str(row[4]).startswith("COM")): port_name = "/dev/ttyS" + str(int(str(row[4]).split("COM")[1]) - 1) else: port_name = str(row[4]) mbconfig += 'device' + str(device_counter) + '.address = "' + port_name + '"\n' mbconfig += 'device' + str(device_counter) + '.IP_Port = "' + str(row[10]) + '"\n' mbconfig += 'device' + str(device_counter) + '.RTU_Baud_Rate = "' + str(row[5]) + '"\n' mbconfig += 'device' + str(device_counter) + '.RTU_Parity = "' + str(row[6]) + '"\n' mbconfig += 'device' + str(device_counter) + '.RTU_Data_Bits = "' + str(row[7]) + '"\n' mbconfig += 'device' + str(device_counter) + '.RTU_Stop_Bits = "' + str(row[8]) + '"\n' mbconfig += 'device' + str(device_counter) + '.RTU_TX_Pause = "' + str(row[21]) + '"\n\n' mbconfig += 'device' + str(device_counter) + '.Discrete_Inputs_Start = "' + str(row[11]) + '"\n' mbconfig += 'device' + str(device_counter) + '.Discrete_Inputs_Size = "' + str(row[12]) + '"\n' mbconfig += 'device' + str(device_counter) + '.Coils_Start = "' + str(row[13]) + '"\n' mbconfig += 'device' + str(device_counter) + '.Coils_Size = "' + str(row[14]) + '"\n' mbconfig += 'device' + str(device_counter) + '.Input_Registers_Start = "' + str(row[15]) + '"\n' mbconfig += 'device' + str(device_counter) + '.Input_Registers_Size = "' + str(row[16]) + '"\n' mbconfig += 'device' + str(device_counter) + '.Holding_Registers_Read_Start = "' + str(row[17]) + '"\n' mbconfig += 'device' + str(device_counter) + '.Holding_Registers_Read_Size = "' + str(row[18]) + '"\n' mbconfig += 'device' + str(device_counter) + '.Holding_Registers_Start = "' + str(row[19]) + '"\n' mbconfig += 'device' + str(device_counter) + '.Holding_Registers_Size = "' + str(row[20]) + '"\n' device_counter += 1 with open('./mbconfig.cfg', 'w+') as f: f.write(mbconfig) except Error as e: print("error connecting to the database" + str(e)) else: print("Error opening DB") def draw_top_div(): global openplc_runtime top_div = ("
" "OpenPLC") if (openplc_runtime.status() == "Running"): top_div += "

Running: " + openplc_runtime.project_name + "

" elif (openplc_runtime.status() == "Compiling"): top_div += "

Compiling: " + openplc_runtime.project_name + "

" else: top_div += "

Stopped: " + openplc_runtime.project_name + "

" top_div += "
User" top_div += "

" + flask_login.current_user.name + "

" top_div += "
" return top_div def draw_status(): global openplc_runtime status_str = "" if (openplc_runtime.status() == "Running"): status_str = "

Status: Running

" status_str += "Stop PLC" else: status_str = "

Status: Stopped

" status_str += "Start PLC" return status_str def draw_blank_page(): return_str = pages.w3_style + pages.dashboard_head + draw_top_div() + """


Dashboard

Dashboard

Programs

Programs

Modbus

Slave Devices

Monitoring

Monitoring

Hardware

Hardware

Users

Users

Settings

Settings

Logout

Logout




""" return return_str def draw_compiling_page(): return_str = draw_blank_page() return_str += """

Compiling program



Go to Dashboard
""" return return_str @login_manager.user_loader def user_loader(username): database = "openplc.db" conn = create_connection(database) if (conn != None): try: cur = conn.cursor() cur.execute("SELECT username, password, name, pict_file FROM Users") rows = cur.fetchall() cur.close() conn.close() for row in rows: if (row[0] == username): user = User() user.id = row[0] user.name = row[2] user.pict_file = str(row[3]) return user return except Error as e: print("error connecting to the database" + str(e)) return else: return @login_manager.request_loader def request_loader(request): username = request.form.get('username') database = "openplc.db" conn = create_connection(database) if (conn != None): try: cur = conn.cursor() cur.execute("SELECT username, password, name, pict_file FROM Users") rows = cur.fetchall() cur.close() conn.close() for row in rows: if (row[0] == username): user = User() user.id = row[0] user.name = row[2] user.pict_file = str(row[3]) user.is_authenticated = (request.form['password'] == row[1]) return user return except Error as e: print("error connecting to the database" + str(e)) return else: return @app.before_request def before_request(): flask.session.permanent = True app.permanent_session_lifetime = datetime.timedelta(minutes=5) flask.session.modified = True @app.route('/') def index(): if flask_login.current_user.is_authenticated: return flask.redirect(flask.url_for('dashboard')) else: return flask.redirect(flask.url_for('login')) @app.route('/login', methods=['GET', 'POST']) def login(): if flask.request.method == 'GET': return pages.login_head + pages.login_body username = flask.request.form['username'] password = flask.request.form['password'] database = "openplc.db" conn = create_connection(database) if (conn != None): try: cur = conn.cursor() cur.execute("SELECT username, password, name, pict_file FROM Users") rows = cur.fetchall() cur.close() conn.close() for row in rows: if (row[0] == username): if (row[1] == password): user = User() user.id = row[0] user.name = row[2] user.pict_file = str(row[3]) flask_login.login_user(user) return flask.redirect(flask.url_for('dashboard')) else: return pages.login_head + pages.bad_login_body return pages.login_head + pages.bad_login_body except Error as e: print("error connecting to the database" + str(e)) return 'Error opening DB' else: return 'Error opening DB' return pages.login_head + pages.bad_login_body @app.route('/start_plc') def start_plc(): global openplc_runtime if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: monitor.stop_monitor() openplc_runtime.start_runtime() time.sleep(1) configure_runtime() monitor.cleanup() monitor.parse_st(openplc_runtime.project_file) return flask.redirect(flask.url_for('dashboard')) @app.route('/stop_plc') def stop_plc(): global openplc_runtime if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: openplc_runtime.stop_runtime() time.sleep(1) monitor.stop_monitor() return flask.redirect(flask.url_for('dashboard')) @app.route('/runtime_logs') def runtime_logs(): global openplc_runtime if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: return openplc_runtime.logs() @app.route('/dashboard') def dashboard(): global openplc_runtime if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: monitor.stop_monitor() if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() return_str = pages.w3_style + pages.dashboard_head + draw_top_div() return_str += """


Dashboard

Dashboard

Programs

Programs

Modbus

Slave Devices

Monitoring

Monitoring

Hardware

Hardware

Users

Users

Settings

Settings

Logout

Logout



""" return_str += draw_status() return_str += """

Dashboard

Status: """ if (openplc_runtime.status() == "Running"): return_str += "Running

" else: return_str += "Stopped

" return_str += "

Program: " + openplc_runtime.project_name + "

" return_str += "

Description: " + openplc_runtime.project_description + "

" return_str += "

File: " + openplc_runtime.project_file + "

" return_str += "

Runtime: " + openplc_runtime.exec_time() + "

" return_str += pages.dashboard_tail return return_str @app.route('/programs', methods=['GET', 'POST']) def programs(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: monitor.stop_monitor() if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() list_all = False if (flask.request.args.get('list_all') == '1'): list_all = True return_str = pages.w3_style + pages.style + draw_top_div() return_str += """


Dashboard

Dashboard

Programs

Programs

Modbus

Slave Devices

Monitoring

Monitoring

Hardware

Hardware

Users

Users

Settings

Settings

Logout

Logout



""" return_str += draw_status() return_str += """

Programs

Here you can upload a new program to OpenPLC or revert back to a previous uploaded program shown on the table.

""" database = "openplc.db" conn = create_connection(database) if (conn != None): try: cur = conn.cursor() if (list_all == True): cur.execute("SELECT Prog_ID, Name, File, Date_upload FROM Programs ORDER BY Date_upload DESC") else: cur.execute("SELECT Prog_ID, Name, File, Date_upload FROM Programs ORDER BY Date_upload DESC LIMIT 10") rows = cur.fetchall() cur.close() conn.close() for row in rows: return_str += "" return_str += "" return_str += """
Program NameFileDate Uploaded
" + str(row[1]) + "" + str(row[2]) + "" + time.strftime('%b %d, %Y - %I:%M%p', time.localtime(row[3])) + "
List all programs

Upload Program


""" except Error as e: print("error connecting to the database" + str(e)) return_str += 'Error connecting to the database. Make sure that your openplc.db file is not corrupt.

Error: ' + str(e) else: return_str += 'Error connecting to the database. Make sure that your openplc.db file is not corrupt.' return return_str @app.route('/reload-program', methods=['GET', 'POST']) def reload_program(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() prog_id = flask.request.args.get('table_id') return_str = pages.w3_style + pages.style + draw_top_div() return_str += """


Dashboard

Dashboard

Programs

Programs

Modbus

Slave Devices

Monitoring

Monitoring

Hardware

Hardware

Users

Users

Settings

Settings

Logout

Logout



""" return_str += draw_status() return_str += """

Program Info


""" database = "openplc.db" conn = create_connection(database) if (conn != None): try: cur = conn.cursor() cur.execute("SELECT * FROM Programs WHERE Prog_ID = ?", (int(prog_id),)) row = cur.fetchone() cur.close() conn.close() return_str += "" return_str += "" return_str += "" return_str += "" return_str += "

Launch programUpdate programRemove program
" return_str += """
""" except Error as e: print("error connecting to the database" + str(e)) return_str += 'Error connecting to the database. Make sure that your openplc.db file is not corrupt.

Error: ' + str(e) else: return_str += 'Error connecting to the database. Make sure that your openplc.db file is not corrupt.' return return_str @app.route('/update-program', methods=['GET', 'POST']) def update_program(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() prog_id = flask.request.args.get('id') return_str = pages.w3_style + pages.style + draw_top_div() return_str += """


Dashboard

Dashboard

Programs

Programs

Modbus

Slave Devices

Monitoring

Monitoring

Hardware

Hardware

Users

Users

Settings

Settings

Logout

Logout



""" return_str += draw_status() return_str += """

Upload Program


""" return return_str @app.route('/update-program-action', methods=['GET', 'POST']) def update_program_action(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() if ('file' not in flask.request.files): return draw_blank_page() + "

Error

You need to select a file to be uploaded!

Use the back-arrow on your browser to return

" prog_file = flask.request.files['file'] if (prog_file.filename == ''): return draw_blank_page() + "

Error

You need to select a file to be uploaded!

Use the back-arrow on your browser to return

" prog_id = flask.request.form['prog_id'] epoch_time = flask.request.form['epoch_time'] database = "openplc.db" conn = create_connection(database) if (conn != None): try: cur = conn.cursor() cur.execute("SELECT * FROM Programs WHERE Prog_ID = ?", (int(prog_id),)) row = cur.fetchone() cur.close() filename = str(row[3]) prog_file.save(os.path.join('st_files', filename)) #Redirect back to the compiling page return '' except Error as e: print("error connecting to the database" + str(e)) return 'Error connecting to the database. Make sure that your openplc.db file is not corrupt.

Error: ' + str(e) else: return 'Error connecting to the database. Make sure that your openplc.db file is not corrupt.' @app.route('/remove-program', methods=['GET', 'POST']) def remove_program(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() prog_id = flask.request.args.get('id') database = "openplc.db" conn = create_connection(database) if (conn != None): try: cur = conn.cursor() cur.execute("DELETE FROM Programs WHERE Prog_ID = ?", (int(prog_id),)) conn.commit() cur.close() conn.close() return flask.redirect(flask.url_for('programs')) except Error as e: print("error connecting to the database" + str(e)) return 'Error connecting to the database. Make sure that your openplc.db file is not corrupt.

Error: ' + str(e) else: return 'Error connecting to the database. Make sure that your openplc.db file is not corrupt.' @app.route('/upload-program', methods=['GET', 'POST']) def upload_program(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() if ('file' not in flask.request.files): return draw_blank_page() + "

Error

You need to select a file to be uploaded!

Use the back-arrow on your browser to return

" prog_file = flask.request.files['file'] if (prog_file.filename == ''): return draw_blank_page() + "

Error

You need to select a file to be uploaded!

Use the back-arrow on your browser to return

" filename = str(random.randint(1,1000000)) + ".st" prog_file.save(os.path.join('st_files', filename)) return_str = pages.w3_style + pages.style + draw_top_div() return_str += """


Dashboard

Dashboard

Programs

Programs

Modbus

Slave Devices

Monitoring

Monitoring

Hardware

Hardware

Users

Users

Settings

Settings

Logout

Logout



""" return_str += draw_status() return_str += """

Program Info


""" return_str += "" return_str += "" return_str += "" return_str += "" return_str += """

""" return return_str @app.route('/upload-program-action', methods=['GET', 'POST']) def upload_program_action(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() prog_name = flask.request.form['prog_name'] prog_descr = flask.request.form['prog_descr'] prog_file = flask.request.form['prog_file'] epoch_time = flask.request.form['epoch_time'] #validate epoch_time format and range try: epoch_time = int(epoch_time) current_time = int(time.time()) #allow timestamps between 2015-01-01 and 1 year in the future min_allowed_time = 1420070400 #2015-01-01 00:00:00 max_allowed_time = current_time + (100 * 31536000) #current time + 1 year if epoch_time < min_allowed_time or epoch_time > max_allowed_time: return 'Invalid epoch time value: must be between 2015-01-01 and 1 year from now' except ValueError: return 'Invalid epoch time format: must be a valid integer timestamp' (prog_name, prog_descr, prog_file, epoch_time) = sanitize_input(prog_name, prog_descr, prog_file, int(epoch_time)) database = "openplc.db" conn = create_connection(database) if (conn != None): try: cur = conn.cursor() cur.execute("INSERT INTO Programs (Name, Description, File, Date_upload) VALUES (?, ?, ?, ?)", (prog_name, prog_descr, prog_file, epoch_time)) conn.commit() cur.close() conn.close() #Redirect back to the compiling page return '' except Error as e: print("error connecting to the database" + str(e)) return 'Error connecting to the database. Make sure that your openplc.db file is not corrupt.

Error: ' + str(e) else: return 'Error connecting to the database. Make sure that your openplc.db file is not corrupt.' @app.route('/compile-program', methods=['GET', 'POST']) def compile_program(): global openplc_runtime if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() st_file = flask.request.args.get('file') #load information about the program being compiled into the openplc_runtime object database = "openplc.db" conn = create_connection(database) if (conn != None): try: cur = conn.cursor() cur.execute("SELECT * FROM Programs WHERE File=?", (st_file,)) row = cur.fetchone() openplc_runtime.project_name = str(row[1]) openplc_runtime.project_description = str(row[2]) openplc_runtime.project_file = str(row[3]) cur.close() conn.close() except Error as e: print("error connecting to the database" + str(e)) else: print("error connecting to the database") delete_persistent_file() openplc_runtime.compile_program(st_file) return draw_compiling_page() @app.route('/compilation-logs', methods=['GET', 'POST']) def compilation_logs(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: return openplc_runtime.compilation_status() @app.route('/modbus', methods=['GET', 'POST']) def modbus(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: monitor.stop_monitor() if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() return_str = pages.w3_style + pages.style + draw_top_div() return_str += """


Dashboard

Dashboard

Programs

Programs

Modbus

Slave Devices

Monitoring

Monitoring

Hardware

Hardware

Users

Users

Settings

Settings

Logout

Logout



""" return_str += draw_status() return_str += """

Slave Devices

List of Slave devices attached to OpenPLC.

Attention: Slave devices are attached to address 100 onward (i.e. %IX100.0, %IW100, %QX100.0, and %QW100) """ database = "openplc.db" conn = create_connection(database) if (conn != None): try: cur = conn.cursor() cur.execute("SELECT dev_id, dev_name, dev_type, di_size, coil_size, ir_size, hr_read_size, hr_write_size FROM Slave_dev") rows = cur.fetchall() cur.close() conn.close() counter_di = 0 counter_do = 0 counter_ai = 0 counter_ao = 0 for row in rows: return_str += "" #calculate di if (row[3] == 0): di = "-" else: di = "%IX" + str(100 + (counter_di // 8)) + "." + str(counter_di%8) + " to " counter_di += row[3]; di += "%IX" + str(100 + ((counter_di-1) // 8)) + "." + str((counter_di-1)%8) #calculate do if (row[4] == 0): do = "-" else: do = "%QX" + str(100 + (counter_do // 8)) + "." + str(counter_do%8) + " to " counter_do += row[4]; do += "%QX" + str(100 + ((counter_do-1) // 8)) + "." + str((counter_do-1)%8) #calculate ai if (row[5] + row[6] == 0): ai = "-" else: ai = "%IW" + str(100 + counter_ai) + " to " counter_ai += row[5]+row[6]; ai += "%IW" + str(100 + (counter_ai-1)) #calculate ao if (row[7] == 0): ao = "-" else: ao = "%QW" + str(100 + counter_ao) + " to " counter_ao += row[7]; ao += "%QW" + str(100 + (counter_ao-1)) return_str += "" return_str += """
Device NameDevice TypeDIDOAIAO
" + str(row[1]) + "" + str(row[2]) + "" + di + "" + do + "" + ai + "" + ao + "

Add new device
""" except Error as e: print("error connecting to the database" + str(e)) return_str += 'Error connecting to the database. Make sure that your openplc.db file is not corrupt.

Error: ' + str(e) else: return_str += 'Error connecting to the database. Make sure that your openplc.db file is not corrupt.' return return_str @app.route('/add-modbus-device', methods=['GET', 'POST']) def add_modbus_device(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() if (flask.request.method == 'GET'): return_str = pages.w3_style + pages.style + draw_top_div() return_str += """


Dashboard

Dashboard

Programs

Programs

Modbus

Slave Devices

Monitoring

Monitoring

Hardware

Hardware

Users

Users

Settings

Settings

Logout

Logout



""" return_str += draw_status() return_str += """

Add new device


" return_str += "" return_str += "" else: return_str += "" return_str += "" return_str += "
" return_str += "
" return_str += "

""" if (openplc_runtime.status() == "Running"): #Check Modbus Server status modbus_enabled = False modbus_port_cfg = 502 database = "openplc.db" conn = create_connection(database) if (conn != None): try: print("Openning database") cur = conn.cursor() cur.execute("SELECT * FROM Settings") rows = cur.fetchall() cur.close() conn.close() for row in rows: if (row[0] == "Modbus_port"): if (row[1] != "disabled"): modbus_enabled = True modbus_port_cfg = int(row[1]) else: modbus_enabled = False except Error as e: return "error connecting to the database" + str(e) else: return "Error opening DB" if modbus_enabled == True: monitor.start_monitor(modbus_port_cfg) data_index = 0 for debug_data in monitor.debug_vars: return_str += '' # onclick="document.location=\'point-info?table_id=' + str(data_index) + '\'">' return_str += '' else: return_str += 'bool_trueTRUE' elif (debug_data.type == 'UINT'): percentage = (debug_data.value*100)/65535 return_str += '

' + str(debug_data.value) + '

' elif (debug_data.type == 'INT'): percentage = ((debug_data.value + 32768)*100)/65535 debug_data.value = ctypes.c_short(debug_data.value).value return_str += '

' + str(debug_data.value) + '

' elif (debug_data.type == 'REAL') or (debug_data.type == 'LREAL'): return_str += "{:10.4f}".format(debug_data.value) else: return_str += str(debug_data.value) return_str += '' data_index += 1 return_str += """
Point NameTypeLocationWriteValue
' + debug_data.name + '' + debug_data.type + '' + debug_data.location + '' if (debug_data.location.find('QX') != -1): return_str += '' return_str += '' return_str += '' if (debug_data.type == 'BOOL'): if (debug_data.value == 0): return_str += 'bool_falseFALSE
" return_str += pages.monitoring_tail #Modbus Server is not enabled else: return_str += """

You must enable Modbus Server on Settings to be able to monitor your program!

""" #Runtime is not running else: return_str += """
""" return return_str @app.route('/monitor-update', methods=['GET', 'POST']) def monitor_update(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: #if (openplc_runtime.status() == "Compiling"): return 'OpenPLC is compiling new code. Please wait' return_str = """ """ #if (openplc_runtime.status() == "Running"): if (True): mb_port_cfg = flask.request.args.get('mb_port') monitor.start_monitor(int(mb_port_cfg)) data_index = 0 for debug_data in monitor.debug_vars: return_str += '' # onclick="document.location=\'point-info?table_id=' + str(data_index) + '\'">' return_str += '' else: return_str += 'bool_trueTRUE' elif (debug_data.type == 'UINT'): percentage = (debug_data.value*100)/65535 return_str += '

' + str(debug_data.value) + '

' elif (debug_data.type == 'INT'): percentage = ((debug_data.value + 32768)*100)/65535 debug_data.value = ctypes.c_short(debug_data.value).value return_str += '

' + str(debug_data.value) + '

' elif (debug_data.type == 'REAL') or (debug_data.type == 'LREAL'): return_str += "{:10.4f}".format(debug_data.value) else: return_str += str(debug_data.value) return_str += '' data_index += 1 return_str += """
Point NameTypeLocationWriteValue
' + debug_data.name + '' + debug_data.type + '' + debug_data.location + '' if (debug_data.location.find('QX') != -1): return_str += '' return_str += '' return_str += '' if (debug_data.type == 'BOOL'): if (debug_data.value == 0): return_str += 'bool_falseFALSE
""" return return_str @app.route('/point-write', methods=['GET', 'POST']) def point_write(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: point_value = flask.request.args.get('value') point_address = flask.request.args.get('address') monitor.write_value(point_address, int(point_value)) return '' @app.route('/point-info', methods=['GET', 'POST']) def point_info(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: #if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() point_id = flask.request.args.get('table_id') debug_data = monitor.debug_vars[int(point_id)] return_str = pages.w3_style + pages.settings_style + draw_top_div() return_str += """


Dashboard

Dashboard

Programs

Programs

Modbus

Slave Devices

Monitoring

Monitoring

Hardware

Hardware

Users

Users

Settings

Settings

Logout

Logout



""" return_str += draw_status() return_str += """

Point Details


Point Name: """ + debug_data.name + """

Type: """ + debug_data.type + """

Location: """ + debug_data.location + "

" if (debug_data.type == 'BOOL'): if (debug_data.value == 0): return_str += """

Status: bool_falseFALSE

""" else: return_str += """

Status: bool_trueTRUE

""" elif (debug_data.type == 'UINT'): percentage = (debug_data.value*100)/65535 return_str += """

Value:

' + str(debug_data.value) + '

' elif (debug_data.type == 'INT'): percentage = ((debug_data.value + 32768)*100)/65535 debug_data.value = ctypes.c_short(debug_data.value).value return_str += """

Value:

' + str(debug_data.value) + '

' elif (debug_data.type == 'REAL') or (debug_data.type == 'LREAL'): return_str += """

Value: """ + "{:10.4f}".format(debug_data.value) + "

" else: return_str += """

Value: """ + str(debug_data.value) + "

" return_str += """

""" if (debug_data.type == 'BOOL'): return_str += """ """ else: return_str += """ """ return_str += pages.point_info_tail return return_str @app.route('/point-update', methods=['GET', 'POST']) def point_update(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: #if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() point_id = flask.request.args.get('table_id') debug_data = monitor.debug_vars[int(point_id)] return_str = """

Point Name: """ + debug_data.name + """

Type: """ + debug_data.type + """

Location: """ + debug_data.location + "

" if (debug_data.type == 'BOOL'): if (debug_data.value == 0): return_str += """

Status: bool_falseFALSE

""" else: return_str += """

Status: bool_trueTRUE

""" elif (debug_data.type == 'UINT'): percentage = (debug_data.value*100)/65535 return_str += """

Value:

' + str(debug_data.value) + '

' elif (debug_data.type == 'INT'): percentage = ((debug_data.value + 32768)*100)/65535 debug_data.value = ctypes.c_short(debug_data.value).value return_str += """

Value:

' + str(debug_data.value) + '

' elif (debug_data.type == 'REAL') or (debug_data.type == 'LREAL'): return_str += """

Value: """ + "{:10.4f}".format(debug_data.value) + "

" else: return_str += """

Value: """ + str(debug_data.value) + "

" return_str += """

""" return return_str @app.route('/hardware', methods=['GET', 'POST']) def hardware(): if (flask_login.current_user.is_authenticated == False): return flask.redirect(flask.url_for('login')) else: monitor.stop_monitor() if (openplc_runtime.status() == "Compiling"): return draw_compiling_page() if (flask.request.method == 'GET'): with open('./scripts/openplc_driver') as f: current_driver = f.read().rstrip() return_str = pages.w3_style + pages.hardware_style + draw_top_div() + pages.hardware_head return_str += draw_status() return_str += """

Hardware

OpenPLC controls inputs and outputs through a piece of code called hardware layer (also known as driver). Therefore, to properly handle the inputs and outputs of your board, you must select the appropriate hardware layer for it. The Blank hardware layer is the default option on OpenPLC, which provides no support for native inputs and outputs.

OpenPLC Hardware Layer