Data Center

The Kofa data center cares for managing CSV files and importing then.

Creating a data center

A data center can be created easily:

>>> from waeup.kofa.datacenter import DataCenter
>>> mydatacenter = DataCenter()
>>> mydatacenter
<waeup.kofa.datacenter.DataCenter object at 0x...>

Each data center has a location in file system where files are stored:

>>> storagepath = mydatacenter.storage
>>> storagepath
'/tmp/tmp...'

Beside other things it provides two locations to put data of deleted items into:

>>> import os
>>> del_path = mydatacenter.deleted_path
>>> os.path.isdir(del_path)
True
>>> grad_path = mydatacenter.graduated_path
>>> os.path.isdir(grad_path)
True

Overall it complies with the IDataCenter interface:

>>> from zope.interface import verify
>>> from waeup.kofa.interfaces import IDataCenter
>>> verify.verifyObject(IDataCenter, DataCenter() )
True
>>> verify.verifyClass(IDataCenter, DataCenter)
True

Managing the storage path

We can set another storage path:

>>> import os
>>> os.mkdir('newlocation')
>>> newpath = os.path.abspath('newlocation')
>>> mydatacenter.setStoragePath(newpath)
[]

The result here is a list of filenames, that could not be copied. Luckily, this list is empty.

When we set a new storage path, we can tell to move all files in the old location to the new one. To see this feature in action, we first have to put a file into the old location:

>>> open(os.path.join(newpath, 'myfile.txt'), 'wb').write('hello')

Now we can set a new location and the file will be copied:

>>> verynewpath = os.path.abspath('verynewlocation')
>>> os.mkdir(verynewpath)
>>> mydatacenter.setStoragePath(verynewpath, move=True)
[]
>>> storagepath = mydatacenter.storage
>>> 'myfile.txt' in os.listdir(verynewpath)
True

We remove the created file to have a clean testing environment for upcoming examples:

>>> os.unlink(os.path.join(storagepath, 'myfile.txt'))

Uploading files

We can get a list of files stored in that location:

>>> mydatacenter.getPendingFiles()
[]

Let’s put some file in the storage:

>>> import os
>>> filepath = os.path.join(storagepath, 'data.csv')
>>> open(filepath, 'wb').write('Some Content\n')

Now we can find a file:

>>> mydatacenter.getPendingFiles()
[<waeup.kofa.datacenter.DataCenterFile object at 0x...>]

As we can see, the actual file is wrapped by a convenience wrapper, that enables us to fetch some data about the file. The data returned is formatted in strings, so that it can easily be put into output pages:

>>> datafile = mydatacenter.getPendingFiles()[0]
>>> datafile.getSize()
'13 bytes'
>>> datafile.getDate() # Nearly current datetime...
'...'

Clean up:

>>> import shutil
>>> shutil.rmtree(newpath)
>>> shutil.rmtree(verynewpath)

Distributing processed files

When files were processed by a batch processor, we can put the resulting files into desired destinations.

We recreate the datacenter root in case it is missing:

>>> import os
>>> dc_root = mydatacenter.storage
>>> fin_dir = os.path.join(dc_root, 'finished')
>>> unfin_dir = os.path.join(dc_root, 'unfinished')
>>> def recreate_dc_storage():
...   if os.path.exists(dc_root):
...     shutil.rmtree(dc_root)
...   os.mkdir(dc_root)
...   mydatacenter.setStoragePath(mydatacenter.storage)
>>> recreate_dc_storage()

We define a function that creates a set of faked result files:

>>> import os
>>> import tempfile
>>> def create_fake_results(source_basename, create_pending=True):
...   tmp_dir = tempfile.mkdtemp()
...   src = os.path.join(dc_root, source_basename)
...   pending_src = None
...   if create_pending:
...     pending_src = os.path.join(tmp_dir, 'mypendingsource.csv')
...   finished_src = os.path.join(tmp_dir, 'myfinishedsource.csv')
...   for path in (src, pending_src, finished_src):
...     if path is not None:
...       open(path, 'wb').write('blah')
...   return tmp_dir, src, finished_src, pending_src

Now we can create the set of result files, that typically come after a successful processing of a regular source:

Now we can try to distribute those files. Let’s start with a source file, that was processed successfully:

>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
...  'mysource.csv', create_pending=False)
>>> mydatacenter.distProcessedFiles(True, src, finished_src,
...                            pending_src, mode='create')
>>> sorted(os.listdir(dc_root))
['deleted', 'finished', 'graduated', 'logs', 'unfinished']
>>> sorted(os.listdir(fin_dir))
['mysource.create.finished.csv', 'mysource.csv']
>>> sorted(os.listdir(unfin_dir))
[]

The created dir will be removed for us by the datacenter. This way we can assured, that less temporary dirs are left hanging around:

>>> os.path.exists(tmp_dir)
False

The root dir is empty, while the original file and the file containing all processed data were moved to’finished/’.

Now we restart, but this time we fake an erranous action:

>>> recreate_dc_storage()
>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
...  'mysource.csv')
>>> mydatacenter.distProcessedFiles(False, src, finished_src,
...                                 pending_src, mode='create')
>>> sorted(os.listdir(dc_root))
['deleted', 'finished', 'graduated', 'logs', 'mysource.create.pending.csv', 'unfinished']
>>> sorted(os.listdir(fin_dir))
['mysource.create.finished.csv']
>>> sorted(os.listdir(unfin_dir))
['mysource.csv']

While the original source was moved to the ‘unfinished’ dir, the pending file went to the root and the set of already processed items are stored in finished/.

We fake processing the pending file and assume that everything went well this time:

>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
...  'mysource.create.pending.csv', create_pending=False)
>>> mydatacenter.distProcessedFiles(True, src, finished_src,
...                                 pending_src, mode='create')
>>> sorted(os.listdir(dc_root))
['deleted', 'finished', 'graduated', 'logs', 'unfinished']
>>> sorted(os.listdir(fin_dir))
['mysource.create.finished.csv', 'mysource.csv']
>>> sorted(os.listdir(unfin_dir))
[]

The result is the same as in the first case shown above.

We restart again, but this time we fake several non-working imports in a row.

We start with a faulty start-import:

>>> recreate_dc_storage()
>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
...  'mysource.csv')
>>> mydatacenter.distProcessedFiles(False, src, finished_src,
...                                 pending_src, mode='create')

We try to process the pending file, which fails again:

>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
...  'mysource.create.pending.csv')
>>> mydatacenter.distProcessedFiles(False, src, finished_src,
...                                 pending_src, mode='create')

We try to process the new pending file:

>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
...  'mysource.create.pending.csv')
>>> mydatacenter.distProcessedFiles(False, src, finished_src,
...                                 pending_src, mode='create')
>>> sorted(os.listdir(dc_root))
['deleted', 'finished', 'graduated', 'logs', 'mysource.create.pending.csv', 'unfinished']
>>> sorted(os.listdir(fin_dir))
['mysource.create.finished.csv']
>>> sorted(os.listdir(unfin_dir))
['mysource.csv']

Finally, we process the pending file and everything works:

>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
...  'mysource.create.pending.csv', create_pending=False)
>>> mydatacenter.distProcessedFiles(True, src, finished_src,
...                                 pending_src, mode='create')
>>> sorted(os.listdir(dc_root))
['deleted', 'finished', 'graduated', 'logs', 'unfinished']
>>> sorted(os.listdir(fin_dir))
['mysource.create.finished.csv', 'mysource.csv']
>>> sorted(os.listdir(unfin_dir))
[]

The root dir is empty (contains no input files) and only the files in finished-subdirectory remain.

We can get a list of imported files stored in the finished subfolder:

>>> mydatacenter.getFinishedFiles()
[<waeup.kofa.datacenter.DataCenterFile object at ...>]
>>> datafile = mydatacenter.getFinishedFiles()[0]
>>> datafile.getSize()
'2 bytes'
>>> datafile.getDate() # Nearly current datetime...
'...'

Clean up:

>>> shutil.rmtree(verynewpath)