Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report error when label is not specified #52

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions brother_ql/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@

from .exceptions import *

from .raster import BrotherQLRaster

from .brother_ql_create import create_label

55 changes: 32 additions & 23 deletions brother_ql/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,49 @@

from .generic import BrotherQLBackendGeneric


available_backends = [
'pyusb',
'network',
'linux_kernel',
"pyusb",
"network",
"linux_kernel",
]


def guess_backend(identifier):
""" guess the backend from a given identifier string for the device """
if identifier.startswith('usb://') or identifier.startswith('0x'):
return 'pyusb'
elif identifier.startswith('file://') or identifier.startswith('/dev/usb/') or identifier.startswith('lp'):
return 'linux_kernel'
elif identifier.startswith('tcp://'):
return 'network'
"""guess the backend from a given identifier string for the device"""
if identifier.startswith("usb://") or identifier.startswith("0x"):
return "pyusb"
elif (
identifier.startswith("file://")
or identifier.startswith("/dev/usb/")
or identifier.startswith("lp")
):
return "linux_kernel"
elif identifier.startswith("tcp://"):
return "network"
else:
raise ValueError('Cannot guess backend for given identifier: %s' % identifier)
raise ValueError("Cannot guess backend for given identifier: %s" % identifier)


def backend_factory(backend_name):
if backend_name == "pyusb":
from . import pyusb as pyusb_backend

if backend_name == 'pyusb':
from . import pyusb as pyusb_backend
list_available_devices = pyusb_backend.list_available_devices
backend_class = pyusb_backend.BrotherQLBackendPyUSB
elif backend_name == 'linux_kernel':
backend_class = pyusb_backend.BrotherQLBackendPyUSB
elif backend_name == "linux_kernel":
from . import linux_kernel as linux_kernel_backend

list_available_devices = linux_kernel_backend.list_available_devices
backend_class = linux_kernel_backend.BrotherQLBackendLinuxKernel
elif backend_name == 'network':
from . import network as network_backend
backend_class = linux_kernel_backend.BrotherQLBackendLinuxKernel
elif backend_name == "network":
from . import network as network_backend

list_available_devices = network_backend.list_available_devices
backend_class = network_backend.BrotherQLBackendNetwork
backend_class = network_backend.BrotherQLBackendNetwork
else:
raise NotImplementedError('Backend %s not implemented.' % backend_name)
raise NotImplementedError("Backend %s not implemented." % backend_name)

return {'list_available_devices': list_available_devices, 'backend_class': backend_class}
return {
"list_available_devices": list_available_devices,
"backend_class": backend_class,
}
15 changes: 7 additions & 8 deletions brother_ql/backends/generic.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@


import logging

logger = logging.getLogger(__name__)


def list_available_devices():
""" List all available devices for the respective backend """
"""List all available devices for the respective backend"""
# returns a list of dictionaries with the keys 'identifier' and 'instance':
# [ {'identifier': '/dev/usb/lp0', 'instance': os.open('/dev/usb/lp0', os.O_RDWR)}, ]
raise NotImplementedError()


class BrotherQLBackendGeneric(object):

def __init__(self, device_specifier):
"""
device_specifier can be either a string or an instance
of the required class type.
"""
self.write_dev = None
self.read_dev = None
self.read_dev = None
raise NotImplementedError()

def _write(self, data):
Expand All @@ -29,16 +27,17 @@ def _read(self, length=32):
return bytes(self.read_dev.read(length))

def write(self, data):
logger.debug('Writing %d bytes.', len(data))
logger.debug("Writing %d bytes.", len(data))
self._write(data)

def read(self, length=32):
try:
ret_bytes = self._read(length)
if ret_bytes: logger.debug('Read %d bytes.', len(ret_bytes))
if ret_bytes:
logger.debug("Read %d bytes.", len(ret_bytes))
return ret_bytes
except Exception as e:
logger.debug('Error reading... %s', e)
logger.debug("Error reading... %s", e)
raise

def dispose(self):
Expand Down
80 changes: 47 additions & 33 deletions brother_ql/backends/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,27 @@
* printing
"""

import logging, time
import logging
import time

from brother_ql.backends import backend_factory, guess_backend
from brother_ql.reader import interpret_response

logger = logging.getLogger(__name__)

def discover(backend_identifier='linux_kernel'):

def discover(backend_identifier="linux_kernel"):
if backend_identifier is None:
logger.info("Backend for discovery not specified, defaulting to linux_kernel")
backend_identifier = "linux_kernel"
be = backend_factory(backend_identifier)
list_available_devices = be['list_available_devices']
BrotherQLBackend = be['backend_class']
list_available_devices = be["list_available_devices"]
BrotherQLBackend = be["backend_class"]

available_devices = list_available_devices()
return available_devices


def send(instructions, printer_identifier=None, backend_identifier=None, blocking=True):
"""
Send instruction bytes to a printer.
Expand All @@ -34,11 +39,11 @@ def send(instructions, printer_identifier=None, backend_identifier=None, blockin
"""

status = {
'instructions_sent': True, # The instructions were sent to the printer.
'outcome': 'unknown', # String description of the outcome of the sending operation like: 'unknown', 'sent', 'printed', 'error'
'printer_state': None, # If the selected backend supports reading back the printer state, this key will contain it.
'did_print': False, # If True, a print was produced. It defaults to False if the outcome is uncertain (due to a backend without read-back capability).
'ready_for_next_job': False, # If True, the printer is ready to receive the next instructions. It defaults to False if the state is unknown.
"instructions_sent": True, # The instructions were sent to the printer.
"outcome": "unknown", # String description of the outcome of the sending operation like: 'unknown', 'sent', 'printed', 'error'
"printer_state": None, # If the selected backend supports reading back the printer state, this key will contain it.
"did_print": False, # If True, a print was produced. It defaults to False if the outcome is uncertain (due to a backend without read-back capability).
"ready_for_next_job": False, # If True, the printer is ready to receive the next instructions. It defaults to False if the state is unknown.
}
selected_backend = None
if backend_identifier:
Expand All @@ -47,23 +52,27 @@ def send(instructions, printer_identifier=None, backend_identifier=None, blockin
try:
selected_backend = guess_backend(printer_identifier)
except:
logger.info("No backend stated. Selecting the default linux_kernel backend.")
selected_backend = 'linux_kernel'
logger.info(
"No backend stated. Selecting the default linux_kernel backend."
)
selected_backend = "linux_kernel"

be = backend_factory(selected_backend)
list_available_devices = be['list_available_devices']
BrotherQLBackend = be['backend_class']
list_available_devices = be["list_available_devices"]
BrotherQLBackend = be["backend_class"]

printer = BrotherQLBackend(printer_identifier)

start = time.time()
logger.info('Sending instructions to the printer. Total: %d bytes.', len(instructions))
logger.info(
"Sending instructions to the printer. Total: %d bytes.", len(instructions)
)
printer.write(instructions)
status['outcome'] = 'sent'
status["outcome"] = "sent"

if not blocking:
return status
if selected_backend == 'network':
if selected_backend == "network":
""" No need to wait for completion. The network backend doesn't support readback. """
return status

Expand All @@ -75,29 +84,34 @@ def send(instructions, printer_identifier=None, backend_identifier=None, blockin
try:
result = interpret_response(data)
except ValueError:
logger.error("TIME %.3f - Couln't understand response: %s", time.time()-start, data)
logger.error(
"TIME %.3f - Couln't understand response: %s", time.time() - start, data
)
continue
status['printer_state'] = result
logger.debug('TIME %.3f - result: %s', time.time()-start, result)
if result['errors']:
logger.error('Errors occured: %s', result['errors'])
status['outcome'] = 'error'
status["printer_state"] = result
logger.debug("TIME %.3f - result: %s", time.time() - start, result)
if result["errors"]:
logger.error("Errors occured: %s", result["errors"])
status["outcome"] = "error"
break
if result['status_type'] == 'Printing completed':
status['did_print'] = True
status['outcome'] = 'printed'
if result['status_type'] == 'Phase change' and result['phase_type'] == 'Waiting to receive':
status['ready_for_next_job'] = True
if status['did_print'] and status['ready_for_next_job']:
if result["status_type"] == "Printing completed":
status["did_print"] = True
status["outcome"] = "printed"
if (
result["status_type"] == "Phase change"
and result["phase_type"] == "Waiting to receive"
):
status["ready_for_next_job"] = True
if status["did_print"] and status["ready_for_next_job"]:
break

if not status['did_print']:
if not status["did_print"]:
logger.warning("'printing completed' status not received.")
if not status['ready_for_next_job']:
if not status["ready_for_next_job"]:
logger.warning("'waiting to receive' status not received.")
if (not status['did_print']) or (not status['ready_for_next_job']):
logger.warning('Printing potentially not successful?')
if status['did_print'] and status['ready_for_next_job']:
if (not status["did_print"]) or (not status["ready_for_next_job"]):
logger.warning("Printing potentially not successful?")
if status["did_print"] and status["ready_for_next_job"]:
logger.info("Printing was successful. Waiting for the next job.")

return status
32 changes: 20 additions & 12 deletions brother_ql/backends/linux_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
Works on Linux.
"""

import glob, os, time, select
import glob
import os
import time
import select

from .generic import BrotherQLBackendGeneric


def list_available_devices():
"""
List all available devices for the linux kernel backend
Expand All @@ -18,9 +22,10 @@ def list_available_devices():
Instance is set to None because we don't want to open (and thus potentially block) the device here.
"""

paths = glob.glob('/dev/usb/lp*')
paths = glob.glob("/dev/usb/lp*")

return [{"identifier": "file://" + path, "instance": None} for path in paths]

return [{'identifier': 'file://' + path, 'instance': None} for path in paths]

class BrotherQLBackendLinuxKernel(BrotherQLBackendGeneric):
"""
Expand All @@ -35,46 +40,49 @@ def __init__(self, device_specifier):

self.read_timeout = 0.01
# strategy : try_twice or select
self.strategy = 'select'
self.strategy = "select"
if isinstance(device_specifier, str):
if device_specifier.startswith('file://'):
if device_specifier.startswith("file://"):
device_specifier = device_specifier[7:]
self.dev = os.open(device_specifier, os.O_RDWR)
elif isinstance(device_specifier, int):
self.dev = device_specifier
else:
raise NotImplementedError('Currently the printer can be specified either via an appropriate string or via an os.open() handle.')
raise NotImplementedError(
"Currently the printer can be specified either via an appropriate string or via an os.open() handle."
)

self.write_dev = self.dev
self.read_dev = self.dev
self.read_dev = self.dev

def _write(self, data):
os.write(self.write_dev, data)

def _read(self, length=32):
if self.strategy == 'try_twice':
if self.strategy == "try_twice":
data = os.read(self.read_dev, length)
if data:
return data
else:
time.sleep(self.read_timeout)
return os.read(self.read_dev, length)
elif self.strategy == 'select':
data = b''
elif self.strategy == "select":
data = b""
start = time.time()
while (not data) and (time.time() - start < self.read_timeout):
result, _, _ = select.select([self.read_dev], [], [], 0)
if self.read_dev in result:
data += os.read(self.read_dev, length)
if data: break
if data:
break
time.sleep(0.001)
if not data:
# one last try if still no data:
return os.read(self.read_dev, length)
else:
return data
else:
raise NotImplementedError('Unknown strategy')
raise NotImplementedError("Unknown strategy")

def _dispose(self):
os.close(self.dev)
Loading