import concurrent.futures
import logging
import traceback
from threading import Event, Lock, Thread
from typing import Callable, Tuple
from typing_extensions import Self
from .artvee_client import ArtveeClient
from .artwork import Artwork, ArtworkMetadata, CategoryType, Image, ImageMetadata
logger = logging.getLogger(__name__)
[docs]
class ArtveeScraper:
"""A web scraper which concurrently extracts artwork from Artvee. Callbacks are notified asynchronously for
each scraped artwork so that user-defined actions may be taken.
Attributes:
_artvee_client (ArtveeClient):
An HTTP client for accessing artwork.
_worker_pool (ThreadPoolExecutor):
A pool of threads to manage concurrent scraping tasks.
_categories (Tuple[CategoryType]):
Category types to scrape.
_boss_thread (Thread):
The main thread responsible for executing the scraping logic; delegates tasks to workers.
_stop_event (Event):
Signal to indicate the scraping process should be halted.
_listener_lock (Lock):
A lock which provides access the listeners in a thread-safe manner.
_listeners (dict):
A hashset of callbacks to invoke asynchronously.
Args:
artvee_client (ArtveeClient, optional):
An HTTP client for accessing artwork. Defaults to a new instance.
worker_threads (int, optional):
The number of worker threads to use for processing. Must be between 1 and 10. Defaults to 3.
categories (Tuple[CategoryType], optional):
Category types to scrape. Defaults to all categories.
Raises:
ValueError:
If `worker_threads` is not in the range [1, 10].
"""
def __init__(
self,
artvee_client: ArtveeClient = ArtveeClient(),
worker_threads: int = 3,
categories: Tuple[CategoryType] = tuple(CategoryType),
) -> None:
"""Initializes a newly created `ArtveeScraper` object."""
self._artvee_client = artvee_client
if not 1 <= worker_threads <= 10:
raise ValueError("Worker threads must be in range [1-10]")
self._worker_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=worker_threads
)
self._categories = categories
self._boss_thread = Thread(target=self._exec, args=())
self._stop_event = Event()
self._listener_lock = Lock()
self._listeners = {}
def __enter__(self):
return self
def __exit__(self, *_):
self.join()
[docs]
def register_listener(
self,
on_event_listener: Callable[[Artwork, Exception | None], None],
) -> Self:
"""Registers a callback to be notified of events asynchronously.
A callback may only be registered once; additional attempts to register the same callback will
overwrite the previous registration. The `success` notification contains a fully populated `Artwork`,
whereas the `failure` notification contains a partially populated `Artwork` and associated exception.
Args:
on_event_listener (Callable[[Artwork, Exception | None], None]):
A callback function that will be notified async when an event occurs.
Raises:
ValueError:
If the provided `on_event_listener` argument is not callable.
Returns:
`self` for method chaining
"""
if not callable(on_event_listener):
raise ValueError("On event listener callback must be a callable type")
with self._listener_lock:
tmp = self._listeners.copy()
tmp[on_event_listener] = None # dict as a hashset
self._listeners = tmp
return self
[docs]
def deregister_listener(
self,
on_event_listener: Callable[[Artwork, Exception | None], None],
) -> Self:
"""Deregisters a callback so that it will no longer be notified of events.
Args:
on_event_listener (Callable[[Artwork, Exception | None]], None]):
The callback to no longer notify when an event occurs.
Returns:
`self` for method chaining
"""
with self._listener_lock:
tmp = self._listeners.copy()
tmp.pop(on_event_listener, None)
self._listeners = tmp
return self
[docs]
def start(self) -> None:
"""Begin scraping artwork
Returns:
None
"""
logger.debug("Starting")
self._boss_thread.start()
[docs]
def join(self) -> None:
"""Blocks the calling thread until all active tasks have been completed.
Returns:
None
"""
logger.debug("Waiting for all active tasks to complete")
self._boss_thread.join()
self._worker_pool.shutdown(wait=True, cancel_futures=False)
[docs]
def shutdown(self) -> None:
"""Initiates a shutdown in which running tasks are completed, but all pending tasks are canceled.
Returns:
None
"""
logger.debug("Shutting down")
self._stop_event.set()
self._boss_thread.join()
self._worker_pool.shutdown(wait=True, cancel_futures=True)
def _exec(self) -> None:
logger.debug("Executing scraper for categories %s", self._categories)
for category in self._categories:
if self._stop_event.is_set():
return
try:
page_count = self._artvee_client.get_page_count(category)
logger.debug("Category %s has %d page(s)", category, page_count)
for page in range(1, page_count + 1):
if self._stop_event.is_set():
return
logger.debug(
"Processing category %s, page (%d/%d)",
category,
page,
page_count,
)
try:
# Scrape the page's metadata
metadata_tuples = self._artvee_client.get_metadata(
category, page
)
# Submit tasks to download the images; invoke the listeners for each
results = self._worker_pool.map(
self._worker_task, metadata_tuples
)
# Block until all submitted tasks for a page have completed
for _ in results:
pass
except Exception:
logging.warning(
"An error occured while processing page %d of category %s; %s",
page,
category,
traceback.format_exc(),
)
except Exception:
logging.error(
"An error occured while processing category %s; %s",
category,
traceback.format_exc(),
)
def _worker_task(
self, metadata_tuple: Tuple[ArtworkMetadata, ImageMetadata]
) -> None:
"""Artwork processing task.
Retrieves the image, coalescing it with the metadata into an `Artwork` object.
If the image bytes are successfully retrieved, any registered listeners will be invoked with a fully populated `Artwork`.
If an exception is encountered while retrieving the image, any registered listeners will be invoked with a partially
populated `Artwork` along with the exception that was thrown.
Args:
metadata_tuple (Tuple[ArtworkMetadata, ImageMetadata]):
A pair of related metadata which provides the attributes of an artwork.
- `ArtworkMetadata`: Attributes of an artwork.
- `ImageMetadata`: Attributes of an image file.
Returns:
None
"""
artwork_metadata, img_metadata = metadata_tuple
# Unpack the metadata to partially populate the artwork
artwork = Artwork(**vars(artwork_metadata))
artwork.image = Image(**vars(img_metadata))
try:
artwork.image.raw = self._artvee_client.get_image(img_metadata)
self._invoke_listeners(artwork) # artwork is fully populated
except Exception as thrown:
self._invoke_listeners(
artwork, thrown
) # artwork is partially populated (missing raw image)
def _invoke_listeners(
self, artwork: Artwork, thrown: Exception | None = None
) -> None:
"""Invokes registered callback functions.
Args:
artwork (Artwork): The artwork. Partially populated if an exception has occurred.
thrown (Exception, optional): The processing exception. Defaults to None.
Returns:
None
"""
for listener in self._listeners.keys():
listener(artwork, thrown)