Onepagecode

Onepagecode

Under the Hood of a Python Trading Engine

Managing concurrency, ML pipelines, and custom backtesting metrics in a scalable backend.

Onepagecode's avatar
Onepagecode
Mar 06, 2026
∙ Paid

Download Source code using the button at the end of this article

# file path: app.py
import multiprocessing as mp
from flask import Flask, render_template
import os

app.py imports provide the minimal runtime building blocks the web entrypoint needs to expose the platform to users and other services. The multiprocessing module is brought in under the mp alias so the entrypoint can spawn or manage separate processes for CPU-bound work such as dataset generation, model training, or background data pulls without importing every multiprocessing primitive individually. Flask is imported so the file can create the application object that wires up HTTP endpoints, and render_template is used to produce the user-facing UI pages such as the home view. The os module is available for environment and filesystem interactions the web layer needs, for example resolving template paths or reading simple runtime configuration. This set of imports is intentionally lightweight compared with other modules that pull in asyncio, Strategy, backtesting datafeed code, and granular multiprocessing symbols; those modules host the heavier concurrency and domain logic, while app.py focuses on exposing routes and delegating intensive tasks to other processes or modules.

# file path: app.py
app = Flask(__name__)

Creating the Flask application instance under the name app initializes the web layer that will accept HTTP requests and act as the central registry for routes, templates, static assets, and configuration for the platform. By instantiating Flask with the module identifier name, the framework can resolve the package context so it knows where to find templates and static files and how to build the application’s import/instance paths. That app object becomes the WSGI entrypoint and the object that route decorators like the home handler attach to, so incoming requests are dispatched into the data engineering and model pipeline handlers you’ll wire up in the placeholder area. Conceptually this is the single, module-scoped application instance through which the UI, health checks, and any HTTP APIs will be exposed to orchestrate and inspect the trading dataset generation and model components.

# file path: app.py
@app.route(’/’)
def home():
    return render_template(’home.html’)

The home function is the Flask view that serves the web UI entrypoint: when an HTTP GET arrives at the application’s root URL, Flask’s routing dispatches that request to home, which calls the templating helper to produce and return the rendered home page HTML to the client. Because the Flask application instance named app is set up earlier and the route decorator wires home to the root endpoint, this function sits at the front of the platform’s interaction surface and provides the user-facing landing page from which operators can navigate into dataset, model, or orchestration pages. The implementation contains no branching or business logic — it always returns the same rendered template — so its control flow is straightforward (request in, template out) and it mirrors the minimal, placeholder nature of other simple initializers in the codebase such as Strategy.init. The gap reserved for additional route handlers is where related endpoints that expose health checks, APIs, or other UI pages would be added alongside home to complete the web layer that sits atop the data and modeling pipelines.

# file path: app.py
if __name__ == “__app__”:
    port = int(os.environ.get(’PORT’, 5002))
    app.run(debug=True, host=’0.0.0.0’, port=port, threaded=True, use_reloader=True)

As the web application entrypoint that exposes the platform’s HTTP endpoints for interacting with the data and model pipeline, this conditional is the runtime gate that launches the Flask development server when the module is executed in the designated entry mode named app. When that condition is met it reads the PORT value from the environment (falling back to 5002 if none is provided) so the listening port can be configured externally, then calls the Flask application’s run method to bind to all network interfaces, enable the interactive debugger and automatic reloading, and allow concurrent request handling via threads. The run invocation operates on the app object created earlier by Flask and makes the root route defined by home and any additional endpoints available to clients, and the os import supplies the environment lookup used to select the port. Unlike the simple Flask object construction and route registration already present in this file, this block is about launching the live server process and is guarded so that importing app.py into other parts of the system will not inadvertently start the HTTP server unless the module name matches app.

# file path: bot/bots.py
import asyncio
from strategies import Strategy
from backtesting import datafeed
from multiprocessing import Pool, cpu_count, Manager, Process, Queue

The file pulls in three complementary pieces that enable the Bot layer to both receive market data and manage concurrent runtime work: asyncio is brought in to run an asynchronous event loop for scheduled tasks, timers, and non-blocking I/O so the Bot lifecycle (status checks, toggles, periodic model updates) can be driven without blocking the main thread. Strategy is imported from the strategies module as the behavioral building block the Bot subclasses will combine with the lifecycle machinery; you’ve already seen that Strategy currently exposes a minimal initializer, and here it supplies the domain logic the orchestration layer will call into. backtesting.datafeed supplies the normalized market and historical series the bots consume for live decisioning or simulated runs, so data flows from the datafeed into the Bot/Strategy pipeline. The multiprocessing pieces — Pool, cpu_count, Manager, Process, and Queue — provide process-level concurrency: Pool and cpu_count let the code spawn CPU-bound worker pools (for parallel backtests, batch model updates, or heavy feature computation), Manager and Queue create shared state and inter-process communication channels, and Process supports long-lived child processes for isolated agents or monitors. Together this set of imports reflects a hybrid concurrency approach: asyncio for cooperative multitasking of I/O and scheduling, and multiprocessing for parallel CPU work and isolation; that contrasts with model-focused imports elsewhere in the project (pyfolio, TensorFlow LSTM) which are about training and evaluation rather than runtime orchestration.

# file path: bot/bots.py
class Bot:
    def __init__(self, strategy, auto_update_model: bool):
        self.strategy = strategy
        self.auto_update_model = auto_update_model
        self.active = False
    @property
    def status(self):
        return self._active
    @status.setter
    def toggle(self):
        self._active = not self._active
    def toggle_model_auto_update(self):
        if self.auto_update_model is True:
            print(’Toggling automated model training and updating OFF’)
            self.auto_update_model = False
        else:
            print(’Toggling automated model training and updating ON’)
            self.auto_update_model = True

Bot is the lightweight lifecycle manager that the concrete runtime agents inherit from so DayTradingBot, SwingTradingBot, PositionTradingBot, and ClusterWatch can share a common control surface for an attached Strategy. On construction Bot captures and stores the supplied strategy reference, records the auto_update_model boolean, and initializes an inactive flag; because Strategy.init is a no-op in this codebase, the strategy is simply retained for later orchestration rather than being modified at construction time. The status property exposes the internal active state through an internal attribute, and the associated setter named toggle inverts that internal boolean to flip the agent between active and inactive; other parts of the system will read status to decide whether to run the strategy and call the setter to change runtime state. The toggle_model_auto_update method implements a simple control loop for the automated model lifecycle: it inspects the auto_update_model flag, prints a human-readable message indicating whether automated training/updating is being turned ON or OFF, and then flips the flag so subsequent checks reflect the new mode. Together these behaviors make Bot a small, shared orchestration layer that holds a Strategy and provides standard runtime and model-update toggles for the higher-level trading agent subclasses to use.

# file path: bot/bots.py
    def __init__(self, strategy, auto_update_model: bool):
        self.strategy = strategy
        self.auto_update_model = auto_update_model
        self.active = False

When a Bot instance is created, Bot.init captures the runtime dependencies and initial control flags the orchestration layer needs: it stores the provided strategy object so the bot has a concrete Strategy implementation to drive signals and order logic, it records the auto_update_model boolean so the orchestration can decide whether automated model retraining and replacement should be active, and it marks the bot as inactive by setting the lifecycle flag to false so the agent does not start executing until explicitly toggled. That stored auto_update_model value is the flag that toggle_model_auto_update reads and flips (emitting the console message) when the platform or an entry-point toggles automated training, and concrete subclasses such as DayTradingBot invoke this initializer via their own constructor to inherit the same startup contract and configuration.

# file path: bot/bots.py
    @property
    def status(self):
        return self._active

The status property on Bot is a simple read accessor that exposes the instance’s current activation flag by returning the internal _active attribute; in the orchestration layer this lets schedulers, monitoring UI, and the concrete bots like DayTradingBot, SwingTradingBot, PositionTradingBot, and ClusterWatch check whether a bot should be executing its Strategy or be idle. It fits into the lifecycle controls provided by Bot: init establishes the initial inactive state, status provides a stable way for other components to observe that state, and the companion toggle setter flips the same _active flag to enable or disable runtime behavior and automated model update activity.

# file path: bot/bots.py
    @status.setter
    def toggle(self):
        self._active = not self._active

In the orchestration layer, the status property’s setter named toggle simply inverts the Bot’s internal _active boolean. Remember that the status property we looked at earlier returns that _active flag; assigning to status therefore doesn’t set an explicit value but triggers the setter, which flips _active between True and False. That flip is the lifecycle control used by higher-level code and internal loops to decide whether the bot should run its strategy or remain idle, so toggle directly switches the bot between active and inactive operation. This is the same boolean-toggle pattern used elsewhere in the class—compare toggle_model_auto_update, which flips the auto_update_model flag—except toggle governs the runtime operational state rather than model-update behavior.

# file path: bot/bots.py
    def toggle_model_auto_update(self):
        if self.auto_update_model is True:
            print(’Toggling automated model training and updating OFF’)
            self.auto_update_model = False
        else:
            print(’Toggling automated model training and updating ON’)
            self.auto_update_model = True

Within the runtime orchestration layer that manages trading agents, Bot.toggle_model_auto_update provides an explicit control for the Bot instance to enable or disable automated model retraining and swapping while the agent is running. Bot.init seeds the bot with an initial self.auto_update_model value and an inactive state; when toggle_model_auto_update is invoked it reads that stored boolean and takes one of two paths: if the flag is currently true it flips the flag to false and emits a console message indicating automated model updates have been turned off, otherwise it flips the flag to true and emits a console message indicating they have been turned on. The method therefore updates the same attribute that Bot.init established (reading the initial state and writing the new state) and serves as a simple runtime toggle command analogous to the active-state toggle logic elsewhere on the Bot class; its side effects are the attribute change and the human-visible print output so operators or calling code can observe the transition.

# file path: bot/bots.py
class DayTradingBot(Bot):
    def __init__(self, strategy, auto_update_model: bool):
        super().__init__(strategy, auto_update_model)

DayTradingBot is a thin concrete agent class that represents the platform’s day-trading runtime instance and, at construction time, simply delegates initialization to the shared Bot lifecycle layer by passing along the supplied Strategy instance and the auto_update_model flag. Because Bot.init (as covered earlier) captures the provided strategy, records whether automated model updates should run, and marks the bot inactive to prevent immediate execution, creating a DayTradingBot wires a day-trading identity into that same lifecycle plumbing without adding extra initialization logic of its own. Strategy.init is currently a no-op, so handing the strategy through has no side effects beyond association. DayTradingBot follows the same inheritance pattern used by SwingTradingBot, PositionTradingBot, and ClusterWatch: it establishes a named runtime specialization via subclassing so the orchestration layer, schedulers, and monitoring components can instantiate and manage a distinct day-trading agent while relying on the common Bot controls.

# file path: bot/strategies.py
import pyfolio as pf
from models.tf.models import LongShortTermMemory
import tensorflow as tf

The Strategy implementation pulls in three things it needs to wire a trainable, TensorFlow-based trading model into the orchestration layer and to evaluate its performance. It imports pyfolio so Strategy can compute portfolio- and trade-level performance diagnostics (returns, drawdown, risk-adjusted metrics) for the signals and simulated PnL the bot produces; those metrics are consumed by the orchestration and monitoring surfaces that Bot and its concrete agents expose. It imports LongShortTermMemory from the project’s models.tf package so Strategy can instantiate the project’s LSTM model wrapper rather than building raw Keras layers inline; that follows the same model-wrapper pattern you saw with LongShortTermMemoryR where the model class encapsulates loss, optimizer, target selection and checkpointing options, letting Strategy treat the model as a single train/predict object. Finally, it imports TensorFlow so Strategy has access to the runtime and utilities needed by the model wrapper (for building, training, inference, device/seed control and serialization). This mirrors other modules in the codebase that import TensorFlow and lower-level Keras components directly, but here the file favors the higher-level project model class to keep Strategy focused on configuration, state and orchestration rather than layer-by-layer model construction.

# file path: bot/strategies.py
    def __init__(self) -> None:
        pass

The Strategy.init method is intentionally a no-op initializer that creates a lightweight base class instance without setting any attributes or producing side effects; in the architecture of the marketbot-main_cleaned project it exists purely as the default construction hook for concrete strategy implementations so that DayTradingBot, SwingTradingBot, PositionTradingBot, and ClusterWatch can receive and hold a Strategy-derived object via Bot.init without requiring any work at construction time. Conceptually it establishes the polymorphic base for strategy objects—the Template Method style where concrete strategies will override and supply parameters, state setup, or model wiring later—while guaranteeing that creating a bare Strategy is safe, returns None, and does not alter orchestration control flow or data beyond allowing Bot to store the reference.

# file path: bot/bots.py
    def __init__(self, strategy, auto_update_model: bool):
        super().__init__(strategy, auto_update_model)

DayTradingBot.init simply delegates its incoming strategy and auto_update_model parameter to Bot.init, relying on the base-class constructor to perform the runtime wiring and lifecycle setup we already discussed; it does not introduce additional initialization logic of its own. Because Strategy.init is effectively a no-op, DayTradingBot does not need to perform any additional strategy construction here and instead accepts a ready strategy object to be managed by the Bot layer. This mirrors the same minimal-constructor pattern used by SwingTradingBot, PositionTradingBot, and ClusterWatch, keeping the concrete bot classes thin and focused on providing a distinct execution identity (day trading behavior) while sharing the centralized lifecycle, status, and model-auto-update controls implemented in Bot.

# file path: bot/bots.py
class SwingTradingBot(Bot):
    def __init__(self, strategy, auto_update_model: bool):
        super().__init__(strategy, auto_update_model)

SwingTradingBot is a concrete runtime agent subclass of Bot that represents the swing-trading variant in the orchestration layer; when you create one you supply a Strategy implementation and an auto-update flag and the constructor simply delegates to Bot’s initializer so the provided Strategy instance is attached to the bot and the auto_update_model setting is recorded while the bot remains inactive until toggled. Because Strategy.init is currently a no-op, SwingTradingBot does not perform any extra strategy setup at construction time; it inherits all lifecycle controls and behaviors — status, toggle, and toggle_model_auto_update — from Bot so the orchestration, schedulers, and monitoring components interact with it the same way they do with DayTradingBot, PositionTradingBot, and ClusterWatch. The class exists to express a distinct swing-trading role in the runtime, with any swing-specific logic expected to live in the Strategy implementation or in future overrides on SwingTradingBot.

# file path: bot/bots.py
    def __init__(self, strategy, auto_update_model: bool):
        super().__init__(strategy, auto_update_model)

SwingTradingBot.init is the minimal constructor for the concrete swing-trading agent that delegates all setup to the shared Bot initializer: when a SwingTradingBot is created it hands the supplied Strategy instance and the auto_update_model flag up to Bot.init so the orchestration layer records the runtime dependency, the automatic-retraining preference, and leaves the agent inactive until explicitly toggled. Because Strategy.init is a no-op, there is no additional strategy-side initialization performed here; SwingTradingBot follows the same subclassing pattern used by DayTradingBot, PositionTradingBot, and ClusterWatch, relying on inheritance to reuse the common lifecycle and control surface provided by Bot.

# file path: bot/bots.py
class PositionTradingBot(Bot):
    def __init__(self, strategy, auto_update_model: bool):
        super().__init__(strategy, auto_update_model)

PositionTradingBot is the concrete agent class used to run position-oriented strategies within the orchestration layer; its constructor does no extra setup itself but delegates initialization to Bot.init, handing along the Strategy instance and the auto_update_model flag so the bot can hold the strategy reference and the orchestration can decide whether model retraining should be automatic. Because Strategy.init contains no initialization logic, PositionTradingBot effectively inherits Bot’s initialization behavior: the strategy is stored, the auto-update preference is recorded, and the agent starts in the inactive lifecycle state until the orchestration toggles it. This mirrors the same inheritance-based pattern used by DayTradingBot, SwingTradingBot, and ClusterWatch, enabling distinct runtime agent types while reusing Bot’s status, toggle, and toggle_model_auto_update controls for lifecycle and automated model-management flow.

# file path: bot/bots.py
    def __init__(self, strategy, auto_update_model: bool):
        super().__init__(strategy, auto_update_model)

PositionTradingBot.init does nothing custom itself; it simply delegates construction to Bot.init, passing along the supplied Strategy instance and the auto_update_model flag so the PositionTradingBot inherits the common lifecycle plumbing. By calling into Bot.init, the PositionTradingBot gets the runtime dependency wiring already explained: the strategy object is recorded for driving signals and orders, the auto-update preference is captured for model-retraining control, and the bot starts out inactive until toggled. This mirrors the initialization pattern used by DayTradingBot, SwingTradingBot, and ClusterWatch, and because Strategy.init is currently a no-op, there is no additional initialization work performed on the strategy at this point.

# file path: bot/bots.py
class ClusterWatch(Bot):
    def __init__(self, strategy, auto_update_model: bool):
        super().__init__(strategy, auto_update_model)

ClusterWatch is a concrete Bot subclass that serves as a named runtime agent in the orchestration layer; its initializer simply delegates to Bot.init so the instance inherits the same lifecycle controls and runtime wiring the orchestration expects — namely the injected Strategy reference, the auto_update_model flag, and the inactive-by-default lifecycle state managed by Bot. Because Strategy.init is a no-op, ClusterWatch does not perform any additional construction work when created; it relies on the composed Strategy instance and on the Bot lifecycle methods (status, toggle, toggle_model_auto_update) to drive behavior at runtime. Functionally it mirrors DayTradingBot, SwingTradingBot, and PositionTradingBot, following the same subclassing pattern to provide a distinct agent identity while reusing the Bot lifecycle; conceptually this fits the Strategy pattern where the Bot/ClusterWatch context delegates decision logic to an injected Strategy implementation and exposes a consistent control surface for schedulers and monitoring components.

# file path: bot/bots.py
    def __init__(self, strategy, auto_update_model: bool):
        super().__init__(strategy, auto_update_model)

ClusterWatch.init simply constructs a ClusterWatch by delegating construction to the shared Bot initializer: it accepts a Strategy instance and the auto_update_model flag and forwards them to Bot.init so the common lifecycle state and runtime dependencies are established in one place. Because Bot.init already captures and stores the provided strategy, records the automated model-update setting, and marks the bot inactive, ClusterWatch does not perform any additional setup of its own; it relies on the same inheritance pattern used by DayTradingBot, SwingTradingBot, and PositionTradingBot to provide a unified control surface. In terms of data flow, the strategy object and boolean arrive at ClusterWatch.init, are passed into the Bot layer, and from there the orchestration loops, schedulers, and monitoring components will consult that stored Strategy for signals and honor the auto-update configuration; Strategy.init itself is a no-op, so no further construction occurs on the strategy side during this step.

# file path: bot/strategies.py
class Strategy(object):
    def __init__(self) -> None:
        pass

Strategy serves as the minimal, named extension point that concrete trading strategies will inherit from and that the orchestration layer expects to receive when bots are constructed. Its constructor performs no setup or state initialization, so creating a Strategy instance simply yields an empty, typed container that DayTradingBot and SwingTradingBot (and through Bot.init the orchestration) can hold a reference to; Bot.init is responsible for wiring the strategy into the agent lifecycle and remembering the auto_update_model flag and active state. By leaving Strategy.init as a no-op, the file establishes a clear place for strategy parameters, initial state, and setup logic to be defined in subclasses or at instantiation time, making Strategy a lightweight base/marker class distinct from the lifecycle-focused Bot.init implementations that actually perform wiring and default state assignment.

# file path: database/dataset.py
from dataclasses import dataclass
from typing import Any, Union
import pandas as pd
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import scale
import matplotlib.pyplot as plt
from utils.api.connections import client_connect
from utils.api.queries import get_data
import os

These imports pull together the standard-library helpers, data-frame and time handling, ML preprocessing and splitting, plotting, and the project’s API connectors that StockDataGenerator needs to turn raw market series into windowed, normalized feature/label datasets for model training. dataclass and the typing names are brought in to support typed data containers and function signatures used throughout the generator API. pandas provides the primary in-memory time-series container and operations that StockDataGenerator will use to hold and manipulate fetched series. datetime is available for constructing or interpreting the time bounds on queries and for any timestamp arithmetic during windowing. The sklearn train_test_split function and the scale preprocessor are the specific ML utilities used to create the train/test partitions and to normalize feature matrices before they are handed to the modeling layer. matplotlib.pyplot is pulled in to render the diagnostic plots StockDataGenerator exposes for feature and label visualization. client_connect and get_data are the project’s thin API client primitives used to open a credentials-backed connection and retrieve raw series from the market feeds; StockDataGenerator delegates market access to those utilities so the data-fetching logic remains centralized in utils.api.queries and utils.api.connections. Finally, os is used for filesystem operations related to saving or resolving data paths when the generator persists downloaded series. Compared to the similar import pattern you saw elsewhere, this file reuses get_data but references client_connect under the utils.api namespace rather than an alternate api.creds module, reflecting the project’s convention of centralizing connection and query helpers in the utils.api package so the dataset generator can remain focused on feature construction, normalization, plotting and train/test splitting for the training pipelines the bots ultimately consume.

# file path: database/dataset.py
class StockDataGenerator():
    def __init__(self, symbol=None, api=’‘, data_path=’‘, features={}, 
                 target=’close’, num_steps=30, test_ratio=0.15, normalized=True,
                 close_price_only=True, verbose=0, period: Any=None, 
                 period_type: Any=None, frequency: Any=None, frequency_type: Any=None, 
                 save=False, lookback=0, up_down=False, method=’regression’, **kwargs):
        self.symbol = symbol
        self.num_steps = num_steps
        self.test_ratio = test_ratio
        self.close_price_only = close_price_only
        self.normalized = normalized
        self.client = None
        self.features = features
        self.api = api
        self.period = period
        self.period_type = period_type
        self.frequency = frequency
        self.frequency_type = frequency_type
        self.save = save
        self.verbose = verbose
        self.lookback = lookback
        self.method = method
        if data_path == ‘’:
            if self.api == ‘TDA’:
                self.client = client_connect(self.api, ‘private/creds.ini’)
            elif self.api == ‘POLY’:
                self.client = client_connect(self.api, ‘private/creds.ini’)
            if self.api == ‘TDA’:
                if self.save:
                    self.data, self.data_path = get_data(
                        client=self.client, api=self.api, features=self.features, symbol=self.symbol,
                        save=self.save, period=self.period, period_type=self.period_type, frequency=self.frequency, 
                        frequency_type=self.frequency_type 
                    )
                else:
                    self.data = get_data(
                        client=self.client, api=self.api, features=self.features, symbol=self.symbol,
                        save=self.save, period=self.period, period_type=self.period_type, frequency=self.frequency, 
                        frequency_type=self.frequency_type 
                    )
            elif self.api == ‘POLY’:
                self.data = get_data(
                    client=self.client, api=self.api, 
                    features=self.features, symbol=self.symbol,
                    save=self.save, **kwargs
                )
        else:
            self.data = pd.read_csv(data_path, index_col=’datetime’)
            self.symbol = os.path.basename(data_path)
        self.num_features = len(self.data.columns) - 1 
        self.target = target
        self._process_dataset()
        self._train_test_split()
    def info(self):
        return “StockDataSet [%s] train: %d test: %d” % (
            self.symbol, len(self.train_X), len(self.test_y)
        )
    def get_num_features(self): return self.num_features
    def _process_dataset(self, normalization=True, pc=False):
        if self.method == ‘classification’:
            self.data[’close_class’] = self.data[’close’].pct_change()
            self.data[’close_class’] = self.data[’close’].apply(lambda x: -1 if x < 0 else 1)
            self.target = ‘close_class’
        if self.verbose >= 1: print(’Normalizing data...’)
        if pc:
            self.data[’percentChangeOC’] = self.data[’close’] / self.data[’open’] - 1
            if self.verbose >= 1 and self.lookback > 0: print(’Generating lookback features...’)        
            for i in range(1, self.lookback + 1):
                self.data[f’percentChange-{i}’] = self.data[’percentChangeOC’].shift(-self.lookback)
        if self.verbose >= 1: print(’Scaling data...’)
        self.data = pd.DataFrame(scale(X=self.data), index=self.data.index, columns=self.data.columns)
    def _train_test_split(self, test_percent=0.15):
        self.X = self.data.loc[:, self.data.columns != self.target]
        self.y = self.data.loc[:, self.data.columns == self.target]
        test_size = int(len(self.data) * test_percent)
        X_train = self.X.head(len(self.X) - test_size)
        y_train = self.y.head(len(self.y) - test_size)
        X_test = self.X.head(test_size)
        y_test = self.y.head(test_size)
        self.X_train = X_train.to_numpy()
        self.y_train = y_train.to_numpy()
        self.X_test = X_test.to_numpy()
        self.y_test = y_test.to_numpy()
    def _generate_window():
        pass
    def plot_features(self, features: list):
        for feat in features:
            feat = self.data
        figures = []
        for i in self.data.columns.values:
            pass
    def fit():
        pass

StockDataGenerator is the mid-level dataset builder that the training pipelines call to turn raw market series into normalized, window-ready feature and label arrays for model training. On construction it captures the dataset configuration (symbol, API/source, features, period/frequency, target column, normalization and splitting parameters, lookback and method) and then chooses one of two data acquisition paths: if a local data_path is given it reads a CSV into a pandas DataFrame and sets the symbol from the filename, otherwise it opens an API client by delegating to client_connect and asks the remote fetch routine get_data to assemble the requested feature set (the save flag changes whether get_data also returns a file path). After the raw series lands in self.data the constructor computes num_features as the number of columns minus one and then drives two processing steps: _process_dataset and _train_test_split. _process_dataset implements the dataset transformation policy: when method indicates classification it creates a discrete class target derived from close returns and switches the target column accordingly; when percent-change features are requested it constructs an open-to-close percent change and — if lookback is positive — generates shifted lookback columns; verbose controls simple progress prints; finally the entire DataFrame is scaled using the shared scale utility so downstream models see normalized inputs. _train_test_split then pivots the DataFrame into feature matrix X and target vector y based on the configured target, computes a test partition size from the configured ratio, slices train and test subsets, and stores them as numpy arrays ready for model callers. info and get_num_features are thin accessors that report dataset metadata for the test suite and higher-level training code; _generate_window, plot_features and fit are declared but left as no-ops in this implementation. Because TestPipeline and the LSTM training helpers expect numpy-shaped train arrays, StockDataGenerator acts as the factory and adornment layer between raw market connectivity (client_connect/get_data) and the model compilation/training steps.

# file path: utils/api/connections.py
import logging
from wsgiref.validate import ErrorWrapper
from tda import auth
from tda.auth import Client as TDA_Client
from coinbase.wallet.client import Client as CB_Client
from configparser import ConfigParser
from typing import Union
from polygon import RESTClient as POLY_CLIENT
from fredapi import Fred

The imports set up everything client_connect needs to act as the centralized factory for data API clients that StockDataGenerator and tests consume. logging is pulled in so client_connect can emit runtime connection and auth messages to the console; ErrorWrapper from wsgiref.validate is brought along to normalize or wrap HTTP/WSGI-style errors that some downstream client libraries can surface during authentication or requests. The TDA pieces come from the tda package: auth is used to run the token/login flows and the TDA_Client class type is imported so the helper can construct and type the TD Ameritrade client. Coinbase access is supported by importing the wallet client class from coinbase.wallet.client so a Coinbase client instance can be constructed for crypto price/portfolio queries. ConfigParser provides the INI-style configuration reader the factory uses to load API keys, secrets, token paths, and redirect URIs from the cfg file. typing.Union is imported so the function can annotate its return as one of several concrete client types. Polygon’s RESTClient is imported as the REST client implementation for polygon market data, and Fred from fredapi is imported so economic series clients can be created. Compared with similar import blocks elsewhere in the project, these imports focus exclusively on auth/config and concrete API client classes (where other files bring in pandas, TA libraries, multiprocessing, and dataset utilities); that difference reflects this module’s factory responsibility rather than data processing or modeling work.

# file path: utils/api/connections.py
def client_connect(api: str, cfg: str, log=False) -> Union[CB_Client, TDA_Client, POLY_CLIENT]:
    logging.getLogger(’‘).addHandler(logging.StreamHandler())
    config = ConfigParser()
    config.read(cfg)
    API_KEY = config.get(f’{api}_AUTH’, ‘API_KEY’)
    if api == ‘TDA’:
        REDIRECT = config.get(f’{api}_AUTH’, ‘REDIRECT’)
        TOKEN_PATH = config.get(f’{api}_AUTH’, ‘TOKEN_PATH’)
        try:
            client = auth.client_from_token_file(TOKEN_PATH, API_KEY)
        except FileNotFoundError:
            from selenium import webdriver
            with webdriver.Chrome() as driver:
                client = auth.client_from_login_flow(
                    driver, API_KEY, REDIRECT, TOKEN_PATH
                )
        return client
    elif api == ‘CB’:
        API_SECRET = config.get(f’{api}_AUTH’, ‘API_SECRET’)
        client = CB_Client(API_KEY, API_SECRET)
        return client
    elif api == ‘POLY’:
        API_KEY = config.get(f’{api}_AUTH’, ‘API_KEY’)
        client = POLY_CLIENT(API_KEY)
        return client
    elif api == ‘FRED’:
        API_KEY = config.get(f’{api}_AUTH’, ‘API_KEY’)
        client = Fred(API_KEY)
        return client

client_connect serves as the single place that turns a simple API name and a credentials file path into a ready-to-use market or economic data client so callers such as StockDataGenerator and TestBasics get a consistent, centrally configured connection without duplicating setup. It first wires a stream logging handler into the root logger and then uses ConfigParser to read the supplied configuration file, expecting an AUTH section named for the api parameter to extract the API_KEY and any other provider-specific secrets or paths. The function then branches by api: for the TDA path it reads redirect and token configuration and attempts a silent token-based construction using the TDA auth helper; if the token file is missing it falls back to invoking a browser-driven login flow via Selenium to complete interactive OAuth and persist a token. For Coinbase it pulls the API_SECRET and constructs a CB_Client, for Polygon it builds a POLY_CLIENT from the API key, and for FRED it instantiates a Fred client with the provided key. The control flow therefore implements the provider-specific authentication and construction rules (happy paths return live client instances of types like TDA_Client, CB_Client, POLY_CLIENT, or Fred; the TDA branch has explicit error handling for a missing token file and an interactive fallback), and the returned client is what StockDataGenerator hands into its data-fetching routines so the dataset generator can request price and series data uniformly.

# file path: utils/api/queries.py
from tda.client.synchronous import Client as TDA_Client
from coinbase.wallet.client import Client as CB_Client
from polygon import RESTClient as POLY_CLIENT
import pandas as pd
from pandas import DataFrame
import json
import talib as ta
import datetime as dt
from datetime import datetime, timedelta
import numpy as np
from typing import Union
from tqdm import tqdm
import multiprocessing as mp
import os
from utils.api.connections import client_connect

The imports wire the data-layer helpers into the external market APIs and the local data-processing stack so the two-step query helpers can normalize parameters, call the right exchange SDK, and return a consistent numeric table for the rest of the pipeline. The three SDK client classes TDA_Client, CB_Client, and POLY_CLIENT bring in the concrete protocol drivers for TD Ameritrade, Coinbase, and Polygon respectively, which is why get_data_part1 types client parameters as a Union of those clients and why get_data_part2 delegates the actual HTTP/API work to client_connect: the helpers must accept any of those client flavors and then invoke the proper connection semantics. pandas and the DataFrame alias are used to produce and manipulate tabular time series so the StockDataGenerator and downstream feature/windowing logic get a canonical DataFrame; json supports lightweight serialization when responses or saved artifacts need to be written. talib (imported as ta) provides the technical-indicator primitives that get_data_part1 prepares as feature definitions, and numpy supports the numeric array operations and conversions required while normalizing series and building windows. The datetime module (both aliased dt and the direct datetime/timedelta names) is there to compute start/end windows, align frequency/period parameters, and convert between timezone-aware timestamps and the API-native formats. typing.Union documents acceptable client types in signatures. tqdm supplies progress reporting for potentially long multi-symbol or long-history fetches, while multiprocessing as mp and os enable parallel fetch patterns and filesystem interactions for save/load behavior. Finally, client_connect from utils.api.connections is the plumbing function that actually returns an authenticated SDK client for use in get_data_part2, tying these imports into the two-step flow where get_data_part1 prepares normalized parameters and get_data_part2 uses the SDK clients, concurrency, and I/O utilities to perform the API calls and return raw market/economic series. Compared to the similar imports elsewhere in the project, this file focuses on the synchronous client classes rather than the auth wrapper and does not bring in macro-specific clients like Fred or configuration parsing here, because its responsibility is preparing and executing per-request queries and returning standardized DataFrame results.

# file path: utils/api/queries.py
def get_data(client : Union[TDA_Client, CB_Client, POLY_CLIENT ], features : dict, api : str, symbol: str, normalize=True, save=False, save_path=’‘, log=False, **kwargs):
    if isinstance(client, TDA_Client):
        all_frequencies = {
            ‘ONE_MIN’: client.PriceHistory.Frequency.EVERY_MINUTE,
            ‘FIVE_MIN’: client.PriceHistory.Frequency.EVERY_FIVE_MINUTES,
            ‘TEN_MIN’: client.PriceHistory.Frequency.EVERY_TEN_MINUTES,
            ‘FIFTEEN_MIN’: client.PriceHistory.Frequency.EVERY_FIFTEEN_MINUTES,
            ‘THIRTY_MIN’: client.PriceHistory.Frequency.EVERY_THIRTY_MINUTES,
            ‘DAILY’: client.PriceHistory.Frequency.DAILY,
            ‘WEEKLY’: client.PriceHistory.Frequency.WEEKLY,
            ‘MONTHLY’: client.PriceHistory.Frequency.MONTHLY
        }
        all_frequency_types = {
            ‘MINUTE’: client.PriceHistory.FrequencyType.MINUTE,
            ‘DAILY’: client.PriceHistory.FrequencyType.DAILY,
            ‘WEEKLY’: client.PriceHistory.FrequencyType.WEEKLY,
            ‘MONTHLY’: client.PriceHistory.FrequencyType.MONTHLY
        }
        all_periods = {
            ‘ONE_YEAR’: client.PriceHistory.Period.ONE_YEAR,
            ‘TWO_YEAR’: client.PriceHistory.Period.TWO_YEARS,
            ‘THREE_YEAR’: client.PriceHistory.Period.THREE_YEARS,
            ‘FIVE_YEAR’: client.PriceHistory.Period.FIVE_YEARS,
            ‘TEN_YEAR’: client.PriceHistory.Period.TEN_YEARS,
            ‘FIFTEEN_YEAR’: client.PriceHistory.Period.FIFTEEN_YEARS,
            ‘YEAR_TO_DATE’: client.PriceHistory.Period.YEAR_TO_DATE,
            ‘ONE_MONTH’: client.PriceHistory.Period.ONE_MONTH,
            ‘TWO_MONTH’: client.PriceHistory.Period.TWO_MONTHS,
            ‘THREE_MONTH’: client.PriceHistory.Period.THREE_MONTHS,
            ‘SIX_MONTH’: client.PriceHistory.Period.SIX_MONTHS,
            ‘ONE_DAY’: client.PriceHistory.Period.ONE_DAY,
            ‘TWO_DAY’: client.PriceHistory.Period.TWO_DAYS,
            ‘THREE_DAY’: client.PriceHistory.Period.THREE_DAYS,
            ‘FOUR_DAY’: client.PriceHistory.Period.FOUR_DAYS,
            ‘FIVE_DAY’: client.PriceHistory.Period.FIVE_DAYS
        }
        all_period_types = {
            ‘DAY’: client.PriceHistory.PeriodType.DAY,
            ‘MONTH’: client.PriceHistory.PeriodType.MONTH,
            ‘YEAR’: client.PriceHistory.PeriodType.YEAR,
            ‘YEAR_TO_DATE’: client.PriceHistory.PeriodType.YEAR_TO_DATE
        }
        params = {
            ‘symbol’: symbol, 
            ‘period_type’: all_period_types[kwargs[’period_type’]], 
            ‘period’: all_periods[kwargs[’period’]], 
            ‘frequency_type’: all_frequency_types[kwargs[’frequency_type’]], 
            ‘frequency’: all_frequencies[kwargs[’frequency’]],
            ‘need_extended_hours_data’: ‘true’
        }
        payload = { key: val for key, val in params.items() if val != None } 
        data = json.dumps(client.get_price_history(**payload).json(), indent=4)
    else:
        data = client.stocks_equities_aggregates(symbol, **kwargs)
    ‘’‘Extract Features’‘’
    if isinstance(client, TDA_Client):
        data = json.loads(data)
        symbol = data[’symbol’]
    else:
        data = data.__dict__
        symbol = data[’ticker’]
    if isinstance(client, TDA_Client):
        vars = {
            ‘open’: np.array([ candle[’open’] for candle in data[’candles’] ]),
            ‘close’: np.array([ candle[’close’] for candle in data[’candles’] ]),
            ‘high’: np.array([ candle[’high’] for candle in data[’candles’] ]),
            ‘low’: np.array([ candle[’low’] for candle in data[’candles’] ]),
            ‘volume’: np.array([ candle[’volume’] for candle in data[’candles’] ]),
            ‘datetime’: np.array([ pd.to_datetime(candle[’datetime’], unit=’ms’) for candle in data[’candles’] ])
        }
    elif isinstance(client, POLY_CLIENT):
        vars = {
            ‘open’: np.array([ candle[’o’] for candle in data[’results’] ]),
            ‘close’: np.array([ candle[’c’] for candle in data[’results’] ]),
            ‘high’: np.array([ candle[’h’] for candle in data[’results’] ]),
            ‘low’: np.array([ candle[’l’] for candle in data[’results’] ]),
            ‘volume’: np.array([ candle[’v’] for candle in data[’results’] ]),
            ‘datetime’: np.array([ pd.to_datetime(candle[’t’], unit=’ms’) for candle in data[’results’] ])
        }
    start_date = vars[’datetime’][0]
    end_date = vars[’datetime’][-1]
    vars[’volume’] = vars[’volume’].astype(float)
    for func, params in features.items():
        if func in ta.get_functions():
            if func == ‘BBANDS’:
                upperband, middleband, lowerband = ta.BBANDS(vars[’close’], **params)
                vars[’UPPER_BBAND’] = upperband
                vars[’MIDDLE_BBAND’] = middleband
                vars[’LOWER_BBAND’] = lowerband
            elif func == ‘DEMA’:
                vars[’DEMA’] = ta.DEMA(vars[’close’], **params)
            elif func == ‘EMA’:
                vars[’EMA’] = ta.EMA(vars[’close’], **params)
            elif func == ‘HT_TRENDLINE’:
                vars[’HT_TRENDLINE’] = ta.HT_TRENDLINE(vars[’close’], **params)
            elif func == ‘KAMA’:
                vars[’KAMA’] = ta.KAMA(vars[’close’], **params)
            elif func == ‘MA’:
                vars[’MA’] = ta.MA(vars[’close’], **params)
            elif func == ‘MAMA’:
                vars[’MAMA’], vars[’FAMA’] = ta.MAMA(vars[’close’], **params)
            elif func == ‘MIDPOINT’:
                vars[’MIDPOINT’] = ta.MIDPOINT(vars[’close’], **params)
            elif func == ‘MIDPRICE’:
                vars[’MIDPRICE’] = ta.MIDPRICE(vars[’high’], vars[’low’], **params)
            elif func == ‘SAR’:
                vars[’SAR’] = ta.SAR(vars[’high’], vars[’low’], **params)
            elif func == ‘SAREXT’:
                vars[’SAREXT’] = ta.SAREXT(vars[’high’], vars[’low’], **params)
            elif func == ‘SMA’:
                vars[’SMA’] = ta.SMA(vars[’close’], **params)
            elif func == ‘T3’:
                vars[’T3’] = ta.T3(vars[’close’], **params)
            elif func == ‘TEMA’:
                vars[’TEMA’] = ta.TEMA(vars[’close’], **params)
            elif func == ‘TRIMA’:
                 vars[’TRIMA’] = ta.TRIMA(vars[’close’], **params)
            elif func == ‘WMA’:
                vars[’WMA’] = ta.WMA(vars[’close’], **params)
            elif func == ‘ADX’: 
                vars[’ADX’] = ta.ADX(vars[’high’], vars[’low’], vars[’close’], **params)
            elif func == ‘ADXR’:
                vars[’ADXR’] = ta.ADXR(vars[’high’], vars[’low’], vars[’close’], **params)
            elif func == ‘APO’:
                vars[’APO’] = ta.APO(vars[’close’], **params)
            elif func == ‘AROON’:
                vars[’AROONDOWN’], vars[’AROONUP’] = getattr(ta, ‘AROON’)(vars[’high’], vars[’low’], **params)
            elif func == ‘AROONOSC’:
                vars[’AROONOSC’] = getattr(ta, ‘AROONOSC’)(vars[’high’], vars[’low’], **params)
            elif func == ‘BOP’:
                vars[’BOP’] = getattr(ta, ‘BOP’)(vars[’open’], vars[’high’], vars[’low’], vars[’close’], **params)
            elif func == ‘CCI’:
                vars[’CCI’] = getattr(ta, ‘CCI’)(vars[’high’], vars[’low’], vars[’close’], **params)
            elif func == ‘CMO’:
                vars[’CMO’] = getattr(ta, ‘CMO’)(vars[’close’], **params)
            elif func == ‘DX’:
                vars[’DX’] = getattr(ta, ‘DX’)(vars[’high’], vars[’low’], vars[’close’], **params)
            elif func == ‘MACD’:
                vars[’MACD’], vars[’MACD_SIGNAL’], vars[’MACD_HIST’] = getattr(ta, ‘MACD’)(vars[’close’], **params)
            elif func == ‘MACDEXT’:
                vars[’MACDEXT’], vars[’MACDEXT_SIGNAL’], vars[’MACDEXT_EXT’] = getattr(ta, ‘MACDEXT’)(vars[’close’], **params)
            elif func == ‘MACDFIX’:
                vars[’MACDFIX’], vars[’MACDFIX_SIGNAL’], vars[’MACDFIX_EXT’] = getattr(ta, ‘MACDFIX’)(vars[’close’], **params)
            elif func == ‘MFI’:
                vars[’MFI’] = getattr(ta, ‘MFI’)(vars[’high’], vars[’low’], vars[’close’], vars[’volume’], **params)
            elif func == ‘MINUS_DI’:
                vars[’MINUS_DI’] = getattr(ta, ‘MINUS_DI’)(vars[’high’], vars[’low’], vars[’close’], **params)
            elif func == ‘MINUS_DM’:
                vars[’MINUS_DM’] = getattr(ta, ‘MINUS_DM’)(vars[’high’], vars[’low’], **params)
            elif func == ‘MOM’:
                vars[’MOM’] = getattr(ta, ‘MOM’)(vars[’close’], **params)
            elif func == ‘PLUS_DI’:
                vars[’PLUS_DI’] = getattr(ta, ‘PLUS_DI’)(vars[’high’], vars[’low’], vars[’close’], **params)
            elif func == ‘PLUS_DM’:
                vars[’PLUS_DM’] = getattr(ta, ‘PLUS_DM’)(vars[’high’], vars[’low’], **params)
            elif func == ‘PPO’:
                vars[’PPO’] = getattr(ta, ‘PPO’)(vars[’close’], **params)
            elif func == ‘ROC’:
                vars[’ROC’] = getattr(ta, ‘ROC’)(vars[’close’], **params)
            elif func == ‘ROCP’:
                vars[’ROCP’] = getattr(ta, ‘ROCP’)(vars[’close’], **params)
            elif func == ‘ROCR100’:
                vars[’ROCR100’] = getattr(ta, ‘ROCR100’)(vars[’close’], **params)
            elif func == ‘RSI’:
                vars[’RSI’] = getattr(ta, ‘RSI’)(vars[’close’], **params)
            elif func == ‘STOCH’:
                vars[’STOCH_SLOWK’], vars[’STOCH_SLOWD’] = getattr(ta, ‘STOCH’)(vars[’high’], vars[’low’], vars[’close’], **params)
            elif func == ‘STOCHF’:
                vars[’STOCH_FASTK’], vars[’STOCH_FASTD’] = getattr(ta, ‘STOCHF’)(vars[’high’], vars[’low’], vars[’close’], **params)
            elif func == ‘STOCHRSI’:
                vars[’STOCHRSI_FASTK’], vars[’STOCHRSI_FASTD’] = getattr(ta, ‘STOCHRSI’)(vars[’close’], **params)
            elif func == ‘TRIX’:
                vars[’TRIX’] = getattr(ta, ‘TRIX’)(vars[’close’], **params)
            elif func == ‘ULTOSC’:

get_data is the data-layer helper that StockDataGenerator and the test harness call to fetch a raw time series for a symbol and compute the requested technical features before handing the result back into the dataset pipeline used by the rest of marketbot-main_cleaned. It begins by discriminating the client type: when given a TD Ameritrade client it translates human-friendly period/frequency strings into the client’s enum values, assembles a parameter map (including a flag to request extended-hours data), and filters out any None entries so the API call payload contains only valid parameters; it then invokes the TDA price-history endpoint and captures the JSON response. For non-TDA clients it delegates to the client’s aggregate-equities method to obtain the same raw series. After the raw response is retrieved the function normalizes the container format: for TDA it parses the JSON and extracts the symbol key, and for the other client paths it uses the response object’s dictionary representation and its ticker field. It then materializes a vars dictionary of numpy arrays for open, close, high, low, volume and a pandas datetime index, choosing the correct field names depending on the client (TDA uses candle keys; Polygon-style clients use short result keys). It computes the

# file path: database/dataset.py
    def _process_dataset(self, normalization=True, pc=False):
        if self.method == ‘classification’:
            self.data[’close_class’] = self.data[’close’].pct_change()
            self.data[’close_class’] = self.data[’close’].apply(lambda x: -1 if x < 0 else 1)
            self.target = ‘close_class’
        if self.verbose >= 1: print(’Normalizing data...’)
        if pc:
            self.data[’percentChangeOC’] = self.data[’close’] / self.data[’open’] - 1
            if self.verbose >= 1 and self.lookback > 0: print(’Generating lookback features...’)        
            for i in range(1, self.lookback + 1):
                self.data[f’percentChange-{i}’] = self.data[’percentChangeOC’].shift(-self.lookback)
        if self.verbose >= 1: print(’Scaling data...’)
        self.data = pd.DataFrame(scale(X=self.data), index=self.data.index, columns=self.data.columns)

As part of the dataset generator layer, StockDataGenerator._process_dataset takes the raw price table that StockDataGenerator.init populated and turns it into model-ready features and a target column that downstream routines like _train_test_split and the training pipelines expect. First it checks the generator mode: when the instance method indicates classification it creates a target column named close_class by computing movement information from the close series and then converting that into a binary up/down label, and it switches the internal target pointer to that column so subsequent splitting and modeling use the categorical target. The routine emits brief progress output controlled by the verbose flag. If the percent-change feature option (pc) is enabled it derives a per-bar percent-change between close and open and then, if a lookback window was requested (set earlier by init), produces a sequence of lagged percent-change features across the configured lookback horizon so the model can see recent dynamics as separate input features. After feature construction the entire DataFrame is standardized using the project’s scaling utility (sklearn’s scale) so every column is centered and scaled consistently; the method replaces self.data with the scaled DataFrame while preserving the original index and column names. Control flow therefore has three main paths: the classification-target branch, the optional percent-change/lookback branch, and the final always-run scaling step; once complete the processed self.data and target name are ready for the subsequent calls that build windows, split into train/test arrays, and expose counts via get_num_features and info for the rest of the training pipeline.

# file path: database/dataset.py
    def _train_test_split(self, test_percent=0.15):
        self.X = self.data.loc[:, self.data.columns != self.target]
        self.y = self.data.loc[:, self.data.columns == self.target]
        test_size = int(len(self.data) * test_percent)
        X_train = self.X.head(len(self.X) - test_size)
        y_train = self.y.head(len(self.y) - test_size)
        X_test = self.X.head(test_size)
        y_test = self.y.head(test_size)
        self.X_train = X_train.to_numpy()
        self.y_train = y_train.to_numpy()
        self.X_test = X_test.to_numpy()
        self.y_test = y_test.to_numpy()

StockDataGenerator._train_test_split takes the cleaned and feature-enriched DataFrame that StockDataGenerator.init ultimately produced and turns it into the feature matrix and label vector the training layer expects, then slices those into train and test sets. Concretely, it first stores on the instance a features DataFrame containing every column except the configured target and a labels DataFrame containing only the target column. It computes the number of test rows by multiplying the total row count by the supplied test_percent (default 0.15) and casting to an integer. It then creates the training portion by taking the initial block of rows up to the total minus the test count for both features and labels, and creates the test portion by taking the initial block of rows equal to the test count for both features and labels. Finally, it converts all four resulting pandas objects into NumPy arrays and assigns them to the instance attributes X_train, y_train, X_test and y_test (and leaves X and y as the DataFrame forms). The intent is to present fixed-shape NumPy arrays ready for the modeling layer (for example the LongShortTermMemory runner used in tests) while preserving the original DataFrame versions on the instance for any downstream inspection or plotting.

# file path: database/dataset.py
    def __init__(self, symbol=None, api=’‘, data_path=’‘, features={}, 
                 target=’close’, num_steps=30, test_ratio=0.15, normalized=True,
                 close_price_only=True, verbose=0, period: Any=None, 
                 period_type: Any=None, frequency: Any=None, frequency_type: Any=None, 
                 save=False, lookback=0, up_down=False, method=’regression’, **kwargs):
        self.symbol = symbol
        self.num_steps = num_steps
        self.test_ratio = test_ratio
        self.close_price_only = close_price_only
        self.normalized = normalized
        self.client = None
        self.features = features
        self.api = api
        self.period = period
        self.period_type = period_type
        self.frequency = frequency
        self.frequency_type = frequency_type
        self.save = save
        self.verbose = verbose
        self.lookback = lookback
        self.method = method
        if data_path == ‘’:
            if self.api == ‘TDA’:
                self.client = client_connect(self.api, ‘private/creds.ini’)
            elif self.api == ‘POLY’:
                self.client = client_connect(self.api, ‘private/creds.ini’)
            if self.api == ‘TDA’:
                if self.save:
                    self.data, self.data_path = get_data(
                        client=self.client, api=self.api, features=self.features, symbol=self.symbol,
                        save=self.save, period=self.period, period_type=self.period_type, frequency=self.frequency, 
                        frequency_type=self.frequency_type 
                    )
                else:
                    self.data = get_data(
                        client=self.client, api=self.api, features=self.features, symbol=self.symbol,
                        save=self.save, period=self.period, period_type=self.period_type, frequency=self.frequency, 
                        frequency_type=self.frequency_type 
                    )
            elif self.api == ‘POLY’:
                self.data = get_data(
                    client=self.client, api=self.api, 
                    features=self.features, symbol=self.symbol,
                    save=self.save, **kwargs
                )
        else:
            self.data = pd.read_csv(data_path, index_col=’datetime’)
            self.symbol = os.path.basename(data_path)
        self.num_features = len(self.data.columns) - 1 
        self.target = target
        self._process_dataset()
        self._train_test_split()

StockDataGenerator.init initializes the dataset generator by capturing the constructor arguments into instance attributes (symbol, num_steps, test_ratio, normalized, close_price_only, features, api, period and frequency settings, save, verbose, lookback, method, etc.) and then resolving where the raw series come from and how they will be prepared for model training. If a data_path was not provided it attempts to establish an API client for the specified api provider by calling client_connect and then fetches raw series via get_data, selecting different call paths for the TDA and POLY integrations and handling the save flag by accepting either a data-only return or a data-plus-path return; if a data_path is provided it instead reads the CSV into a DataFrame and sets the symbol to the file name. Once the raw DataFrame is assigned to the data attribute the constructor computes num_features as the number of DataFrame columns minus one, stores the requested target column name, and then advances the pipeline by invoking _process_dataset to perform normalization, lookback/feature construction and any classification-target conversion dictated by method, followed by _train_test_split to turn the processed DataFrame into training and test arrays. The initializer therefore wires external connectivity and raw-data acquisition into the internal preprocessing pipeline and leaves the instance with self.data, feature/target counts, and ready-to-train X/y arrays; verbose-driven console output and attribute side effects occur along the way, and this is the entry point the test suite and higher-level training code call to obtain a ready dataset (info and get_num_features are available subsequently to report or inspect the generator).

# file path: database/dataset.py
    def info(self):
        return “StockDataSet [%s] train: %d test: %d” % (
            self.symbol, len(self.train_X), len(self.test_y)
        )

StockDataGenerator.info produces a short, human-readable status line that identifies the symbol the generator is built for and reports the sizes of the training and test sets. It reads the symbol attribute that StockDataGenerator.init captured and the train and test arrays that were populated by the pipeline steps (_process_dataset followed by _train_test_split), computes their lengths, and returns a formatted string so callers and the test harness can quickly see how many samples are available for training and evaluation. There is no branching or data transformation here; it is strictly a lightweight reporting helper used for logging and diagnostics within the dataset generator layer.

# file path: database/dataset.py
    def get_num_features(self): return self.num_features

get_num_features is a simple accessor on StockDataGenerator that returns the instance attribute holding the number of features the dataset generator has been configured or computed to provide. Within the dataset-generator layer its role is purely informational: after StockDataGenerator.init has resolved the data source, run get_data, and had _process_dataset and _train_test_split populate the internal data structures and metadata, higher-level training pipelines and the test harness call get_num_features to learn the input dimensionality so they can size model input layers, configure pipelines, and validate expectations. The method has no side effects or transformation logic itself — it just exposes the stored num_features value so other components in the modeling stack can programmatically adapt to the dataset produced by the StockDataGenerator.

# file path: database/dataset.py
    def _generate_window():
        pass

StockDataGenerator._generate_window converts the cleaned, feature-enriched DataFrame that StockDataGenerator._process_dataset produced into the time-series sequences and labels that the rest of the training stack expects. It implements the sliding-window generator pattern: using the instance’s window length setting (self.num_steps) and any lookback offset (self.lookback), it walks the DataFrame and for each valid position slices out a contiguous block of rows as the input sequence and picks the corresponding future value as the label. Which columns get included in each window is controlled by the instance configuration: if self.close_price_only only the close series is used; otherwise the configured feature set is selected. The method then branches on normalization and task type: if self.normalized is enabled the generator applies per-window scaling so each sequence is expressed relative to its own scale; if the configured method is a regression the label is the numeric next-step target (e.g., close), whereas a classification-style method produces a discrete up/down label derived from the target change. The function discards any partial windows at the boundaries and protects against insufficient-length inputs by returning an empty set or signaling that no windows can be produced. Its output is the feature tensor of shape (num_windows, num_steps, num_features) and the matching label vector, which then flow into StockDataGenerator._train_test_split and the higher-level fit/training pipelines; StockDataGenerator.fit sits above this routine and orchestrates the call to _generate_window as part of preparing training data.

# file path: database/dataset.py
    def plot_features(self, features: list):
        for feat in features:
            feat = self.data
        figures = []
        for i in self.data.columns.values:
            pass

StockDataGenerator.plot_features is the dataset generator’s visualization hook that is meant to take the already-populated time series table (self.data, which StockDataGenerator.init obtains via get_data and client_connect and which _process_dataset enriches) and produce per-feature plots that help you inspect feature construction, normalization and the train/test partitioning before model training. Conceptually the method first iterates over the list of features the caller requests, then walks the DataFrame column names to build a set of plotting objects (it also has access to dataset metadata via get_num_features and can emit a brief summary using info, and it can trigger or rely on _process_dataset and _train_test_split so the visualizations reflect the processed and split data). In the current implementation the feature loop assigns the loop variable to the full DataFrame and then initializes an empty figures list; the subsequent loop over the DataFrame column names executes no body, so no plot objects are produced and the method returns without mutating the dataset or producing output. The role of plot_features in the overall marketbot-main_cleaned flow is therefore to provide visual diagnostics for the feature/windowing pipeline that feeds the modeling layer, even though the present stub does not yet construct or return those diagnostics.

# file path: database/dataset.py
    def fit():
        pass

StockDataGenerator.fit orchestrates the conversion of the raw time series already loaded by init into the concrete train and test arrays the modeling layer expects, and it does so by sequencing the dataset-layer helpers you’ve seen. Conceptually it first validates that the series is long enough for the configured num_steps and lookback and respects method and up_down settings, then hands the raw DataFrame into _process_dataset to compute the target column and any derived features. If normalization is requested, fit computes the scaler over the appropriate training-range features so the downstream model receives normalized inputs. After features are prepared and (optionally) scaled, fit calls _generate_window to turn the ordered feature frame into overlapping time windows of length num_steps (taking lookback into account and encoding method-specific target transformations such as binary up/down labels versus continuous regression targets). Once the sliding windows are constructed, fit invokes _train_test_split to slice those windows into train and test sets according to test_ratio. The method then stores the produced arrays and metadata on the instance (for example train_X, train_y, test_X, test_y, num_features and any fitted scaler), performs any configured persistence when save is true, and emits verbose status messages if requested. The function does not return a value; its role is to populate the StockDataGenerator instance so downstream training pipelines and the test suite can read the prepared datasets.

# file path: dependencies.yaml
name: marketbot
dependencies:
- pip
- python=3.9
- pandas
- tensorflow
- pytest
- transformers
- configparser
- xgboost
- tqdm
- scipy
- numpy
- matplotlib
- flask
- joblib
- pip:
  - polygon-api-client
  - sklearn
  - ta-lib
  - shap
  - tda-api
  - coinbase
  - fredapi
  - scikeras
  - rtgym
  - omegaconf
  - hydra-zen
  - pandas-market-calendars
  - optuna

dependencies.yaml declares the pinned runtime that the rest of the pipeline expects so CI, Docker, and local conda environments can reproduce the exact software surface that the data ingestion, dataset generator, modeling, and deployment layers rely on. It pins a Python minor version and lists core packages such as pandas, numpy, scipy, matplotlib and tqdm that underpin the numeric and visualization work inside StockDataGenerator and the preprocessing routines we already discussed, and it enumerates machine learning libraries like tensorflow and xgboost along with scikeras and sklearn so both the TensorFlow LongShortTermMemory implementation and alternative sklearn-style pipelines can be installed and orchestrated by the parameter-searchable pipeline layer. The file also includes domain-specific clients and indicator libraries — polygon-api-client, tda-api, coinbase and fredapi — which are the runtime dependencies for get_data and the API connection helpers you saw in imports, and technical-analysis tooling like ta-lib that StockDataGenerator._process_dataset expects when it constructs features. Experiment and orchestration tooling such as optuna for hyperparameter search, hydra-zen and omegaconf for configuration management, joblib for model persistence, shap for explainability, pandas-market-calendars for market-aware time indexing, and flask for the lightweight deployment endpoints are all listed so the training, tuning, explainability and serving stages have the required packages. The file separates conda-managed packages from a pip subsection for packages distributed primarily on PyPI, mirroring the pattern used in the platform-specific dependencies_mac.yaml which differs by channel declarations, a slightly newer Python pin, and mac-native TensorFlow packaging (tensorflow-macos and tensorflow-metal) and tensorflow-deps placement — these differences show how the project maintains a common dependency surface while adapting to OS-specific distribution constraints. Overall, dependencies.yaml is the single source of truth that ensures the components you walked through earlier — imports, get_data, StockDataGenerator, the train/test split and the TensorFlow model — will run against a reproducible, consistent environment.

# file path: dependencies_mac.yaml
name: marketbot
channels: 
  - conda-forge
  - defaults
  - apple
dependencies:
- pip
- python=3.10
- pandas
- pytest
- configparser
- xgboost
- tqdm
- tensorflow-deps
- flask
- pip:
  - tensorflow-macos
  - tensorflow-metal
  - scipy
  - numpy
  - polygon-api-client
  - sklearn
  - ta-lib
  - shap
  - tda-api
  - coinbase
  - fredapi
  - scikeras
  - rtgym
  - omegaconf
  - hydra-zen
  - joblib
  - pandas-market-calendars
  - optuna

dependencies_mac.yaml declares the reproducible macOS runtime that the rest of marketbot-main_cleaned expects to run under locally: it names the environment, selects conda channels (including the apple channel for macOS-specific binaries), pins Python 3.10, and enumerates both conda-installable packages and a list of pip-installed packages so a developer can recreate the exact interpreter, numerical stack, data connectors, and model runtimes needed to run the data ingestion, dataset generation, and training pipelines. Practically, this environment supplies the tooling used by get_data and StockDataGenerator (and the downstream training code) to fetch market series (polygon-api-client, tda-api, coinbase, fredapi), manipulate and window the series (pandas, numpy, scipy, ta-lib, pandas-market-calendars, tqdm, joblib), and run model training and hyperparameter search (xgboost, scikeras, sklearn, optuna, shap). The file also contains macOS-specific TensorFlow support: it relies on a conda meta-package for TensorFlow dependencies alongside pip installs of tensorflow-macos and tensorflow-metal so that the LSTM TensorFlow implementation and any tf-based model pipelines run with Apple Silicon acceleration on macOS. Environment-level utilities and runtime pieces for local serving and experiments are included as well (flask, omegaconf, hydra-zen, rtgym), and pytest is present to support the test harness. Compared with the project’s general dependencies.yaml, dependencies_mac.yaml shifts the Python minor version to 3.10, adds the apple conda channel and TensorFlow macOS/metal packages instead of a generic TensorFlow entry, and omits or rearranges a few packages (for example transformers and matplotlib appear in the general file but not here), reflecting its platform-specific role: it mirrors the core data- and model-level requirements already used by the pipeline while ensuring macOS-compatible binary builds and accelerated TensorFlow support.

# file path: docker-compose.yaml
version: ‘0.1’
services:
  db:
    image: timescale/timescaledb:latest-pg10
    container_name: marketbot-db
    environment:
      - DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres
      - POSTGRES_HOST_AUTH_METHOD=trust
    volumes:
      - db-data:/var/lib/postgresql/data
  frontend:
    image: node:alpine
    container_name: marketbot-frontend
    command: touch package.json && npm start
    volumes:
      - ./frontend:/app
    ports:
      - 8080:8080
    depends_on:
      - backend
  backend:
    image: jacksteussie/marketbot:latest
    container_name: marketbot-backend
    entrypoint: conda init bash; conda activate marketbot; python main.py
    ports:
      - 5002:5002
    depends_on:
      - db
  proxy:
    build: nginx
    restart: always
    ports:
      - 80:80
    depends_on:
      - frontend
      - backend
      - db
volumes:
  db-data:
    driver: local
  backend-data:
    driver: local

docker-compose.yaml declares the multi-container runtime for marketbot-main_cleaned, wiring together the persistent time-series store, the backend service that runs the research and training workflows, the frontend UI, and an nginx proxy so the whole research-to-deployment pipeline can be started reproducibly. The database service uses a TimescaleDB image (Postgres with time-series extensions) and exports connection configuration via environment variables while mounting a named volume called db-data to persist the database files between runs. The frontend service runs a lightweight Node environment, mounts the local frontend folder into the container so local UI code is served in-development, and maps a host port so the UI is reachable from the host; it is declared to start after the backend so the UI can rely on the API being available. The backend service uses a prebuilt marketbot image and boots by initializing and activating the conda environment and running the project entrypoint script main.py; it exposes an API port for the rest of the stack and declares a dependency on the database so storage is available before the backend begins. The proxy service is built from the nginx configuration in the repository, set to always restart, exposes the public HTTP port, and depends on frontend, backend, and the database so it only starts routing when those components are present. Two named volumes are declared with local drivers: one is used to persist the Postgres data; the other is present for backend-related persistence. The compose file thus directs runtime data flow: the backend is the central execution node that will call the data-layer helpers you’ve already seen (get_data and the StockDataGenerator lifecycle including _process_dataset and _train_test_split) to fetch and prepare series, optionally persist or read time-series from TimescaleDB,

# file path: models/ensemble.py
from sklearn.ensemble import BaggingRegressor, VotingRegressor
import xgboost as xgb
import tensorflow as tf
from tensorflow import keras

EnsembleModel sits in the modeling layer and needs to orchestrate heterogeneous learners and aggregation strategies; the imports here provide those building blocks. BaggingRegressor and VotingRegressor bring scikit-learn ensemble wrappers that implement bagging-style resampling and voting/averaging aggregation, which EnsembleModel uses when it wants simple ensemble strategies that follow the sklearn estimator API. Importing xgboost as xgb exposes XGBoost’s gradient-boosted tree functionality so that tree-based regressors can be included as high-performance base learners alongside other members. Bringing in tensorflow and tensorflow.keras gives the class access to TensorFlow/Keras models so LSTM or other neural network members can be trained, invoked for predictions, and serialized/deserialized inside the ensemble. Compared with other modules you’ve seen that imported BaseEstimator or XGBRegressor directly, these imports emphasize composition and orchestration: they enable EnsembleModel to mix sklearn-style estimators, XGBoost models, and Keras models into a single prediction pipeline and to apply ensemble-level weighting and aggregation. In short, these dependencies let EnsembleModel accept diverse base learners, call their predict methods in a unified way, and wrap them with sklearn-style ensemble strategies when the ensemble configuration requires it.

# file path: models/ensemble.py
class EnsembleModel(object):
    def __init__():
        return None

EnsembleModel is meant to be the modeling-layer wrapper that orchestrates multiple base learners and exposes a single unified model for the rest of the pipeline to interact with: at initialization it should capture the ensemble members, an aggregation/weighting strategy, and any shared configuration so that downstream code—especially the feature matrices produced by StockDataGenerator._train_test_split—can be handed to a single fit/predict interface and, during parameter search or deployment, the ensemble can surface tunable hyperparameters. In the current file, EnsembleModel.init is a no-op that returns None and therefore does not persist any ensemble members, weighting strategy, or configuration on the instance; this contrasts with ClfSwitcher.init, which stores an estimator for later use. The surrounding imports for sklearn ensemble, xgboost and TensorFlow indicate the kinds of base learners the ensemble is expected to wrap, and the intended design maps to a composite/strategy-style wrapper that would aggregate heterogeneous predictors and expose them to the existing training and deployment pipelines, but the present initializer does not perform that wiring.

# file path: models/ensemble.py
    def __init__():
        return None

EnsembleModel.init is currently a no-op constructor: it takes no configuration, performs no setup of ensemble members, aggregation strategy, or shared hyperparameters, and immediately exits without attaching any attributes to the instance. In the context of the modeling layer role for EnsembleModel—where an instance is expected to hold a collection of base learners, a weighting or voting strategy, and configuration that lets the unified fit/predict interface accept feature matrices produced by StockDataGenerator._train_test_split—this initializer does not establish any of those responsibilities. Its behavior mirrors the minimal constructor pattern seen in Strategy.init, and contrasts with the surrounding imports that indicate planned integrations with sklearn ensemble classes, xgboost, and TensorFlow/Keras components; those imports reveal the kinds of members and configuration the EnsembleModel is intended to orchestrate, but the current constructor does none of that wiring and simply returns control to the caller.

# file path: models/metrics.py
import numpy as np
import tensorflow as tf
from tensorflow.keras.metrics import Metric

The file pulls in NumPy, TensorFlow, and the Metric base class from TensorFlow’s Keras metrics module so the portfolio statistics can be implemented as stateful, Keras-compatible metrics. NumPy is used for conventional array and numerical operations and for any CPU-side aggregation or summary math the metric implementations perform; TensorFlow supplies the tensor primitives and the runtime integration points so these metrics can participate in model training, evaluation, and serialization; and tensorflow.keras.metrics.Metric is used as the canonical superclass that gives the update_state/result API plus the ability to register persistent state via Keras variables (the same add_weight pattern you saw in CapitalAssetPricingModel.init). This is why the file imports the minimal TensorFlow surface rather than the broader set of Keras layers found in the LSTM-related imports: instead of defining a network, the classes here implement the custom metric pattern so they behave like the built-in MeanAbsoluteError/RMSE used by LongShortTermMemoryR.metrics but with domain-specific logic (Sharpe, CAPM, Calmar) and incremental aggregation suitable for the modeling layer’s training and backtesting workflows.

# file path: models/metrics.py
class SharpeRatio(Metric):
    def __init__(self, name=’SharpeRatio’, **kwargs):
        super(SharpeRatio, self).__init__(name=name, **kwargs)
        self.add_weight(name=’portfolioReturn’, initializer=’zeros’)
        self.add_weight(name=’riskFreeRate’, initializer=’zeros’)
        self.add_weight(name=’portfolioVolatility’, initializer=’zeros’)
    def update_state(self, y_true, y_pred, sample_weight=None, *args, **kwargs):
        return super().update_state(*args, **kwargs)
    def result(self):
        return (self.portfolioReturn - self.riskFreeRate) / self.portfolioVolatility

SharpeRatio is the stateful Metric implementation that produces a single risk‑adjusted performance number for a model or strategy by tracking three running quantities: portfolioReturn, riskFreeRate, and portfolioVolatility; these are created as internal weights during SharpeRatio.init so the metric can accumulate values across batches or time windows. During evaluation the system calls SharpeRatio.update_state with the model outputs and labels (y_true, y_pred, plus optional sample weights); update_state delegates to the shared Metric.update_state implementation so the incremental aggregation logic and any batching behavior are centralized rather than duplicated here. When the pipeline asks for the final statistic, SharpeRatio.result computes the excess return relative to the risk‑free rate divided by the portfolio volatility, yielding the standard Sharpe ratio. The class follows the same pattern used by CalmarRatio and CapitalAssetPricingModel—declare stateful weights in init, reuse the base update_state for accumulation, and expose a simple result formula—so it plugs cleanly into the higher layers (for example the outputs from StockDataGenerator and EnsembleModel) to provide streaming, comparable performance metrics across experiments.

# file path: models/metrics.py
    def __init__(self, name=’SharpeRatio’, **kwargs):
        super(SharpeRatio, self).__init__(name=name, **kwargs)
        self.add_weight(name=’portfolioReturn’, initializer=’zeros’)
        self.add_weight(name=’riskFreeRate’, initializer=’zeros’)
        self.add_weight(name=’portfolioVolatility’, initializer=’zeros’)

SharpeRatio.init prepares a stateful metric object for incremental evaluation by invoking the Metric base initializer and then registering three tracked internal state variables: portfolioReturn, riskFreeRate, and portfolioVolatility, each initialized to zero. Those weights are the pieces of accumulated information that update_state will fill in during a backtest or batch evaluation and that result will combine to produce the Sharpe measure (the excess return relative to volatility). The pattern mirrors how CapitalAssetPricingModel.init and CalmarRatio.init register their own metric-specific weights for later incremental updates, so SharpeRatio.init is effectively the setup phase that allocates and names the state the rest of the metric lifecycle relies on.

# file path: models/metrics.py
    def __init__(self, name=’CapitalAssetPricingModel’, **kwargs):
        super(CapitalAssetPricingModel, self).__init__(name=name, **kwargs)
        self.add_weight(name=’expectedReturnI’, initializer=’zeros’)
        self.add_weight(name=’expectedReturnM’, initializer=’zeros’)
        self.add_weight(name=’riskFreeRate’, initializer=’zeros’)
        self.add_weight(name=’beta’, initializer=’zeros’)
        self.add_weight(name=’riskPremium’, initializer=’zeros’)

CapitalAssetPricingModel.init registers the metric with the shared Metric machinery and then allocates the five internal accumulators the CAPM calculation needs: an expectedReturnI accumulator for the asset (instrument), an expectedReturnM accumulator for the market, a riskFreeRate accumulator, a beta accumulator for the asset’s sensitivity to the market, and a riskPremium accumulator for the market’s excess return; it does this by invoking the parent constructor to set up the Metric instance (passing the metric name and any kwargs) and then creating each named weight initialized to zero via add_weight so they start in a deterministic state. This mirrors the pattern you saw in SharpeRatio.init and CalmarRatio.init, where small zero-initialized state variables are registered for incremental updates; in runtime the update_state implementation will increment these accumulators as streaming returns and market data flow in from the dataset generator and modeling layer, and result will read them to compute the CAPM expected return (using the stored risk-free rate and beta times the market risk premium).

# file path: models/metrics.py
    def update_state(self, y_true, y_pred, sample_weight=None, *args, **kwargs):
        return super().update_state(*args, **kwargs)

SharpeRatio.update_state acts as a simple delegator: when the evaluation pipeline calls update_state with observed and predicted returns (and optionally sample weights), SharpeRatio.update_state does not compute or accumulate anything itself but instead forwards control up the inheritance chain to the Metric base implementation that centralizes incremental aggregation logic. Because SharpeRatio.init already registered the state variables portfolioReturn, riskFreeRate, and portfolioVolatility, delegating here ensures those shared weights are updated in a uniform way by the base Metric code rather than reimplemented per-metric; CapitalAssetPricingModel.update_state and CalmarRatio.update_state follow the same delegation pattern so all metric classes rely on a single, consistent update mechanism. In terms of data flow, the measured returns arrive at SharpeRatio.update_state, are handed to the superclass for incremental state mutation, and later the result method reads the aggregated weights to produce the final Sharpe ratio. Control flow is straightforward: there is no branching inside SharpeRatio.update_state itself—the decision-making and error handling, if any, live in the Metric.update_state implementation that it calls.

# file path: models/metrics.py
    def update_state(self, y_true, y_pred, sample_weight=None, *args, **kwargs):
        return super().update_state(*args, **kwargs)

Within the metrics layer that the modeling and evaluation pipeline uses to track portfolio statistics, CapitalAssetPricingModel.update_state is a thin delegator that hands off incremental aggregation work to the shared Metric implementation instead of performing any local computation itself. When the training or evaluation loop streams a batch of y_true, y_pred and optional sample weights into the metric, that call reaches CapitalAssetPricingModel.update_state but the method forwards the call to the parent Metric update path (consistent with how SharpeRatio.update_state and CalmarRatio.update_state behave), so the actual mechanics of accumulating running sums or weighted aggregates live in the base class. This lets the state variables that were registered in CapitalAssetPricingModel.init—expectedReturnI, expectedReturnM, riskFreeRate, beta, and riskPremium—be updated via the common aggregation logic, and keeps the class focused on expressing the CAPM-specific final computation in result rather than reimplementing incremental bookkeeping.

# file path: models/metrics.py
    def result(self):
        return (self.portfolioReturn - self.riskFreeRate) / self.portfolioVolatility

SharpeRatio.result reads the metric’s accumulated portfolioReturn, riskFreeRate, and portfolioVolatility state and returns the standard Sharpe ratio by taking the excess portfolio return above the risk-free rate and dividing that excess by the portfolio volatility, producing a single risk‑adjusted performance scalar. Because SharpeRatio is implemented as a stateful Metric, those three quantities are created as persistent weights in init and are incrementally updated via update_state from model predictions or simulated strategy returns produced earlier in the pipeline, so result simply combines the aggregated state rather than recomputing raw time‑series statistics. This follows the same pattern you see in other metric result methods such as CalmarRatio.result — both perform a straightforward arithmetic aggregation of stored summary statistics (Calmar uses average annual return over maximum drawdown) to produce a comparative score used by the evaluation layer for model and strategy selection.

# file path: models/metrics.py
class CapitalAssetPricingModel(Metric):
    def __init__(self, name=’CapitalAssetPricingModel’, **kwargs):
        super(CapitalAssetPricingModel, self).__init__(name=name, **kwargs)
        self.add_weight(name=’expectedReturnI’, initializer=’zeros’)
        self.add_weight(name=’expectedReturnM’, initializer=’zeros’)
        self.add_weight(name=’riskFreeRate’, initializer=’zeros’)
        self.add_weight(name=’beta’, initializer=’zeros’)
        self.add_weight(name=’riskPremium’, initializer=’zeros’)
    def update_state(self, y_true, y_pred, sample_weight=None, *args, **kwargs):
        return super().update_state(*args, **kwargs)
    def result(self):
        return self.riskFreeRate + self.beta * (self.expectedReturnM - self.riskFreeRate)

CapitalAssetPricingModel is the stateful metric implementation that evaluates a strategy or model through the CAPM expected-return formula as part of the platform’s portfolio-performance layer. It follows the same Metric-based pattern used by SharpeRatio and CalmarRatio: at construction it registers persistent numeric weights to hold running aggregates (expectedReturnI for the asset, expectedReturnM for the market, riskFreeRate, beta, and an auxiliary riskPremium), its update_state method delegates to the shared incremental-aggregation machinery provided by the Metric hierarchy (the same delegation pattern you saw in SharpeRatio.update_state), and its result method synthesizes those stored aggregates into the CAPM expected return by taking the risk-free rate plus beta times the market excess return. In the overall data flow, the research/training loop calls update_state repeatedly with observed and predicted returns so those add-weight fields are accumulated across a backtest or validation stream, and once evaluation is complete result produces the single-number CAPM estimate that downstream components (model selection, reporting in the EnsembleModel workflows, or the dashboard) use to compare strategies. The class mirrors the structure of SharpeRatio and CalmarRatio: declare stateful pieces in init, rely on the shared update_state behavior, and implement a concise result that encodes the chosen financial formula.

# file path: models/metrics.py
    def result(self):
        return self.riskFreeRate + self.beta * (self.expectedReturnM - self.riskFreeRate)

CapitalAssetPricingModel.result computes the expected return of the asset according to the Capital Asset Pricing Model by taking the stored riskFreeRate and adding to it the stored beta multiplied by the market risk premium, i.e., the difference between expectedReturnM and the riskFreeRate. In the context of the stateful portfolio metrics layer, update_state accumulates the named internal weights (expectedReturnI, expectedReturnM, riskFreeRate, beta, riskPremium) from observed returns, and result simply reads those accumulated values to produce the CAPM-implied expected return. This follows the same pattern as SharpeRatio.result in that a final statistic is derived from previously accumulated state, but it differs conceptually: SharpeRatio.result expresses excess return normalized by volatility, whereas CapitalAssetPricingModel.result uses beta and the market premium to estimate the required or expected return under CAPM for evaluating model or strategy performance.

# file path: models/metrics.py
class CalmarRatio(Metric):
    def __init__(self, name=’CalmarRatio’, **kwargs):
        super(CalmarRatio, self).__init__(name=name, **kwargs)
        self.add_weight(name=’averageAnnualReturn’, initializer=’zeros’)
        self.add_weight(name=’maximumDrawdown’, initializer=’zeros’)
    def update_state(self, y_true, y_pred, sample_weight=None, *args, **kwargs):
        return super().update_state(*args, **kwargs)
    def result(self):
        return self.averageAnnualReturn / self.maximumDrawdown

CalmarRatio is a stateful metric class in the portfolio-performance layer that follows the same subclassing pattern used by SharpeRatio and CapitalAssetPricingModel: its constructor invokes the Metric base initializer and then registers two accumulator weights named averageAnnualReturn and maximumDrawdown so the evaluation pipeline can incrementally aggregate those values over time, its update_state method does not perform local computation but forwards control to the shared Metric update_state implementation (the same delegating behavior you saw in SharpeRatio.update_state and CapitalAssetPricingModel.update_state) so that incoming per-step return data are batched into the registered weights, and its result method reads the accumulated averageAnnualReturn and maximumDrawdown and returns their quotient to produce the Calmar ratio — a single scalar expressing annualized return per unit of worst historical drawdown, which the rest of the evaluation and model-selection layers use to compare strategies with sensitivity to downside risk.

# file path: models/metrics.py
    def __init__(self, name=’CalmarRatio’, **kwargs):
        super(CalmarRatio, self).__init__(name=name, **kwargs)
        self.add_weight(name=’averageAnnualReturn’, initializer=’zeros’)
        self.add_weight(name=’maximumDrawdown’, initializer=’zeros’)

CalmarRatio.init initializes the CalmarRatio metric’s internal state by first delegating construction to the metric superclass so the common metric machinery is in place, and then registering the two named state variables it needs: averageAnnualReturn and maximumDrawdown, both seeded to zero. In the platform’s metrics layer these registered weights are the slots where the shared Metric aggregation logic will accumulate incremental values when update_state is called (recall SharpeRatio.update_state and CapitalAssetPricingModel.update_state simply forward to that shared implementation), and result later reads those two stored quantities to produce the Calmar ratio by dividing average annual return by the maximum drawdown. This follows the same pattern used by SharpeRatio.init and CapitalAssetPricingModel.init, where each metric declares the minimal set of persistent numeric weights required for its final statistic.

# file path: models/metrics.py
    def update_state(self, y_true, y_pred, sample_weight=None, *args, **kwargs):
        return super().update_state(*args, **kwargs)

CalmarRatio.update_state receives the observed and predicted return vectors (and optional sample weights) from the evaluation pipeline but does not perform any local computation; instead it forwards control up the inheritance chain to the Metric base implementation so the shared, centralized incremental-aggregation machinery handles updating the metric’s internal accumulators. In practice that means the incoming y_true/y_pred flow into Metric.update_state, which is responsible for updating the CalmarRatio instance’s stored weights such as averageAnnualReturn and maximumDrawdown; later CalmarRatio.result will read those stored values to produce the final ratio. This is the same delegation pattern used by SharpeRatio.update_state and CapitalAssetPricingModel.update_state — CalmarRatio.update_state follows that inheritance/delegation approach so per-metric state declaration and final computation remain separate from the common accumulation logic. There is no branching or local aggregation here: the method is a thin delegator that hands the data off to the shared Metric logic.

# file path: models/metrics.py
    def result(self):
        return self.averageAnnualReturn / self.maximumDrawdown

CalmarRatio.result produces the single risk‑adjusted performance scalar the evaluation pipeline expects by reading the metric’s stored averageAnnualReturn and maximumDrawdown weights and returning their quotient. Because CalmarRatio.update_state delegates incremental aggregation to the shared Metric implementation (as you already know SharpeRatio.update_state and CapitalAssetPricingModel.update_state do), the values consumed here are the accumulated, ready-to-use statistics that the rest of the metrics machinery maintained. Conceptually this mirrors the pattern used by SharpeRatio.result, which also reads pre-aggregated state and returns a ratio (excess return over volatility), and differs from CapitalAssetPricingModel.result, which synthesizes an expected return from stored riskFreeRate and beta plus market premium; CalmarRatio.result specifically expresses return relative to drawdown by dividing average annual return by maximum drawdown. There is no additional transformation or branching in the computation: it is a straightforward division of those two stored quantities so the platform can surface a Calmar measure for model and strategy comparisons within marketbot-main_cleaned’s performance layer.

# file path: models/tf/models.py
from math import sqrt, floor
import tensorflow as tf
from tensorflow.keras.layers import LSTM, Bidirectional, Dropout, Dense, Softmax
from tensorflow.keras.losses import Huber
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import Sequential
from matplotlib import pyplot as plt
import numpy as np
from queue import Queue
import sys
import shap

The imports bring together the numerical, neural‑network, plotting and explainability primitives that LongShortTermMemoryR needs to act as the project’s LSTM modeling building block: sqrt and floor from math are small numeric helpers used for sizing layers or plotting/layout calculations; TensorFlow (tf) and the Keras primitives LSTM, Bidirectional, Dropout, Dense and Softmax are the core building blocks used to assemble the recurrent Sequential network architecture; Huber and Adam provide a robust regression loss option and the optimizer implementation the class can choose when compiling; matplotlib.pyplot and numpy are used to convert model outputs into arrays and to render training histories and prediction visualizations that the model exposes; Queue and sys are present to support lightweight runtime buffering or control during training/callback flows; and shap is imported to compute SHAP values for post‑hoc feature attribution so the model can produce explainability outputs alongside numeric metrics. Compared with other model modules that also import TensorFlow, this file specifically pulls in Keras layer types and the Huber loss and pairs them with SHAP for explainability, whereas other files in the project may instead lean on sklearn/xgboost ensembles or higher‑level wrappers and different visualization/portfolio tools like pyfolio. The imports therefore reflect this file’s role: construct, train, evaluate and explain an LSTM time‑series model that feeds into the same evaluation pipeline that produces SharpeRatio and CAPM metrics.

# file path: models/tf/models.py
class LongShortTermMemoryR(tf.keras.Model):
    def __init__(self, loss=’mse’, opt=’Adam’, target=’close’, method=’regression’, enable_checkpoints=False):
        super().__init__()
        self.target = target
        self.model = None
        self.loss = loss
        self.optimizer = opt
        self.enable_checkpoints = enable_checkpoints
        self.method = method
    @property
    def metrics(self):
        metrics = [  
            tf.keras.metrics.MeanAbsoluteError(name=’mae’),
            tf.keras.metrics.RootMeanSquaredError(name=’rmse’)
        ]
        return metrics
    @property
    def callbacks(self):
        callbacks = [
        ]
        return callbacks
    def info(self, summary=True): 
        self.model.summary()
        return 
    def compile_model(self, X_train, prediction_range=200, verbose=0, name=None) -> tf.keras.Model:
        model = Sequential(name=name)
        for 
        model.add(LSTM(units=prediction_range, return_sequences=True, input_shape=(X_train.shape[1], 1)))
        model.add(Dropout(0.2))
        model.add(LSTM(units=prediction_range // 2, return_sequences=True))
        model.add(Dropout(0.2))
        model.add(Bidirectional(LSTM(units=prediction_range // 2, return_sequences=True)))
        model.add(Dropout(0.5))
        model.add(Bidirectional(tf.keras.layers.LSTM(units=prediction_range // 2)))
        model.add(Dropout(0.5))
        model.add(Dense(units=1))
        self.model = model
        self.model.compile(optimizer=self.optimizer, loss=self.loss, metrics=self.metrics)
        if verbose > 0: model.summary()
        return self.model
    def train_model(self, X_train, y_train, X_val=None, y_val=None, plot_metrics=False, epochs=25, verbose=0):
        self.callback_verbose = verbose 
        if self.model is None:
            raise TypeError(’Model has not been created yet! (received None type as input to trainer)’)
        if X_val is None or y_val is None:
            self.train_history = self.model.fit(
                X_train, y_train, epochs=epochs,
                verbose=verbose, callbacks=self.callbacks, 
                use_multiprocessing=True)
        else:
            self.train_history = self.model.fit(
                X_train, y_train, epochs=epochs,
                validation_data=(X_val, y_val),
                verbose=verbose, callbacks=self.callbacks, 
                use_multiprocessing=True)
        if plot_metrics == True:
            self.plot_train_results()
            self.show_train_pred(X_train, y_train)
            self.get_shap(X_train, X_val)
    def test_model(self, X_test, y_test, verbose=0):
        self.test_metrics = self.model.evaluate(X_test, y_test, verbose=verbose, callbacks=self.callbacks)
    def show_train_pred(self, X_train, y_train):
        y_pred = self.model.predict(X_train)
        fig = plt.figure(2)
        plt.plot(range(len(y_train)), y_train, label=’y_truth’)
        plt.plot(range(len(y_pred)), y_pred, label=’y_pred’)
        plt.legend()
        plt.show()
    def plot_train_results(self):
        epochs = [ i for i in range(len(self.train_history.history[’loss’])) ]
        metrics = [ metric.name for metric in self.metrics ]
        n = len(self.metrics) + 1   
        m = floor(sqrt(n))
        k = abs(m ** 2 - n)
        metrics_queue = Queue(n) 
        metrics_queue.put(’loss’)
        for metric in metrics: metrics_queue.put(metric)
        if n == 1:
            fig = plt.plot(epochs, self.train_history.history[’loss’])  
            plt.savefig(’models/logs/train_graphs.png’)
            print(fig)
            return fig
        elif k == 0:
            fig, ax = plt.subplots(m, m)
            for r in range(m):
                for c in range(m):
                    if metrics_queue.empty():
                        break
                    else:
                        metric = metrics_queue.get(block=True)
                        ax[r][c].set_title(f’Training {metric}’)
                        ax[r][c].plot(epochs, self.train_history.history[metric])
        if 0 < k < m: 
            fig, ax = plt.subplots(m + 1, m)
            fig.tight_layout()
            for r in range(m + 1):
                if r == m:
                    for c in range(k):
                        if metrics_queue.empty():
                            break
                        else:
                            metric = metrics_queue.get(block=True)
                            ax[r][c].set_title(f’Training {metric}’)
                            ax[r][c].plot(epochs, self.train_history.history[metric])
                else:
                    for c in range(m):
                        if metrics_queue.empty():
                            break
                        else:
                            metric = metrics_queue.get(block=True)
                            ax[r][c].set_title(f’Training {metric}’)
                            ax[r][c].plot(epochs, self.train_history.history[metric])
        elif m <= k < 2 * m + 1:
            fig, ax = plt.subplots(m + 1, m + 1)
            fig.tight_layout()
            for r in range(m + 1):
                if r == m + 1:
                    for c in range(k):
                        if metrics_queue.empty():
                            break
                        else:
                            metric = metrics_queue.get(block=True)
                            ax[r][c].set_title(f’Training {metric}’)
                            ax[r][c].plot(epochs, self.train_history.history[metric])
                else:
                    for c in range(m + 1):
                        if metrics_queue.empty():
                            break
                        else:
                            metric = metrics_queue.get(block=True)
                            ax[r][c].set_title(f’Training {metric}’)
                            ax[r][c].plot(epochs, self.train_history.history[metric])
        fig.savefig(f’models/logs/train_graphs_{self.model.name}.png’)
        return fig
    def get_shap(self, X_train, X_test):
        explainer = shap.DeepExplainer(self.model, X_train[:])
        shap_values = explainer.shap_values(X_test[:])
        shap.summary_plot(shap_values, X_train)

LongShortTermMemoryR is the project’s TensorFlow LSTM modeling building block that encapsulates configuration, model construction, training, evaluation and basic explainability so the pipeline layer can instantiate a time‑series learner, fit it to windowed datasets produced by the dataset generator, and then inspect behavior during experiments and tests. The constructor stores model configuration such as the prediction target, loss, optimizer, method, and a flag for checkpoints onto instance attributes and leaves the actual Keras model reference as None until compilation. The metrics property exposes the regression evaluation metrics used at compile time (mean absolute error and root mean squared error), while the callbacks property provides a hook for training callbacks; these two are referenced when the model is compiled and when fit and evaluate are invoked. compile_model builds a Sequential LSTM stack tailored to the provided training input shape: it uses stacked LSTM layers, Dropout layers and Bidirectional wrappers, finishes with a single Dense output, assigns that network to the instance model attribute, and compiles it with the configured optimizer, loss and the metrics property so training and evaluation will populate those metric values. train_model enforces a guard that the model must be compiled first, then fits the instance model on X_train and y_train with optional validation data; it stores the returned history on the instance so downstream plotting and inspection can read epoch histories, and it supports an optional plotting pipeline that after training calls plot_train_results, show_train_pred and get_shap. test_model runs evaluation on test sets and writes the evaluation results to an instance attribute. show_train_pred runs the model forward on the training inputs, produces a simple truth versus prediction plot and shows it, while plot_train_results reads the stored training history, assembles a grid sized from the number of tracked metrics (loss plus the metrics property), iteratively places each metric into subplot axes, saves the composed figure to the models logs folder and returns the figure object; its control flow computes grid dimensions and branches to render square or rectangular subplot arrays depending on how many metrics are present. get_shap creates a DeepExplainer around the compiled model using the training examples and then computes and displays SHAP summary plots for the supplied test examples so users can get per‑feature attribution for LSTM inputs. In terms of data flow, X_train is used both to infer input_shape during compile_model and as the background sample for the DeepExplainer; training produces train_history which drives plotting, and evaluate produces test_metrics stored on the instance. Conceptually LongShortTermMemoryR acts as a lifecycle wrapper around a Keras model—constructing, compiling, training, evaluating and exposing visual and attribution outputs—so the higher level pipeline and tests can treat it as an interchangeable model component alongside other estimators and the ClfSwitcher adapter used elsewhere in the codebase.

# file path: pipelining/steps.py
from sklearn.base import BaseEstimator
from xgboost import XGBRegressor

The imports bring in the sklearn BaseEstimator abstraction and the XGBRegressor implementation from the xgboost library. BaseEstimator is pulled in so ClfSwitcher can present itself as an sklearn-style estimator with the standard parameter-get/set contract that the pipeline and hyperparameter searchers rely on; that allows ClfSwitcher to be embedded in sklearn pipelines and treated interchangeably with other estimators during grid or randomized searches. XGBRegressor supplies a concrete, high-performance gradient‑boosted tree regressor as the default delegate inside ClfSwitcher, giving a sensible out‑of‑the‑box estimator for the modeling layer that the training/evaluation pipeline will call into. Compared with other import patterns elsewhere in the codebase—where ensemble regressors like BaggingRegressor and VotingRegressor are imported or entire modules such as xgboost aliased and TensorFlow/Keras are pulled in—these two imports specifically provide the sklearn estimator base contract and a single, ready-to-use XGBoost regressor class so ClfSwitcher can act as a simple delegating adapter between the pipeline and whatever concrete estimator is supplied.

# file path: pipelining/steps.py
    def predict(self, X, y=None):
        return self.estimator.predict(X)

ClfSwitcher.predict acts as a simple delegator inside the pipeline layer: when the training or evaluation pipeline hands it feature data X (the same X produced by the dataset generator and passed through pipeline steps), ClfSwitcher.predict forwards that data to whatever estimator instance was attached during ClfSwitcher.init and returns the estimator’s prediction output unchanged. This follows the same delegation pattern you saw in SharpeRatio.update_state and CapitalAssetPricingModel.update_state — instead of implementing prediction logic itself, ClfSwitcher provides a uniform adapter interface so downstream pipeline code can call predict uniformly regardless of whether the underlying estimator is a scikit-learn classifier, an XGBoost model, or a custom model that implements predict (for example a LongShortTermMemoryR instance if wired in). There is no branching or additional transformation here: the optional y parameter is ignored, no local aggregation occurs, and the returned value flows back into the pipeline for scoring, metric aggregation, or further postprocessing. This thin wrapper is what enables interchangeable classifiers and parameter-search tooling to swap estimators without changing downstream pipeline logic.

# file path: models/tf/models.py
    def __init__(self, loss=’mse’, opt=’Adam’, target=’close’, method=’regression’, enable_checkpoints=False):
        super().__init__()
        self.target = target
        self.model = None
        self.loss = loss
        self.optimizer = opt
        self.enable_checkpoints = enable_checkpoints
        self.method = method

LongShortTermMemoryR.init sets up the LSTM wrapper’s runtime configuration and initial state so the model can be created, compiled, trained and inspected by the rest of the pipeline. It initializes the instance as a Keras-compatible model by delegating to the superclass initializer, records the target series name so downstream pipeline and data generators know which column the model will predict, and stores the loss function and optimizer identifiers so LongShortTermMemoryR.compile_model will use those values when it builds and compiles the tf.keras.Sequential network. It leaves self.model as None to signal that the concrete TensorFlow layers must be constructed later by compile_model (train_model will guard against a missing model and raise if compile_model has not been run), saves the enable_checkpoints flag for the callbacks property to consult when assembling training callbacks, and records the method string (by default ‘regression’) so metrics and evaluation behavior can be chosen consistently. These attributes together form the small configuration object that flows into info, compile_model, train_model, test_model, show_train_pred, plot_train_results, and get_shap during the model lifecycle.

# file path: models/tf/models.py
    def show_train_pred(self, X_train, y_train):
        y_pred = self.model.predict(X_train)
        fig = plt.figure(2)
        plt.plot(range(len(y_train)), y_train, label=’y_truth’)
        plt.plot(range(len(y_pred)), y_pred, label=’y_pred’)
        plt.legend()
        plt.show()

LongShortTermMemoryR.show_train_pred produces a quick visual check of how the current LongShortTermMemoryR.model reproduces the training targets: it takes the training features and targets, asks the model to produce predictions for the training features (via the model’s predict path, which in the project can be routed through ClfSwitcher.predict to the underlying estimator), and then draws two overlaid line traces — one for the true y_train values and one for the predicted values — across their index ranges so you can visually compare fit versus ground truth. In the pipeline this is invoked after training when plot_metrics is enabled, serving as a mid-level diagnostic in the modeling building block to help developers confirm whether the LSTM is capturing the training signal; the method creates a plotting figure, plots the ground-truth series and the model output series, adds a legend, and displays the resulting chart.

# file path: models/tf/models.py
    def plot_train_results(self):
        epochs = [ i for i in range(len(self.train_history.history[’loss’])) ]
        metrics = [ metric.name for metric in self.metrics ]
        n = len(self.metrics) + 1   
        m = floor(sqrt(n))
        k = abs(m ** 2 - n)
        metrics_queue = Queue(n) 
        metrics_queue.put(’loss’)
        for metric in metrics: metrics_queue.put(metric)
        if n == 1:
            fig = plt.plot(epochs, self.train_history.history[’loss’])  
            plt.savefig(’models/logs/train_graphs.png’)
            print(fig)
            return fig
        elif k == 0:
            fig, ax = plt.subplots(m, m)
            for r in range(m):
                for c in range(m):
                    if metrics_queue.empty():
                        break
                    else:
                        metric = metrics_queue.get(block=True)
                        ax[r][c].set_title(f’Training {metric}’)
                        ax[r][c].plot(epochs, self.train_history.history[metric])
        if 0 < k < m: 
            fig, ax = plt.subplots(m + 1, m)
            fig.tight_layout()
            for r in range(m + 1):
                if r == m:
                    for c in range(k):
                        if metrics_queue.empty():
                            break
                        else:
                            metric = metrics_queue.get(block=True)
                            ax[r][c].set_title(f’Training {metric}’)
                            ax[r][c].plot(epochs, self.train_history.history[metric])
                else:
                    for c in range(m):
                        if metrics_queue.empty():
                            break
                        else:
                            metric = metrics_queue.get(block=True)
                            ax[r][c].set_title(f’Training {metric}’)
                            ax[r][c].plot(epochs, self.train_history.history[metric])
        elif m <= k < 2 * m + 1:
            fig, ax = plt.subplots(m + 1, m + 1)
            fig.tight_layout()
            for r in range(m + 1):
                if r == m + 1:
                    for c in range(k):
                        if metrics_queue.empty():
                            break
                        else:
                            metric = metrics_queue.get(block=True)
                            ax[r][c].set_title(f’Training {metric}’)
                            ax[r][c].plot(epochs, self.train_history.history[metric])
                else:
                    for c in range(m + 1):
                        if metrics_queue.empty():
                            break
                        else:
                            metric = metrics_queue.get(block=True)
                            ax[r][c].set_title(f’Training {metric}’)
                            ax[r][c].plot(epochs, self.train_history.history[metric])
        fig.savefig(f’models/logs/train_graphs_{self.model.name}.png’)
        return fig

plot_train_results builds a visual summary of the model.fit history that train_model stores on self.train_history so experimenters can inspect loss and metric trajectories after training. It first derives an epoch index sequence from the length of the recorded loss history and gathers human-readable metric names from the LongShortTermMemoryR.metrics property we covered earlier. To decide how to lay out subplots it computes n as the total number of traces to plot (loss plus the other metrics), takes the integer square root m of n to form a near‑square grid, and computes k as the difference between that square and the actual count to know how many extra cells are required. It then enqueues the trace names into a FIFO queue with loss intentionally queued first so loss is always plotted in the first subplot, followed by the other metrics. There is an early-return guard for the simplest case where only loss exists: a single-line plot is produced, written to a fixed models/logs file and returned. For the multi-plot cases the function chooses one of three layout strategies based on the relationship between k and m: when n is a perfect square it creates an m by m grid and fills it row by row; when there are a small number of extra traces it creates an (m+1) by m grid and fills the last row only up to k columns; and when k is larger it creates an (m+1) by (m+1) grid and fills cells similarly. In each populated cell it sets a title indicating which training metric is being shown and plots the epoch sequence against the corresponding series pulled from self.train_history.history until the queue is exhausted. The resulting figure is saved to disk using the model instance name so multiple models produce distinct files, and the pyplot figure object is returned to the caller. In practice plot_train_results is invoked by train_model when plot_metrics is enabled and complements the visual checks produced by show_train_pred and the SHAP summary produced by

# file path: models/tf/models.py
    def get_shap(self, X_train, X_test):
        explainer = shap.DeepExplainer(self.model, X_train[:])
        shap_values = explainer.shap_values(X_test[:])
        shap.summary_plot(shap_values, X_train)

get_shap wires the trained LongShortTermMemoryR.model into the SHAP explainability flow so the pipeline can understand which parts of the input windows drive the LSTM’s outputs. When called it constructs a SHAP DeepExplainer with the LSTM model and uses the provided training windows as the background dataset for reference; it then asks that explainer to compute SHAP values for the supplied test windows and finally renders a SHAP summary plot that aggregates per‑feature (and per‑time‑step, given the windowed input shape) contributions across the test samples. In the context of the project’s pipeline, X_train and X_test are the windowed, normalized arrays produced by the dataset generator and passed through compile_model/train_model beforehand, so get_shap consumes the same shaped arrays the model was trained on, produces a matrix of explanation values (shap_values) for X_test, and emits the summary visualization as the observable output for model inspection.

# file path: models/tf/models.py
    @property
    def metrics(self):
        metrics = [  
            tf.keras.metrics.MeanAbsoluteError(name=’mae’),
            tf.keras.metrics.RootMeanSquaredError(name=’rmse’)
        ]
        return metrics

The LongShortTermMemoryR.metrics property supplies the Keras metrics the LSTM wrapper asks TensorFlow to compute during training and evaluation: it constructs and returns a list containing a MeanAbsoluteError metric and a RootMeanSquaredError metric, each given short names so their values appear in training histories and evaluation outputs. Because LongShortTermMemoryR is the project’s time‑series regression building block that the pipeline instantiates and compiles, compile_model reads this metrics property when it configures the model, so these two regression‑oriented diagnostics are computed alongside the configured loss and recorded in the fit history and any later evaluate calls (for example, test_model uses the model’s evaluate path which reports the same metrics). The choice of MAE and RMSE complements the wrapper’s loss setting by providing both absolute and squared‑error perspectives on prediction quality for targets like the asset close price, making them immediately available for callbacks, plots, and test assertions elsewhere in the pipeline.

# file path: models/tf/models.py
    @property
    def callbacks(self):
        callbacks = [
        ]
        return callbacks

LongShortTermMemoryR.callbacks is a read-only property that serves as the hook the rest of the training/evaluation code uses to obtain Keras-style callbacks when the LSTM is fit or evaluated; because test_model retrieves callbacks via callbacks and passes them into the model’s evaluate call, the value returned here directly determines whether any callback logic runs during those lifecycle calls. In the current implementation the property returns an empty list, so fit/evaluate run without any callback side effects. Exposing callbacks as a property (rather than a fixed field) lets the class present a single, consistent interface point for the pipeline layer to query at runtime and for other LongShortTermMemoryR features such as the enable_checkpoints flag from init to be respected by any future callback construction.

# file path: models/tf/models.py
    def info(self, summary=True): 
        self.model.summary()
        return

LongShortTermMemoryR.info invokes the wrapped tf.keras model’s built‑in summary output so the pipeline and developer can see the network architecture and parameter counts printed to stdout. It reads the model instance that was assigned earlier by LongShortTermMemoryR.compile_model and simply calls the model summary routine; there is no branching or return value beyond returning None. In the workflow of the project this serves as the quick, programmatic inspection point that complements LongShortTermMemoryR.show_train_pred and LongShortTermMemoryR.get_shap by exposing the layer topology and sizes immediately after model construction (typically during model setup in LongShortTermMemoryR.init or when the Pipeline instantiates the learner). The summary parameter is present on the method signature but the implementation ignores it and always produces the model summary.

# file path: models/tf/models.py
    def compile_model(self, X_train, prediction_range=200, verbose=0, name=None) -> tf.keras.Model:
        model = Sequential(name=name)
        for 
        model.add(LSTM(units=prediction_range, return_sequences=True, input_shape=(X_train.shape[1], 1)))
        model.add(Dropout(0.2))
        model.add(LSTM(units=prediction_range // 2, return_sequences=True))
        model.add(Dropout(0.2))
        model.add(Bidirectional(LSTM(units=prediction_range // 2, return_sequences=True)))
        model.add(Dropout(0.5))
        model.add(Bidirectional(tf.keras.layers.LSTM(units=prediction_range // 2)))
        model.add(Dropout(0.5))
        model.add(Dense(units=1))
        self.model = model
        self.model.compile(optimizer=self.optimizer, loss=self.loss, metrics=self.metrics)
        if verbose > 0: model.summary()
        return self.model

LongShortTermMemoryR.compile_model constructs and compiles the temporal neural network that will be used by the pipeline and tests: it takes the windowed feature tensor produced by StockDataGenerator (the X_train passed in from TestPipeline.testModelCompileTrain), uses its time-step dimension to set the model’s input shape, and then builds a stacked Sequential architecture with progressively smaller LSTM layers, interleaved dropout for regularization, and two bidirectional LSTM layers before a single-unit Dense output so the network yields a scalar regression prediction. The prediction_range parameter determines the size of the first LSTM and is halved for deeper layers, and every intermediate LSTM keeps sequences until the final recurrent layer so subsequent layers can operate on full sequences; dropout rates increase in later stages to aggressively regularize the larger bidirectional components. After assembling the layers the method assigns the compiled Keras model to self.model and calls Keras compile using the optimizer and loss values provided by LongShortTermMemoryR.init and the metrics returned by the LongShortTermMemoryR.metrics property, making the model ready for LongShortTermMemoryR.train_model, LongShortTermMemoryR.test_model, LongShortTermMemoryR.show_train_pred, LongShortTermMemoryR.get_shap and LongShortTermMemoryR.plot_train_results to perform fitting, evaluation and inspection. There is a simple control path that prints the Keras model summary when verbose is greater than zero; otherwise the method simply returns the compiled model object.

# file path: models/tf/models.py
    def train_model(self, X_train, y_train, X_val=None, y_val=None, plot_metrics=False, epochs=25, verbose=0):
        self.callback_verbose = verbose 
        if self.model is None:
            raise TypeError(’Model has not been created yet! (received None type as input to trainer)’)
        if X_val is None or y_val is None:
            self.train_history = self.model.fit(
                X_train, y_train, epochs=epochs,
                verbose=verbose, callbacks=self.callbacks, 
                use_multiprocessing=True)
        else:
            self.train_history = self.model.fit(
                X_train, y_train, epochs=epochs,
                validation_data=(X_val, y_val),
                verbose=verbose, callbacks=self.callbacks, 
                use_multiprocessing=True)
        if plot_metrics == True:
            self.plot_train_results()
            self.show_train_pred(X_train, y_train)
            self.get_shap(X_train, X_val)

Within the modeling layer, LongShortTermMemoryR.train_model is the method that takes the windowed training arrays and drives the actual TensorFlow fit loop for the LSTM instance that LongShortTermMemoryR.compile_model created and compiled. It first stores the incoming verbosity level onto callback_verbose so any callback implementations can read runtime verbosity, then guards against being invoked before a model exists by raising a TypeError if self.model is None. The core behavior is a two‑path fit: when no validation set is provided it calls the model’s fit routine with only X_train and y_train, and when X_val and y_val are supplied it calls fit with validation_data so the training run records validation metrics as well. In both cases it forwards the caller’s epochs and verbose settings, attaches the wrapper’s callbacks property (which is where checkpointing or other runtime hooks are exposed from LongShortTermMemoryR), enables multiprocessing for data feeding, and captures the returned History object onto self.train_history so later inspection and plotting can read the recorded metric traces. After the fit completes, if plot_metrics is enabled the method wires the recorded history into the visualization and explainability tools: it invokes plot_train_results to render/save loss and metric curves (plot_train_results consumes self.train_history), calls show_train_pred so you can visually compare training targets to model predictions (the prediction step may be routed through the pipeline delegator when present), and calls get_shap to produce a DeepExplainer SHAP summary for model interpretability. The control flow therefore cleanly separates setup/guarding, the two fitting modes (with or without validation), and a post‑training branch that triggers plotting and SHAP explainability.

# file path: models/tf/models.py
    def test_model(self, X_test, y_test, verbose=0):
        self.test_metrics = self.model.evaluate(X_test, y_test, verbose=verbose, callbacks=self.callbacks)

LongShortTermMemoryR.test_model runs the Keras evaluation step on the LSTM instance using the supplied test features and targets, forwarding the verbosity flag and the class’s callbacks so any logging or checkpoint behavior is honored during evaluation. The numeric outputs returned by the evaluation call — the configured loss followed by the metrics defined by LongShortTermMemoryR.metrics — are stored on the instance as test_metrics so downstream code (pipeline steps, experiment logging, or unit tests) can inspect final test performance. There is no branching here: it simply exercises the model’s evaluate path and captures its results.

# file path: pipelining/paramSearch.py
from models.tf.models import LongShortTermMemoryR
from joblib import Memory
from sklearn.metrics import make_scorer, mean_squared_error
from sklearn.model_selection import TimeSeriesSplit
import joblib
import os
import absl.logging
from time import sleep, time
import numpy as np
from sklearn.manifold import Isomap
import xgboost
import tensorflow as tf
import scipy
from sklearn.feature_selection import RFECV
import matplotlib.pyplot as plt
import tensorflow as tf
import optuna

The imports assemble the building blocks the hyperparameter search pipeline needs to construct, evaluate and manage candidate model configurations during time‑series experiments. LongShortTermMemoryR is pulled in so the pipeline can ask the LSTM wrapper for model metadata and behavior and instantiate or inspect the LSTM learner as trials require. Joblib’s Memory is used to cache expensive preprocessing or feature transforms between trials so repeated evaluations reuse results; the broader joblib import supports persistence and parallel job control when fitting many candidates. Sklearn utilities are present to shape scoring and validation: make_scorer and mean_squared_error let the pipeline define the numerical objective used to compare trials, TimeSeriesSplit supplies a proper temporal cross‑validation splitter for rolling evaluation, and RFECV provides an automated recursive feature‑selection routine the pipeline can invoke during configuration searches. Numpy and SciPy provide core numerical and scientific routines relied on across preprocessing, metric computation and any statistical transforms used in trial evaluation. Isomap from sklearn.manifold and matplotlib.pyplot enable optional dimensionality reduction and plotting of diagnostics or feature embeddings during experiments. XGBoost and its XGBRegressor are available as alternate estimators the pipeline can switch to inside trials alongside the TensorFlow LSTM; TensorFlow is imported to support constructing, training and seeding the LongShortTermMemoryR model and to interact with any TensorFlow runtime settings. Absl.logging is used to surface consistent logging and verbosity control for the pipeline’s runtime messages, while os, time and sleep provide filesystem operations and basic timing/rate control around trial execution. Finally, Optuna is imported because the Pipeline exposes an objective function to drive automated hyperparameter search and Optuna orchestrates trial suggestion, pruning and study management. Compared with the other Pipeline and import usages elsewhere in the codebase, these imports expand the toolkit to include feature‑selection, manifold embedding and a full Optuna‑driven search loop in addition to the core LSTM and XGBoost modeling pieces already used in related modules.

# file path: pipelining/paramSearch.py
class Pipeline():
    def __init__(self, models, data, target, features, **kwargs):
        self.models = models
        self.data = data
        self.target = target
        self.features = features
        self.kwargs = kwargs
        self.verbose = kwargs.get(’verbose’, 0)
        self.memory = Memory(location=’cache’, verbose=0)
        self.logger = absl.logging.get_absl_logger()
        self.logger.set_verbosity(self.verbose)
        self.logger.info(’Initialized Pipeline’)
        self.logger.info(’Models: {}’.format(self.models))
    def objective(trial):
        model = trial.suggest_categorical(’model’, [’lstm’, ‘xgbr’, ‘xgbrf’])
        features = trial.suggest_categorical(’features’, features)

The Pipeline class is the hyperparameter-search orchestration layer that sits above the dataset generator and modeling building blocks, responsible for taking the set of candidate models, the prepared time‑series data and the available feature columns and exposing an objective function that an optimizer can call to evaluate candidate configurations. On construction, Pipeline stores the models list, the dataset, the target name and the feature list along with any runtime kwargs, initializes a joblib Memory cache to reuse intermediate artifacts, and configures an absl logger whose verbosity is driven by the provided kwargs so experiment runs produce controlled logging; the constructor therefore establishes the runtime context and local caching used throughout trials and may trigger external lookups when resolving model artifacts or configurations. The objective method is the trial-level entry point used by Optuna-style search: it asks the trial to pick which model family to try (the LSTM versus the XGBoost variants) and to choose which feature set to use from the feature candidates provided to Pipeline, and those selections determine the downstream control flow that instantiates and evaluates the chosen learner. For the LSTM path, Pipeline uses LongShortTermMemoryR (remember LongShortTermMemoryR encapsulates model construction, training and inspection) and calls its info method to surface model metadata and behavior (LongShortTermMemoryR.info triggers the model summary) so the trial can inspect model topology as part of evaluation; for the tree‑based choices Pipeline routes toward the XGBoost estimator path. Data flows from the stored data and selected feature subset into the chosen estimator, which then trains and is scored by the pipeline’s evaluation logic; the objective therefore acts as the bridge that converts a trial’s categorical choices into concrete model instantiation, metadata inspection via LongShortTermMemoryR.info, and subsequent model evaluation against the windowed time‑series dataset.

# file path: pipelining/paramSearch.py
    def __init__(self, models, data, target, features, **kwargs):
        self.models = models
        self.data = data
        self.target = target
        self.features = features
        self.kwargs = kwargs
        self.verbose = kwargs.get(’verbose’, 0)
        self.memory = Memory(location=’cache’, verbose=0)
        self.logger = absl.logging.get_absl_logger()
        self.logger.set_verbosity(self.verbose)
        self.logger.info(’Initialized Pipeline’)
        self.logger.info(’Models: {}’.format(self.models))

Pipeline.init wires the hyperparameter-search orchestration into the rest of the modeling stack by capturing the inputs the pipeline will need later and preparing the runtime helpers used throughout trial evaluation. It stores the provided models selection, the prepared time‑series data, the prediction target name, the list of feature columns and any extra configuration into instance attributes so the objective function and other Pipeline methods can read them during trial execution. It pulls a verbosity level out of kwargs and uses that to configure an absl logger so training, evaluation and summary output from downstream routines (for example the LongShortTermMemoryR lifecycle methods you already saw) can be emitted at the desired detail. It also constructs a joblib Memory instance pointed at a local cache location so expensive steps like dataset windowing or feature transforms can be memoized across trials. Finally it emits two informational log messages recording that the Pipeline was initialized and which model candidates were passed in. These steps set up the environment that later calls into LongShortTermMemoryR (whose compile_model, train_model and test_model you’ve already reviewed) and allow the pipeline to control output and reuse intermediate artifacts during hyperparameter search.

# file path: pipelining/paramSearch.py
    def objective(trial):
        model = trial.suggest_categorical(’model’, [’lstm’, ‘xgbr’, ‘xgbrf’])
        features = trial.suggest_categorical(’features’, features)

Pipeline.objective is the entry point that an optimizer will call to propose a candidate configuration for evaluation: it asks the trial to pick which model family to try (the three supported families are the LSTM implementation and two XGBoost variants) and it asks the trial to pick which feature set to use from the Pipeline’s provided features list. Those two decisions drive the rest of the pipeline: the chosen feature set determines how StockDataGenerator will assemble windowed inputs or flat feature arrays for training, and the chosen model family determines which downstream path runs — if the LSTM family is selected the pipeline will proceed to construct and compile a LongShortTermMemoryR instance (which you already saw how compile_model, train_model and test_model operate), whereas an XGBoost choice routes to the tree‑based training and evaluation logic. In short, objective narrows the search space by selecting a model type and a feature subset for each trial, producing the configuration that the pipeline will then build, train and score.

# file path: pipelining/paramSearch.py
study = optuna.create_study(direction=’maximize’)

That statement instantiates an Optuna Study that will manage the hyperparameter-search session for the Pipeline layer, explicitly telling Optuna that the metric returned by Pipeline.objective should be maximized rather than minimized. The Study object becomes the container for all trial records, sampling/pruning state, and optimization results; later the code will hand Pipeline.objective to the Study so Optuna can repeatedly propose parameter sets (for example the model and features choices suggested inside Pipeline.objective) and evaluate them. In short, create_study prepares and names the optimization run and its goal (maximize) so that subsequent calls to the Study’s optimize routine will execute the trial loop that builds and evaluates candidate configurations — including instantiating and assessing LongShortTermMemoryR-based trials via the pipeline’s evaluation logic.

# file path: pipelining/paramSearch.py
study.optimize(objective, n_trials=100)

That call instructs the Optuna study to run the hyperparameter search loop for a fixed number of trials: Optuna will repeatedly create a Trial object, hand it to the Pipeline.objective function, collect the returned score, and internally track the best-performing trial according to the study’s direction (which was set earlier). Each trial drives the decision flow inside Pipeline.objective — sampling model and feature choices via trial.suggest_* and assembling a candidate configuration, which in turn leads to building, compiling and evaluating the selected model (for LSTM-based choices the objective will invoke the LongShortTermMemoryR flow that you already studied) against the prepared time‑series data; the evaluation metric returned by objective is what Optuna maximizes across trials. The numeric limit on trials bounds how many candidate configurations are explored during this automated search, and the study records trial histories and the current best configuration as the loop proceeds.

# file path: pipelining/steps.py
class ClfSwitcher(BaseEstimator):
    def __init__(self, estimator=XGBRegressor()):
        self.estimator = estimator
    def fit(self, X, y=None, **kwargs):
        self.estimator.fit(X, y)
        return self
    def predict(self, X, y=None):
        return self.estimator.predict(X)
    def predict_proba(self, X):
        prob = self.estimator.predict_proba(X)
        return prob
    def score(self, X, y):
        score = self.estimator.score(X, y)
        print(score)
        return score

ClfSwitcher is a thin adapter that gives the Pipeline a stable, sklearn-like estimator interface while allowing the training/evaluation layer to swap concrete classifier implementations without touching downstream code. Its constructor stores the supplied estimator instance on the object so the pipeline always holds a single attribute to call into. The fit method simply forwards the training feature and target arrays it receives from the Pipeline (which in turn gets windowed features from the dataset generator) to the underlying estimator’s fit and returns self so the object can be used in estimator chains and grid/search routines. The predict and predict_proba methods delegate prediction work to the wrapped estimator and return whatever those methods produce, letting the optimizer and evaluation routines consume point predictions or class probabilities unchanged. The score method forwards scoring to the underlying estimator, prints the numeric result to the console as a side effect, and returns that value for the Pipeline’s objective logging. Conceptually ClfSwitcher implements an adapter/strategy role: it does not implement any training loop or model construction itself (unlike LongShortTermMemoryR, which builds, compiles and trains a TensorFlow model) but instead standardizes how interchangeable estimators are invoked by the hyperparameter search and orchestration layers.

# file path: pipelining/steps.py
    def fit(self, X, y=None, **kwargs):
        self.estimator.fit(X, y)
        return self

ClfSwitcher.fit is the simple training entry point the Pipeline calls to fit whatever classifier was provided to ClfSwitcher during initialization. Conceptually it delegates the work: it takes the feature matrix and target vector handed in by the pipeline and invokes the underlying estimator’s fit implementation, then returns the ClfSwitcher instance so the object conforms to the scikit-learn fit/transform chaining convention. The method signature accepts additional keyword arguments but the implementation forwards only the X and y to the wrapped estimator’s fit call, so the fitted state lives on the underlying estimator attribute that ClfSwitcher.init set. Functionally this makes ClfSwitcher a lightweight adapter/strategy wrapper that lets the hyperparameter-search and training orchestration swap classifiers without changing downstream predict, predict_proba, or score calls, which themselves simply delegate to the same underlying estimator after fit.

# file path: pipelining/steps.py
    def predict_proba(self, X):
        prob = self.estimator.predict_proba(X)
        return prob

Within the pipeline layer that orchestrates model selection and evaluation, ClfSwitcher.predict_proba is a thin delegation method that asks whatever classifier instance the ClfSwitcher was initialized with to produce class probability estimates. When the pipeline hands in the feature matrix X, predict_proba forwards that matrix to the wrapped estimator’s probability-prediction routine and returns the resulting probability array unchanged. This mirrors the behavior of ClfSwitcher.predict and ClfSwitcher.fit you reviewed earlier — fit delegates training to the estimator, predict delegates deterministic predictions, and score delegates scoring — so predict_proba keeps the same adapter/delegation pattern to provide a unified interface for downstream hyperparameter search and evaluation code that expects probabilistic outputs. There are no branches or additional transformations here: input X flows in from the dataset/pipeline and the estimator’s probability outputs flow back to the caller.

# file path: pipelining/steps.py
    def score(self, X, y):
        score = self.estimator.score(X, y)
        print(score)
        return score

ClfSwitcher.score is the small evaluation hook the pipeline uses to get a numeric performance value from whatever estimator the switcher is currently wrapping. When called with feature matrix X and targets y it simply forwards those arrays to the wrapped estimator’s own scoring method (the estimator that was set when ClfSwitcher was constructed or swapped in later), prints the returned numeric score to the console for runtime visibility, and then returns that same numeric value to the caller. In the pipeline context this means the Pipeline objective or any sklearn-style evaluator sees the estimator’s native scoring semantics (for example accuracy for classifiers or R² for regressors), and the value printed and returned is exactly whatever the underlying estimator computes. The method contains no branching or additional logic: it relies on the estimator having been fitted earlier via ClfSwitcher.fit and on the estimator’s score implementation to produce the metric.

# file path: pipelining/steps.py
    def __init__(self, estimator=XGBRegressor()):
        self.estimator = estimator

ClfSwitcher.init accepts an estimator argument (with a default of XGBRegressor) and stores that estimator on the instance as the estimator attribute. By doing so it sets up the delegation point the rest of ClfSwitcher relies on: downstream calls from the hyperparameter-search Pipeline to fit, predict, predict_proba and score are forwarded to whatever estimator was supplied here. Conceptually this implements a strategy/adapter role—rather than constructing a model itself (unlike LongShortTermMemoryR.compile_model, which builds a Keras network), ClfSwitcher standardizes different sklearn-style estimator implementations behind a single interface so the pipeline can swap candidates or run grid searches without changing pipeline logic. The practical data flow is simple: the X and y arrays prepared earlier in the pipeline will be passed into ClfSwitcher.fit and then delegated to the stored estimator.fit; subsequent prediction and scoring calls follow the same delegation to estimator.predict, estimator.predict_proba, and estimator.score. The only side effect of the constructor is writing the estimator attribute so the instance is ready to forward those calls.

# file path: tests/basic_tests.py
from utils.api.queries import get_data
from api.creds import client_connect
from tda.client import Client
from data.dataset import StockDataGenerator
from models.tf.models import LongShortTermMemory
import unittest
import tracemalloc as tm

The imports wire together the three layers this end‑to‑end test exercises: market connectivity, dataset construction, and model training. get_data and client_connect bring in the API plumbing that establishes a TD Ameritrade client and fetches market series; the Client type from tda.client is the concrete client class the tests assert against to ensure the connection step succeeded. StockDataGenerator is the dataset builder that normalizes, windows and exposes the X/y arrays the modeling layer expects, and LongShortTermMemory is the temporal model class that provides the compile, train and test behaviors we inspected earlier. unittest supplies the test harness for structuring those assertions and flows, while tracemalloc is included to snapshot and track memory usage during the end‑to‑end runs. Together these imports reflect the same pattern seen in TestBasics and TestPipeline—TestBasics exercises client_connect and get_data directly to validate API access, and TestPipeline exercises StockDataGenerator and LongShortTermMemory to validate dataset compilation and model training—so the basic_tests module composes those pieces to verify the full ingestion-to-training path.

# file path: tests/basic_tests.py
import sys
import os

The test pulls in the standard library modules sys and os so the test can control and inspect the Python runtime and the filesystem during its end-to-end checks: sys is available for things like adjusting the module search path or managing interpreter-level behavior when the test harness needs to alter import resolution or abort early, while os provides platform-independent file and environment operations used to build paths, create and remove test artifacts, and read environment variables needed to locate credentials or toggle behavior. These low-level utilities support the higher-level imports that perform API calls, dataset construction, and model training by enabling reliable setup and teardown, cross-platform file handling, and runtime configuration for the StockDataGenerator and LongShortTermMemoryR training exercised by the test; unlike the project-specific imports, sys and os are generic test scaffolding from the standard library.

# file path: tests/basic_tests.py
global verbose

The test module declares a module-level verbosity flag so the various pieces exercised by the end‑to‑end test can share a single toggle for printing and framework verbosity. Declaring verbose as global makes it accessible to and mutable from the test helpers and functions that either read it implicitly or reassign it, so the API client fetch routines, the StockDataGenerator dataset construction and logging, and the training/evaluation calls into LongShortTermMemoryR can all be driven by the same on/off level without threading a verbosity argument through every call. This mirrors the way LongShortTermMemoryR.test_model accepts a verbosity argument for model.evaluate — the global flag provides a single source of truth that the test harness can use when it calls that method or when it emits simple status prints (like the API status header elsewhere in the file). Conceptually it’s the simple global‑flag pattern used to coordinate logging behavior across the three layers the test touches: market connectivity, dataset generation, and model training.

# file path: tests/basic_tests.py
@unittest.skip(’‘)
class TestBasics(unittest.TestCase):
    def testGetData(self):
        tda_client = client_connect(’TDA’, ‘private/creds.ini’)
        self.assertEqual(type(tda_client), Client)
        if verbose > 0: print(’\n[ TD Ameritrade API, Symbol: BLK ]’)
        data = get_data(tda_client, symbol=’BLK’, 
                        period=’TEN_YEAR’, period_type=’YEAR’, 
                        frequency=’DAILY’, frequency_type=’DAILY’,
                        features= { 
                            ‘EMA’: {}, ‘%B’: {}, ‘MIDPOINT’: {}, ‘CCI’: {}, ‘RSI’: {}, ‘VIX’: {}, ‘AROONOSC’: {}
                        }, api=’TDA’, save=True, save_path=’data/TDA/example.csv’)
        if verbose > 0: print(data)
        tda_client.session.close()

TestBasics contains a single unit test, testGetData, that exercises the API-to-dataset ingestion path the rest of the platform depends on: it first calls client_connect with the TDA label and a local credentials file so the test obtains an authenticated TDA client object; the test asserts that the returned object is a TDA Client to detect failures in authentication or credential parsing early. Next, the test optionally logs a short message and invokes get_data with a concrete symbol, a ten‑year daily timeframe and a dictionary of technical features; get_data is responsible for translating the human-friendly period and frequency strings into the broker client’s enums, issuing the price history request, computing the requested technical indicators, and returning the assembled time series (and, because save is enabled, persisting a CSV to the provided path). The returned dataset is printed when verbose is enabled, and the test closes the client’s session to release network resources. Conceptually, this test sits at the entry point of the data pipeline: it validates that client_connect can create a live client from the credentials file and that get_data can produce the normalized feature-rich table that StockDataGenerator will consume (and ultimately feed into the LongShortTermMemory/LongShortTermMemoryR training flow). The map-from-string-to-client-enum approach inside get_data lets the test call into a single abstraction across APIs, so testGetData serves as a regression check for core market connectivity and feature construction that

# file path: tests/basic_tests.py
    def testGetData(self):
        tda_client = client_connect(’TDA’, ‘private/creds.ini’)
        self.assertEqual(type(tda_client), Client)
        if verbose > 0: print(’\n[ TD Ameritrade API, Symbol: BLK ]’)
        data = get_data(tda_client, symbol=’BLK’, 
                        period=’TEN_YEAR’, period_type=’YEAR’, 
                        frequency=’DAILY’, frequency_type=’DAILY’,
                        features= { 
                            ‘EMA’: {}, ‘%B’: {}, ‘MIDPOINT’: {}, ‘CCI’: {}, ‘RSI’: {}, ‘VIX’: {}, ‘AROONOSC’: {}
                        }, api=’TDA’, save=True, save_path=’data/TDA/example.csv’)
        if verbose > 0: print(data)
        tda_client.session.close()

TestBasics.testGetData is a smoke-test entry that exercises the market-connection and ingestion layers to ensure the pipeline can obtain a usable dataset for downstream dataset generation and modeling. It first asks client_connect to build an authenticated TD Ameritrade client from the provided credentials file; client_connect reads the config and returns the client object that the test asserts is the expected Client type. This guarantees the project’s low-level market connectivity is functioning before any further processing is attempted.

Next, the test invokes get_data with that client, asking for historical daily price data for the BLK symbol over the ten‑year period and requesting a specific set of technical features (EMA, %B, MIDPOINT, CCI, RSI, VIX, AROONOSC). get_data detects the TDA client type and translates the human-friendly period/frequency strings into the client’s PriceHistory enums, performs the API price-history request, assembles the returned series into a tabular structure, and then computes the requested technical indicators via the project’s feature-processing code (the indicator-generation logic referenced in get_data_part2). The call also exercises the save-path behavior so the resulting dataset is written to disk when asked. Throughout this flow there are conditional branches: the test asserts the client type (failing fast if the connection step failed), the test conditionally emits console logging when verbose is enabled, and get_data itself branches on client type and on which features were requested to decide how to fetch and compute data.

Finally, the test prints the returned data when verbose logging is on and closes the TDA client session to release network resources. In the repository’s layered architecture this function sits at the entry point that validates market connectivity and raw-data assembly, producing the same kind of time-series inputs that StockDataGenerator and the model-training layer expect to consume.

# file path: tests/basic_tests.py
class TestPipeline(unittest.TestCase):
    @unittest.skip(’minimize API calls’)
    def testDatasetCompiling(self):
        data = StockDataGenerator(
            ‘BLK’, ‘TDA’, 
            period=’TEN_YEAR’, period_type=’YEAR’, 
            frequency=’DAILY’, frequency_type=’DAILY’,
            features = {
                ‘EMA’: {}, ‘%B’: {}, ‘RSI’: {}
            }, save=True, verbose=verbose
        )
        data.client.session.close()
        if verbose > 0: print(’\n’, data.data)
        self.assertEqual(type(data), StockDataGenerator)
    def testModelCompileTrain(self):
        print(’\n’)
        data = StockDataGenerator(
            ‘BLK’, 
            data_path=’data/TDA/example.csv’,
            verbose=verbose, target=’close’
        )
        if verbose > 0: print(data.data)
        lstm = LongShortTermMemory()
        lstm.compile_model(data.X_train, verbose=verbose)
        lstm.train_model(data.X_train, data.y_train, epochs=25, verbose=verbose, plot_metrics=True)

TestPipeline is a unittest.TestCase that serves as the end‑to‑end smoke test for the marketbot-main_cleaned stack: it verifies that market connectivity, dataset construction, and the TensorFlow LSTM model plumbing work together. It defines two tests: testDatasetCompiling, which is annotated to be skipped to avoid live API usage, and testModelCompileTrain, which runs locally against a supplied example CSV. When run, testDatasetCompiling would instantiate StockDataGenerator with a TDA client and a set of feature definitions, which triggers the client_connect/get_data flow to pull raw price history, then StockDataGenerator._process_dataset and _train_test_split to normalize, window, and split the data; the test closes the client session and asserts that the result is a StockDataGenerator instance. The testModelCompileTrain path avoids the network by pointing StockDataGenerator at the example CSV so the generator produces data.data and numeric X_train and y_train arrays via its internal processing and splitting steps; the test optionally prints the dataset when verbose is enabled. After dataset creation, the test instantiates LongShortTermMemory, calls LongShortTermMemory.compile_model with the shape information derived from X_train so the Sequential LSTM/Bidirectional layers are constructed, then calls LongShortTermMemory.train_model to run model.fit for 25 epochs; train_model enforces that a model exists, executes the fit call (populating train_history and any callbacks), and—because plot_metrics is true—will invoke the plotting routine that lays out training metrics and may produce network/visual side effects. Remember that earlier we covered ClfSwitcher methods used elsewhere in the pipeline; TestPipeline does not exercise ClfSwitcher here but instead focuses on the raw data generator and the LongShortTermMemory compile/train lifecycle to catch regressions in core ingestion and training functionality for the marketbot-main_cleaned project.

# file path: tests/basic_tests.py
    @unittest.skip(’minimize API calls’)
    def testDatasetCompiling(self):
        data = StockDataGenerator(
            ‘BLK’, ‘TDA’, 
            period=’TEN_YEAR’, period_type=’YEAR’, 
            frequency=’DAILY’, frequency_type=’DAILY’,
            features = {
                ‘EMA’: {}, ‘%B’: {}, ‘RSI’: {}
            }, save=True, verbose=verbose
        )
        data.client.session.close()
        if verbose > 0: print(’\n’, data.data)
        self.assertEqual(type(data), StockDataGenerator)

TestPipeline.testDatasetCompiling is an end‑to‑end entry test that would validate the ingestion-to-dataset pipeline by instantiating StockDataGenerator with a live TD Ameritrade connection and a small feature set, but it is marked to be skipped to avoid live API traffic. When run, constructing StockDataGenerator triggers the connection and data‑pull path: client_connect is used to create the API client, get_data retrieves raw market series for the symbol and requested frequency, StockDataGenerator._process_dataset performs feature engineering and target construction, and StockDataGenerator._train_test_split turns the processed frame into train/test arrays — the imports already wire these market, dataset and model layers together so this single call exercises them end‑to‑end. The test then closes the client’s session to release network resources, conditionally prints the assembled DataFrame when verbose is enabled, and finally asserts that the object produced is a StockDataGenerator instance. It therefore serves as the integration checkpoint for data ingestion and dataset compilation and complements the sibling testModelCompileTrain that consumes the compiled dataset to validate LSTM compilation and training.

# file path: tests/basic_tests.py
    def testModelCompileTrain(self):
        print(’\n’)
        data = StockDataGenerator(
            ‘BLK’, 
            data_path=’data/TDA/example.csv’,
            verbose=verbose, target=’close’
        )
        if verbose > 0: print(data.data)
        lstm = LongShortTermMemory()
        lstm.compile_model(data.X_train, verbose=verbose)
        lstm.train_model(data.X_train, data.y_train, epochs=25, verbose=verbose, plot_metrics=True)

The test entrypoint TestPipeline.testModelCompileTrain exercises the end-to-end path from dataset construction into model compilation and training so we can detect regressions in market data ingestion and the LSTM training pipeline. It begins by instantiating StockDataGenerator for the BLK symbol while pointing it at a local CSV and requesting the close price as the target; StockDataGenerator is responsible for loading the series (either from a client or from the provided file), running _process_dataset to normalize and create target/features, and then performing _train_test_split and any windowing so that X_train and y_train are available as NumPy arrays. With verbose enabled the test optionally prints the assembled data frame for inspection. Next the test creates a LongShortTermMemory instance and calls its compile_model with the X_train shape so the Sequential LSTM network can be constructed and saved onto the model attribute; compile_model builds the layered architecture (stacked LSTM, Dropout and Bidirectional LSTM layers) using the input shape derived from X_train. After compilation the test invokes train_model to fit the compiled model on X_train and y_train for 25 epochs; train_model guards against a missing model and then calls the Keras fit loop (with callbacks and multiprocessing enabled), stores the training history on the trainer instance, and — because plot_metrics is true here — proceeds to call the plotting helpers to visualize training metrics, overlay predictions versus truth, and compute SHAP explanations for the trained network. Overall the data flow is CSV → StockDataGenerator processing → X_train/y_train → LongShortTermMemory.compile_model → LongShortTermMemory.train_model, and the key control points are the dataset construction steps inside StockDataGenerator and the train_model guard that ensures a model exists before fitting; visible side effects are console output, the model and train_history attributes being written, and plotting/SHAP operations being executed.

# file path: tests/basic_tests.py
if __name__ == ‘__main__’:
    verbose = 1 if ‘-v’ in sys.argv else 0
    unittest.main(verbosity=2)

When the test module is executed as a script it performs a small startup decision and then hands control to Python’s test runner. It inspects sys.argv for a command‑line verbose flag and sets a module local verbose variable accordingly so other parts of the test module or its helpers can produce more diagnostic output when requested (this mirrors the global verbose usage found elsewhere in the project). After that it invokes unittest.main with a higher verbosity level so the test harness will discover and run the test cases in this file and print more detailed per‑test results. In practice that call is what kicks off the end‑to‑end checks that exercise the API client, StockDataGenerator dataset assembly, and LongShortTermMemory model training. The pattern here follows the other test/runner entrypoints in the codebase: a simple CLI verbosity toggle followed by a direct call into unittest’s main runner.

# file path: utils/api/api_status.py
from urllib import response
from api.creds import client_connect
import krakenex
from tda import client 
from pykrakenapi import KrakenAPI

The imports bring together a small set of external SDKs and a local connector so api_status can instantiate and interrogate real exchange and brokerage endpoints for health checks. urllib.response is pulled in to give a standardized HTTP response object type the status routines can inspect or normalize across providers. client_connect from api.creds is the project’s local connection factory that centralizes credential handling and returns authenticated client instances; this mirrors other modules that call a connection helper to keep auth logic out of the status and orchestration layers. krakenex and KrakenAPI provide the low‑level and higher‑level Kraken exchange bindings respectively, so the health checker can probe Kraken endpoints both directly and via the convenience wrapper for market data methods. tda.client exposes the TD Ameritrade client SDK used to reach TDA endpoints for account or market status. Compared with similar import groups elsewhere — which bring in tda.auth and an aliased TDA_Client, Coinbase’s CB_Client, Polygon’s REST client, and Fred — these imports are focused on the specific providers api_status needs to assess; like those other modules, api_status relies on vendor SDKs plus a local factory (client_connect) so the rest of the system can consume a normalized endpoint health signal regardless of which concrete client was used.

# file path: utils/api/api_status.py
print(’      API      STATUS’)

The line emits a simple, human-readable column header to standard output with two aligned labels — one for the endpoint identifier and one for its reported state — so that when the utilities module runs its endpoint checks the following lines create an easy-to-scan table in the console. Because this file centralizes health checking and normalization for the lightweight API client, that header frames the diagnostic output produced as the code iterates over each external API and prints the endpoint name alongside the normalized status; the spacing is fixed so the reported rows line up under the labels. This is purely presentation-level behavior and does not change the underlying status-normalization or state exposed programmatically (for example, it complements programmatic accessors such as Bot.status by providing a human-facing snapshot), and it is distinct from the import wiring and app instantiation elsewhere which configure clients and servers rather than formatting runtime diagnostic output.

# file path: utils/api/api_status.py
try:
    tda_client = client_connect(’TDA’, ‘private/creds.ini’)
    response = tda_client.get_price_history(’AAPL’,
        period_type=client.Client.PriceHistory.PeriodType.YEAR,
        period=client.Client.PriceHistory.Period.ONE_YEAR,
        frequency_type=client.Client.PriceHistory.FrequencyType.DAILY,
        frequency=client.Client.PriceHistory.Frequency.DAILY)
    assert response.status_code == 200, response.raise_for_status()
    print(’TD Ameritrade:   UP’)
except:
    print(’TD Ameritrade:  DOWN’)
try:
    cb_client = client_connect(’CB’, ‘private/creds.ini’)
    print(’Coinbase:        UP’)
except:
    print(’Coinbase:       DOWN’)
try:
    api = krakenex.API()
    k = KrakenAPI(api)
    ohlc, last = k.get_ohlc_data(”BCHUSD”)
    print(’Kraken:          UP’)
except:
    print(’Kraken:         DOWN’)

The sequence performs three sequential endpoint health probes for TD Ameritrade, Coinbase, and Kraken, each wrapped in its own try/except so a failure on one provider won’t stop the others. For TD Ameritrade the code uses client_connect with the TDA key and the private/creds.ini credentials to obtain a tda_client (as wired up by the imports you reviewed), then attempts a real market data fetch for AAPL using the client’s price history API and enforces an HTTP 200 result by asserting the response status and invoking the client’s error reporting if not successful; a successful round‑trip prints the UP line, while any exception prints the DOWN line. For Coinbase the probe is limited to instantiating a cb_client via client_connect with the same credentials file and reporting UP if the client is created, DOWN on exception. For Kraken the probe instantiates a krakenex.API, wraps it with pykrakenapi’s KrakenAPI, requests OHLC data for the BCHUSD pair to validate both connectivity and basic market retrieval, and prints UP on success or DOWN on exception. Together these probes normalize endpoint liveness into the simple two‑column status lines (the header printed earlier), providing a consistent, human‑readable summary that higher layers can use to decide whether to proceed with live ingestion or handle unavailable feeds.

# file path: utils/api/queries.py
                vars[’CDLGRAVESTONEDOJI’] = ta.CDLGRAVESTONEDOJI(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLHAMMER’:
                vars[’CDLHAMMER’] = ta.CDLHAMMER(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLHANGINGMAN’:
                vars[’CDLHANGINGMAN’] = ta.CDLHANGINGMAN(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLHARAMI’:
                vars[’CDLHARAMI’] = ta.CDLHARAMI(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLHARAMICROSS’:
                vars[’CDLHARAMICROSS’] = ta.CDLHARAMICROSS(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLHIGHWAVE’:
                vars[’CDLHIGHWAVE’] = ta.CDLHIGHWAVE(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLHIKKAKE’:
                vars[’CDLHIKKAKE’] = ta.CDLHIKKAKE(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLHIKKAKEMOD’:
                vars[’CDLHIKKAKEMOD’] = ta.CDLHIKKAKEMOD(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLHOMINGPIGEON’:
                vars[’CDLHOMINGPIGEON’] = ta.CDLHOMINGPIGEON(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLIDENTICAL3CROWS’:
                vars[’CDLIDENTICAL3CROWS’] = ta.CDLIDENTICAL3CROWS(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLINNECK’:
                vars[’CDLINNECK’] = ta.CDLINNECK(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLINVERTEDHAMMER’:
                vars[’CDLINVERTEDHAMMER’] = ta.CDLINVERTEDHAMMER(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLKICKING’:
                vars[’CDLKICKING’] = ta.CDLKICKING(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLKICKINGBYLENGTH’:
                vars[’CDLLADDERBOTTOM’] = ta.CDLLADDERBOTTOM(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLLONGLEGGEDDOJI’:
                vars[’CDLLONGLEGGEDDOJI’] = ta.CDLLONGLEGGEDDOJI(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLLONGLINE’:
                vars[’CDLLONGLINE’] = ta.CDLLONGLINE(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLMARUBOZU’:
                vars[’CDLMARUBOZU’] = ta.CDLMARUBOZU(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLMATCHINGLOW’:
                vars[’CDLMATCHINGLOW’] = ta.CDLMATCHINGLOW(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLMATHOLD’:
                vars[’CDLMATHOLD’] = ta.CDLMATHOLD(vars[’open’], vars[’high’], vars[’low’], vars[’close’], **params)
            elif func == ‘CDLMORNINGDOJISTAR’:
                vars[’CDLMORNINGDOJISTAR’] = ta.CDLMORNINGDOJISTAR(vars[’open’], vars[’high’], vars[’low’], vars[’close’], **params)
            elif func == ‘CDLMORNINGSTAR’:
                vars[’CDLMORNINGSTAR’] = ta.CDLMORNINGSTAR(vars[’open’], vars[’high’], vars[’low’], vars[’close’], **params)
            elif func == ‘CDLONNECK’:
                vars[’CDLONNECK’] = ta.CDLONNECK(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLPIERCING’:
                vars[’CDLPIERCING’] = ta.CDLPIERCING(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLRICKSHAWMAN’:
                vars[’CDLRICKSHAWMAN’] = ta.CDLRICKSHAWMAN(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLRISEFALL3METHODS’:
                vars[’CDLRISEFALL3METHODS’] = ta.CDLRISEFALL3METHODS(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLSEPARATINGLINES’:
                vars[’CDLSEPARATINGLINES’] = ta.CDLSEPARATINGLINES(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLSHOOTINGSTAR’:
                vars[’CDLSHOOTINGSTAR’] = ta.CDLSHOOTINGSTAR(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLSHORTLINE’:
                vars[’CDLSHORTLINE’] = ta.CDLSHORTLINE(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLSPINNINGTOP’:
                vars[’CDLSPINNINGTOP’] = ta.CDLSPINNINGTOP(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLSTALLEDPATTERN’:
                vars[’CDLSTALLEDPATTERN’] = ta.CDLSTALLEDPATTERN(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLSTICKSANDWICH’:
                vars[’CDLSTICKSANDWICH’] = ta.CDLSTICKSANDWICH(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLTAKURI’:
                vars[’CDLTAKURI’] = ta.CDLTAKURI(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLTASUKIGAP’:
                vars[’CDLTASUKIGAP’] = ta.CDLTASUKIGAP(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLTHRUSTING’:
                vars[’CDLTHRUSTING’] = ta.CDLTHRUSTING(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLTRISTAR’:
                vars[’CDLTRISTAR’] = ta.CDLTRISTARI(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLUNIQUE3RIVER’:
                vars[’CDLUNIQUE3RIVER’] = ta.CDLUNIQUE3RIVER(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLUPSIDEGAP2CROWS’:
                vars[’CDLUPSIDEGAP2CROWS’] = ta.CDLUPSIDEGAP2CROWS(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
            elif func == ‘CDLXSIDEGAP3METHODS’:
                vars[’CDLXSIDEGAP3METHODS’] = ta.CDLXSIDEGAP3METHODS(vars[’open’], vars[’high’], vars[’low’], vars[’close’])
        elif func == ‘%B’:
            if ‘UPPER_BBAND’ not in vars.keys():
                upperband, middleband, lowerband = ta.BBANDS(vars[’close’], **params)
                vars[’%B’] = (vars[’close’] - lowerband) / (upperband - lowerband)
            else: 
                vars[’%B’] = (vars[’close’] - vars[’LOWER_BBAND’]) / (vars[’UPPER_BBAND’] - vars[’LOWER_BBAND’])
        elif func == ‘VIX’:
            fred_client = client_connect(’FRED’, ‘private/creds.ini’)
            temp = fred_client.get_series(’VIXCLS’)
            vix = []
            for date in vars[’datetime’]:
                try:
                    vix.append(temp[f’{date:%Y}-{date:%m}-{date:%d}’])
                except:
                    vix.append(None)
            vars[’VIX’] = vix
    df = DataFrame.from_dict(vars)
    df = df.dropna()            
    df.index = df[’datetime’]
    df.drop(’datetime’, axis=’columns’, inplace=True)
    start_date = str(start_date)
    end_date = str(end_date)
    if save:
        if save_path == ‘’:
            if os.path.exists(f’data/{api}/{symbol}/’):
                DataFrame.to_csv(df, f’data/{api}/{symbol}/{symbol}_{start_date}_{end_date}.csv’)
                return df, f’data/{api}/{symbol}/{symbol}_{start_date}_{end_date}.csv’
            else:
                os.makedirs(f’data/{api}/{symbol}/’)
                DataFrame.to_csv(df, f’data/{api}/{symbol}/{symbol}_{start_date}_{end_date}.csv’)
                return df, f’data/{api}/{symbol}/{symbol}_{start_date}_{end_date}.csv’
        else:
            if os.path.exists(f’data/{api}/’): 
                DataFrame.to_csv(df, save_path)
                return df, save_path
            else: 
                os.mkdir(f’data/{api}/’)
                DataFrame.to_csv(df, save_path)
                return df, save_path
    else:
        return df

get_data_part2 is the second-stage assembler that takes the prepared variable dictionary from get_data_part1 and turns it into a cleaned, feature‑enriched pandas DataFrame ready for the dataset pipeline. It iterates through the requested feature names and, for a large set of candlestick pattern indicators, invokes the corresponding talib candlestick functions (for example CDLHAMMER, CDLHANGINGMAN and many others) to populate new series into the vars mapping so StockDataGenerator can use those discrete pattern signals as features. For the percent‑B feature it computes the Bollinger Band components if they are not already present and then normalizes the close position into percent‑B; if the upper and lower band series are already available it reuses them to avoid recalculation. For the VIX feature it opens a FRED connection via client_connect, fetches the VIX closing series, and aligns that external series to the generator’s datetime index by walking the datetime list and appending the matching value or a None when a date key is absent; those None entries will be removed by the next cleaning step. After feature construction it materializes vars into a pandas DataFrame, drops any rows containing missing values to ensure downstream windowing and model training receive complete timesteps, sets the dataframe index to the datetime column and removes the original datetime column. It converts the start and end dates to strings to construct filenames when saving; if save is requested it ensures the target directory exists (creating it when necessary), writes the CSV out and returns both the dataframe and the filepath, otherwise it returns just the dataframe. Because it calls client_connect to retrieve FRED series and may create directories and write CSVs, get_data_part2 produces network and filesystem side effects that feed directly into StockDataGenerator’s later windowing, normalization and train/test splitting stages.

Download the source code using the button below:

User's avatar

Continue reading this post for free, courtesy of Onepagecode.

Or purchase a paid subscription.
© 2026 Onepagecode · Privacy ∙ Terms ∙ Collection notice
Start your SubstackGet the app
Substack is the home for great culture