ビッグデータ分析で使用するPythonコード一覧
こちらに記事を移動しました。
ビッグデータ分析で使用するシェルコマンド
こちらに記事を移動しました。
TelegrafでWindowsのメトリクスを取得しInfluxDBに格納しGrafanaで可視化する
性能評価で Windows のメトリクスをリアルタイムで監視したかったので,Telagraf で Windows のメトリクスを取得して InfluxDB に格納し,Grafana で可視化した。
概要
ソフト | 内容 |
---|---|
InfluxDB | 時系列DB。メトリクスをここに格納する。 |
Grafana | 様々なデータソースを可視化するGUIツール。 |
Telegraf | 様々なメトリクスを influxdb に収集するコレクタ。 |
- 対象の Windows に Telegraf をインストールし,Telegraf から Windows のメトリクスを InfluxDB に流し,Grafana で可視化する。
- Telegraf -> InfluxDB -> Grafana
Docker コンテナ作成
InfluxDB と Grafana は Docker で用意する。
ディレクトリ
- ローカルに influxdb, grafana のデータを保持したり influxdb の設定ファイルを読み込んだり出来るようにディレクトリを用意する。
$ pwd C:\LoadTest\Docker $ ls grafana influxdb
influxdb.conf
データのディレクトリを設定する。
$ pwd C:\LoadTest\Docker $ vi influxb/etc/influxdb.conf
[meta] dir = "/var/lib/influxdb/meta" [data] dir = "/var/lib/influxdb/data" engine = "tsm1" wal-dir = "/var/lib/influxdb/wal"
docker-compose.yml
- InfluxDB, Grafana のコンテナを作成する。
$ pwd C:\LoadTest\Docker $ vi docker-compose.yml $ ls docker-compose.yml grafana influxdb
version: "3" services: influxdb: image: influxdb:latest ports: - "8086:8086" volumes: - ./influxdb/data:/var/lib/influxdb - ./influxdb/etc:/etc/influxdb grafana: image: grafana/grafana:latest ports: - "3000:3000" volumes: - ./grafana:/var/lib/grafana/ depends_on: - influxdb
コンテナ作成
$ pwd C:\LoadTest\Docker $ docker-compose up -d $ docker-compose ps Name Command State Ports ---------------------------------------------------------------------------------------- docker_grafana_1 /run.sh Up 0.0.0.0:3000->3000/tcp docker_influxdb_1 /entrypoint.sh influxd Up 0.0.0.0:8086->8086/tcp
InfluxDB
動作確認
$ curl -sl -I http://localhost:8086/ping HTTP/1.1 204 No Content Content-Type: application/json Request-Id: 47b37236-8a8b-11ea-8001-0242ac120002 X-Influxdb-Build: OSS X-Influxdb-Version: 1.8.0 X-Request-Id: 47b37236-8a8b-11ea-8001-0242ac120002 Date: Thu, 14 May 2020 00:44:43 GMT
DB 作成
- Windows のメトリクスを格納する DB を作成する。
$ curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE win_perf" HTTP/1.1 200 OK Content-Type: application/json Request-Id: 4e59158c-957c-11ea-8002-0242ac130003 X-Influxdb-Build: OSS X-Influxdb-Version: 1.8.0 X-Request-Id: 4e59158c-957c-11ea-8002-0242ac130003 Date: Thu, 14 May 2020 00:46:11 GMT Transfer-Encoding: chunked {"results":[{"statement_id":0}]}
Telegraf
パフォーマンスを見たい Windows にインストールする。
インストール
https://portal.influxdata.com/downloads から Windows 用の Telegraf をダウンロードする。
今回は https://dl.influxdata.com/telegraf/releases/telegraf-1.14.2_windows_amd64.zip をダウンロードした。
これを任意のディレクトリに C:\Program Files\telegraf 解凍した。
中身はシンプル。 設定ファイルと実行ファイルのみ。
> tree /F . C:\PROGRAM FILES\TELEGRAF telegraf.conf telegraf.exe
telegraf.conf
設定ファイルは InfluxDB の接続先とDB名を設定する。
[[outputs.influxdb]] urls = ["http://xxx.xxx.xxx.xxx:8086"] database = "win_perf"
起動
> cd C:\"Program Files"\telegraf
> .\telegraf.exe --config .\telegraf.conf
サービス化
> C:\"Program Files"\telegraf\telegraf.exe --service install
サービス起動/停止
> net start telegraf > net stop telegraf
grafana
- https://grafana.com/docs/grafana/latest/installation/migrating_to2/ より cofnig.js は廃止された模様。
管理者作成
- http://localhost:3000/login にアクセする。
- 下記のように入力し Login をクリックする。
userid | password |
---|---|
admin | admin |
data source (InfluxDB) 追加
- Add data source をクリックする。
- InfluxDB を選択する。
- 下記設定し Test & Save をクリックする。
項目 | 設定 |
---|---|
Name | InfluxDB_WinPerf |
URL | http://xxx.xxx.xxx.xxx:8086 |
Access | Server |
Database | win_perf |
User | admin |
Password | admin |
HTTP Method | GET |
フォルダ作成
- + アイコンをクリックし,Create\Folder を選択する。
- Name : win_perf
- Create をクリックする。
Dashboard 作成
MySQL 構築
VM 構築
Vagrant.configure("2") do |config| config.vm.box = "ubuntu/bionic64" config.vm.synced_folder "./share", "/home/vagrant/share", owner: "vagrant", group: "vagrant" #--- MySQL 構築 ---# config.vm.define "mysql" do | mysql | mysql.disksize.size = '90GB' mysql.vm.provider "virtualbox" do |vb| vb.memory = 8192 end mysql.vm.hostname = "mysql" mysql.vm.network "private_network", ip: "192.168.33.31" mysql.vm.provision :hosts, :sync_hosts => true end #--- MySQL 構築 ---# end
MySQL インストール
全て y にしました。
パスワードの強度は 0 にしました。
$ sudo apt update $ sudo apt install mysql-server -y $ sudo mysql_secure_installation Securing the MySQL server deployment. Connecting to MySQL using a blank password. VALIDATE PASSWORD PLUGIN can be used to test passwords and improve security. It checks the strength of password and allows the users to set only those passwords which are secure enough. Would you like to setup VALIDATE PASSWORD plugin? Press y|Y for Yes, any other key for No: y There are three levels of password validation policy: LOW Length >= 8 MEDIUM Length >= 8, numeric, mixed case, and special characters STRONG Length >= 8, numeric, mixed case, special characters and dictionary file Please enter 0 = LOW, 1 = MEDIUM and 2 = STRONG: 0 Please set the password for root here. New password: Re-enter new password: Estimated strength of the password: 50 Do you wish to continue with the password provided?(Press y|Y for Yes, any other key for No) : y By default, a MySQL installation has an anonymous user, allowing anyone to log into MySQL without having to have a user account created for them. This is intended only for testing, and to make the installation go a bit smoother. You should remove them before moving into a production environment. Remove anonymous users? (Press y|Y for Yes, any other key for No) : y Success. Normally, root should only be allowed to connect from 'localhost'. This ensures that someone cannot guess at the root password from the network. Disallow root login remotely? (Press y|Y for Yes, any other key for No) : y Success. By default, MySQL comes with a database named 'test' that anyone can access. This is also intended only for testing, and should be removed before moving into a production environment. Remove test database and access to it? (Press y|Y for Yes, any other key for No) : y - Dropping test database... Success. - Removing privileges on test database... Success. Reloading the privilege tables will ensure that all changes made so far will take effect immediately. Reload privilege tables now? (Press y|Y for Yes, any other key for No) : y Success. All done!
MySQL 設定
$ sudo vi /etc/mysql/mysql.conf.d/mysqld.cnf $ sudo systemctl restart mysql $ sudo vi /etc/mysql/conf.d/mysqldump.cnf $ sudo vi /etc/mysql/conf.d/mysql.cnf
# [mysqld] 配下に追記
character-set-server = utf8
default_password_lifetime = 0
# [mysqldump] 配下に追記
default-character-set=utf8
# [mysql] 配下に追記
default-character-set=utf8
MySQL 動作確認
$ sudo mysql -u root -p Enter password: Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 5 Server version: 5.7.28-0ubuntu0.18.04.4 (Ubuntu) Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> quit Bye
Pig をインストールする
象さん(Hadoop)を Pig で操作するために Pig をインストールしました。
Pig インストール
$ cd ~ $ wget https://archive.apache.org/dist/pig/pig-0.17.0/pig-0.17.0.tar.gz $ cd /opt $ sudo tar xvf ~/pig-0.17.0.tar.gz $ sudo chown -R hadoop:hadoop ./pig-0.17.0/ # シンボリックリンク作成 $ sudo ln -s pig-0.17.0 pig $ sudo chown -h hadoop:hadoop ./pig
Pig PATH 設定
Pig の PATH を通します。
# hadoop ユーザを作ったので hadoop ユーザで PATH を通しています。ここは各々のユーザで実行してください。 $ sudo su - hadoop $ cd ~ $ vi .bashrc $ source .bashrc
export PIG_HOME=/opt/pig export PATH=$PATH:$PIG_HOME/bin export PIG_CLASSPATH=$PIG_HOME/conf:$HADOOP_INSTALL/etc/hadoop
Pig 動作確認
$ pig --version Apache Pig version 0.17.0 (r1797386) compiled Jun 02 2017, 15:41:58
検証用に Python で Big Data を自作する
Hadoop と RDB でどれくらい処理時間に差が出るか手元で検証してみたかったので Python で Big Data を自作しました。
データ構成
テーブル | 説明 |
---|---|
売上 | 100,000,000 件の売上明細。 |
店舗 | 1,000,000 件の店舗。 |
エリア | 1,000 件の店舗エリア。 |
商品 | 10,000,000 件の商品。 |
分類 | 10,000 件の商品分類。 |
Big Data 生成
$ cd ~ $ ls generate_big_data.py $ sudo apt install python3 -y $ python3 generate_big_data.py $ du -h ./* 184K /home/vagrant/category.csv 8.0K /home/vagrant/generate_big_data.py 122M /home/vagrant/product.csv 3.8G /home/vagrant/sales.csv 11M /home/vagrant/shop.csv
プログラム
一定の件数ごとにファイルに吐き出すようにし OOM にならないようにしました。
import random import datetime import time # 店舗数: 1,000,000 SHOP_CNT = 1000000 # エリア数: 1,000 AREA_CNT = 1000 # 商品数: 10,000,000 PRODCUT_CNT = 10000000 # 商品区分: 10,000 CATEGORY_CNT = 10000 # 売上数: 100,000,000 SALES_CNT = 100000000 # 最大価格: 100,000 PRICE_MAX = 100000 # 最大購入数: 100 COUNT_MAX = 100 SHOP_DST = 'shop.csv' AREA_DST = 'area.txt' PRODUCT_DST = 'product.csv' CATEGORY_DST = 'category.csv' SALES_DST = 'sales.csv' # Table: shop # Column: id,area_code # id: 1 - 1,000,000 # area_code: 1 - 1,000 print('{} start: generate shop csv'.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'))) start = time.time() rows = [] # header # rows.append('id,area_code\n') for i in range(SHOP_CNT): shop_id = str(i + 1) area_code = str(random.randrange(1, AREA_CNT, 1)) rows.append('{},{}\n'.format(shop_id, area_code)) # 100,000 件ごとに出力する if((i + 1) % 100000 == 0): cnt = i + 1 print('shop rows: {}'.format(cnt)) with open(SHOP_DST, 'a', encoding='utf-8') as f: f.writelines(rows) rows = [] elapsed_time = time.time() - start print('{} finish: generate shop csv({} sec)'.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'), elapsed_time)) # Table: area # Column: area_code,area_name # area_code: 1 - 1,000 # area_name: area_0 - area_1000 print('{} start: generate area csv'.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'))) start = time.time() rows = [] # header # rows.append('area_code,area_name\n') for i in range(AREA_CNT): area_code = str(i + 1) area_name = 'area_' + str(i + 1) rows.append('{},{}\n'.format(area_code, area_name)) # 100 件ごとに出力する if((i + 1) % 100 == 0): cnt = i + 1 print('area rows: {}'.format(cnt)) with open(AREA_DST, 'a', encoding='utf-8') as f: f.writelines(rows) rows = [] elapsed_time = time.time() - start print('{} finish: generate area csv({} sec)'.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'), elapsed_time)) # Table: product # Column: id,category_code # id: 1 - 10,000,000 # category_code: 1 - 10,000 print('{} start: generate product csv'.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'))) start = time.time() rows = [] # header # rows.append('id,category_code\n') for i in range(PRODCUT_CNT): product_id = str(i + 1) category_code = str(random.randrange(1, CATEGORY_CNT, 1)) rows.append('{},{}\n'.format(product_id, category_code)) # 1,000,000 件ごとに出力する if((i + 1) % 1000000 == 0): cnt = i + 1 print('product rows: {}'.format(cnt)) with open(PRODUCT_DST, 'a', encoding='utf-8') as f: f.writelines(rows) rows = [] elapsed_time = time.time() - start print('{} finish: generate product csv({} sec)'.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'), elapsed_time)) # Table: category # Column: category_code,name # category_code: 1 - 10,000 # name: category_1 - category_10000 print('{} start: generate category csv'.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'))) start = time.time() rows = [] # header # rows.append('id,name\n') for i in range(CATEGORY_CNT): category_code = str(i + 1) category_name = 'category_' + str(i + 1) rows.append('{},{}\n'.format(category_code, category_name)) # 1,000 件ごとに出力する if((i + 1) % 1000 == 0): cnt = i + 1 print('category rows: {}'.format(cnt)) with open(CATEGORY_DST, 'a', encoding='utf-8') as f: f.writelines(rows) rows = [] elapsed_time = time.time() - start print('{} finish: generate category csv({} sec)'.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'), elapsed_time)) # Table: sales # Column: id,shop_id,product_id,price,count,total_price # id: 1 - 10,000,000 print('{} start: generate sales csv'.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'))) start = time.time() rows = [] # header # rows.append('id,shop_id,product_id,price,count,total_price\n') cnt = 0 for i in range(SALES_CNT): sales_id = str(i + 1) shop_id = str(random.randrange(1, SHOP_CNT, 1)) product_id = str(random.randrange(1, PRODCUT_CNT, 1)) price = str(random.randrange(1, PRICE_MAX, 10)) count = str(random.randrange(1, COUNT_MAX, 1)) total_price = str(int(price) * int(count)) rows.append('{},{},{},{},{},{}\n'.format(sales_id, shop_id, product_id, price, count, total_price)) # 10,000,000 件ごとに出力する if((i + 1) % 10000000 == 0): cnt = i + 1 print('sales rows: {}'.format(cnt)) with open(SALES_DST, 'a', encoding='utf-8') as f: f.writelines(rows) rows = [] elapsed_time = time.time() - start print('{} finish: generate sales csv({} sec)'.format(datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'), elapsed_time))
Hadoop Cluster を構築する
こちらに移行しました。