In [4]:
# Constants
BASES = "ACGT"
TRAIN_DATASET = "data/train_data.tfrecords"
TEST_DATASET = "data/test_data.tfrecords"
EVAL_DATASET = "data/eval_data.tfrecords"
EPOCHS = 1000
BATCH_SIZE = 256
LEARNING_RATE = 0.004
L2 = 0.001
LOG_DIR = "logs"

In [3]:
!mkdir logs
!mkdir data
!curl -fL https://git.coolneng.duckdns.org/coolneng/locimend/raw/branch/master/data/HVR.fastq -o data/HVR.fastq
!curl -fL https://git.coolneng.duckdns.org/coolneng/locimend/raw/branch/master/data/curesim-HVR.fastq -o data/curesim-HVR.fastq

mkdir: cannot create directory ‘logs’: File exists
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1074k  100 1074k    0     0   804k      0  0:00:01  0:00:01 --:--:--  804k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1484k  100 1484k    0     0   321k      0  0:00:04  0:00:04 --:--:--  321k


In [5]:
!pip install biopython

Collecting biopython
[?25l  Downloading https://files.pythonhosted.org/packages/5a/42/de1ed545df624180b84c613e5e4de4848f72989ce5846a74af6baa0737b9/biopython-1.79-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (2.3MB)
[K     |████████████████████████████████| 2.3MB 5.2MB/s 
Installing collected packages: biopython
Successfully installed biopython-1.79


In [6]:
from typing import List, Tuple

from Bio.motifs import create
from Bio.SeqIO import parse
from numpy.random import random
from tensorflow import Tensor, int64, stack, cast, int32
from tensorflow.sparse import to_dense
from tensorflow.data import TFRecordDataset
from tensorflow.io import (
    FixedLenFeature,
    TFRecordWriter,
    VarLenFeature,
    parse_single_example,
)
from tensorflow.train import Example, Feature, Features, Int64List



def generate_example(sequence, label, base_counts) -> bytes:
    """
    Create a binary-string for each sequence containing the sequence and the bases' counts
    """
    schema = {
        "A_counts": Feature(int64_list=Int64List(value=[sum(base_counts["A"])])),
        "C_counts": Feature(int64_list=Int64List(value=[sum(base_counts["C"])])),
        "G_counts": Feature(int64_list=Int64List(value=[sum(base_counts["G"])])),
        "T_counts": Feature(int64_list=Int64List(value=[sum(base_counts["T"])])),
        "sequence": Feature(int64_list=Int64List(value=encode_sequence(sequence))),
        "label": Feature(int64_list=Int64List(value=encode_sequence(label))),
    }
    example = Example(features=Features(feature=schema))
    return example.SerializeToString()


def encode_sequence(sequence) -> List[int]:
    """
    Encode the DNA sequence using the indices of the BASES constant
    """
    encoded_sequence = [BASES.index(element) for element in sequence]
    return encoded_sequence


def read_fastq(data_file, label_file) -> List[bytes]:
    """
    Parses a data and a label FASTQ files and generates a List of serialized Examples
    """
    examples = []
    with open(data_file) as data, open(label_file) as labels:
        for element, label in zip(parse(data, "fastq"), parse(labels, "fastq")):
            motifs = create([element.seq])
            example = generate_example(
                sequence=str(element.seq),
                label=str(label.seq),
                base_counts=motifs.counts,
            )
            examples.append(example)
    return examples


def create_dataset(
    data_file, label_file, train_eval_test_split=[0.8, 0.1, 0.1]
) -> None:
    """
    Create a training, evaluation and test dataset with a 80/10/30 split respectively
    """
    data = read_fastq(data_file, label_file)
    with TFRecordWriter(TRAIN_DATASET) as training, TFRecordWriter(
        TEST_DATASET
    ) as test, TFRecordWriter(EVAL_DATASET) as evaluation:
        for element in data:
            if random() < train_eval_test_split[0]:
                training.write(element)
            elif random() < train_eval_test_split[0] + train_eval_test_split[1]:
                evaluation.write(element)
            else:
                test.write(element)


def transform_features(parsed_features) -> List[Tensor]:
    """
    Cast and transform the parsed features of an Example into a list of Tensors
    """
    sparse_features = ["sequence", "label"]
    for feature in sparse_features:
        parsed_features[feature] = cast(parsed_features[feature], int32)
        parsed_features[feature] = to_dense(parsed_features[feature])
    for base in BASES:
        parsed_features[f"{base}_counts"] = cast(
            parsed_features[f"{base}_counts"], int32
        )
    features = list(parsed_features.values())[:-1]
    return features


def process_input(byte_string) -> Tuple[Tensor, Tensor]:
    """
    Parse a byte-string into an Example object
    """
    schema = {
        "A_counts": FixedLenFeature(shape=[1], dtype=int64),
        "C_counts": FixedLenFeature(shape=[1], dtype=int64),
        "G_counts": FixedLenFeature(shape=[1], dtype=int64),
        "T_counts": FixedLenFeature(shape=[1], dtype=int64),
        "sequence": VarLenFeature(dtype=int64),
        "label": VarLenFeature(dtype=int64),
    }
    parsed_features = parse_single_example(byte_string, features=schema)
    features = transform_features(parsed_features)
    return stack(features, axis=-1), parsed_features["label"]


def read_dataset(filepath) -> TFRecordDataset:
    """
    Read TFRecords files and generate a dataset
    """
    data_input = TFRecordDataset(filenames=filepath)
    dataset = data_input.map(map_func=process_input)
    shuffled_dataset = dataset.shuffle(buffer_size=10000, seed=42)
    batched_dataset = shuffled_dataset.batch(batch_size=BATCH_SIZE).repeat(count=EPOCHS)
    return batched_dataset


def dataset_creation(
    data_file, label_file
) -> Tuple[TFRecordDataset, TFRecordDataset, TFRecordDataset]:
    """
    Generate the TFRecord files and split them into training, validation and test data
    """
    create_dataset(data_file, label_file)
    train_data = read_dataset(TRAIN_DATASET)
    eval_data = read_dataset(EVAL_DATASET)
    test_data = read_dataset(TEST_DATASET)
    return train_data, eval_data, test_data



In [7]:
from random import seed

from tensorflow.keras import Model, Sequential, layers
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.losses import sparse_categorical_crossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from tensorflow.random import set_seed


def build_model() -> Model:
    """
    Build the CNN model
    """
    model = Sequential()
    model.add(
        layers.Conv1D(
            filters=16,
            kernel_size=5,
            activation="relu",
            kernel_regularizer=l2(L2),
        )
    )
    model.add(layers.MaxPool1D(pool_size=3, strides=1))
    model.add(
        layers.Conv1D(
            filters=16,
            kernel_size=3,
            activation="relu",
            kernel_regularizer=l2(L2),
        )
    )
    model.add(layers.MaxPool1D(pool_size=3, strides=1))
    model.add(layers.Flatten())
    model.add(
        layers.Dense(
            units=16,
            activation="relu",
            kernel_regularizer=l2(L2),
        )
    )
    model.add(layers.Dropout(rate=0.3))
    model.add(
        layers.Dense(
            units=16,
            activation="relu",
            kernel_regularizer=l2(L2),
        )
    )
    model.add(layers.Dropout(rate=0.3))
    # FIXME Change output size
    model.add(layers.Dense(units=len(BASES), activation="softmax"))
    model.compile(
        optimizer=Adam(LEARNING_RATE),
        loss=sparse_categorical_crossentropy,
        metrics=["accuracy"],
    )
    return model


def show_metrics(model, eval_dataset, test_dataset) -> None:
    """
    Show the model metrics
    """
    eval_metrics = model.evaluate(eval_dataset, verbose=0)
    test_metrics = model.evaluate(test_dataset, verbose=0)
    print(f"Final eval metrics - loss: {eval_metrics[0]} - accuracy: {eval_metrics[1]}")
    print(f"Final test metrics - loss: {test_metrics[0]} - accuracy: {test_metrics[1]}")


def run(data_file, label_file, seed_value=42) -> None:
    """
    Create a dataset, a model and runs training and evaluation on it
    """
    seed(seed_value)
    set_seed(seed_value)
    train_data, eval_data, test_data = dataset_creation(data_file, label_file)
    tensorboard = TensorBoard(log_dir=LOG_DIR, histogram_freq=1, profile_batch=0)
    model = build_model()
    print("Training the model")
    model.fit(
        train_data,
        epochs=EPOCHS,
        validation_data=eval_data,
        callbacks=[tensorboard],
        verbose=0,
    )
    print("Training complete. Obtaining final metrics...")
    show_metrics(model, eval_data, test_data)

In [8]:
run(data_file="data/curesim-HVR.fastq", label_file="data/HVR.fastq")

Training the model


TypeError: ignored