Source code for qtpyvcp.actions.program_actions

import sys
import linuxcnc
import tempfile

from qtpy.QtCore import Qt, QTimer
# Set up logging
from qtpyvcp.utilities import logger
LOG = logger.getLogger(__name__)

from qtpyvcp.utilities.info import Info
from qtpyvcp.plugins import getPlugin

STATUS = getPlugin('status')
STAT = STATUS.stat
INFO = Info()
CMD = linuxcnc.command()

from qtpyvcp.actions.base_actions import setTaskMode


#==============================================================================
# Program actions
#==============================================================================

def load(fname, add_to_recents=True, isreload=False):
    if not fname:
        # load a blank file. Maybe should load [DISPLAY] OPEN_FILE
        clear()

    #setTaskMode(linuxcnc.MODE_AUTO)
    if not isreload:
        STATUS.addLock()
    
    filter_prog = INFO.getFilterProgram(fname)
    if not filter_prog:
        LOG.debug('Loading NC program: %s', fname)
        CMD.program_open(fname.encode('utf-8'))
        CMD.wait_complete()
    else:
        LOG.debug('Loading file with filter program: %s', fname)
        openFilterProgram(fname, filter_prog)

    if add_to_recents:
        addToRecents(fname)
    
    QTimer.singleShot(2000, STATUS.removeLock)

load.ok = lambda *args, **kwargs: True
load.bindOk = lambda *args, **kwargs: True

[docs]def reload(): """Reload the currently loaded NC program ActionButton syntax:: program.reload """ stat = linuxcnc.stat() stat.poll() fname = stat.file if os.path.exists(fname): load(stat.file, add_to_recents=False, isreload=True)
reload.ok = lambda *args, **kwargs: True reload.bindOk = lambda *args, **kwargs: True
[docs]def clear(): """Clear the loaded NC program ActionButton syntax:: program.clear """ _, blankfile = tempfile.mkstemp(prefix="new_program_", suffix=".ngc") with open(blankfile, 'w') as fp: fp.write("(New Program)\n\n\nM30") load(blankfile, add_to_recents=False)
clear.ok = lambda *args, **kwargs: True clear.bindOk = lambda *args, **kwargs: True def addToRecents(fname): files = STATUS.recent_files.getValue() if fname in files: files.remove(fname) files.insert(0, fname) STATUS.recent_files.setValue(files[:STATUS.max_recent_files]) # ------------------------------------------------------------------------- # program RUN action # -------------------------------------------------------------------------
[docs]def run(start_line=0): """Runs the loaded program, optionally starting from a specific line. ActionButton syntax:: program.run program.run:line Args: start_line (int, optional) : The line to start program from. Defaults to 0. """ if STAT.state == linuxcnc.RCS_EXEC and STAT.paused: CMD.auto(linuxcnc.AUTO_RESUME) elif setTaskMode(linuxcnc.MODE_AUTO): CMD.auto(linuxcnc.AUTO_RUN, start_line)
def _run_ok(widget=None): """Checks if it is OK to run a program. Args: widget (QWidget, optional) : If a widget is supplied it will be enabled/disabled according to the result, and will have it's statusTip property set to the reason the action is disabled. Returns: bool : True if Ok, else False. """ if STAT.estop: ok = False msg = "Can't run program when in E-Stop" elif not STAT.enabled: ok = False msg = "Can't run program when not enabled" elif not STATUS.allHomed(): ok = False msg = "Can't run program when not homed" elif not STAT.paused and not STAT.interp_state == linuxcnc.INTERP_IDLE: ok = False msg = "Can't run program when already running" elif STAT.file == "": ok = False msg = "Can't run program when no file loaded" else: ok = True msg = "Run program" _run_ok.msg = msg if widget is not None: widget.setEnabled(ok) widget.setStatusTip(msg) widget.setToolTip(msg) return ok def _run_bindOk(widget): STATUS.estop.onValueChanged(lambda: _run_ok(widget)) STATUS.enabled.onValueChanged(lambda: _run_ok(widget)) STATUS.all_axes_homed.onValueChanged(lambda: _run_ok(widget)) STATUS.interp_state.onValueChanged(lambda: _run_ok(widget)) STATUS.file.onValueChanged(lambda: _run_ok(widget)) run.ok = _run_ok run.bindOk = _run_bindOk # ------------------------------------------------------------------------- # program RUN from LINE action # ------------------------------------------------------------------------- def run_from_line(line=None): # TODO: This might should show a popup to select start line, # or it could get the start line from the gcode view or # even from the backplot. LOG.error('Run from line not implemented yet.') run_from_line.ok = _run_ok run_from_line.bindOk = _run_bindOk # ------------------------------------------------------------------------- # program STEP action # -------------------------------------------------------------------------
[docs]def step(): """Steps program line by line ActionButton syntax:: program.step """ if STAT.state == linuxcnc.RCS_EXEC and STAT.paused: CMD.auto(linuxcnc.AUTO_STEP) elif setTaskMode(linuxcnc.MODE_AUTO): CMD.auto(linuxcnc.AUTO_STEP)
step.ok = _run_ok step.bindOk = _run_bindOk # ------------------------------------------------------------------------- # program PAUSE action # -------------------------------------------------------------------------
[docs]def pause(): """Pause executing program ActionButton syntax:: program.pause """ LOG.debug("Pausing program execution") CMD.auto(linuxcnc.AUTO_PAUSE)
def _pause_ok(widget=None): """Checks if it is OK to pause the program. Args: widget (QWidget, optional) : If a widget is supplied it will be enabled/disabled according to the result, and will have it's statusTip property set to the reason the action is disabled. Returns: bool : True if Ok, else False. """ if STAT.state == linuxcnc.RCS_EXEC and not STAT.paused: msg = "Pause program execution" ok = True elif STAT.paused: msg = "Program is already paused" ok = False else: msg = "No program running to pause" ok = False _pause_ok.msg = msg if widget is not None: widget.setEnabled(ok) widget.setStatusTip(msg) widget.setToolTip(msg) return ok def _pause_bindOk(widget): STATUS.state.onValueChanged(lambda: _pause_ok(widget)) STATUS.paused.onValueChanged(lambda: _pause_ok(widget)) pause.ok = _pause_ok pause.bindOk = _pause_bindOk # ------------------------------------------------------------------------- # program RESUME action # -------------------------------------------------------------------------
[docs]def resume(): """Resume a previously paused program ActionButton syntax:: program.resume """ LOG.debug("Resuming program execution") CMD.auto(linuxcnc.AUTO_RESUME)
def _resume_ok(widget): """Checks if it is OK to resume a paused program. Args: widget (QWidget, optional) : If a widget is supplied it will be enabled/disabled according to the result, and will have it's statusTip property set to the reason the action is disabled. Returns: bool : True if Ok, else False. """ if STAT.state == linuxcnc.RCS_EXEC and STAT.paused: ok = True msg = "Resume program execution" else: ok = False msg = "No paused program to resume" _resume_ok.msg = msg if widget is not None: widget.setEnabled(ok) widget.setStatusTip(msg) widget.setToolTip(msg) return ok def _resume_bindOk(widget): STATUS.paused.onValueChanged(lambda: _resume_ok(widget)) STATUS.state.onValueChanged(lambda: _resume_ok(widget)) resume.ok = _resume_ok resume.bindOk = _resume_bindOk # ------------------------------------------------------------------------- # program ABORT action # -------------------------------------------------------------------------
[docs]def abort(): """Aborts any currently executing program, MDI command or homing operation. ActionButton syntax:: program.abort """ LOG.debug("Aborting program") CMD.abort()
def _abort_ok(widget=None): """Checks if it is OK to abort current operation. Args: widget (QWidget, optional) : If a widget is supplied it will be enabled/disabled according to the result, and will have it's statusTip property set to the reason the action is disabled. Returns: bool : True if Ok, else False. """ if STAT.state == linuxcnc.RCS_EXEC or STAT.state == linuxcnc.RCS_ERROR: ok = True msg = "" else: ok = False msg = "Nothing to abort" _abort_ok.msg = msg if widget is not None: widget.setEnabled(ok) widget.setStatusTip(msg) widget.setToolTip(msg) return ok def _abort_bindOk(widget): STATUS.state.onValueChanged(lambda: _abort_ok(widget)) abort.ok = _abort_ok abort.bindOk = _abort_bindOk # ------------------------------------------------------------------------- # BLOCK DELETE actions # -------------------------------------------------------------------------
[docs]class block_delete: """Block Delete Group"""
[docs] @staticmethod def on(): """Start ignoring lines beginning with '/'. ActionButton syntax:: program.block-delete.on """ LOG.debug("Setting block delete green<ON>") CMD.set_block_delete(True)
[docs] @staticmethod def off(): """Stop ignoring lines beginning with '/'. ActionButton syntax:: program.block-delete.off """ LOG.debug("Setting block delete red<OFF>") CMD.set_block_delete(False)
[docs] @staticmethod def toggle(): """Toggle ignoring lines beginning with '/'. ActionButton syntax:: program.block-delete.toggle """ if STAT.block_delete == True: block_delete.off() else: block_delete.on()
def _block_delete_ok(widget=None): """Checks if it is OK to set block_delete. Args: widget (QWidget, optional) : If a widget is supplied it will be enabled/disabled according to the result, and will have it's statusTip property set to the reason the action is disabled. Returns: bool : True if Ok, else False. """ if STAT.task_state == linuxcnc.STATE_ON: ok = True msg = "" else: ok = False msg = "Machine must be ON to set Block Del" _block_delete_ok.msg = msg if widget is not None: widget.setEnabled(ok) widget.setStatusTip(msg) widget.setToolTip(msg) return ok def _block_delete_bindOk(widget): widget.setChecked(STAT.block_delete) STATUS.task_state.onValueChanged(lambda: _block_delete_ok(widget)) STATUS.block_delete.onValueChanged(lambda s: widget.setChecked(s)) block_delete.on.ok = block_delete.off.ok = block_delete.toggle.ok = _block_delete_ok block_delete.on.bindOk = block_delete.off.bindOk = block_delete.toggle.bindOk = _block_delete_bindOk # ------------------------------------------------------------------------- # OPTIONAL STOP actions # -------------------------------------------------------------------------
[docs]class optional_stop: """Optional Stop Group"""
[docs] @staticmethod def on(): """Pause when a line beginning with M1 is encountered ActionButton syntax:: program.optional-stop.on """ LOG.debug("Setting optional stop green<ON>") CMD.set_optional_stop(True)
[docs] @staticmethod def off(): """Don't pause when a line beginning with M1 is encountered ActionButton syntax:: program.option-stop.off """ LOG.debug("Setting optional stop red<OFF>") CMD.set_optional_stop(False)
[docs] @staticmethod def toggle(): """Toggle pause when a line beginning with M1 is encountered ActionButton syntax:: program.optional-stop.toggle """ if STAT.optional_stop == True: optional_stop.off() else: optional_stop.on()
def _optional_stop_ok(widget=None): """Checks if it is OK to set optional_stop. Args: widget (QWidget, optional) : If a widget is supplied it will be enabled/disabled according to the result, and will have it's statusTip property set to the reason the action is disabled. Returns: bool : True if Ok, else False. """ if STAT.task_state == linuxcnc.STATE_ON: ok = True msg = "" else: ok = False msg = "Machine must be ON to set Opt Stop" _optional_stop_ok.msg = msg if widget is not None: widget.setEnabled(ok) widget.setStatusTip(msg) widget.setToolTip(msg) return ok def _optional_stop_bindOk(widget): widget.setChecked(STAT.block_delete) STATUS.task_state.onValueChanged(lambda: _optional_stop_ok(widget)) STATUS.optional_stop.onValueChanged(lambda s: widget.setChecked(s)) optional_stop.on.ok = optional_stop.off.ok = optional_stop.toggle.ok = _optional_stop_ok optional_stop.on.bindOk = optional_stop.off.bindOk = optional_stop.toggle.bindOk = _optional_stop_bindOk optional_skip = block_delete #============================================================================== # Program preprocessing handlers #============================================================================== import os, sys, time, select, re import tempfile, atexit, shutil FILTER_TEMP = None def openFilterProgram(infile, prog_name): temp_dir = _mktemp() outfile = os.path.join(temp_dir, os.path.basename(infile)) #FilterProgram(prog_name, infile, outfile, lambda r: r or _loadFilterResult(outfile)) FilterProgram(prog_name, infile, outfile, None) CMD.program_open(outfile) LOG.debug('Linuxcnc Command - program_open') def _loadFilterResult(fname): if fname: CMD.program_open(fname) def _mktemp(): global FILTER_TEMP if FILTER_TEMP is not None: return FILTER_TEMP FILTER_TEMP = tempfile.mkdtemp(prefix='emcflt-', suffix='.d') atexit.register(lambda: shutil.rmtree(FILTER_TEMP)) return FILTER_TEMP # slightly reworked code from gladevcp # loads a filter program and collects the result progress_re = re.compile("^FILTER_PROGRESS=(\\d*)$") class FilterProgram: def __init__(self, prog_name, infile, outfile, callback=None): import subprocess outfile = open(outfile, "w") infile = infile.replace("'", "'\\''") env = dict(os.environ) env['AXIS_PROGRESS_BAR'] = '1' self.p = subprocess.run(["sh", "-c", "%s '%s'" % (prog_name, infile)], stdin=subprocess.PIPE, stdout=outfile, stderr=subprocess.PIPE, env=env, text=True) self.stderr_text = [] self.program_filter = prog_name self.callback = callback #self.gid = STATUS.onValueChanged('periodic', self.update) #progress = Progress(1, 100) #progress.set_text(_("Filtering...")) # force file load until know what to do about update/finish def update(self, w): if self.p.poll() is not None: self.finish() STATUS.disconnect(self.gid) return False r, w, x = select.select([self.p.stderr], [], [], 0) if not r: return True stderr_line = self.p.stderr.readline() m = progress_re.match(stderr_line) if m: pass #progress.update(int(m.group(1)), 1) else: self.stderr_text.append(stderr_line) sys.stderr.write(stderr_line) return True def finish(self): # .. might be something left on stderr for line in self.p.stderr: m = progress_re.match(line) if not m: self.stderr_text.append(line) sys.stderr.write(line) r = self.p.returncode if r: self.error(r, "".join(self.stderr_text)) if self.callback: self.callback(r) def error(self, exitcode, stderr): LOG.error("Error loading filter program!") # dialog = gtk.MessageDialog(None, 0, gtk.MESSAGE_ERROR, gtk.BUTTONS_CLOSE, # _("The program %(program)r exited with code %(code)d. " # "Any error messages it produced are shown below:") # % {'program': self.program_filter, 'code': exitcode}) # diaLOG.format_secondary_text(stderr) # diaLOG.run() # diaLOG.destroy()