Batch Processing

Batch processing is much more than pure data import.

Overview

Basically, it means processing CSV files in order to mass-create, mass-remove, or mass-update data.

So you can feed CSV files to processors, that are part of the batch-processing mechanism.

Processors

Each CSV file processor

  • accepts a single data type identified by an interface.
  • knows about the places inside a site (University) where to store, remove or update the data.
  • can check headers before processing data.
  • supports the mode ‘create’, ‘update’, ‘remove’.
  • creates log entries (optional)
  • creates csv files containing successful and not-successful processed data respectively.

Output

The results of processing are written to loggers, if a logger was given. Beside this new CSV files are created during processing:

  • a pending CSV file, containing datasets that could not be processed
  • a finished CSV file, containing datasets successfully processed.

The pending file is not created if everything works fine. The respective path returned in that case is None.

The pending file (if created) is a CSV file that contains the failed rows appended by a column --ERRROR-- in which the reasons for processing failures are listed.

The complete paths of these files are returned. They will be in a temporary directory created only for this purpose. It is the caller’s responsibility to remove the temporay directories afterwards (the datacenters distProcessedFiles() method takes care for that).

It looks like this:

  -----+      +---------+
 /     |      |         |              +------+
| .csv +----->|Batch-   |              |      |
|      |      |processor+----changes-->| ZODB |
|  +------+   |         |              |      |
+--|      |   |         +              +------+
   | Mode +-->|         |                 -------+
   |      |   |         +----outputs-+-> /       |
   |  +----+->+---------+            |  |.pending|
   +--|Log |  ^                      |  |        |
      +----+  |                      |  +--------+
        +-----++                     v
        |Inter-|                  ----------+
        |face  |                 /          |
        +------+                | .finished |
                                |           |
                                +-----------+

Creating a Batch Processor

We create an own batch processor for an own datatype. This datatype must be based on an interface that the batcher can use for converting data.

Founding Stoneville

We start with the interface:

>>> from zope.interface import Interface
>>> from zope import schema
>>> class ICave(Interface):
...   """A cave."""
...   name = schema.TextLine(
...     title = u'Cave name',
...     default = u'Unnamed',
...     required = True)
...   dinoports = schema.Int(
...     title = u'Number of DinoPorts (tm)',
...     required = False,
...     default = 1)
...   owner = schema.TextLine(
...     title = u'Owner name',
...     required = True,
...     missing_value = 'Fred Estates Inc.')
...   taxpayer = schema.Bool(
...     title = u'Payes taxes',
...     required = True,
...     default = False)

Now a class that implements this interface:

>>> import grok
>>> class Cave(object):
...   grok.implements(ICave)
...   def __init__(self, name=u'Unnamed', dinoports=2,
...                owner='Fred Estates Inc.', taxpayer=False):
...     self.name = name
...     self.dinoports = 2
...     self.owner = owner
...     self.taxpayer = taxpayer

We also provide a factory for caves. Strictly speaking, this not necessary but makes the batch processor we create afterwards, better understandable.

>>> from zope.component import getGlobalSiteManager
>>> from zope.component.factory import Factory
>>> from zope.component.interfaces import IFactory
>>> gsm = getGlobalSiteManager()
>>> cave_maker = Factory(Cave, 'A cave', 'Buy caves here!')
>>> gsm.registerUtility(cave_maker, IFactory, 'Lovely Cave')

Now we can create caves using a factory:

>>> from zope.component import createObject
>>> createObject('Lovely Cave')
<Cave object at 0x...>

This is nice, but we still lack a place, where we can place all the lovely caves we want to sell.

Furthermore, as a replacement for a real site, we define a place where all caves can be stored: Stoneville! This is a lovely place for upperclass cavemen (which are the only ones that can afford more than one dinoport).

We found Stoneville:

>>> stoneville = dict()

Everything in place.

Now, to improve local health conditions, imagine we want to populate Stoneville with lots of new happy dino-hunting natives that slept on the bare ground in former times and had no idea of bathrooms. Disgusting, isn’t it?

Lots of cavemen need lots of caves.

Of course we can do something like:

>>> cave1 = createObject('Lovely Cave')
>>> cave1.name = "Fred's home"
>>> cave1.owner = "Fred"
>>> stoneville[cave1.name] = cave1

and Stoneville has exactly

>>> len(stoneville)
1

inhabitant. But we don’t want to do this for hundreds or thousands of citizens-to-be, do we?

It is much easier to create a simple CSV list, where we put in all the data and let a batch processor do the job.

The list is already here:

>>> open('newcomers.csv', 'wb').write(
... """name,dinoports,owner,taxpayer
... Barneys Home,2,Barney,1
... Wilmas Asylum,1,Wilma,1
... Freds Dinoburgers,10,Fred,0
... Joeys Drive-in,110,Joey,0
... """)

All we need, is a batch processor now.

>>> from waeup.kofa.utils.batching import BatchProcessor
>>> from waeup.kofa.interfaces import IGNORE_MARKER
>>> class CaveProcessor(BatchProcessor):
...   util_name = 'caveprocessor'
...   grok.name(util_name)
...   name = 'Cave Processor'
...   iface = ICave
...   location_fields = ['name']
...   factory_name = 'Lovely Cave'
...
...   def parentsExist(self, row, site):
...     return True
...
...   def getParent(self, row, site):
...     return stoneville
...
...   def entryExists(self, row, site):
...     return row['name'] in stoneville.keys()
...
...   def getEntry(self, row, site):
...     if not self.entryExists(row, site):
...       return None
...     return stoneville[row['name']]
...
...   def delEntry(self, row, site):
...     del stoneville[row['name']]
...
...   def addEntry(self, obj, row, site):
...     stoneville[row['name']] = obj
...
...   def updateEntry(self, obj, row, site, filename):
...     # This is not strictly necessary, as the default
...     # updateEntry method does exactly the same
...     for key, value in row.items():
...       if value != IGNORE_MARKER:
...         setattr(obj, key, value)

If we also want the results being logged, we must provide a logger (this is optional):

>>> import logging
>>> logger = logging.getLogger('stoneville')
>>> logger.setLevel(logging.DEBUG)
>>> logger.propagate = False
>>> handler = logging.FileHandler('stoneville.log', 'w')
>>> logger.addHandler(handler)

Create the fellows:

>>> processor = CaveProcessor()
>>> result = processor.doImport('newcomers.csv',
...                   ['name', 'dinoports', 'owner', 'taxpayer'],
...                    mode='create', user='Bob', logger=logger)
>>> result
(4, 0, '/.../newcomers.finished.csv', None)

The result means: four entries were processed and no warnings occured. Furthermore we get filepath to a CSV file with successfully processed entries and a filepath to a CSV file with erraneous entries. As everything went well, the latter is None. Let’s check:

>>> sorted(stoneville.keys())
[u'Barneys Home', ..., u'Wilmas Asylum']

The values of the Cave instances have correct type:

>>> barney = stoneville['Barneys Home']
>>> barney.dinoports
2

which is a number, not a string.

Apparently, when calling the processor, we gave some more info than only the CSV filepath. What does it all mean?

While the first argument is the path to the CSV file, we also have to give an ordered list of headernames. These replace the header field names that are actually in the file. This way we can override faulty headers.

The mode paramter tells what kind of operation we want to perform: create, update, or remove data.

The user parameter finally is optional and only used for logging.

We can, by the way, see the results of our run in a logfile if we provided a logger during the call:

>>> print open('stoneville.log').read()
processed: newcomers.csv, create mode, 4 lines (4 successful/ 0 failed), ... s (... s/item)

We cleanup the temporay dir created by doImport():

>>> import shutil
>>> import os
>>> shutil.rmtree(os.path.dirname(result[2]))

As we can see, the processing was successful. Otherwise, all problems could be read here as we can see, if we do the same operation again:

>>> result = processor.doImport('newcomers.csv',
...                   ['name', 'dinoports', 'owner', 'taxpayer'],
...                    mode='create', user='Bob', logger=logger)
>>> result
(4, 4, '/.../newcomers.finished.csv', '/.../newcomers.pending.csv')

This time we also get a path to a .pending file.

The log file will tell us this in more detail:

>>> print open('stoneville.log').read()
processed: newcomers.csv, create mode, 4 lines (4 successful/ 0 failed), ... s (... s/item)
processed: newcomers.csv, create mode, 4 lines (0 successful/ 4 failed), ... s (... s/item)

This time a new file was created, which keeps all the rows we could not process and an additional column with error messages:

>>> print open(result[3]).read()
owner,name,taxpayer,dinoports,--ERRORS--
Barney,Barneys Home,1,2,This object already exists.
Wilma,Wilmas Asylum,1,1,This object already exists.
Fred,Freds Dinoburgers,0,10,This object already exists.
Joey,Joeys Drive-in,0,110,This object already exists.

This way we can correct the faulty entries and afterwards retry without having the already processed rows in the way.

We also notice, that the values of the taxpayer column are returned as in the input file. There we wrote ‘1’ for True and ‘0’ for False (which is accepted by the converters).

Clean up:

>>> shutil.rmtree(os.path.dirname(result[2]))

We can also tell to ignore some cols from input by passing --IGNORE-- as col name:

>>> result = processor.doImport('newcomers.csv', ['name',
...                             '--IGNORE--', '--IGNORE--'],
...                    mode='update', user='Bob')
>>> result
(4, 0, '...', None)

Clean up:

>>> shutil.rmtree(os.path.dirname(result[2]))

If something goes wrong during processing, the respective –IGNORE– cols won’t be populated in the resulting pending file:

>>> result = processor.doImport('newcomers.csv', ['name', 'dinoports',
...                             '--IGNORE--', '--IGNORE--'],
...                    mode='create', user='Bob')
>>> result
(4, 4, '...', '...')
>>> print open(result[3], 'rb').read()
name,dinoports,--ERRORS--
Barneys Home,2,This object already exists.
Wilmas Asylum,1,This object already exists.
Freds Dinoburgers,10,This object already exists.
Joeys Drive-in,110,This object already exists.

Clean up:

>>> shutil.rmtree(os.path.dirname(result[2]))

Updating Entries

To update entries, we just call the batchprocessor in a different mode:

>>> result = processor.doImport('newcomers.csv', ['name',
...                             'dinoports', 'owner'],
...                    mode='update', user='Bob')
>>> result
(4, 0, '...', None)

Now we want to tell, that Wilma got an extra port for her second dino:

>>> open('newcomers.csv', 'wb').write(
... """name,dinoports,owner
... Wilmas Asylum,2,Wilma
... """)
>>> wilma = stoneville['Wilmas Asylum']
>>> wilma.dinoports
1

Clean up:

>>> shutil.rmtree(os.path.dirname(result[2]))

We start the processor:

>>> result = processor.doImport('newcomers.csv', ['name',
...                    'dinoports', 'owner'], mode='update', user='Bob')
>>> result
(1, 0, '...', None)
>>> wilma = stoneville['Wilmas Asylum']
>>> wilma.dinoports
2

Wilma’s number of dinoports raised.

Clean up:

>>> shutil.rmtree(os.path.dirname(result[2]))

If we try to update an unexisting entry, an error occurs:

>>> open('newcomers.csv', 'wb').write(
... """name,dinoports,owner
... NOT-WILMAS-ASYLUM,2,Wilma
... """)
>>> result = processor.doImport('newcomers.csv', ['name',
...                             'dinoports', 'owner'],
...                    mode='update', user='Bob')
>>> result
(1, 1, '/.../newcomers.finished.csv', '/.../newcomers.pending.csv')

Clean up:

>>> shutil.rmtree(os.path.dirname(result[2]))

Also invalid values will be spotted:

>>> open('newcomers.csv', 'wb').write(
... """name,dinoports,owner
... Wilmas Asylum,NOT-A-NUMBER,Wilma
... """)
>>> result = processor.doImport('newcomers.csv', ['name',
...                             'dinoports', 'owner'],
...                    mode='update', user='Bob')
>>> result
(1, 1, '...', '...')

Clean up:

>>> shutil.rmtree(os.path.dirname(result[2]))

We can also update only some cols, leaving some out. We skip the ‘dinoports’ column in the next run:

>>> open('newcomers.csv', 'wb').write(
... """name,owner
... Wilmas Asylum,Barney
... """)
>>> result = processor.doImport('newcomers.csv', ['name', 'owner'],
...                             mode='update', user='Bob')
>>> result
(1, 0, '...', None)
>>> wilma.owner
u'Barney'

Clean up:

>>> shutil.rmtree(os.path.dirname(result[2]))

We can however, not leave out the ‘location field’ (‘name’ in our case), as this one tells us which entry to update:

>>> open('newcomers.csv', 'wb').write(
... """name,dinoports,owner
... 2,Wilma
... """)
>>> processor.doImport('newcomers.csv', ['dinoports', 'owner'],
...                    mode='update', user='Bob')
Traceback (most recent call last):
...
FatalCSVError: Need at least columns 'name' for import!

This time we get even an exception!

Generally, empty strings are considered as None:

>>> open('newcomers.csv', 'wb').write(
... """name,dinoports,owner
... "Wilmas Asylum","","Wilma"
... """)
>>> result = processor.doImport('newcomers.csv', ['name',
...                             'dinoports', 'owner'],
...                    mode='update', user='Bob')
>>> result
(1, 0, '...', None)
>>> wilma.dinoports
2

Clean up:

>>> shutil.rmtree(os.path.dirname(result[2]))

We can tell to set dinoports to None although this is not a number, as we declared the field not required in the interface:

>>> open('newcomers.csv', 'wb').write(
... """name,dinoports,owner
... "Wilmas Asylum","XXX","Wilma"
... """)
>>> result = processor.doImport('newcomers.csv', ['name',
...                             'dinoports', 'owner'],
...                    mode='update', user='Bob', ignore_empty=False)
>>> result
(1, 0, '...', None)
>>> wilma.dinoports is None
True

Clean up:

>>> shutil.rmtree(os.path.dirname(result[2]))

Removing Entries

In ‘remove’ mode we can delete entries. Here validity of values in non-location fields doesn’t matter because those fields are ignored.

>>> open('newcomers.csv', 'wb').write(
... """name,dinoports,owner
... "Wilmas Asylum","ILLEGAL-NUMBER",""
... """)
>>> result = processor.doImport('newcomers.csv', ['name',
...                             'dinoports', 'owner'],
...                    mode='remove', user='Bob')
>>> result
(1, 0, '...', None)
>>> sorted(stoneville.keys())
[u'Barneys Home', "Fred's home", u'Freds Dinoburgers', u'Joeys Drive-in']

Oops! Wilma is gone.

Clean up:

>>> shutil.rmtree(os.path.dirname(result[2]))

Clean up:

>>> import os
>>> os.unlink('newcomers.csv')
>>> os.unlink('stoneville.log')