pyhaya’s diary

プログラミング、特にPythonについての記事を書きます。Djangoや機械学習などホットな話題をわかりやすく説明していきたいと思います。

flutter build apkしたら~/.gradle/caches/transforms-3/... (そのようなファイルやディレクトリはありません)と怒られた話

タイトルのとおりですが、flutter buildができなくなりました。きっかけはbuild時に 「~/.gradle/caches のバージョンが〜」というwarningが出ていたのでcacheディレクトリだから消せばいいかと考えて消したことでした。

ネットで情報を探していると

  • ~/.gradle/caches を消せばいい
  • ~/.AndroidStudio4.0/system/caches を消せばいい
  • Android Studioを開いて、「File → Invalidate cache & restart」をすればいい

など色々あったのですが、全部うまく行きませんでした。結局、

rm -rf ~/.gradle

で .gradle ディレクトリごと消したらビルドできるようになりました。.gradleディレクトリ内のcaches以外のものがcachesに影響していたのでしょうか?

flutter何もわからない。。。

実験タスクでのデータ管理をちょっと改善した話

TL;DR

実験した結果をクラウドストレージから手元に落として来て分析するときに、ファイルが多いといろいろ辛みがあった。スクリプトを使って使うファイルの管理をしたらデータの分析の効率が少し改善した。

背景

自分は業務でよく、機械学習ジョブを回してその結果を分析するということをやります。その際、ジョブ自体はクラウド上で行い、結果の詳細な分析はデータを手元に落として行うことが多いです。このように、やることによって場所を変えるのは、クラウド上の実行環境はあくまで「実行環境」なのでデバッグや分析をするために毎回ツールをセットアップする必要があったり、分析した結果を可視化するのが難しいという理由からです。

しかし、データを手元に落としてきたはきたで別のツラミがあります。例として、ジョブの実行結果がクラウドストレージに吐き出されていて、それを分析のためにダウンロードするといった状況を考えると、パッと思いつくだけでも以下のようなツラミが生じる可能性があります。

  • 複数のジョブを回す場合に、雑にデータをホイホイダウンロードしているとファイルが多くなったときにどのファイルがどのジョブのものかわからなくなる
  • 1つの実験から複数ファイルが出力として得られるときに、どのファイルが同じ実験から得られたファイルなのか対応がわからなくなる
  • 最初気にしていなかった実験条件が知りたくなったときにストレージのどのファイルを見たら調べられるかわからなくなる

タスクが明確でやりたいことが全部はっきりしているプロジェクトであったら上に挙げたツラミはもしかしたら生じる余地はないのかもしれません。しかし、多くのプロジェクトは多かれ少なかれ「不確かさ」をはらんでいるはずで、そのようなとき、上のツラミが生じることは十分あり得ると思います。この記事では、何らかの方法で少しでもこれを軽減できないかと考えて自分が考え出した1つの解決策を紹介します。

ナイーブな解決策と問題点

上のような問題点を解決する方法として真っ先に思いつくのは、「ファイルをダウンロードして、その後にリンクをどこかにメモしておく」という方法かと思います。しかし、この方法の場合には「リンクをどこかにメモしておく」というステップはスキップ可能で、ダウンロード→分析という過程が簡単に取れてしまいます。したがってこの方法ではツラミが生じる余地を十分減らせていないでしょう。

自分が考えた解決策

そこで私は、「ファイルをダウンロードする」と「リンクをどこかにメモしておく」の順番を逆にすることで問題を軽減できないかと考えました。つまり、必要なデータのリンクをどこかで一元管理しておいて、データのダウンロードはそれを使って行うという方法にすればデータの管理がしやすいのではないかと考えました。

以下が現状で私が使っているデータ管理の形式です。データ分析はPythonでやることが多いので、データ管理にもPythonを使って書いています。

resources = [
	{
		"bucket": "bucket1",
		"blob": "project1",
		"subblob": "experiment1-yyyymmd'd'",
		"files": [
			"parent_dir/file1",
			"file2",
			"file3",
		],
		"destination": "path/to/save/experiment1/yyyymmd'd'",
		"skil_if_exist": True,
		"description": "〇〇を✗✗にして実験した結果",
	},
	{
		"bucket": "bucket1",
		"blob": "project1",
		"subblob": "experiment1-yyyymmdd",
		"files": [
			"parent_dir/file1",
			"file2",
			"file3",
		],
		"destination": "path/to/save/experiment1/yyyymmdd",
		"skil_if_exist": True,
		"description": "〇〇を△△にして実験した結果",
	}
]

この形式にして実際に分析をしてみて感じた利点は、

  • 同じ実験のファイルが一目でわかりやすい
  • ファイル名とパス、ファイルの説明が一箇所にまとまっている
  • 特定の実験で追加のファイルが必要になったときに変更が容易
  • この形式と決めておけば実際のダウンロードに使うスクリプトは使い回せるので楽

だと思っています。

また、まだ試してはいませんが、ダウンロードしたファイルを分析するために読み込むときにも resources を利用できれば、例えば似た名前のファイルを間違って読み込んで分析した、といったミスも減らせる可能性もあります。

この手法のテンプレートはGitHubで公開していますので興味のある人は見てみてください。

github.com

他の手法との比較

Pythonで何かしらのジョブ実行をサポートするものはいくつもあります。その代表例とも言えるのがAirflowです。

airflow.apache.org

Airflowでは上のリンクにあるように、GCSへの多様な操作をできるようになっています。Airflowと比較すると上で紹介した方法というのは機能性という点では劣ってしまいますが、あえて(半分無理やり)Airflowに対する利点を挙げると以下のようなものがあると思います。

  • 実験データ管理だけしたいときには紹介した方法は必要最低限の機能が備わっている
    • Airflowはワークフローエンジンなので実験データ管理以外にも様々な機能がありすぎる
  • データを分析する部分は自由度が大きい
    • 分析部分についてはDAGを作って実行というより試行錯誤しながらやりたいことが多い

補足

実際にダウンロードする際に使うコード
def _download_resource(
    files: list[str],
    destination_dir: str,
    gcs_base: str,
) -> None:
    targets = " ".join([gcs_base + f for f in files])

    result = subprocess.call(
        [
            "gsutil",
            "-m",
            "cp",
            targets,
            destination_dir,
        ]
    )
    if result != 0:
        raise RuntimeError("Script Failed")

def _build_source_and_destination(
    project: str, resource: dict[str, Any]
) -> tuple[str, str]:
    source = f"gs://{resource['bucket']}/{resource['blob']}"
    if resource["subblob"] != "":
        source += f"/{resource['subblob']}/"

    destination = f"{config.REPO_ROOT}/data/{project}/{resource['destination']}"

    return source, destination


def download(project: str, resources: list[dict[str, Any]]):
    for rs in resources:
        source, destination = _build_source_and_destination(project, rs)

        _download_resource(
            rs["files"],
            destination,
            source,
        )

例えばGoogle Cloud StorageからCLI経由でダウンロードする際には、以下のようなコードを使うことができます。(権限があればCloud SDKを使う方法もあります)

「因果推論の科学」を読んだ

最近話題になっていた「因果推論の科学」という本を読んだので感想みたいなものをつらつら書いてみました。


この本はジューディア・パールが2018年に書いた "The Book of Why: The New Science of Cause and Effect " の邦訳版です。英語圏ではすでに非常に評判が良いらしくAmazonでも1,500近くのレビューがついて平均4.4の評価となっています。

著者のジューディア・パールはベイジアンネットワークの研究で有名な方で、チューリング賞も受賞されています(ソース: Wikipedia)。

簡単にまとめると

この本がベースとしているのが「因果のはしご」という考え方です。因果のはしごは3段からなり、下から順に以下のように説明されています。

  • 関連付け:現実を観察してその中に規則性を見つけ、予測に用いる
  • 介入:ある行動をしたときに結果がどうなるか予測する
  • 反事実:現実とは異なる状況を仮定したら結果はどのようになるか予測する

最初にこのはしごに沿ってどう統計学が発展してきたかという歴史的な話がされます。その中で、現状の深層学習を使ったAIはまだはしごの1段目にいるので「強いAI」を作るには因果関係を組み込んではしごを登る必要があるということが書かれています。では因果関係はどうやって定式化されるのか?という話が続いて因果ダイヤグラムという道具が出てきます。そして因果ダイヤグラムを使って介入や反事実がどう表現されるか、具体例を多く交えながら(かつ少ない数式で)説明されています。

データは何も教えてくれない

ちょっと過激なこのセクションタイトルは第一章に出てくる言葉です。

データを見れば、たとえば、ある薬を服用した人が服用しなかった人よりも早く回復したことだけはわかるかもしれない。しかし、「なぜそうなったか」という理由はわからない。もしかすると、その薬を服用した人は、そうする金銭的余裕があったからそうしたまでで、服用しなかったとしても、結局は同じくらい早く回復したかもしれない。

わたしたちがデータを使って知りたいことは、ある行動が結果に対してどのような影響をもたらしたか、つまり因果関係であることが多いです。しかしデータ単体では因果関係を明らかにすることはできません(因果のはしごの1段目の問には答えられるが2段目以上の問には答えられない)。上の例でいうと、薬を服用した人と服用していない人のそれぞれに対する回復までにかかる時間のデータがあって、そこから「薬に効果があったか」という問に答えを出したいとき、分析者の頭の中には下のような因果ダイヤグラムがあります(本当はもっと様々な要因がダイヤグラムには現れるはずですがここでは上の引用に挙げられている要因だけを考えます)。

データに加えて上のような「モデル」を組み合わせることで「薬に効果があったか」という問に対する1つの答えを得ることができます。このモデルでは、経済力が交絡因子であると考えているので経済力を固定してバッグドアを閉じという分析をすることになります。

このように、データから因果のはしごの2段目の問に答えるにはデータの背後にある因果関係をどのようにモデリングするかが重要になります。

データの生成過程を知ることの重要性

データそのものよりもその生成過程が重要であるという事実を、本ではモンティー・ホールのパラドックスを使ってわかりやすく説明しています。モンティー・ホールのパラドックスは一種のくじ引き的な状況で生じるパラドックスです。3つの扉がありそのうち1つの後ろには新車が置かれていて、参加者はその扉を選べば新車がもらえるという状況です。参加者が開ける扉を選択したあとにくじを作った主催者は参加者が選ばなかった扉の中から新車の扉以外を開き、参加者に選択した扉を変更できることを告げます。このとき参加者は選択した扉を変えるべきかというのが問題です。

直感的に考えると選択を変更しても変更しなくても確率は変わらない気がしますが、実際には変更しない場合に新車がもらえる確率は1/3、変更した場合には2/3となります。本の中ではこの状況を少し変えたバージョンも挙げて比較しています。参加者が最初に扉を選んだあとに主催者が扉を開けるとき、開ける扉は完全にランダムで新車がある扉も開ける可能性があるというバージョンです。実はこちらのバージョンでは参加者が選択を変更してもしなくても新車がもらえる確率は1/3です。

この2つの状況からデータを取ろうとすると両方とも「参加者の選択した扉」、「新車の位置」、「主催者の開いたドア」に関するデータが取れます。データを比較すればアレンジバージョンの方では参加者が選択を変えても新車を得られる確率が変わらないという事実はわかりますが、なぜそうなっているのかはわかりません。両者の状況では実は、データの生成過程が異なります。両者で因果ダイヤグラムを書くと前者では新車の位置と主催者の扉選択の間に因果関係がありますが後者ではありません。つまり、本の中の一節を引用するならば

情報をどのようにして得たかは、情報そのものと同じくらい重要

ということです。

モデルを作る難しさ

一方で「モデルを正しく作る」という作業も簡単なものではありません。本の中で

因果関係の存在は、因果関係が存在するという前提で状況を見ていないと発見できない

と書いてあるように、正しい分析のためにはデータが生成される過程を理解し、不確かな部分には仮説を立てることが重要です。余談ですが、本の中では因果推論の研究が長い間統計学で敬遠されてきた理由を述べていて下のような一節が出てきます。

クローは、無視された理由をこう推測している。パス解析は、「あらかじめ用意された手順にただ従えばいい、というものではなかった。パス解析を行う者は、まず自分で仮説を立て、複数の因果関係をまとめた適切なダイヤグラムを作成する必要があった」。クローの指摘は本質を突いている。あらゆる因果推論がそうであるように、パス解析にもまた科学的思考が不可欠だ。ところが統計学では、科学的思考は敬遠され、むしろ決められた手順に従うことをよしとする場合が多い。自らの科学的知識が試されるような手法は敬遠され、データの数値を使って決まった手順で計算すればいいという手法が好まれるのである。

「科学的知識」はデータサイエンスの文脈では「ドメイン知識」と言われることが多いかもしれません。やはり適切なモデルを作るための手順書のようなものは存在しないので粘り強く仮説検証をすることが重要ということがわかりますね。

感想

自分は因果推論に関しては初心者で、入門書としてこの本を読んでみました。中身は数学書のように数式がいっぱい出て来るというわけでもなく(そもそも縦書きですし)、様々な実例を挙げて説明がなされているのでとてもわかり易かったです。

実は過去にPythonを使った因果推論の参考書をちょこっと読んだことがあったのですが、その中ではバックドア基準はこういうもんだから受け入れろという感じの書き方だったのでいまいち理解できていませんでした。しかしこの本では文章でですが重要な部分がちゃんと説明されていたので、消化不良を起こさずに理解できた気がしています。

「ベイズ統計モデリングによるデータ分析入門」を読んだ

ベイズを勉強してみたいと思って本を探していたら、こちらの「RとStanで始めるベイズ統計モデリングによるデータ分析入門」がわかりやすいと評判だったので買ってみた。

自分はRはほとんど書いたことがないのだが、それでもこの本はわかりやすかった。

構成

この本の前半はプログラムを書くよりむしろベイズ統計モデリングの理論的な側面に重点をおいて説明されている。最初に中学・高校あたりで習う確率の知識をさらっと復習したあとに確率分布、ベイズ統計、そしてマルコフ連鎖モンテカルロ法(MCMC)の説明がある。

そしてRの簡単な説明をはさんで、実践編として

  • 一般化線形モデル
  • 一般化線形混合モデル
  • 状態空間モデル

を実際に試してみるという構成になっている。ガチガチの専門書のようにMCMCアルゴリズムが数式をふんだんに使って説明されているわけではなく、むしろアルゴリズムが何をしたくてこのようなことをやっているかという点が詳しく説明されている。

実践編ではシンプルな一般化線形モデルから具体的なデータセットを用いた演習が始まり、一般化線形混合モデル、状態空間モデルへと展開されていく。話の流れとしては前のものをどんどんと拡張していくという方向へ向かっていくので、ここの章の内容まででは表現できなかった部分がうまくモデリングされていく過程を実体験することができ、読んでいて飽きない。

また、実践編ではRを使ってチュートリアルのように自分で手を動かすことができる。自分は普段、データ分析を行う際にはPythonを使うため、Rはほとんど初心者であるが、RはJupyter Notebookでも使えるので環境の構築は簡単だった。一つ大変だったのはグラフの大きさをなかなか変えられなかったくらいだと思う。グラフの大きさを変えても一番外枠の大きさは変わらず、その中でグラフのアスペクト比だけが変わってしまって大変だった(伝われ)。

この本を読み終わって思ったのだが、この本のあとに久保拓弥先生の緑本を読むといいかもしれない。

この本もRを使っている本(ただしMCMCの実験にはStanではなくWinBUGSというソフトウェアを使っている)であるが、少し馬場先生の本と比べると専門色が濃く、馬場先生の本では詳しくは説明されていなかった過分散が詳しく説明されている他、階層ベイズモデルまで説明されている。自分も何回か読んだはずなのだがそこまで内容を覚えていないのでこの機会にもう一度読み返してみようかななどと思っている。

Docker for Windowsがストレージを解放しない問題

Docker for Windowsにおいて、イメージを削除してもディスクの容量が解放されないという問題があるらしい。私の環境でも確認してみたら、Dockerが50 GBほどの容量を使っていることがわかった。コンテナは毎回使うごとに消しているし、volumeやnetworkを docker volumes prune等を使って消してもこんなだったので解決策を探してみた。

結論から言うと、Windows Homeの場合にはDockerのデータを全部消さないと現時点では解決しないみたいである(Windows Proだと圧縮とかできるみたいだが)。
github.com
上のIssueは2019年に出されたみたいだがまだ解決していないようなので結構根深い問題なのかもしれない。

データが全部消えてもDockerfileは残してあるし、必要ならばすぐにビルドしなおせばよさそうだったのでやってみた。

f:id:pyhaya:20220128112523p:plain
Docker Desktop for Windowsの起動画面
Docker Desktop for Windowsを起動して右上のTrouble Shootingを開く(虫のマークのやつ)
f:id:pyhaya:20220128112529p:plain
Trouble Shootingの画面
Clean/Purge Data(赤くなっているやつ)を押す。 f:id:pyhaya:20220128112545p:plain WSLにチェックを入れてDelete。いままでビルドしたイメージが全部消えるのでいくつかビルドしなおしてちゃんとビルドできることを確認してみた後、Dockerのデータの容量を確認してみた。 f:id:pyhaya:20220128112527p:plain わかりにくいが容量が5 GBほどになっており、消す前の1/10の容量になった。
いままでビルドしてきたイメージやボリュームが消えてもすぐ復旧できるという状況の人は試してみると幸せになれるかもしれない。

SRGANで画像の高解像度化

今回はGANを使った初期の画像の高解像度化モデルであるSRGANを実装してみたので紹介したいと思います。

実行環境

今回のモデルは以下のような環境で実装しています。

ソースコードは以下に公開しています。
github.com


検証に用いたデータセットはDIV2Kで以下からダウンロードしました。
data.vision.ee.ethz.ch

訓練に用いるために、各画像から20枚ずつ32x32のランダムクロップを行いました。

SRGANとは

SRGANとは、その名前にもあるようにGAN(敵対的生成ネットワーク)を使って、入力画像に対して解像度が上がった画像を生成しようという目的で作られたモデルです。

f:id:pyhaya:20210822222355p:plain
https://arxiv.org/pdf/1609.04802.pdf から引用

モデルの実装

モデルはTensorflowのKeras APIを使って実装します。

import tensorflow as tf
from tensorflow.keras.layers import (
    Conv2D,
    Dense,
    PReLU,
    BatchNormalization,
    LeakyReLU,
    Flatten,
)
from tensorflow.keras import Sequential, Model


class BResidualBlock(Model):
    def __init__(self):
        super(BResidualBlock, self).__init__()

        self.conv = Conv2D(filters=64, kernel_size=3, strides=1, padding="same")
        self.bn = BatchNormalization(momentum=0.8)
        self.prelu = PReLU(shared_axes=[1, 2])

        self.conv2 = Conv2D(filters=64, kernel_size=3, strides=1, padding="same")
        self.bn2 = BatchNormalization(momentum=0.8)

    def call(self, input_tensor, training=True):
        x = self.conv(input_tensor)
        x = self.bn(x, training=training)
        x = self.prelu(x)

        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x += input_tensor

        return x


class ResidualBlock(Model):
    def __init__(self):
        super(ResidualBlock, self).__init__()

        self.residual1 = BResidualBlock()
        self.residual2 = BResidualBlock()
        self.residual3 = BResidualBlock()
        self.residual4 = BResidualBlock()
        self.residual5 = BResidualBlock()

        self.conv = Conv2D(filters=64, kernel_size=3, padding="same")
        self.bn = BatchNormalization(momentum=0.8)

    def call(self, input_tensor, training=True):
        x = self.residual1(input_tensor)
        x = self.residual2(x, training=training)
        x = self.residual3(x, training=training)
        x = self.residual4(x, training=training)
        x = self.residual5(x, training=training)

        x = self.conv(x)
        x = self.bn(x)

        x += input_tensor

        return x


class DiscriminatorBlock(Model):
    def __init__(self, filters=128):
        super(DiscriminatorBlock, self).__init__()
        self.filters = filters

        self.conv1 = Conv2D(filters=filters, kernel_size=3, strides=1, padding="same")
        self.bn1 = BatchNormalization(momentum=0.8)
        self.lrelu1 = LeakyReLU(alpha=0.2)
        self.conv2 = Conv2D(filters=filters, kernel_size=3, strides=2, padding="same")
        self.bn2 = BatchNormalization(momentum=0.8)
        self.lrelu2 = LeakyReLU(alpha=0.2)

    def call(self, input_tensor, training=True):
        x = self.conv1(input_tensor)
        x = self.bn1(x, training=training)
        x = self.lrelu1(x)
        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = self.lrelu2(x)

        return x


class PixelShuffler(tf.keras.layers.Layer):
    def __init__(self):
        super(PixelShuffler, self).__init__()

    def call(self, input_tensor):
        x = tf.nn.depth_to_space(input_tensor, 2)

        return x


def make_generator():
    model = Sequential(
        [
            Conv2D(filters=64, kernel_size=9, padding="same"),
            PReLU(shared_axes=[1, 2]),
            ResidualBlock(),
            Conv2D(filters=256, kernel_size=3, padding="same"),
            PixelShuffler(),
            PReLU(shared_axes=[1, 2]),
            Conv2D(filters=256, kernel_size=3, padding="same"),
            PixelShuffler(),
            PReLU(shared_axes=[1, 2]),
            Conv2D(filters=3, kernel_size=9, padding="same"),
        ]
    )

    return model


def make_discriminator():
    model = Sequential(
        [
            Conv2D(filters=64, kernel_size=3, padding="same"),
            LeakyReLU(alpha=0.2),
            Conv2D(filters=64, kernel_size=3, strides=2, padding="same"),
            BatchNormalization(momentum=0.8),
            LeakyReLU(alpha=0.2),
            DiscriminatorBlock(128),
            DiscriminatorBlock(256),
            DiscriminatorBlock(512),
            Flatten(),
            Dense(1024),
            LeakyReLU(alpha=0.2),
            Dense(1, activation="sigmoid"),
        ]
    )

    return model

def make_vgg(height, width):
    vgg = tf.keras.applications.vgg19.VGG19(include_top=False, weights="imagenet")
    partial_vgg = tf.keras.Model(
        inputs=vgg.input, outputs=vgg.get_layer("block5_conv4").output
    )
    partial_vgg.trainable = False
    partial_vgg.build(input_shape=(None, height * 4, width * 4, 3))

    return partial_vgg

レーニングの実行

Generatorのトレーニン

このモデルでは、GeneratorとDiscriminatorを同時に訓練するのではなく、先にGeneratorを訓練しておきます。これによりGeneratorが "local optima" にハマるのを防ぎます。Generatorの訓練では損失関数にMeanSquaredErrorを使います。

class SRResNetTrainer:
    def __init__(
        self,
        epochs: int = 10000,
        batch_size: int = 32,
        learning_rate: float = 1e-4,
        training_data_path: str = "./datasets/train.tfrecords",
        validate_data_path: str = "./datasets/valid.tfrecords",
        height: int = 32,
        width: int = 32,
        g_weight: str = None,
        checkpoint_path: str = "./checkpoint",
        best_generator_loss: float = 1e9,
    ):
        self.epochs = epochs
        self.batch_size = batch_size

        self.generator = make_generator()
        if g_weight is not None and g_weight != "":
            print("Loading weights on generator...")
            self.generator.load_weights(g_weight)

        self.train_data, self.validate_data = prepare_from_tfrecords(
            train_data=training_data_path,
            validate_data=validate_data_path,
            height=height,
            width=width,
            batch_size=batch_size,
        )
        self.mse_loss = tf.keras.losses.MeanSquaredError()
        self.best_generator_loss = best_generator_loss
        self.generator_optimizer = tf.keras.optimizers.Adam(learning_rate)

        self.checkpoint_path = checkpoint_path
        self.make_checkpoint = len(checkpoint_path) > 0

        current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
        train_log_dir = "./logs/" + current_time + "/train_generator"
        valid_log_dir = "./logs/" + current_time + "/valid_generator"
        self.train_summary_writer = tf.summary.create_file_writer(train_log_dir)
        self.valid_summary_writer = tf.summary.create_file_writer(valid_log_dir)

    @tf.function
    def train_step(self, lr: tf.Tensor, hr: tf.Tensor):
        with tf.GradientTape() as tape:
            generated_fake = self.generator(lr)
            g_loss = self.mse_loss(generated_fake, hr)

        generator_grad = tape.gradient(g_loss, self.generator.trainable_variables)
        self.generator_optimizer.apply_gradients(
            grads_and_vars=zip(generator_grad, self.generator.trainable_variables)
        )

        return g_loss

    @tf.function
    def validation_step(self, lr: tf.Tensor, hr: tf.Tensor):
        generated_fake = self.generator(lr)
        g_loss = self.mse_loss(generated_fake, hr)

        return g_loss

    def train(self, start_epoch=0):
        for step in range(start_epoch, self.epochs):
            g_loss_train = []
            for images in tqdm(self.train_data):
                g_loss = self.train_step(images["low"], images["high"])
                g_loss_train.append(g_loss.numpy())

            g_loss_train_mean = np.mean(g_loss_train)

            with self.train_summary_writer.as_default():
                tf.summary.scalar("g_loss", g_loss_train_mean, step=step)

            print(
                f"Epoch {step+ 1}| Generator-Loss: {g_loss_train_mean:.3e},",
            )

            g_loss_valid = []
            for images in tqdm(self.validate_data):
                g_loss = self.validation_step(images["low"], images["high"])
                g_loss_valid.append(g_loss)

            g_loss_valid_mean = np.mean(g_loss_valid)

            with self.valid_summary_writer.as_default():
                tf.summary.scalar("g_loss", g_loss_valid_mean, step=step)

            print(
                f"Validation| Generator-Loss: {g_loss_valid_mean:.3e},",
            )

            if self.make_checkpoint:
                self.generator.save_weights(f"{self.checkpoint_path}/generator_last")

                if g_loss_valid_mean < self.best_generator_loss:
                    self.best_generator_loss = g_loss_valid_mean
                    self.generator.save_weights(
                        f"{self.checkpoint_path}/generator_best"
                    )

                    print("Model Saved")
GANの訓練

Generatorを訓練したら、その重みを使ってGANの訓練を行います。先ほどとは違い、Generatorの損失関数にはBinaryCrossentropyとcontent lossと呼ばれるものを使っています。このロスでは、正解の高解像度画像とGeneratorの生成した画像をそれぞれVGG19に入れ、その中間層の出力を比べたときの二乗誤差を計算します。

class SRGANTrainer:
    def __init__(
        self,
        epochs: int = 100,
        batch_size: int = 16,
        learning_rate: float = 1e-4,
        height: int = 32,
        width: int = 32,
        g_weight: str = None,
        d_weight: str = None,
        training_data_path: str = "./datasets/train.tfrecords",
        validate_data_path: str = "./datasets/valid.tfrecords",
        checkpoint_path: str = "./checkpoints",
        best_generator_loss: float = 1e9,
    ):
        # -----------------------------
        # Hyper-parameters
        # -----------------------------
        self.epochs = epochs
        self.batch_size = batch_size

        # -----------------------------
        # Model
        # -----------------------------
        self.generator = make_generator()
        self.discriminator = make_discriminator()
        self.vgg = make_vgg(height=height, width=width)

        if g_weight is not None and g_weight != "":
            print("Loading weights on generator...")
            self.generator.load_weights(g_weight)
        if d_weight is not None and d_weight != "":
            print("Loading weights on discriminator...")
            self.discriminator.load_weights(d_weight)

        # -----------------------------
        # Data
        # -----------------------------
        self.train_data, self.validate_data = prepare_from_tfrecords(
            train_data=training_data_path,
            validate_data=validate_data_path,
            height=height,
            width=width,
            batch_size=batch_size,
        )

        # -----------------------------
        # Loss
        # -----------------------------
        self.discriminator_loss_fn = tf.keras.losses.BinaryCrossentropy(
            from_logits=False
        )
        self.mse_loss = tf.keras.losses.MeanSquaredError()
        self.bce_loss = tf.keras.losses.BinaryCrossentropy(from_logits=False)

        self.best_generator_loss = best_generator_loss

        # -----------------------------
        # Optimizer
        # -----------------------------
        self.discriminator_optimizer = tf.keras.optimizers.Adam(learning_rate)
        self.generator_optimizer = tf.keras.optimizers.Adam(learning_rate)

        # -----------------------------
        # Summary Writer
        # -----------------------------
        current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
        train_log_dir = "./logs/" + current_time + "/train"
        valid_log_dir = "./logs/" + current_time + "/valid"
        self.train_summary_writer = tf.summary.create_file_writer(train_log_dir)
        self.valid_summary_writer = tf.summary.create_file_writer(valid_log_dir)

        self.checkpoint_path = checkpoint_path
        self.make_checkpoint = len(checkpoint_path) > 0

    @tf.function
    def _content_loss(self, lr: tf.Tensor, hr: tf.Tensor):
        lr = (lr + 1) * 127.5
        hr = (hr + 1) * 127.5

        lr = tf.keras.applications.vgg19.preprocess_input(lr)
        hr = tf.keras.applications.vgg19.preprocess_input(hr)
        lr_vgg = self.vgg(lr) / 12.75
        hr_vgg = self.vgg(hr) / 12.75

        return self.mse_loss(lr_vgg, hr_vgg)

    def _adversarial_loss(self, output):
        return self.bce_loss(tf.ones_like(output), output)

    @tf.function
    def train_step(self, lr: tf.Tensor, hr: tf.Tensor) -> tuple[tf.Tensor]:
        with tf.GradientTape() as g_tape, tf.GradientTape() as d_tape:
            generated_fake = self.generator(lr)

            real = self.discriminator(hr)
            fake = self.discriminator(generated_fake)

            d_loss = self.discriminator_loss_fn(real, tf.ones_like(real))
            d_loss += self.discriminator_loss_fn(fake, tf.zeros_like(fake))

            g_loss = self._content_loss(generated_fake, hr)
            g_loss += self._adversarial_loss(generated_fake) * 1e-3

        discriminator_grad = d_tape.gradient(
            d_loss, self.discriminator.trainable_variables
        )
        generator_grad = g_tape.gradient(g_loss, self.generator.trainable_variables)
        self.discriminator_optimizer.apply_gradients(
            grads_and_vars=zip(
                discriminator_grad, self.discriminator.trainable_variables
            )
        )
        self.generator_optimizer.apply_gradients(
            grads_and_vars=zip(generator_grad, self.generator.trainable_variables)
        )

        return g_loss, d_loss

    @tf.function
    def validation_step(self, lr: tf.Tensor, hr: tf.Tensor):
        generated_fake = self.generator(lr)
        real = self.discriminator(hr)
        fake = self.discriminator(generated_fake)

        d_loss = self.discriminator_loss_fn(real, tf.ones_like(real))
        d_loss += self.discriminator_loss_fn(fake, tf.zeros_like(fake))

        g_loss = self._content_loss(generated_fake, hr)
        g_loss += self._adversarial_loss(generated_fake) * 1e-3

        return g_loss, d_loss

    def train(self, start_epoch):
        for step in range(start_epoch, self.epochs):
            d_loss_train = []
            g_loss_train = []
            for images in tqdm(self.train_data):
                g_loss, d_loss = self.train_step(images["low"], images["high"])
                g_loss_train.append(g_loss.numpy())
                d_loss_train.append(d_loss.numpy())

            g_loss_train_mean = np.mean(g_loss_train)
            d_loss_train_mean = np.mean(d_loss_train)

            with self.train_summary_writer.as_default():
                tf.summary.scalar("g_loss", g_loss_train_mean, step=step)
                tf.summary.scalar("d_loss", d_loss_train_mean, step=step)

            print(
                f"Epoch {step+ 1}| Generator-Loss: {g_loss_train_mean:.3e},",
                f"Discriminator-Loss: {d_loss_train_mean:.3e}",
            )

            d_loss_valid = []
            g_loss_valid = []
            for images in tqdm(self.validate_data):
                g_loss, d_loss = self.validation_step(images["low"], images["high"])
                d_loss_valid.append(d_loss)
                g_loss_valid.append(g_loss)

            g_loss_valid_mean = np.mean(g_loss_valid)
            d_loss_valid_mean = np.mean(d_loss_valid)

            with self.valid_summary_writer.as_default():
                tf.summary.scalar("g_loss", g_loss_valid_mean, step=step)
                tf.summary.scalar("d_loss", d_loss_valid_mean, step=step)

            print(
                f"Validation| Generator-Loss: {g_loss_valid_mean:.3e},",
                f"Discriminator-Loss: {d_loss_valid_mean:.3e}",
            )

            if self.make_checkpoint:
                self.generator.save_weights(f"{self.checkpoint_path}/generator_last")
                self.discriminator.save_weights(
                    f"{self.checkpoint_path}/discriminator_last"
                )

                if g_loss_valid_mean < self.best_generator_loss:
                    self.best_generator_loss = g_loss_valid_mean
                    self.generator.save_weights(
                        f"{self.checkpoint_path}/generator_best"
                    )
                    self.discriminator.save_weights(
                        f"{self.checkpoint_path}/discriminator_best"
                    )

                    print("Model Saved")

訓練結果

訓練したモデルを使ってテストデータの高解像度化を行ってみました。ところどころノイズは載っていますが概ねよく高解像度化できているように見えます。

f:id:pyhaya:20210905214807p:plain

Django + Plotly.js でグラフをリアルタイム更新

最近、測定器からデータを取得してリアルタイムにデータを表示して確認したいという願望があってそれを実現する方法を考えてました。その1つの解決策としてDjangoを使ってWebアプリケーションのような形で実現する方法を思いつき、試しに作ってみたのでそれを共有したいと思います。

この記事では、プロットするデータは測定器から発生していますがPythonで得られるデータであればなんでもこの方法は応用できます。

f:id:pyhaya:20210815224932j:plain
Photo by Isaac Smith on Unsplash

なぜ Django?

  • 測定器からのデータは GPIB 通信というもので来ていたのでそれを処理できる必要がある -> Python, C
  • グラフを動的に変化させたい -> JavaScript

という要件があったので、Djangoを使ってWebアプリケーションとして書けば要件を満たすことができると考えました。

この記事で書かないこと

PyVISAを用いた測定器との通信方法

環境

構成

フロントエンド側 (HTML + JavaScript)

HTML

<script
  type="text/javascript"
  src="{% static 'core/js/plotly-2.3.0.min.js' %}"
></script>

<div id="canvas" style="width: 700px; height: 400px"></div>
<form id="tds-settings" action="{% url 'core:tds_data' %}">
  <div id="stage-position-settings">
    <div id="start">
      <p>Start:</p>
      <input type="number" id="start-position" /> &mu;m
    </div>
    <div id="end">
      <p>End:</p>
      <input type="number" id="end-position" /> &mu;m
    </div>
    <div id="step">
      <p>Step:</p>
      <input type="number" id="moving-step" /> &mu;m
    </div>
  </div>
  <p>Lock-in time:</p>
  <input type="number" id="lockin-time" /> ms
  <input type="submit" value="Start" id="start-button" />
</form>

グラフをプロットするエリア(id="canvas")と測定の設定を決めるフォームを用意します。フォームがsubmitされたときのアクションにはDjango側で用意するAPIのアドレスを入れます。

JavaScript

var trace1 = {
  x: [],
  y: [],
  type: "scatter",
};

var data = [trace1];
var layout = {
  width: 600,
  height: 400,
  margin: { l: 50, r: 0, b: 3, t: 20, pad: 5 },
  showlegend: true,
  legend: { orientation: "h" },
};

Plotly.newPlot("canvas", data, layout);
const _sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

document
  .getElementById("tds-settings")
  .addEventListener("submit", async (e) => {
    e.preventDefault();
    tds_measurement();
  });

async function tds_measurement() {
  let url = document.getElementById("tds-settings").action;
  let boot = tds_boot_url;

  let start = Number(document.getElementById("start-position").value);
  let end = Number(document.getElementById("end-position").value);
  let step = Number(document.getElementById("moving-step").value);
  let lockin = Number(document.getElementById("lockin-time").value);

  if (start >= end || start < 0 || end <= 0 || step <= 0 || lockin <= 0) {
    alert("Invalid Parameters");
    return;
  }

  let finished = false;
  fetch(boot, {
    method: "POST",
    body: `start=${start}&end=${end}&step=${step}&lockin=${lockin}`,
    headers: {
      "Content-Type": "application/x-www-form-urlencoded; charset=utf-8",
    },
  }).catch((error) => {
    finished = true;
  });
  await _sleep(1000);
  while (!finished) {
    await fetch(url, {
      method: "POST",
      body: "",
      headers: {
        "Content-Type": "application/x-www-form-urlencoded; charset=utf-8",
      },
    })
      .then((response) => {
        return response.json();
      })
      .then((responseJson) => {
        if (responseJson.status === "finished") {
          finished = true;
        }
        let x = responseJson.x;
        let y = responseJson.y;
        data[0].x = x;
        data[0].y = y;

        Plotly.update("canvas", data, layout);
      })
      .catch((error) => {
        alert("Error while measurement");
        finished = true;
      });

    await _sleep(500);
  }
}

JS側では、フォームがsubmitされるとまず、画面遷移が生じるないようにします(e.preventDefault())。Django側へは2回APIアクセスします。それぞれの役割は

  • 測定を開始する
  • 測定データを得る

となっていますが、なぜこのように2つに分けたかというと、1つにまとめてしまうと測定が終わるまでデータ点が得られないためです。Python側でエンドポイントを分けてあげることで測定シーケンスを進めるスレッドと測定データをフロント側に渡すスレッドができることになります。

測定を開始するリクエストを送ったあとは、測定が終了したという通知をPythonから受け取るまでは500msごとに測定データを要求します。

バックエンド側 (Python)

エンドポイントの設定
from django.urls import path

from . import views

app_name = "core"
urlpatterns = [
    path("tds/", views.TDS.as_view(), name="tds"),
    path("tds-data/", views.tds_data, name="tds_data"),
    path("tds-boot/", views.tds_boot, name="tds_boot"),
]

1つめは最初に書いたHTMLをテンプレートとするビュー、あとの2つはjsonを返すビューです。

ビュー
from django.views import View
from django.http import JsonResponse

wave_tds = WaveForm()
tds_running = False

class TDS(View):
    def get(self, request):
        return render(request, "core/tds.html")

def tds_boot(request):
    global tds_running

    tds_running = True
    wave_tds.clear()
    start = int(request.POST.get("start"))
    end = int(request.POST.get("end"))
    step = int(request.POST.get("step"))
    lockin = float(request.POST.get("lockin"))

    api_ops.tds_scan(start, end, step, lockin, wave_tds)  # 測定シーケンスの中身

    tds_running = False

    return JsonResponse({})


def tds_data(request):
    x = wave_tds.x
    y = wave_tds.y

    status = "running" if tds_running else "finished"

    return JsonResponse({"x": x, "y": y, "status": status})

測定シーケンスが走っているかいないかはグローバル変数 tds_runningで判断します。

実際の動作 (動画)

実際に動作している様子を動画にしました。データは適当にsinカーブを生成するようにしています。マウスを近づけると値が読めたり、詳しく見たい部分をズームアップできるのは嬉しいですね。
streamable.com