Java - File IO示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;

public class FileIoDemo
{

public static void main(String[] args)
{
// 先声明文件对象
File file = new File("D:" + File.separator + "out.txt");
try
{
System.out.println("CanonicalPath = " + file.getCanonicalPath());
}
catch(IOException e1)
{
e1.printStackTrace();
}

// FileWriter外面套一层buffer
try(BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(file)))
{
bufferedWriter.write("line1\n");
bufferedWriter.write("line2\n");
bufferedWriter.write("line3\n");
bufferedWriter.write("line4\n");
bufferedWriter.write("line5\n");
}
catch(IOException e)
{
e.printStackTrace();
}

// FileReader外面套一层buffer
try(BufferedReader bufferedReader = new BufferedReader(new FileReader(file)))
{
String curLine = null;
while((curLine = bufferedReader.readLine()) != null)
{
System.out.println(curLine);
}
}
catch(IOException e)
{
e.printStackTrace();
}
}
}

Java - 多线程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 定义一个类A实现Runnable接口;然后通过new Thread(new R())方式新建线程。
*/
public class MultiThreadingDemo implements Runnable
{
private static final Object lock = new Object();

private int tickets = 50;


@Override
public void run()
{
for(int i = 0; i < 10; ++i)
{
synchronized(lock)
{
--tickets;
System.out.println(Thread.currentThread().getName() + " sell 1 ticket (" + i + "st), " + tickets + " left");
}

try
{
Thread.sleep((long) Math.random() * 1000);
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}

}

public static void main(String[] args)
{
MultiThreadingDemo demo = new MultiThreadingDemo();
// new Thread(R)
Thread t1 = new Thread(demo);
// set thread name
Thread t2 = new Thread(demo, "thr2");
Thread t3 = new Thread(demo, "thr3");
Thread t4 = new Thread(demo, "thr4");
Thread t5 = new Thread(demo, "thr5");

t1.start();
t2.start();
t3.start();
t4.start();
t5.start();
}
}

Java - 反射示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;

/**
* Reflection机制允许程序在正在执行的过程中,利用Reflection APIs取得任何“已知名称”的类的内部信息,并动态生成instances、变更fields内容、调用methods等。
*/
public class ReflectionDemo
{
private int myInt;
private String myString;

public ReflectionDemo()
{
this.myInt = 0;
this.myString = "zzz";
}

public ReflectionDemo(int myInt, String myString)
{
this.myInt = myInt;
this.myString = myString;
}

public void printMyInt()
{
System.out.println(myInt);
}

public boolean isMyString(String s)
{
return myString.equals(s);
}

@Override
public String toString()
{
return "[" + myInt + ", " + myString + "]";
}

public static void main(String[] args) throws ReflectiveOperationException
{
// create instance
/**
* 几种获取Class对象的方式:
* 1. 类名称.class
* Class demoClass = ReflectionDemo.class;
* 2. 实例.class
* Class demoClass = new ReflectionDemo().getClass();
* 3. Class.forName
* Class demoClass = Class.forName("ReflectionDemo");
* 4. ClassLoader.loadClass
* Class demoClass = ClassLoader.getSystemClassLoader().loadClass("ReflectionDemo");
*/
//
Class demoClass = Class.forName("ReflectionDemo");

ReflectionDemo demo1 = (ReflectionDemo) demoClass.newInstance();// <----- default constructor
System.out.println(demo1);

Constructor ctor = demoClass.getDeclaredConstructor(int.class, String.class);
ReflectionDemo demo2 = (ReflectionDemo) ctor.newInstance(666, "hahaha");// <----- constructor with arguments
System.out.println(demo2);

// change field
Field field = demoClass.getDeclaredField("myInt");
field.setAccessible(true);
field.set(demo1, 999);

// invoke method
Method method1 = demoClass.getDeclaredMethod("printMyInt");
method1.invoke(demo1);
method1.invoke(demo2);
Method method2 = demoClass.getDeclaredMethod("isMyString", String.class);
System.out.println(method2.invoke(demo1, "hahaha"));
System.out.println(method2.invoke(demo2, "hahaha"));

// return instance for usage
// return demo1;
// return demo2;
}
}

Python - 文件IO

文件打开/关闭

文件打开:file object = open(file_name [, access_mode][, buffering])

  • access_mode:决定打开文件的模式。这个参数是非强制的,默认文件访问模式为只读(r)。
  • buffering:如果buffering的值被设为0,就不会有寄存。如果buffering的值取1,访问文件时会寄存行。如果将buffering的值设为大于1的整数,表明了这就是的寄存区的缓冲大小。如果取负值,寄存区的缓冲大小则为系统默认。

打开方式:

  • b:二进制模式。
  • +:打开一个文件进行更新(可读可写)。
模式 r r+ w w+ a a+
+ + + +
+ + + + +
创建 + + + +
覆盖 + +
指针在开始 + + + +
指针在结尾 + +

文件关闭:fileObject.close()

读文件

读取指定字节:fileObject.read([count])
读取一行:fileObject.readline()
读取所有行:fileObject.readlines()

高性能读逐行:

1
2
3
with open("filename") as fd:
for line in fd:
do_things(line)

写文件

写文件:fileObject.write(string)

  • write()方法可将任何字符串写入一个打开的文件。需要重点注意的是,Python字符串可以是二进制数据,而不是仅仅是文字。
  • write()方法不会在字符串的结尾添加换行符\n

Python - 基于FFT的周期性检测方法

核心思想

信号的连续/离散性 v.s. 周期性/非周期性,在时域与频域中是相互决定的。即:时域连续,频域非周期;时域离散,频域周期;时域周期,频域离散;时域非周期,频域连续。

基于FFT的周期性检测方法,具体流程为:(1)对输入信号做FFT;(2)检测FFT结果离散性;(3)根据FFT结果是否为离散,给出周期性判断结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# -*- coding: UTF-8 -*-

import numpy as np
from scipy.fftpack import fft, ifft
import matplotlib.pyplot as plt
import time
import random


def is_periodic(arr, z_value=0.02, frac_thr=0.5):
starttime = time.time()
f_arr = fft(arr)
f_arr = 2.0 / len(arr) * np.abs(f_arr)
idx_z = np.where(f_arr <= z_value)
print("idx_z count: %d, f_arr len: %d" % (len(idx_z[0]), len(f_arr)))
endtime = time.time()
print("time cost = %fs" % (endtime - starttime))
return len(idx_z[0]) * 1.0 / len(f_arr) >= frac_thr


def is_periodic2(arr, mmrate_thr=10, show=False, plot=False, filename=""):
starttime = time.time()

if type(arr) == list:
arr = np.array(arr)

# 准直流
# if len(np.where(arr > 0)[0]) >= 0.9 * len(arr):
# return False

# 标准化
arr_mean = np.mean(arr)
arr_std = np.std(arr)
if arr_std == 0:
arr_std = 1
arr = (arr - arr_mean) / arr_std
# 二值化
arr[np.where(arr > 0.01)[0]] = 1
arr[np.where(abs(arr) <= 0.01)[0]] = 0
arr[np.where(arr < -0.01)[0]] = -1

f_arr = fft(arr)
f_arr = 2.0 / len(arr) * np.abs(f_arr)

# 去除直流影响,仅考察0.1~0.5
median = float(np.median(f_arr[len(arr) // 10 : len(arr) // 10 * 5]))
max = float(np.max(f_arr[len(arr) // 10 : len(arr) // 10 * 5]))
# 计算max / median的比值,考察频域信号是否为离散冲击
if median != 0:
mmrate = max * 1.0 / median
else:
if max == 0:
mmrate = 0
else:
mmrate = mmrate_thr + 999
print("median: %f, max: %f, max_median rate: %f" % (median, max, mmrate))

endtime = time.time()
print("time cost = %fs" % (endtime - starttime))

if show or plot:
ts = len(arr)
n = len(arr)
fs = n * 1.0 / ts
x = np.linspace(0.0, ts, n, endpoint=False)
xf = np.linspace(0.0, 1.0 / 2.0 * fs, n // 2, endpoint=False)

plt.figure(figsize=(15, 6))
plt.subplot(121)
plt.plot(x, arr)
plt.title('Original Signal', fontsize=12)
#plt.ylim((0, 100))

plt.subplot(122)
plt.plot(xf, f_arr[: n // 2], 'g')
plt.title('FFT Transform Normalization', fontsize=12, color='g')

if plot:
plotfile = "%s_%.2f_%.2f_%.2f_" % (mmrate >= mmrate_thr, mmrate, max, median) + filename
plt.savefig(plotfile)
if show:
plt.show()

return mmrate >= mmrate_thr


if __name__ == '__main__':
# 采样时长ts = 288s。
ts = 288.0
# 采样点数n = 288。
n = 288
# 采样频率fs = 1Hz,即1s采1个点。
fs = n / ts

# 时域
x = np.linspace(0.0, ts, n, endpoint=False)

# ---------- 生成一段周期性数据 ----------
# 周期性冲击
y1 = np.zeros(n)
for i in range(0, n, 10):
y1[i] = random.randint(8, 12) * 1.0 / 10

# 周期性冲击 Type2
# full_list = []
# sub_list = [1, 0, 0, 1, 0]
# T = len(sub_list)
# T_count = n // T
# for _ in range(0, T_count):
# sub_list[0] = random.randint(8, 12) * 2.0 / 10
# sub_list[3] = random.randint(8, 12) * 2.0 / 10
# full_list = full_list + sub_list
# if len(full_list) > n:
# full_list = full_list[: n]
# else:
# full_list = full_list + [0] * (n - len(full_list))
# y1 = np.array(full_list)

# 正弦波 (相比而言,冲击总是正信号,有直流分量,频域0处会有较大值)
# y1 = np.sin(2 * np.pi * 0.1 * x) + 2 * np.sin(2 * np.pi * 0.2 * x)

# 近似周期性冲击 --> 此种信号(周期取值发生震荡)下性能不佳
# full_list = []
# T = 7
# T_count = n // T
# for _ in range(0, T_count):
# cur_list = [0] * random.randint(T - 1, T)
# cur_list[0] = random.randint(8, 12) * 1.0 / 10
# full_list = full_list + cur_list
# if len(full_list) > n:
# full_list = full_list[: n]
# else:
# full_list = full_list + [0] * (n - len(full_list))
# y1 = np.array(full_list)

# ---------- 生成一段非周期性数据 ----------
# 随机冲击
# y2 = np.zeros(n)
# idx = random.sample(range(0, len(x)), 5)
# for i in idx:
# y2[i] = random.randint(5, 10)
# 方波
y2 = np.zeros(n)
for i in range(100, 120):
y2[i] = 1

# ---------- 画图 ----------
plt.figure(figsize=(15, 6))
plt.subplot(231)
plt.plot(x[0:50], y1[0:50])
plt.title('Original Signal (Periodic)', fontsize=9)
plt.ylim((-1, 10))
plt.subplot(234)
plt.plot(x[0:], y2[0:])
plt.title('Original Signal (Aperiodic)', fontsize=9)
plt.ylim((-1, 10))

yf1 = fft(y1)
yf2 = fft(y2)
xf = np.linspace(0.0, 1.0 / 2.0 * fs, n // 2, endpoint=False)

# 归一化处理
yf1_n = 2.0 / n * np.abs(yf1)
xf1_n = xf
print("FFT Periodic")
print(yf1_n)
yf2_n = 2.0 / n * np.abs(yf2)
xf2_n = xf
print("FFT Aperiodic")
print(yf2_n)

plt.subplot(232)
plt.plot(xf1_n, yf1_n[: n // 2], 'g')
plt.title('FFT Transform Normalization (Periodic)', fontsize=9, color='g')
plt.ylim((0, 1))
plt.subplot(235)
plt.plot(xf2_n, yf2_n[: n // 2], 'g')
plt.title('FFT Transform Normalization (Aperiodic)', fontsize=9, color='g')
plt.ylim((0, 1))

# 周期性检测
plt.subplot(233)
# if is_periodic(y1):
if is_periodic2(y1):
plt.title('Periodic Signal Detected!', fontsize=9, color='r')
plt.plot(xf1_n, yf1_n[: n // 2], 'r')
print("y1 is periodic!!!".center(100, "-"))
else:
plt.title('Aperiodic Signal...', fontsize=9, color='k')
plt.plot(xf1_n, yf1_n[: n // 2], 'k')
plt.ylim((0, 1))

plt.subplot(236)
# if is_periodic(y2):
if is_periodic2(y2):
plt.plot(xf2_n, yf2_n[: n // 2], 'r')
plt.title('Periodic Signal Detected!', fontsize=9, color='r')
print("y2 is periodic!!!".center(100, "-"))
else:
plt.title('Aperiodic Signal...', fontsize=9, color='k')
plt.plot(xf2_n, yf2_n[: n // 2], 'k')
plt.ylim((0, 1))

plt.show()