Commit f3782d5d authored by Javier Vegas-Regidor's avatar Javier Vegas-Regidor
Browse files

Added custom downloader to prioritize better the downloads

parent ef91fddc
......@@ -16,7 +16,7 @@ CON_FILES = /esnas/autosubmit/con_files/
# Diagnostics to run, space separated. You must provide for each one the name and the parameters (comma separated) or
# an alias defined in the ALIAS section (see more below). If you are using the diagnostics just to CMORize, leave it
# empty
DIAGS = climpercent,atmos,sfcWind,1981,2012,11
DIAGS = daysover,atmos,sfcWind,1981,2012,11
# DIAGS = OHC
# Frequency of the data you want to use by default. Some diagnostics do not use this value: i.e. monmean always stores
# its results at monthly frequency (obvious) and has a parameter to specify input's frequency.
......@@ -87,7 +87,7 @@ OCEAN_TIMESTEP = 6
# CHUNK_SIZE is the size of each data file, given in months
# CHUNKS is the number of chunks. You can specify less chunks than present on the experiment
EXPID = testing_erainterim
STARTDATES = 20001101 20011101 20021101 20031101 20041101 20051101 20061101 20071101 20081101 20091101 20101101 20111101 20121101 20131101 20141101 20151101 20161101
STARTDATES = 20131101 20141101 20151101
# STARTDATES = 19840101 19850101
MEMBERS = 0
MEMBER_DIGITS = 1
......
# coding: utf-8
import csv
import os
import shutil
from datetime import datetime
import numpy as np
import os
from bscearth.utils.log import Log
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
from publisher import Publisher
from earthdiagnostics.modelingrealm import ModelingRealms
from variable_type import VariableType
......@@ -59,6 +59,12 @@ class DataFile(Publisher):
super(DataFile, self).unsubscribe(who)
self._clean_local()
@property
def size(self):
if self.local_status == LocalStatus.READY:
os.path.getsize(self.local_file)
return None
def _clean_local(self):
if self.local_status != LocalStatus.READY or len(self.suscribers) > 0 or self.upload_required():
return
......@@ -72,7 +78,6 @@ class DataFile(Publisher):
return self.local_status == LocalStatus.READY and self.storage_status == StorageStatus.PENDING
def download_required(self):
if not self.local_status == LocalStatus.PENDING:
return False
......@@ -459,4 +464,12 @@ class NetCDFFile(DataFile):
except Exception as ex:
Log.error('Can not create link to {1}: {0}'.format(ex, self.remote_file))
@property
def size(self):
if self.local_status == LocalStatus.READY:
return os.path.getsize(self.local_file)
if self.storage_status == StorageStatus.READY:
return os.path.getsize(self.remote_file)
return None
......@@ -4,9 +4,8 @@ import datetime
from datafile import StorageStatus, LocalStatus
from earthdiagnostics.constants import Basins, Basin
from earthdiagnostics.frequency import Frequency
from earthdiagnostics.variable_type import VariableType
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.variable import VariableManager
from earthdiagnostics.variable_type import VariableType
from publisher import Publisher
......@@ -238,8 +237,10 @@ class Diagnostic(Publisher):
request.unsubscribe(self)
def all_requests_in_storage(self):
return not any(request.storage_status != StorageStatus.READY for request in self._requests)
return self.pending_requests() == 0
def pending_requests(self):
return len([request.storage_status != StorageStatus.READY for request in self._requests])
class DiagnosticOption(object):
......
#!/usr/bin/env python
# coding=utf-8
import argparse
import os
import shutil
import pkg_resources
import tempfile
from distutils.spawn import find_executable
import bscearth.utils.path
import netCDF4
import os
import pkg_resources
from bscearth.utils.date import *
import bscearth.utils.path
import tempfile
from earthdiagnostics.constants import Basins
from earthdiagnostics.config import Config
from earthdiagnostics import cdftools
from earthdiagnostics.cmormanager import CMORManager
from earthdiagnostics.threddsmanager import THREDDSManager
from earthdiagnostics.config import Config
from earthdiagnostics.constants import Basins
from earthdiagnostics.obsreconmanager import ObsReconManager
from earthdiagnostics import cdftools
from earthdiagnostics.threddsmanager import THREDDSManager
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable import VariableManager
from work_manager import WorkManager
from distutils.spawn import find_executable
class EarthDiags(object):
......
......@@ -99,6 +99,7 @@ class DaysOverPercentile(Diagnostic):
"""
Runs the diagnostic
"""
raise Exception('Pues me enfado y no respiro!!!')
iris.FUTURE.netcdf_promote = True
percentiles = iris.load_cube(self.percentiles_file.local_file)
......@@ -206,7 +207,3 @@ class DaysOverPercentile(Diagnostic):
result.add_aux_coord(iris.coords.AuxCoord(percentile, long_name='percentile'))
result.add_aux_coord(time_coord)
return result
# coding=utf-8
import datetime
import operator
import threading
import time
from bscearth.utils.log import Log
# noinspection PyCompatibility
from concurrent.futures import ThreadPoolExecutor
from earthdiagnostics.datafile import StorageStatus, LocalStatus
from earthdiagnostics.diagnostic import DiagnosticStatus, Diagnostic, DiagnosticOptionError
from earthdiagnostics.utils import Utils, TempFile
import threading
from earthdiagnostics.ocean import *
from earthdiagnostics.general import *
from earthdiagnostics.ocean import *
from earthdiagnostics.statistics import *
from earthdiagnostics.utils import Utils, TempFile
class WorkManager(object):
......@@ -54,7 +54,7 @@ class WorkManager(object):
self.threads = self.config.max_cores
Log.info('Using {0} threads', self.threads)
self.downloader = ThreadPoolExecutor(self.config.parallel_downloads)
self.downloader = Downloader()
self.uploader = ThreadPoolExecutor(self.config.parallel_uploads)
self.executor = ThreadPoolExecutor(self.threads)
......@@ -69,8 +69,9 @@ class WorkManager(object):
for file_object in self.data_manager.requested_files.values():
file_object.subscribe(self, self._file_object_status_changed)
if file_object.download_required():
self.downloader.submit(file_object.download)
self.downloader.submit(file_object)
self.downloader.start()
self.lock = threading.Lock()
self.lock.acquire()
......@@ -96,7 +97,7 @@ class WorkManager(object):
def _file_object_status_changed(self, file_object):
if file_object.download_required():
self.downloader.submit(file_object.download)
self.downloader.submit(file_object)
return
if file_object.upload_required():
self.uploader.submit(file_object.upload)
......@@ -223,3 +224,62 @@ class WorkManager(object):
Diagnostic.register(VerticalGradient)
class Downloader(object):
def __init__(self):
self._downloads = []
self._lock = threading.Lock()
self._wait = threading.Semaphore()
self.stop = False
def start(self):
self._thread = threading.Thread(target=self.downloader)
self._thread.start()
def submit(self, datafile):
self._lock.acquire()
self._downloads.append(datafile)
self._lock.release()
def downloader(self):
try:
def suscribers_waiting(datafile):
waiting = 0
for diag in datafile.suscribers:
if not isinstance(diag, Diagnostic):
continue
if diag.pending_requests() == 1:
waiting += 1
return waiting
def prioritize(datafile1, datafile2):
waiting = suscribers_waiting(datafile1) - suscribers_waiting(datafile2)
if waiting:
return -waiting
suscribers = len(datafile1.suscribers) - len(datafile2.suscribers)
if suscribers:
return -suscribers
size = datafile1.size - datafile2.size
if size:
return -size
return 0
while True:
with self._lock:
if len(self._downloads) == 0:
if self.stop:
return
time.sleep(0.01)
break
self._downloads.sort(prioritize)
datafile = self._downloads[0]
self._downloads.remove(datafile)
datafile.download()
except Exception as ex:
Log.critical('Unhandled error at downloader: {0}', ex)
def shutdown(self):
self.stop = True
self._thread.join()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment