Source code for wwt_data_formats.folder

# -*- mode: python; coding: utf-8 -*-
# Copyright 2019-2020 the .NET Foundation
# Licensed under the MIT License.

from __future__ import absolute_import, division, print_function

__all__ = '''
Folder
fetch_folder_tree
make_absolutizing_url_mutator
walk_cached_folder_tree
'''.split()

import os.path
import re
import requests
from traitlets import Bool, Instance, Int, List, Unicode, Union, UseEnum
from xml.etree import ElementTree as etree

from . import LockedXmlTraits, XmlSer
from .abcs import UrlContainer
from .enums import FolderType

[docs]class Folder(LockedXmlTraits, UrlContainer): """A grouping of WWT content assets. Children can be: places (aka "Items"), imagesets, linesets, tours, folders, or IThumbnail objects (to be explored). """ name = Unicode('').tag(xml=XmlSer.attr('Name')) group = Unicode('Explorer').tag(xml=XmlSer.attr('Group')) url = Unicode('').tag(xml=XmlSer.attr('Url')) """The URL at which the full contents of this folder can be downloaded in WTML format. """ thumbnail = Unicode('').tag(xml=XmlSer.attr('Thumbnail')) browseable = Bool(True).tag(xml=XmlSer.attr('Browseable')) searchable = Bool(True).tag(xml=XmlSer.attr('Searchable')) type = UseEnum( FolderType, default_value = FolderType.SKY, ).tag(xml=XmlSer.attr('Type')) sub_type = Unicode('').tag(xml=XmlSer.attr('SubType')) msr_community_id = Int(0).tag(xml=XmlSer.attr('MSRCommunityId')) """The ID number of the WWT Community that this content came from.""" msr_component_id = Int(0).tag(xml=XmlSer.attr('MSRComponentId')) """The ID number of this content item on the WWT Communities system.""" permission = Int(0).tag(xml=XmlSer.attr('Permission')) "TBD." children = List( trait = Union([ Instance('wwt_data_formats.folder.Folder', args=()), Instance('wwt_data_formats.place.Place', args=()), Instance('wwt_data_formats.imageset.ImageSet', args=()), ]), default_value = () ).tag(xml=XmlSer.inner_list()) def _tag_name(self): return 'Folder'
[docs] def walk(self, download=False): yield (0, (), self) for index, child in enumerate(self.children): if isinstance(child, Folder): if not len(child.children) and child.url and download: url = child.url child = Folder.from_url(url) child.url = url self.children[index] = child for depth, path, subchild in child.walk(download=download): yield (depth + 1, (index,) + path, subchild) else: yield (1, (index,), child)
def mutate_urls(self, mutator): if self.url: self.url = mutator(self.url) if self.thumbnail: self.thumbnail = mutator(self.thumbnail) for c in self.children: c.mutate_urls(mutator)
[docs]def make_absolutizing_url_mutator(baseurl): """Return a function that makes relative URLs absolute. Parameters ---------- baseurl : string, absolute URL The absolute URL with which to combine relative URLs Returns ------- A mutator function suitable for use with :meth:`wwt_data_formats.abcs.UrlContainer.mutate_urls`. Notes ----- This function is designed for usage with :meth:`wwt_data_formats.abcs.UrlContainer.mutate_urls`. It returns a mutator function that can be passed to this method. The mutator will take relative URLs and make them absolute by combining them with the *baseurl* argument. Input URLs that are already absolute will be unchanged. """ from urllib.parse import urljoin, urlsplit def mutator(url): if not url: return url if urlsplit(url).netloc: return url # this URL is absolute return urljoin(baseurl, url) return mutator
def _sanitize_name(name): s = re.sub('[^-_a-zA-Z0-9]+', '_', name) s = re.sub('^_+', '', s) s = re.sub('_+$', '', s) return s
[docs]def fetch_folder_tree(root_url, root_cache_path, on_fetch=None): done_urls = set() def get_folder(url): if url in done_urls: return None, None on_fetch(url) resp = requests.get(url) resp.encoding = 'utf-8-sig' # see LockedXmlTraits.from_urL() elem = etree.fromstring(resp.text) done_urls.add(url) return resp.text, Folder.from_xml(elem) root_text, root_folder = get_folder(root_url) with open(os.path.join(root_cache_path, 'index.wtml'), 'wt') as f: f.write(root_text) def walk(cur_folder, cur_cache_path): for index, child in enumerate(cur_folder.children): if not isinstance(child, Folder): continue text = None subdir_base = f'{index:03d}_{_sanitize_name(child.name)}' child_cache_path = os.path.join(cur_cache_path, subdir_base) if not len(child.children) and child.url: text, child = get_folder(child.url) if child is None: continue os.makedirs(child_cache_path, exist_ok=True) with open(os.path.join(child_cache_path, 'index.wtml'), 'wt') as f: f.write(text) walk(child, child_cache_path) walk(root_folder, root_cache_path)
[docs]def walk_cached_folder_tree(root_cache_path): seen_urls = set() root_folder = Folder.from_file(os.path.join(root_cache_path, 'index.wtml')) def walk(cur_treepath, cur_folder, cur_cache_path): yield (cur_treepath, cur_folder) for index, child in enumerate(cur_folder.children): child_treepath = cur_treepath + (index,) if not isinstance(child, Folder): yield (child_treepath, child) else: subdir_base = f'{index:03d}_{_sanitize_name(child.name)}' child_cache_path = os.path.join(cur_cache_path, subdir_base) if not len(child.children) and child.url: if child.url in seen_urls: continue seen_urls.add(child.url) child = Folder.from_file(os.path.join(child_cache_path, 'index.wtml')) for sub_treepath, sub_child in walk(child_treepath, child, child_cache_path): yield (sub_treepath, sub_child) for info in walk((), root_folder, root_cache_path): yield info