utils.batching - Batching Components

Kofa components for batch processing.

Batch processors eat CSV files to add, update or remove large numbers of certain kinds of objects at once.

class waeup.kofa.utils.batching.AsyncExportJob(site, exporter_name, *args, **kwargs)[source]

Bases: zc.async.job.Job

An IJob that exports data to CSV files.

AsyncExportJob instances are regular AsyncJob instances with a different constructor API. Instead of a callable to execute, you must pass a site and some exporter_name to trigger an export.

The real work is done when an instance of this class is put into a queue. See waeup.kofa.async to learn more about asynchronous jobs.

The exporter_name must be the name under which an ICSVExporter utility was registered with the ZCA.

The site must be a valid site or None.

The result of an AsyncExportJob is the path to generated CSV file. The file will reside in a temporary directory that should be removed after being used.

__doc__ = 'An IJob that exports data to CSV files.\n\n `AsyncExportJob` instances are regular `AsyncJob` instances with a\n different constructor API. Instead of a callable to execute, you\n must pass a `site` and some `exporter_name` to trigger an export.\n\n The real work is done when an instance of this class is put into a\n queue. See :mod:`waeup.kofa.async` to learn more about\n asynchronous jobs.\n\n The `exporter_name` must be the name under which an ICSVExporter\n utility was registered with the ZCA.\n\n The `site` must be a valid site or ``None``.\n\n The result of an `AsyncExportJob` is the path to generated CSV\n file. The file will reside in a temporary directory that should be\n removed after being used.\n '
__implemented__ = <implementedBy waeup.kofa.utils.batching.AsyncExportJob>
__init__(site, exporter_name, *args, **kwargs)[source]
__module__ = 'waeup.kofa.utils.batching'
__provides__

Special descriptor for class __provides__

The descriptor caches the implementedBy info, so that we can get declarations for objects without instance-specific interfaces a bit quicker.

For example:

>>> from zope.interface import Interface
>>> class IFooFactory(Interface):
...     pass
>>> class IFoo(Interface):
...     pass
>>> class C(object):
...     implements(IFoo)
...     classProvides(IFooFactory)
>>> [i.getName() for i in C.__provides__]
['IFooFactory']
>>> [i.getName() for i in C().__provides__]
['IFoo']
failed

A report job is marked failed iff it is finished and the result is None.

While a job is unfinished, the failed status is None.

Failed jobs normally provide a traceback to examine reasons.

finished

A job is marked finished if it is completed.

Please note: a finished report job does not neccessarily provide an IReport result. See meth:failed.

class waeup.kofa.utils.batching.BatchProcessor[source]

Bases: grokcore.component.components.GlobalUtility

A processor to add, update, or remove data.

This is a non-active baseclass.

__doc__ = 'A processor to add, update, or remove data.\n\n This is a non-active baseclass.\n '
__implemented__ = <implementedBy waeup.kofa.utils.batching.BatchProcessor>
__module__ = 'waeup.kofa.utils.batching'
__provides__

Special descriptor for class __provides__

The descriptor caches the implementedBy info, so that we can get declarations for objects without instance-specific interfaces a bit quicker.

For example:

>>> from zope.interface import Interface
>>> class IFooFactory(Interface):
...     pass
>>> class IFoo(Interface):
...     pass
>>> class C(object):
...     implements(IFoo)
...     classProvides(IFooFactory)
>>> [i.getName() for i in C.__provides__]
['IFooFactory']
>>> [i.getName() for i in C().__provides__]
['IFoo']
addEntry(obj, row, site)[source]

Add the entry given given by row data.

applyMapping(row, mapping)[source]

Apply mapping to a row of CSV data.

available_fields
callFactory(*args, **kw)[source]
checkConversion(row, mode='ignore', ignore_empty=True)[source]

Validates all values in row.

checkHeaders(headerfields, mode='create')[source]
checkUpdateRequirements(obj, row, site)[source]

Checks requirements the object must fulfill when being updated.

This method is not used in case of deleting or adding objects.

Returns error messages as strings in case of requirement problems.

createLogfile(path, fail_path, num, warnings, mode, user, timedelta, logger=None)[source]

Write to log file.

delEntry(row, site)[source]

Delete entry given by row data.

doImport(path, headerfields, mode='create', user='Unknown', logger=None, ignore_empty=True)[source]

In contrast to most other methods, doImport is not supposed to be customized, neither in custom packages nor in derived batch processor classes. Therefore, this is the only place where we do import data.

Before this method starts creating or updating persistent data, it prepares two more files in a temporary folder of the filesystem: (1) a file for pending data with file extension .pending and (2) a file for successfully processed data with file extension .finished. Then the method starts iterating over all rows of the CSV file. Each row is treated as follows:

  1. An empty row is skipped.

  2. Empty strings or lists ([]) in the row are replaced by ignore markers.

  3. The BatchProcessor.checkConversion method validates and converts all values in the row. Conversion means the transformation of strings into Python objects. For instance, number expressions have to be transformed into integers, dates into datetime objects, phone number expressions into phone number objects, etc. The converter returns a dictionary with converted values or, if the validation of one of the elements fails, an appropriate warning message. If the conversion fails a pending record is created and stored in the pending data file together with a warning message the converter has raised.

  4. In create mode only:

    The parent object must be found and a child object with same object id must not exist. Otherwise the row is skipped, a corresponding warning message is raised and a record is stored in the pending data file.

    Now doImport tries to add the new object with the data from the conversion dictionary. In some cases this may fail and a DuplicationError is raised. For example, a new payment ticket is created but the same payment for same session has already been made. In this case the object id is unique, no other object with same id exists, but making the ‘same’ payment twice does not make sense. The import is skipped and a record is stored in the pending data file.

  5. In update mode only:

    If the object can’t be found, the row is skipped, a no such entry warning message is raised and a record is stored in the pending data file.

    The BatchProcessor.checkUpdateRequirements method checks additional requirements the object must fulfill before being updated. These requirements are not imposed by the data type but the context of the object. For example, post-graduate students have a different registration workflow. With this method we do forbid certain workflow transitions or states.

    Finally, doImport updates the existing object with the data from the conversion dictionary.

  6. In remove mode only:

    If the object can’t be found, the row is skipped, a no such entry warning message is raised and a record is stored in the pending data file.

    Finally, doImport removes the existing object.

emptyRow(row)[source]

Detect empty rows.

entryExists(row, site)[source]

Tell whether there already exists an entry for row data.

factory_name = ''
getEntry(row, site)[source]

Get the object for the entry in row.

getHeaders(mode='create')[source]
getMapping(path, headerfields, mode)[source]

Get a mapping from CSV file headerfields to actually used fieldnames.

getParent(row, site)[source]

Get the parent object for the entry in row.

get_csv_skeleton()[source]

Export CSV file only with a header of available fields.

A raw string with CSV data should be returned.

iface = <InterfaceClass zope.interface.Interface>
location_fields = []
name = u'Non-registered base processor'
parentsExist(row, site)[source]

Tell whether the parent object for data in row exists.

req
required_fields

Required fields that have no default.

A list of names of field, whose value cannot be set if not given during creation. Therefore these fields must exist in input.

Fields with a default != missing_value do not belong to this category.

stringFromErrs(errors, inv_errors)[source]
updateEntry(obj, row, site, filename)[source]

Update obj to the values given in row.

Returns a string describing the fields changed.

util_name = ''
writeFailedRow(writer, row, warnings)[source]

Write a row with error messages to error CSV.

If warnings is a list of strings, they will be concatenated.

class waeup.kofa.utils.batching.ExportContainerFinder[source]

Bases: grokcore.component.components.GlobalUtility

Finder for local (site-wide) export container.

__call__()[source]

Get the local export container-

If no site can be determined or the site provides no export container, None is returned.

__doc__ = 'Finder for local (site-wide) export container.\n '
__implemented__ = <implementedBy waeup.kofa.utils.batching.ExportContainerFinder>
__module__ = 'waeup.kofa.utils.batching'
__provides__

Special descriptor for class __provides__

The descriptor caches the implementedBy info, so that we can get declarations for objects without instance-specific interfaces a bit quicker.

For example:

>>> from zope.interface import Interface
>>> class IFooFactory(Interface):
...     pass
>>> class IFoo(Interface):
...     pass
>>> class C(object):
...     implements(IFoo)
...     classProvides(IFooFactory)
>>> [i.getName() for i in C.__provides__]
['IFooFactory']
>>> [i.getName() for i in C().__provides__]
['IFoo']
class waeup.kofa.utils.batching.ExportJobContainer[source]

Bases: object

A mix-in that provides functionality for asynchronous export jobs.

__dict__ = dict_proxy({'__providedBy__': <_interface_coptimizations.ObjectSpecificationDescriptor object at 0x2ba7d007e2b0>, 'delete_export_entry': <function delete_export_entry at 0x2ba7d52175f0>, '__module__': 'waeup.kofa.utils.batching', '__implemented__': <implementedBy waeup.kofa.utils.batching.ExportJobContainer>, 'running_exports': [], 'get_export_jobs_status': <function get_export_jobs_status at 0x2ba7d5217668>, '__provides__': <zope.interface.declarations.ClassProvides object at 0x2ba7d5984390>, 'entry_from_job_id': <function entry_from_job_id at 0x2ba7d5217578>, 'start_export_job': <function start_export_job at 0x2ba7d5217500>, '__dict__': <attribute '__dict__' of 'ExportJobContainer' objects>, 'get_running_export_jobs': <function get_running_export_jobs at 0x2ba7d52176e0>, '__weakref__': <attribute '__weakref__' of 'ExportJobContainer' objects>, '__doc__': 'A mix-in that provides functionality for asynchronous export jobs.\n '})
__doc__ = 'A mix-in that provides functionality for asynchronous export jobs.\n '
__implemented__ = <implementedBy waeup.kofa.utils.batching.ExportJobContainer>
__module__ = 'waeup.kofa.utils.batching'
__providedBy__

Special descriptor for class __provides__

The descriptor caches the implementedBy info, so that we can get declarations for objects without instance-specific interfaces a bit quicker.

For example:

>>> from zope.interface import Interface
>>> class IFooFactory(Interface):
...     pass
>>> class IFoo(Interface):
...     pass
>>> class C(object):
...     implements(IFoo)
...     classProvides(IFooFactory)
>>> [i.getName() for i in C.__provides__]
['IFooFactory']
>>> [i.getName() for i in C().__provides__]
['IFoo']
__provides__

Special descriptor for class __provides__

The descriptor caches the implementedBy info, so that we can get declarations for objects without instance-specific interfaces a bit quicker.

For example:

>>> from zope.interface import Interface
>>> class IFooFactory(Interface):
...     pass
>>> class IFoo(Interface):
...     pass
>>> class C(object):
...     implements(IFoo)
...     classProvides(IFooFactory)
>>> [i.getName() for i in C.__provides__]
['IFooFactory']
>>> [i.getName() for i in C().__provides__]
['IFoo']
__weakref__

list of weak references to the object (if defined)

delete_export_entry(entry)[source]

Delete the export denoted by entry.

Removes given entry from the local running_exports list and also removes the regarding job via the local job manager.

entry must be a tuple (<job id>, <exporter name>, <user id>) as created by start_export_job() or returned by get_running_export_jobs().

entry_from_job_id(job_id)[source]

Get entry tuple for job_id.

Returns None if no such entry can be found.

get_export_jobs_status(user_id=None)[source]

Get running/completed export jobs for user_id as list of tuples.

Each tuple holds <raw status>, <status translated>, <exporter title> in that order, where <status translated> and <exporter title> are translated strings representing the status of the job and the human readable title of the exporter used.

get_running_export_jobs(user_id=None)[source]

Get export jobs for user with user_id as list of tuples.

Each tuples holds <job_id>, <exporter_name>, <user_id> in that order. The <exporter_name> is the utility name of the used exporter.

If user_id is None, all running jobs are returned.

running_exports = []
start_export_job(exporter_name, user_id, *args, **kwargs)[source]

Start asynchronous export job.

exporter_name is the name of an exporter utility to be used.

user_id is the ID of the user that triggers the export.

The job_id is stored along with exporter name and user id in a persistent list.

The method supports additional positional and keyword arguments, which are passed as-is to the respective AsyncExportJob.

Returns the job ID of the job started.

class waeup.kofa.utils.batching.ExporterBase[source]

Bases: object

A base for exporters.

__dict__ = dict_proxy({'export_all': <function export_all at 0x2ba7d5b7c668>, '__providedBy__': <_interface_coptimizations.ObjectSpecificationDescriptor object at 0x2ba7d007e2b0>, '__module__': 'waeup.kofa.utils.batching', '__implemented__': <implementedBy waeup.kofa.utils.batching.ExporterBase>, 'get_selected': <function get_selected at 0x2ba7d5b7c578>, 'title': 'Override this title', 'fields': ('code', 'title', 'title_prefix'), 'close_outfile': <function close_outfile at 0x2ba7d5b7c488>, '__provides__': <zope.interface.declarations.ClassProvides object at 0x2ba7d5a55910>, 'get_csv_writer': <function get_csv_writer at 0x2ba7d5b7c398>, 'export_selected': <function export_selected at 0x2ba7d5b7c758>, 'get_filtered': <function get_filtered at 0x2ba7d5b7c500>, '__weakref__': <attribute '__weakref__' of 'ExporterBase' objects>, 'export': <function export at 0x2ba7d5b7c5f0>, '__dict__': <attribute '__dict__' of 'ExporterBase' objects>, 'export_filtered': <function export_filtered at 0x2ba7d5b7c6e0>, 'mangle_value': <function mangle_value at 0x2ba7d5b7c320>, '__doc__': 'A base for exporters.\n ', 'write_item': <function write_item at 0x2ba7d5b7c410>})
__doc__ = 'A base for exporters.\n '
__implemented__ = <implementedBy waeup.kofa.utils.batching.ExporterBase>
__module__ = 'waeup.kofa.utils.batching'
__providedBy__

Special descriptor for class __provides__

The descriptor caches the implementedBy info, so that we can get declarations for objects without instance-specific interfaces a bit quicker.

For example:

>>> from zope.interface import Interface
>>> class IFooFactory(Interface):
...     pass
>>> class IFoo(Interface):
...     pass
>>> class C(object):
...     implements(IFoo)
...     classProvides(IFooFactory)
>>> [i.getName() for i in C.__provides__]
['IFooFactory']
>>> [i.getName() for i in C().__provides__]
['IFoo']
__provides__

Special descriptor for class __provides__

The descriptor caches the implementedBy info, so that we can get declarations for objects without instance-specific interfaces a bit quicker.

For example:

>>> from zope.interface import Interface
>>> class IFooFactory(Interface):
...     pass
>>> class IFoo(Interface):
...     pass
>>> class C(object):
...     implements(IFoo)
...     classProvides(IFooFactory)
>>> [i.getName() for i in C.__provides__]
['IFooFactory']
>>> [i.getName() for i in C().__provides__]
['IFoo']
__weakref__

list of weak references to the object (if defined)

close_outfile(filepath, outfile)[source]

Close outfile. If filepath is None, the contents of outfile is returned.

export(iterable, filepath=None)[source]

Export iterable as CSV file. If filepath is None, a raw string with CSV data should be returned.

export_all(site, filepath=None)[source]

Export all appropriate objects in site into filepath as CSV data. If filepath is None, a raw string with CSV data should be returned.

export_filtered(site, filepath=None, **kw)[source]

Export items denoted by kw. If filepath is None, a raw string with CSV data should be returned.

export_selected(site, filepath=None, **kw)[source]

Export those items specified by a list of identifiers called selected. If filepath is None, a raw string with CSV data should be returned.

fields = ('code', 'title', 'title_prefix')

Fieldnames considered by this exporter

get_csv_writer(filepath=None)[source]

Get a CSV dict writer instance open for writing.

Returns a tuple (<writer>, <outfile>) where <writer> is a csv.DictWriter instance and outfile is the real file which is written to. The latter is important when writing to StringIO and can normally be ignored otherwise.

The returned file will already be filled with the header row.

Please note that if you give a filepath, the returned outfile is open for writing only and you might have to close it before reopening it for reading.

get_filtered(site, **kw)[source]

Get datasets to export filtered by keyword arguments. Returns an iterable.

get_selected(site, selected)[source]

Get datasets to export for selected items specified by a list of identifiers. Returns an iterable.

mangle_value(value, name, context=None)[source]

Hook for mangling values in derived classes.

title = 'Override this title'

The title under which this exporter will be displayed (if registered as a utility)

write_item(obj, writer)[source]

Write a row extracted from obj into CSV file using writer.

class waeup.kofa.utils.batching.VirtualExportJobContainer[source]

Bases: waeup.kofa.utils.batching.ExportJobContainer

A virtual export job container.

Virtual ExportJobContainers can be used as a mixin just like real ExportJobContainer.

They retrieve and store data in the site-wide ExportJobContainer.

Functionality is currently entirely as for regular ExportJobContainers, except that data is stored elsewhere.

VirtualExportJobContainers need a registered IExportContainerFinder utility to find a suitable container for storing data.

__doc__ = 'A virtual export job container.\n\n Virtual ExportJobContainers can be used as a mixin just like real\n ExportJobContainer.\n\n They retrieve and store data in the site-wide ExportJobContainer.\n\n Functionality is currently entirely as for regular\n ExportJobContainers, except that data is stored elsewhere.\n\n VirtualExportJobContainers need a registered\n IExportContainerFinder utility to find a suitable container for\n storing data.\n '
__implemented__ = <implementedBy waeup.kofa.utils.batching.VirtualExportJobContainer>
__module__ = 'waeup.kofa.utils.batching'
__provides__

Special descriptor for class __provides__

The descriptor caches the implementedBy info, so that we can get declarations for objects without instance-specific interfaces a bit quicker.

For example:

>>> from zope.interface import Interface
>>> class IFooFactory(Interface):
...     pass
>>> class IFoo(Interface):
...     pass
>>> class C(object):
...     implements(IFoo)
...     classProvides(IFooFactory)
>>> [i.getName() for i in C.__provides__]
['IFooFactory']
>>> [i.getName() for i in C().__provides__]
['IFoo']
_site_container
logger
running_exports

Exports stored in the site-wide exports container.

waeup.kofa.utils.batching.export_job(site, exporter_name, **kw)[source]

Export all entries delivered by exporter and store it in a temp file.

site gives the site to search. It will be passed to the exporter and also be set as ‘current site’ as the function is used in asynchronous jobs which run in their own threads and have no site set initially. Therefore site must also be a valid value for use with zope.component.hooks.setSite().

exporter_name is the utility name under which the desired exporter was registered with the ZCA.

The resulting CSV file will be stored in a new temporary directory (using tempfile.mkdtemp()). It will be named after the exporter used with .csv filename extension.

Returns the path to the created CSV file.

Note

It is the callers responsibility to clean up the used file and its parent directory.