]> gitweb.fluxo.info Git - ckandumper.git/commitdiff
Initial asyncio version
authorSilvio Rhatto <rhatto@riseup.net>
Wed, 15 May 2019 18:06:03 +0000 (15:06 -0300)
committerSilvio Rhatto <rhatto@riseup.net>
Wed, 15 May 2019 18:06:03 +0000 (15:06 -0300)
ckandumper

index d70f35f869a0142f598c4d97b79c608fae7d6a41..d447899b865d8eb78fd0d59a8cb2d026d7ac0a2e 100755 (executable)
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 # Dependencies
+import asyncio
 import argparse
-import sys, os, subprocess, pycurl, json
+import sys, os, subprocess, json
 from urllib.parse import urlencode
 
+class DownloadMultiple:
+    """Downloads multiple files simultaneously with error logging and fancy output"""
+
+    wget = '/usr/bin/wget'
+
+    def __init__(self, limit_rate, limit_concurrent):
+        self.limit_rate = limit_rate
+
+        if args.limit_concurrent != None:
+            self.limit_concurrent = asyncio.Semaphore(limit_concurrent)
+        else:
+            self.limit_concurrent = asyncio.Semaphore(20)
+
+        if not os.path.exists(self.wget):
+            raise FileNotFoundError('Wget not found in your path; please install it first.')
+
+    def ensuredir(self, dest):
+        """Ensures that the destination folder exists"""
+        if not os.path.exists(dest) and not os.path.isdir(dest):
+            os.makedirs(dest, 0o755);
+        elif os.path.exists(dest) and not os.path.isdir(dest):
+            raise ValueError('File exists and is not a folder:' + dest)
+
+    async def download_file(self, url, local_filename, semaphore):
+        """Downloads a file.
+
+        Using wget as it is more reliable
+
+        Straight from https://docs.python.org/3/library/asyncio-subprocess.html
+        """
+
+        async with semaphore:
+            print('Downloading ' + url + '...')
+            self.ensuredir(os.path.dirname(local_filename));
+
+            # Other opts: -q --show-progress
+            cmd  = '/usr/bin/wget ' + self.limit_rate + ' -c -O "' + local_filename + '" ' + url
+            proc = subprocess.call(cmd, shell=True)
+            proc = await asyncio.create_subprocess_shell(cmd,
+                                                         stdout=asyncio.subprocess.PIPE,
+                                                         stderr=asyncio.subprocess.PIPE)
+
+            stdout, stderr = await proc.communicate()
+
+            print(f'[{cmd!r} exited with {proc.returncode}]')
+
+            if stdout:
+                print(f'[stdout]\n{stdout.decode()}')
+
+            if stderr:
+                print(f'[stderr]\n{stderr.decode()}')
+
+    async def gather(self, filepairs):
+        """Gather all files to be downloaded
+
+        See https://stackoverflow.com/questions/50308812/is-it-possible-to-limit-the-number-of-coroutines-running-corcurrently-in-asyncio#50328882
+        """
+
+        jobs = []
+
+        for url, filename in filepairs:
+          jobs.append(asyncio.ensure_future(self.download_file(url, filename, self.limit_concurrent)))
+
+        await asyncio.gather(*jobs)
+
+    def get(self, filepairs):
+        loop = asyncio.get_event_loop()
+        loop.set_debug(True)
+        loop.run_until_complete(self.gather(filepairs))
+
 class CkanDumper:
     """Dumps CKAN data: metadata plus entire datasets"""
 
@@ -38,32 +109,28 @@ class CkanDumper:
 
         if args.limit_rate != None:
             self.limit_rate = '--limit-rate=' + args.limit_rate
+        else:
+            self.limit_rate = ''
 
-    def download(self, url, local_filename):
-        """Downloads a file.
-
-        Using wget as it is more reliable
-        """
-        subprocess.call('/usr/bin/wget ' + self.limit_rate + ' -c -O "' + local_filename + '" ' + url, shell=True)
+        if args.limit_concurrent != None:
+            self.limit_concurrent = args.limit_concurrent
+        else:
+            self.limit_concurrent = '20'
 
-    def ensuredir(self, dest):
-        """Ensures that the destination folder exists"""
-        if not os.path.exists(dest) and not os.path.isdir(dest):
-            os.makedirs(dest, 0o755);
-        elif os.path.exists(dest) and not os.path.isdir(dest):
-            raise ValueError('File exists and is not a folder:' + dest)
+        self.download = DownloadMultiple(self.limit_rate, self.limit_concurrent)
 
     def load_json(self, file):
         """Loads a file with contents serialized as JSON"""
         descriptor = open(file)
         data       = json.load(descriptor)
-        file.close()
+
+        descriptor.close()
+        return data
 
     def dump(self):
         """Downloads all content listed in a CKAN repository"""
-        self.ensuredir(self.dest)
-
-        # Move to dest folder
+        # Switch to dest folder
+        #self.ensuredir(self.dest)
         #os.chdir(self.dest)
 
         package_list = self.dest + os.sep + 'package_list.json'
@@ -73,45 +140,65 @@ class CkanDumper:
         #
         # Groups
         #
-        self.download(self.url + self.group_list, group_list)
-        groups = self.load_json(group_list)
+        self.download.get([[self.url + self.group_list, group_list]])
+
+        groups          = self.load_json(group_list)
+        group_downloads = []
 
         for group in groups['result']:
             group_folder = self.dest + os.sep + 'groups' + os.sep + group
             group_file   = group_folder + os.sep + 'group.json'
 
-            self.ensuredir(group_folder)
-            print("Downloading " + self.url + self.group_show + 'id=' + group + '...')
-            self.download(self.url + self.group_show + urlencode({ 'id': group }, False, '', 'utf-8'), group_file)
+            #print("Downloading " + self.url + self.group_show + 'id=' + group + '...')
+            #self.ensuredir(group_folder)
+            group_downloads.append([self.url + self.group_show + urlencode({ 'id': group }, False, '', 'utf-8'), group_file])
+
+        self.download.get(group_downloads)
+
         #
         # Tags
         #
-        self.download(self.url + self.tag_list, tag_list)
-        tags = self.load_json(tag_list)
+        self.download.get([[self.url + self.tag_list, tag_list]])
+
+        tags           = self.load_json(tag_list)
+        tags_downloads = []
 
         for tag in tags['result']:
             tag_folder = self.dest + os.sep + 'tags' + os.sep + tag
             tag_file   = tag_folder + os.sep + 'tag.json'
 
-            self.ensuredir(tag_folder)
-            print("Downloading " + self.url + self.tag_show + 'id=' + tag + '...')
-            self.download(self.url + self.tag_show + urlencode({ 'id': tag }, False, '', 'utf-8'), tag_file)
+            #print("Downloading " + self.url + self.tag_show + 'id=' + tag + '...')
+            #self.ensuredir(tag_folder)
+            tags_downloads.append([self.url + self.tag_show + urlencode({ 'id': tag }, False, '', 'utf-8'), tag_file])
+
+        self.download.get(tags_downloads)
 
         #
         # Packages
         #
-        self.download(self.url + self.package_list, package_list)
-        packages = self.load_json(package_list)
+        self.download.get([[self.url + self.package_list, package_list]])
+
+        packages           = self.load_json(package_list)
+        packages_downloads = []
 
         for package in packages['result']:
             package_folder = self.dest + os.sep + 'packages' + os.sep + package
             package_file   = package_folder + os.sep + 'package.json'
 
-            self.ensuredir(package_folder + os.sep + 'data')
-            print("Downloading " + self.url + self.package_show + 'id=' + package + '...')
-            self.download(self.url + self.package_show + urlencode({ 'id': package }, False, '', 'utf-8'), package_file)
+            #print("Downloading " + self.url + self.package_show + 'id=' + package + '...')
+            #self.ensuredir(package_folder + os.sep + 'data')
+            packages_downloads.append([self.url + self.package_show + urlencode({ 'id': package }, False, '', 'utf-8'), package_file])
+
+        self.download.get(packages_downloads)
 
-            contents = self.load_json(package_file)
+        #
+        # Package contents
+        #
+        for package in packages['result']:
+            package_downloads = []
+            package_folder    = self.dest + os.sep + 'packages' + os.sep + package
+            package_file      = package_folder + os.sep + 'package.json'
+            contents          = self.load_json(package_file)
 
             for resource in contents['result']['resources']:
                 #if resource['name'] != None:
@@ -128,7 +215,9 @@ class CkanDumper:
 
                 resource_file = package_folder + os.sep + 'data' + os.sep + name + format
 
-                self.download(resource['url'], resource_file)
+                package_download.append([resource['url'], resource_file])
+
+            self.download.get(package_download)
 
             # Run only once during development
             #return
@@ -136,9 +225,10 @@ class CkanDumper:
 if __name__ == "__main__":
     # Parse CLI
     parser = argparse.ArgumentParser(description='Dump CKAN metadata and datasets.')
-    parser.add_argument('url',  nargs='+', help='CKAN instance URL')
-    parser.add_argument('dest', nargs='+', help='Destination folder')
-    parser.add_argument("--limit-rate",    help="Limit the download speed to amount bytes per second, per download")
+    parser.add_argument('url',  nargs='+',    help='CKAN instance URL')
+    parser.add_argument('dest', nargs='+',    help='Destination folder')
+    parser.add_argument("--limit-rate",       help="Limit the download speed to amount bytes per second, per download")
+    parser.add_argument("--limit-concurrent", help="Limit the total concurrent downloads")
     args = parser.parse_args()
 
     # Dispatch